Skip to content

Instantly share code, notes, and snippets.

Created March 8, 2016 10:11
Show Gist options
  • Select an option

  • Save anonymous/bc82a0bea35a5cb3ce01 to your computer and use it in GitHub Desktop.

Select an option

Save anonymous/bc82a0bea35a5cb3ce01 to your computer and use it in GitHub Desktop.
ZMQ Socket with timeout
from functools import update_wrapper
import zmq
class Socket(zmq.Socket):
def __init__(self, ctx, type, default_timeout=None):
zmq.Socket.__init__(self, ctx, type)
self.default_timeout = default_timeout
def on_timeout(self):
return None
def _timeout_wrapper(f):
def wrapper(self, *args, **kwargs):
timeout = kwargs.pop('timeout', self.default_timeout)
if timeout is not None:
timeout = int(timeout * 1000)
poller = zmq.Poller()
poller.register(self)
if not poller.poll(timeout):
return self.on_timeout()
return f(self, *args, **kwargs)
return update_wrapper(wrapper, f, ('__name__', '__doc__'))
for _meth in dir(zmq.Socket):
if _meth.startswith(('send', 'recv')):
locals()[_meth] = _timeout_wrapper(getattr(zmq.Socket, _meth))
del _meth, _timeout_wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment