# File lib/zmq/socket.rb, line 11 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end, __FILE__, __LINE__ end end
Returns the socket AFFINITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.affinity => 0
static VALUE rb_czmq_socket_opt_affinity(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_affinity(sock->socket)); }
Sets the socket AFFINITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.affinity = 1 => nil
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_affinity, "AFFINITY", value); }
Returns the socket BACKLOG value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.backlog => 100
static VALUE rb_czmq_socket_opt_backlog(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_backlog(sock->socket)); }
Sets the socket BACKLOG value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.backlog = 200 => nil
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_backlog, "BACKLOG", value); }
Binds to a given endpoint. When the port number is '*', attempts to bind to a free port. Always returns the port number on success.
ctx = ZMQ::Context.new sock = ctx.socket(:PUSH) sock.bind("tcp://localhost:*") => 5432
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint) { struct nogvl_conn_args args; int rc; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqSockGuardCrossThread(sock); Check_Type(endpoint, T_STRING); args.socket = sock; args.endpoint = StringValueCStr(endpoint); rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0); if (rc == -1) ZmqRaiseSysError(); if (sock->verbose) zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint)); sock->state = ZMQ_SOCKET_BOUND; sock->endpoint = rb_str_new4(endpoint); return INT2NUM(rc); }
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
sock.close => nil
static VALUE rb_czmq_socket_close(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqSockGuardCrossThread(sock); /* This is useless for production / real use cases as we can't query the state again OR assume anything about the underlying connection. Merely doing the right thing. */ sock->state = ZMQ_SOCKET_PENDING; rb_czmq_free_sock(sock); return Qnil; }
Attempts to connect to a given endpoint.
ctx = ZMQ::Context.new rep = ctx.socket(:REP) port = rep.bind("tcp://localhost:*") => 5432 req = ctx.socket(:REQ) req.connect("tcp://localhost:#{port}") => true
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint) { struct nogvl_conn_args args; int rc; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqSockGuardCrossThread(sock); Check_Type(endpoint, T_STRING); args.socket = sock; args.endpoint = StringValueCStr(endpoint); rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0); ZmqAssert(rc); if (sock->verbose) zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint)); sock->state = ZMQ_SOCKET_CONNECTED; sock->endpoint = rb_str_new4(endpoint); return Qtrue; }
Returns the endpoint this socket is currently connected to, if any.
ctx = ZMQ::Context.new sock = ctx.socket(:PUSH) sock.endpoint => nil sock.bind("inproc://test") sock.endpoint => "inproc://test"
static VALUE rb_czmq_socket_endpoint(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return sock->endpoint; }
Query if this socket is in a readable or writable state.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.events => ZMQ::POLLIN
static VALUE rb_czmq_socket_opt_events(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_events(sock->socket)); }
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.
ctx = ZMQ::Context.new sock = ctx.socket(:PUSH) sock.fd => -1 sock.bind("inproc://test") sock.fd => 4
static VALUE rb_czmq_socket_fd(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1); return INT2NUM(zsockopt_fd(sock->socket)); }
Returns the socket HWM (High Water Mark) value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.hwm => 0
static VALUE rb_czmq_socket_opt_hwm(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_hwm(sock->socket)); }
Sets the socket HWM (High Water Mark() value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.hwm = 100 => nil sock.hwm => 100
static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_hwm, "HWM", value); }
Sets the socket IDENTITY value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.identity = "anonymous" => nil
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value) { char *val; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqSockGuardCrossThread(sock); Check_Type(value, T_STRING); if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty."); if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars."); val = StringValueCStr(value); zsockopt_set_identity(sock->socket, val); if (sock->verbose) zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val); return Qnil; }
Returns the socket LINGER value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.linger => -1
static VALUE rb_czmq_socket_opt_linger(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_linger(sock->socket)); }
Sets the socket LINGER value in ms.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.linger = 1000 => nil
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_linger, "LINGER", value); }
Sets the socket MCAST_LOOP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.mcast_loop = false => nil
static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetBooleanSockOpt(obj, zsockopt_set_mcast_loop, "MCAST_LOOP", value); }
Returns the socket MCAST_LOOP status.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.mcast_loop? => true
static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return (zsockopt_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse; }
Poll all sockets for readbable states by default
# File lib/zmq/socket.rb, line 68 def poll_readable? true end
Poll all sockets for writable states by default
# File lib/zmq/socket.rb, line 73 def poll_writable? true end
Returns the socket RATE value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rate => 40000
static VALUE rb_czmq_socket_opt_rate(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_rate(sock->socket)); }
Sets the socket RATE value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rate = 50000 => nil
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_rate, "RATE", value); }
Returns the socket RCVBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rcvbuf => 0
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_rcvbuf(sock->socket)); }
Sets the socket RCVBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.rcvbuf = 1000 => nil
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_rcvbuf, "RCVBUF", value); }
Query if there's more messages to receive.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.rcvmore => true
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return (zsockopt_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse; }
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
# File lib/zmq/socket.rb, line 29 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end
Returns the socket RECONNECT_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl => 100
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_reconnect_ivl(sock->socket)); }
Sets the socket RECONNECT_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl = 200 => nil
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_reconnect_ivl, "RECONNECT_IVL", value); }
Returns the socket RECONNECT_IVL_MAX value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl_max => 0
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_reconnect_ivl_max(sock->socket)); }
Sets the socket RECONNECT_IVL_MAX value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.reconnect_ivl_max = 5 => nil
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value); }
Returns the socket RECOVERY_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl => 10
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_recovery_ivl(sock->socket)); }
Sets the socket RECOVERY_IVL value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl = 20 => nil
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_recovery_ivl, "RECOVERY_IVL", value); }
Returns the socket RECOVERY_IVL_MSEC value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl_msec => -1
static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_recovery_ivl_msec(sock->socket)); }
Sets the socket RECOVERY_IVL_MSEC value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recovery_ivl_msec = 20 => nil
static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value); }
Receive a string from this ZMQ socket. May block depending on the socket type.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") sock.recv => "message"
static VALUE rb_czmq_socket_recv(VALUE obj) { char *str = NULL; struct nogvl_recv_args args; VALUE result = Qnil; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); args.socket = sock; str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0); if (str == NULL) return result; ZmqAssertSysError(); if (sock->verbose) zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str); result = ZmqEncode(rb_str_new2(str)); free(str); return result; }
Receives a ZMQ frame from this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") sock.recv_frame => ZMQ::Frame or nil
static VALUE rb_czmq_socket_recv_frame(VALUE obj) { zframe_t *frame = NULL; struct nogvl_recv_args args; char print_prefix[255]; char *cur_time = NULL; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); args.socket = sock; frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0); if (frame == NULL) return Qnil; if (sock->verbose) { cur_time = rb_czmq_formatted_current_time(); ZmqDumpFrame("recv_frame", frame); } return rb_czmq_alloc_frame(frame); }
Receives a ZMQ frame from this socket. Does not block
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") sock.recv_frame_nonblock => ZMQ::Frame or nil
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj) { zframe_t *frame = NULL; char print_prefix[255]; char *cur_time = NULL; errno = 0; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); frame = zframe_recv_nowait(sock->socket); if (frame == NULL) return Qnil; if (sock->verbose) { cur_time = rb_czmq_formatted_current_time(); ZmqDumpFrame("recv_frame_nonblock", frame); } return rb_czmq_alloc_frame(frame); }
Receives a ZMQ message from this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") sock.recv_message => ZMQ::Message or nil
static VALUE rb_czmq_socket_recv_message(VALUE obj) { zmsg_t *message = NULL; struct nogvl_recv_args args; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); args.socket = sock; message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0); if (message == NULL) return Qnil; if (sock->verbose) ZmqDumpMessage("recv_message", message); return rb_czmq_alloc_message(message); }
Receive a string from this ZMQ socket. Does not block.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") sock.recv_nonblock => "message"
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj) { char *str = NULL; errno = 0; VALUE result = Qnil; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); str = zstr_recv_nowait(sock->socket); if (str == NULL) return result; ZmqAssertSysError(); if (sock->verbose) zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str); result = ZmqEncode(rb_str_new2(str)); free(str); return result; }
Returns the recv timeout currently associated with this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recv_timeout = 5 sock.recv_timeout => 5
static VALUE rb_czmq_socket_recv_timeout(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return sock->recv_timeout; }
Sets a receive timeout for this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.recv_timeout = 5 => nil
static VALUE rb_czmq_socket_set_recv_timeout(VALUE obj, VALUE timeout) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout))); sock->recv_timeout = timeout; return Qnil; }
Sends a string to this ZMQ socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REQ) sock.connect("inproc://test") sock.send("message") => true
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg) { int rc; struct nogvl_send_args args; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); Check_Type(msg, T_STRING); args.socket = sock; args.msg = StringValueCStr(msg); rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0); ZmqAssert(rc); if (sock->verbose) zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg)); return Qtrue; }
Sends a ZMQ::Frame instance to this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") frame = ZMQ::Frame("frame") sock.send_frame(frame) => nil frame = ZMQ::Frame("multi") sock.send_frame(frame, ZMQ::Frame::MORE)
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj) { struct nogvl_send_frame_args args; VALUE frame_obj; VALUE flags; char print_prefix[255]; char *cur_time = NULL; zframe_t *print_frame = NULL; int rc, flgs; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); rb_scan_args(argc, argv, "11", &frame_obj, &flags); ZmqGetFrame(frame_obj); if (NIL_P(flags)) { flgs = 0; } else { if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags)); Check_Type(flags, T_FIXNUM); flgs = FIX2INT(flags); } if (sock->verbose) { cur_time = rb_czmq_formatted_current_time(); print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame); } args.socket = sock; args.frame = frame; args.flags = flgs; rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0); ZmqAssert(rc); if (sock->verbose) ZmqDumpFrame("send_frame", print_frame); return Qtrue; }
Sends a ZMQ::Message instance to this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.bind("inproc://test") msg = ZMQ::Message.new msg.push ZMQ::Frame("header") sock.send_message(msg) => nil
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj) { struct nogvl_send_message_args args; zmsg_t *print_message = NULL; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); ZmqGetMessage(message_obj); if (sock->verbose) print_message = zmsg_dup(message->message); args.socket = sock; args.message = message->message; rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0); message->flags |= ZMQ_MESSAGE_DESTROYED; if (sock->verbose) ZmqDumpMessage("send_message", print_message); return Qnil; }
Returns the send timeout currently associated with this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.send_timeout = 5 sock.send_timeout => 5
static VALUE rb_czmq_socket_send_timeout(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return sock->send_timeout; }
Sets a send timeout for this socket.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.send_timeout = 5 => nil
static VALUE rb_czmq_socket_set_send_timeout(VALUE obj, VALUE timeout) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout))); sock->send_timeout = timeout; return Qnil; }
Sends a string to this ZMQ socket, with a more flag set.
ctx = ZMQ::Context.new sock = ctx.socket(:REQ) sock.connect("inproc://test") sock.sendm("mes") => true sock.sendm("sage") => true
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg) { int rc; struct nogvl_send_args args; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!"); ZmqSockGuardCrossThread(sock); Check_Type(msg, T_STRING); args.socket = sock; args.msg = StringValueCStr(msg); rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0); ZmqAssert(rc); if (sock->verbose) zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg)); return Qtrue; }
Returns the socket SNDBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.sndbuf => 0
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_sndbuf(sock->socket)); }
Sets the socket SNDBUF value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.sndbuf = 1000 => nil
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_sndbuf, "SNDBUF", value); }
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED
ctx = ZMQ::Context.new sock = ctx.socket(:PUSH) sock.state => ZMQ::Socket::PENDING sock.bind("inproc://test") sock.state => ZMQ::Socket::BOUND
static VALUE rb_czmq_socket_state(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(sock->state); }
Subscribes this SUB socket to a topic.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.subscribe "ruby" => nil
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetStringSockOpt(obj, zsockopt_set_subscribe, "SUBSCRIBE", value, { ZmqAssertSockOptFor(ZMQ_SUB) }); }
Returns the socket SWAP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.swap => 0
static VALUE rb_czmq_socket_opt_swap(VALUE obj) { zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); return INT2NUM(zsockopt_swap(sock->socket)); }
Sets the socket SWAP value.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.swap = 100 => nil
static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetSockOpt(obj, zsockopt_set_swap, "SWAP", value); }
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
# File lib/zmq/socket.rb, line 56 def to_s case state when BOUND "#{type_str} socket bound to #{endpoint}" when CONNECTED "#{type_str} socket connected to #{endpoint}" else "#{type_str} socket" end end
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
# File lib/zmq/socket.rb, line 47 def type_str self.class.const_get(:TYPE_STR) end
Unsubscribes this SUB socket from a topic.
ctx = ZMQ::Context.new sock = ctx.socket(:SUB) sock.unsubscribe "ruby" => nil
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value) { zmq_sock_wrapper *sock = NULL; ZmqSetStringSockOpt(obj, zsockopt_set_unsubscribe, "UNSUBSCRIBE", value, { ZmqAssertSockOptFor(ZMQ_SUB) }); }
Let this socket be verbose - dumps a lot of data to stdout for debugging.
ctx = ZMQ::Context.new sock = ctx.socket(:REP) sock.verbose = true => nil
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level) { Bool vlevel; zmq_sock_wrapper *sock = NULL; GetZmqSocket(obj); vlevel = (level == Qtrue) ? TRUE : FALSE; sock->verbose = vlevel; return Qnil; }
Generated with the Darkfish Rdoc Generator 2.