Created
March 8, 2016 10:11
-
-
Save anonymous/bc82a0bea35a5cb3ce01 to your computer and use it in GitHub Desktop.
ZMQ Socket with timeout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from functools import update_wrapper | |
| import zmq | |
| class Socket(zmq.Socket): | |
| def __init__(self, ctx, type, default_timeout=None): | |
| zmq.Socket.__init__(self, ctx, type) | |
| self.default_timeout = default_timeout | |
| def on_timeout(self): | |
| return None | |
| def _timeout_wrapper(f): | |
| def wrapper(self, *args, **kwargs): | |
| timeout = kwargs.pop('timeout', self.default_timeout) | |
| if timeout is not None: | |
| timeout = int(timeout * 1000) | |
| poller = zmq.Poller() | |
| poller.register(self) | |
| if not poller.poll(timeout): | |
| return self.on_timeout() | |
| return f(self, *args, **kwargs) | |
| return update_wrapper(wrapper, f, ('__name__', '__doc__')) | |
| for _meth in dir(zmq.Socket): | |
| if _meth.startswith(('send', 'recv')): | |
| locals()[_meth] = _timeout_wrapper(getattr(zmq.Socket, _meth)) | |
| del _meth, _timeout_wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment