Skip to content

Instantly share code, notes, and snippets.

@arnaudmorin
Created September 22, 2025 16:49
Show Gist options
  • Select an option

  • Save arnaudmorin/89e9376b8d18f3dcfb7041dd02e5dbdd to your computer and use it in GitHub Desktop.

Select an option

Save arnaudmorin/89e9376b8d18f3dcfb7041dd02e5dbdd to your computer and use it in GitHub Desktop.
oslo.messaging ipc unix
from oslo_config import cfg
import oslo_messaging as m
cfg.CONF.set_override('lock_path', '/tmp', group='oslo_concurrency')
transport = m.get_rpc_transport(cfg.CONF, url='unix:/tmp')
target = m.Target(topic='test')
client = m.get_rpc_client(
transport,
target,
)
r = client.prepare(timeout=10).call(
None,
'coucou',
arg='boy'
)
print(f'{r=}')
from oslo_config import cfg
from oslo_context.context import RequestContext
import oslo_messaging as m
import time
import threading
cfg.CONF.set_override('lock_path', '/tmp', group='oslo_concurrency')
transport = m.get_rpc_transport(cfg.CONF, url='unix:/tmp')
class TestEndpoint(object):
def coucou(self, ctx, arg):
return 'Coucou ' + arg
def server_t():
target = m.Target(topic='test', server='server1')
endpoints = [TestEndpoint()]
server = m.get_rpc_server(transport, target, endpoints)
server.start()
time.sleep(4)
server.stop()
server.wait()
def client_t():
target = m.Target(topic='test', server='server1')
client = m.get_rpc_client(transport, target)
time.sleep(1)
r = client.prepare(timeout=10).call(
RequestContext(user_id='a'),
'coucou',
arg='boy'
)
print(f'{r=}')
thread_s = threading.Thread(target=server_t)
thread_c = threading.Thread(target=client_t)
thread_s.start()
thread_c.start()
thread_s.join()
thread_c.join()
from oslo_config import cfg
import oslo_messaging as m
import time
class TestEndpoint(object):
def coucou(self, ctx, arg):
return 'Coucou ' + arg
cfg.CONF.set_override('lock_path', '/tmp', group='oslo_concurrency')
transport = m.get_rpc_transport(cfg.CONF, url='unix:/tmp')
target = m.Target(topic='test', server='server1')
endpoints = [
TestEndpoint(),
]
server = m.get_rpc_server(transport, target, endpoints)
try:
print('Starting server')
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment