Last active
December 20, 2015 03:29
-
-
Save quixoten/6063655 to your computer and use it in GitHub Desktop.
An example ZMQ architecture for distributing work.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'rubygems' | |
| require 'ffi-rzmq' | |
| NUMBER_OF_CLIENTS = 10 | |
| NUMBER_OF_SCAN_PROCESSORS = 3 | |
| def client(identity) | |
| context = ZMQ::Context.new | |
| socket = context.socket ZMQ::REQ | |
| socket.identity = identity | |
| socket.connect "ipc://frontend.ipc" | |
| socket.send_string (1 + rand(3)).to_s | |
| socket.recv_string reply = "" | |
| printf "%11s: REP %2s\n", identity, reply | |
| ensure | |
| socket.close | |
| context.terminate | |
| end | |
| def scan_processor(identity) | |
| context = ZMQ::Context.new | |
| socket = context.socket ZMQ::REQ | |
| socket.identity = identity | |
| socket.connect "ipc://backend.ipc" | |
| # This initial message lets the main server know | |
| # that we're ready to receive jobs | |
| socket.send_string "READY" | |
| loop do | |
| socket.recv_string client = "" | |
| socket.recv_string empty = "" | |
| socket.recv_string request = "" | |
| printf "%11s: REQ %2s from %s\n", identity, request, client | |
| sleep request.to_i | |
| socket.send_strings [client, empty, "OK from #{identity}"] | |
| end | |
| ensure | |
| socket.close | |
| context.terminate | |
| end | |
| def main_server | |
| context = ZMQ::Context.new | |
| frontend_socket = context.socket ZMQ::ROUTER | |
| backend_socket = context.socket ZMQ::ROUTER | |
| frontend_socket.bind "ipc://frontend.ipc" | |
| backend_socket.bind "ipc://backend.ipc" | |
| available_processors = [] | |
| poller = ZMQ::Poller.new | |
| poller.register_readable backend_socket | |
| poller.register_readable frontend_socket | |
| # The poller will continuously poll the backend_socket and will poll the | |
| # frontend_socket when there is at least one processor available. | |
| while poller.poll > 0 | |
| poller.readables.each do |readable| | |
| if readable === backend_socket | |
| # A router socket prefixes the senders id and an empty frame to the | |
| # frames originally sent. In other words, if "PROCESSOR-2" sends "READY" | |
| # the frames received from the router will be: ["PROCESSOR-2", "", "READY"] | |
| # | |
| # When replying through a router socket, the same prefix is expected. | |
| # In other words, to send "DO STUFF" to "PROCESSOR-2" through a router | |
| # socket, use the frames: ["PROCESSOR-2", "", "DO STUFF"]. | |
| # | |
| # When sending, the router removes the id frame and the empty frame and | |
| # sends the remaining frames through the connection identified in the | |
| # id frame. | |
| # | |
| # In this implementation, when "PROCESSR-2" is responding to a request from | |
| # "CLIENT-1" with "OK", the frames received here will be: | |
| # ["PROCESSOR-2", "", "CLIENT-1", "", "OK"] | |
| # | |
| # Only the first two frames are automatically given by the router socket. | |
| # The second two: ["CLIENT-1", ""] are actually part of the message | |
| # "PROCESSOR-2" sent us. This allows us to easily route the mssage back | |
| # to the client by simply removing the first two frames, ["PROCESSOR-2", ""], | |
| # and sending the remaining frames, ["CLIENT-1", "", "OK"], which is exactly | |
| # what is done below. | |
| # Remove and store the processor id from the frames received | |
| backend_socket.recv_string processor = "" | |
| # Remove and store the empty frame from the frames received (storing not really necessary) | |
| backend_socket.recv_string empty = "" | |
| # Read the remaining frames into <reply> | |
| backend_socket.recv_strings reply = [] | |
| # At this point <reply> will either be | |
| # 1) ["READY"] | |
| # or | |
| # 2) [CLIENT_ID, "", REPLY] | |
| # | |
| # In either case, the processor id we stored is added to the list of | |
| # <available_processors> (it either started up and is ready for its first | |
| # job, or it just completed and is ready for its nth job). | |
| # | |
| # In the second case, <reply> is sent out the <frontend_socket>. Because | |
| # it's a router socket, it will use the first two frames, [CLIENT_ID, ""], | |
| # to send the remaining frames, [REPLY], to the correct client. | |
| frontend_socket.send_strings reply unless reply == ["READY"] | |
| available_processors << processor | |
| elsif readable === frontend_socket && available_processors.any? | |
| # Read the request from the client and forward it to the LRU processor | |
| # As stated above, because we're using a router socket, messages | |
| # received will be prefixed with the sender's id and an empty frame. | |
| # In this particular case (receiving requests from clients) it will | |
| # have the form: [CLIENT_ID, "", FRAME_1, FRAME_2, ..., FRAME_N] | |
| # | |
| # For this example, we'll say that "CLIENT-3" sent the frames ["1"] and | |
| # "PROCESSOR-2" is our LRU (Least Recently Used) processor. | |
| # | |
| # When we read the request from the router socket, it is received as: | |
| # ["CLIENT-3", "", "1"] | |
| # | |
| # In order to send the request to PROCESSOR-2, we have to prefix the | |
| # request with the frames: ["PROCESSOR-2", ""] | |
| # | |
| # If we send ["PROCESSOR-2, "", "1"] then "PROCESSOR-2" will receive the | |
| # single frame ["1"], which would be enough to process the job but not | |
| # enough to route the response correctly. | |
| # | |
| # Instead, we send: ["PROCESSOR-2", "", "CLIENT-3", "", "1"], simply | |
| # prefixing the frames we receive with the processor's id and an empty frame. | |
| # Now, "PROCESSOR-2" will receive the frames: ["CLIENT-3", "", "1"] | |
| # | |
| # As you can see in the scan_processor definition, it reads the client id and the | |
| # empty frame and sends it as part of the response. | |
| frontend_socket.recv_strings request = [] | |
| backend_socket.send_strings [available_processors.shift, ""] + request | |
| end | |
| end | |
| end | |
| ensure | |
| frontend_socket.close | |
| backend_socket.close | |
| context.terminate | |
| end | |
| # NOTE: Because we're using ZMQ, we can start the systems in any order without | |
| # impairing functionality. | |
| # Start the main server | |
| Thread.new { main_server } | |
| # Start the processors | |
| NUMBER_OF_SCAN_PROCESSORS.times do |processor_id| | |
| Thread.new { scan_processor "PROCESSOR-#{processor_id}" } | |
| end | |
| # Start the clients | |
| client_threads = NUMBER_OF_CLIENTS.times.map do |client_id| | |
| Thread.new { client "CLIENT-#{client_id}" } | |
| end | |
| # Wait for all the clients to get their responses | |
| client_threads.each &:join |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment