-
-
Save litnimax/088d5f51e3ed05a71f48 to your computer and use it in GitHub Desktop.
ZMQ Socket with timeout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from functools import update_wrapper | |
| import zmq | |
| class Socket(zmq.Socket): | |
| def __init__(self, ctx, type, default_timeout=None): | |
| zmq.Socket.__init__(self, ctx, type) | |
| self.default_timeout = default_timeout | |
| def on_timeout(self): | |
| return None | |
| def _timeout_wrapper(f): | |
| def wrapper(self, *args, **kwargs): | |
| timeout = kwargs.pop('timeout', self.default_timeout) | |
| if timeout is not None: | |
| timeout = int(timeout * 1000) | |
| poller = zmq.Poller() | |
| poller.register(self) | |
| if not poller.poll(timeout): | |
| return self.on_timeout() | |
| return f(self, *args, **kwargs) | |
| return update_wrapper(wrapper, f, ('__name__', '__doc__')) | |
| for _meth in dir(zmq.Socket): | |
| if _meth.startswith(('send', 'recv')): | |
| locals()[_meth] = _timeout_wrapper(getattr(zmq.Socket, _meth)) | |
| del _meth, _timeout_wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment