# File lib/zmq/socket.rb, line 11 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end, __FILE__, __LINE__ end end
Returns the socket AFFINITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.affinity => 0
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_affinity(sock->socket));
}
Sets the socket AFFINITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.affinity = 1 => nil
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_affinity, "AFFINITY", value);
}
Returns the socket BACKLOG value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.backlog => 100
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_backlog(sock->socket));
}
Sets the socket BACKLOG value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.backlog = 200 => nil
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_backlog, "BACKLOG", value);
}
Binds to a given endpoint. When the port number is '*', attempts to bind to a free port. Always returns the port number on success.
ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*") => 5432
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
if (rc == -1) ZmqRaiseSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
sock->endpoint = rb_str_new4(endpoint);
return INT2NUM(rc);
}
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
sock.close => nil
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
Attempts to connect to a given endpoint.
ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*") => 5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}") => true
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
sock->endpoint = rb_str_new4(endpoint);
return Qtrue;
}
Returns the endpoint this socket is currently connected to, if any.
ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint => nil
sock.bind("inproc://test")
sock.endpoint => "inproc://test"
static VALUE rb_czmq_socket_endpoint(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoint;
}
Query if this socket is in a readable or writable state.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.events => ZMQ::POLLIN
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_events(sock->socket));
}
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.
ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd => -1
sock.bind("inproc://test")
sock.fd => 4
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
return INT2NUM(zsockopt_fd(sock->socket));
}
Returns the socket HWM (High Water Mark) value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.hwm => 0
static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_hwm(sock->socket));
}
Sets the socket HWM (High Water Mark() value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.hwm = 100 => nil sock.hwm => 100
static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_hwm, "HWM", value);
}
Sets the socket IDENTITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.identity = "anonymous" => nil
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsockopt_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
Returns the socket LINGER value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.linger => -1
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_linger(sock->socket));
}
Sets the socket LINGER value in ms.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.linger = 1000 => nil
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_linger, "LINGER", value);
}
Sets the socket MCAST_LOOP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.mcast_loop = false => nil
static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsockopt_set_mcast_loop, "MCAST_LOOP", value);
}
Returns the socket MCAST_LOOP status.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.mcast_loop? => true
static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsockopt_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}
Poll all sockets for readbable states by default
# File lib/zmq/socket.rb, line 68 def poll_readable? true end
Poll all sockets for writable states by default
# File lib/zmq/socket.rb, line 73 def poll_writable? true end
Returns the socket RATE value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rate => 40000
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_rate(sock->socket));
}
Sets the socket RATE value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rate = 50000 => nil
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_rate, "RATE", value);
}
Returns the socket RCVBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rcvbuf => 0
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_rcvbuf(sock->socket));
}
Sets the socket RCVBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rcvbuf = 1000 => nil
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_rcvbuf, "RCVBUF", value);
}
Query if there's more messages to receive.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.rcvmore => true
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsockopt_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
# File lib/zmq/socket.rb, line 29 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end
Returns the socket RECONNECT_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl => 100
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_reconnect_ivl(sock->socket));
}
Sets the socket RECONNECT_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl = 200 => nil
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_reconnect_ivl, "RECONNECT_IVL", value);
}
Returns the socket RECONNECT_IVL_MAX value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl_max => 0
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_reconnect_ivl_max(sock->socket));
}
Sets the socket RECONNECT_IVL_MAX value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl_max = 5 => nil
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
Returns the socket RECOVERY_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl => 10
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_recovery_ivl(sock->socket));
}
Sets the socket RECOVERY_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl = 20 => nil
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_recovery_ivl, "RECOVERY_IVL", value);
}
Returns the socket RECOVERY_IVL_MSEC value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl_msec => -1
static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_recovery_ivl_msec(sock->socket));
}
Sets the socket RECOVERY_IVL_MSEC value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl_msec = 20 => nil
static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}
Receive a string from this ZMQ socket. May block depending on the socket type.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv => "message"
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
Receives a ZMQ frame from this socket.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame => ZMQ::Frame or nil
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
Receives a ZMQ frame from this socket. Does not block
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock => ZMQ::Frame or nil
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
Receives a ZMQ message from this socket.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message => ZMQ::Message or nil
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
Receive a string from this ZMQ socket. Does not block.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock => "message"
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
str = zstr_recv_nowait(sock->socket);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
Returns the recv timeout currently associated with this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recv_timeout = 5 sock.recv_timeout => 5
static VALUE rb_czmq_socket_recv_timeout(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->recv_timeout;
}
Sets a receive timeout for this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recv_timeout = 5 => nil
static VALUE rb_czmq_socket_set_recv_timeout(VALUE obj, VALUE timeout)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
sock->recv_timeout = timeout;
return Qnil;
}
Sends a string to this ZMQ socket.
ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message") => true
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
Sends a ZMQ::Frame instance to this socket.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame) => nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
}
args.socket = sock;
args.frame = frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
Sends a ZMQ::Message instance to this socket.
ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg) => nil
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags |= ZMQ_MESSAGE_DESTROYED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
Returns the send timeout currently associated with this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.send_timeout = 5 sock.send_timeout => 5
static VALUE rb_czmq_socket_send_timeout(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->send_timeout;
}
Sets a send timeout for this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.send_timeout = 5 => nil
static VALUE rb_czmq_socket_set_send_timeout(VALUE obj, VALUE timeout)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
sock->send_timeout = timeout;
return Qnil;
}
Sends a string to this ZMQ socket, with a more flag set.
ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes") => true
sock.sendm("sage") => true
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
Returns the socket SNDBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.sndbuf => 0
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_sndbuf(sock->socket));
}
Sets the socket SNDBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.sndbuf = 1000 => nil
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_sndbuf, "SNDBUF", value);
}
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED
ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state => ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state => ZMQ::Socket::BOUND
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
Subscribes this SUB socket to a topic.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.subscribe "ruby" => nil
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsockopt_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
Returns the socket SWAP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.swap => 0
static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsockopt_swap(sock->socket));
}
Sets the socket SWAP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.swap = 100 => nil
static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsockopt_set_swap, "SWAP", value);
}
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
# File lib/zmq/socket.rb, line 56 def to_s case state when BOUND "#{type_str} socket bound to #{endpoint}" when CONNECTED "#{type_str} socket connected to #{endpoint}" else "#{type_str} socket" end end
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
# File lib/zmq/socket.rb, line 47 def type_str self.class.const_get(:TYPE_STR) end
Unsubscribes this SUB socket from a topic.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.unsubscribe "ruby" => nil
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsockopt_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
Let this socket be verbose - dumps a lot of data to stdout for debugging.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.verbose = true => nil
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
Bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? TRUE : FALSE;
sock->verbose = vlevel;
return Qnil;
}
Generated with the Darkfish Rdoc Generator 2.