Skip to content

Instantly share code, notes, and snippets.

@quixoten
Last active December 20, 2015 03:29
Show Gist options
  • Select an option

  • Save quixoten/6063655 to your computer and use it in GitHub Desktop.

Select an option

Save quixoten/6063655 to your computer and use it in GitHub Desktop.
An example ZMQ architecture for distributing work.
require 'rubygems'
require 'ffi-rzmq'
NUMBER_OF_CLIENTS = 10
NUMBER_OF_SCAN_PROCESSORS = 3
def client(identity)
context = ZMQ::Context.new
socket = context.socket ZMQ::REQ
socket.identity = identity
socket.connect "ipc://frontend.ipc"
socket.send_string (1 + rand(3)).to_s
socket.recv_string reply = ""
printf "%11s: REP %2s\n", identity, reply
ensure
socket.close
context.terminate
end
def scan_processor(identity)
context = ZMQ::Context.new
socket = context.socket ZMQ::REQ
socket.identity = identity
socket.connect "ipc://backend.ipc"
# This initial message lets the main server know
# that we're ready to receive jobs
socket.send_string "READY"
loop do
socket.recv_string client = ""
socket.recv_string empty = ""
socket.recv_string request = ""
printf "%11s: REQ %2s from %s\n", identity, request, client
sleep request.to_i
socket.send_strings [client, empty, "OK from #{identity}"]
end
ensure
socket.close
context.terminate
end
def main_server
context = ZMQ::Context.new
frontend_socket = context.socket ZMQ::ROUTER
backend_socket = context.socket ZMQ::ROUTER
frontend_socket.bind "ipc://frontend.ipc"
backend_socket.bind "ipc://backend.ipc"
available_processors = []
poller = ZMQ::Poller.new
poller.register_readable backend_socket
poller.register_readable frontend_socket
# The poller will continuously poll the backend_socket and will poll the
# frontend_socket when there is at least one processor available.
while poller.poll > 0
poller.readables.each do |readable|
if readable === backend_socket
# A router socket prefixes the senders id and an empty frame to the
# frames originally sent. In other words, if "PROCESSOR-2" sends "READY"
# the frames received from the router will be: ["PROCESSOR-2", "", "READY"]
#
# When replying through a router socket, the same prefix is expected.
# In other words, to send "DO STUFF" to "PROCESSOR-2" through a router
# socket, use the frames: ["PROCESSOR-2", "", "DO STUFF"].
#
# When sending, the router removes the id frame and the empty frame and
# sends the remaining frames through the connection identified in the
# id frame.
#
# In this implementation, when "PROCESSR-2" is responding to a request from
# "CLIENT-1" with "OK", the frames received here will be:
# ["PROCESSOR-2", "", "CLIENT-1", "", "OK"]
#
# Only the first two frames are automatically given by the router socket.
# The second two: ["CLIENT-1", ""] are actually part of the message
# "PROCESSOR-2" sent us. This allows us to easily route the mssage back
# to the client by simply removing the first two frames, ["PROCESSOR-2", ""],
# and sending the remaining frames, ["CLIENT-1", "", "OK"], which is exactly
# what is done below.
# Remove and store the processor id from the frames received
backend_socket.recv_string processor = ""
# Remove and store the empty frame from the frames received (storing not really necessary)
backend_socket.recv_string empty = ""
# Read the remaining frames into <reply>
backend_socket.recv_strings reply = []
# At this point <reply> will either be
# 1) ["READY"]
# or
# 2) [CLIENT_ID, "", REPLY]
#
# In either case, the processor id we stored is added to the list of
# <available_processors> (it either started up and is ready for its first
# job, or it just completed and is ready for its nth job).
#
# In the second case, <reply> is sent out the <frontend_socket>. Because
# it's a router socket, it will use the first two frames, [CLIENT_ID, ""],
# to send the remaining frames, [REPLY], to the correct client.
frontend_socket.send_strings reply unless reply == ["READY"]
available_processors << processor
elsif readable === frontend_socket && available_processors.any?
# Read the request from the client and forward it to the LRU processor
# As stated above, because we're using a router socket, messages
# received will be prefixed with the sender's id and an empty frame.
# In this particular case (receiving requests from clients) it will
# have the form: [CLIENT_ID, "", FRAME_1, FRAME_2, ..., FRAME_N]
#
# For this example, we'll say that "CLIENT-3" sent the frames ["1"] and
# "PROCESSOR-2" is our LRU (Least Recently Used) processor.
#
# When we read the request from the router socket, it is received as:
# ["CLIENT-3", "", "1"]
#
# In order to send the request to PROCESSOR-2, we have to prefix the
# request with the frames: ["PROCESSOR-2", ""]
#
# If we send ["PROCESSOR-2, "", "1"] then "PROCESSOR-2" will receive the
# single frame ["1"], which would be enough to process the job but not
# enough to route the response correctly.
#
# Instead, we send: ["PROCESSOR-2", "", "CLIENT-3", "", "1"], simply
# prefixing the frames we receive with the processor's id and an empty frame.
# Now, "PROCESSOR-2" will receive the frames: ["CLIENT-3", "", "1"]
#
# As you can see in the scan_processor definition, it reads the client id and the
# empty frame and sends it as part of the response.
frontend_socket.recv_strings request = []
backend_socket.send_strings [available_processors.shift, ""] + request
end
end
end
ensure
frontend_socket.close
backend_socket.close
context.terminate
end
# NOTE: Because we're using ZMQ, we can start the systems in any order without
# impairing functionality.
# Start the main server
Thread.new { main_server }
# Start the processors
NUMBER_OF_SCAN_PROCESSORS.times do |processor_id|
Thread.new { scan_processor "PROCESSOR-#{processor_id}" }
end
# Start the clients
client_threads = NUMBER_OF_CLIENTS.times.map do |client_id|
Thread.new { client "CLIENT-#{client_id}" }
end
# Wait for all the clients to get their responses
client_threads.each &:join
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment