Created
January 12, 2010 19:25
-
-
Save dejw/275515 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require "rubygems" | |
| require "Node" | |
| require "Configuration" | |
| require "CoreService" | |
| require "MonitoringObjectsAttributes" | |
| require "example-services" | |
| require "mocha" | |
| class NameService | |
| def initialize(bus) | |
| (@mock = Mocha::Mock.new).stubs(:register_address).returns("Agent1", "Agent2") | |
| end | |
| def register_address(suggest) | |
| @mock.register_address | |
| end | |
| end | |
| class Service < CoreService::Service | |
| def finalize | |
| super() | |
| children.values.min {|a, b| a.finalize <=> b.finalize }.finalize | |
| end | |
| def notify(who) | |
| self.logger.info("CoreService: '#{who.address}' stopped") | |
| super(who) | |
| end | |
| def detached | |
| self.logger.info("Minimum of f(x) = #{Function.to_s} is #{self.finalize}") | |
| self.logger.debug("CoreService: detached") | |
| end | |
| def init | |
| super() | |
| self.children.each do |address, agent| | |
| @bus.get(:communicationService).register_listener(agent, address) | |
| end | |
| end | |
| end | |
| class Function | |
| class << self | |
| attr_accessor :a, :b, :line | |
| end | |
| def self.call(x) | |
| eval(to_s) | |
| end | |
| def self.to_s | |
| self.line.strip | |
| end | |
| end | |
| class CommunicationService | |
| def initialize(bus) | |
| @bus = bus | |
| @listeners = Hash.new | |
| @queue = [] | |
| @mutex = Mutex.new | |
| @cond = ConditionVariable.new | |
| @bus.get(:logger).info("Communication: init") | |
| end | |
| def init | |
| @running = true | |
| @thread = Thread.new do | |
| @bus.get(:logger).info("Communication: start") | |
| @mutex.synchronize do | |
| while running? do | |
| @cond.wait(@mutex) if @queue.empty? | |
| until @queue.empty? do | |
| agent, message, listener = @queue.shift | |
| @listeners[agent].on_received_message(message, listener) | |
| end | |
| end | |
| @bus.stopped(self) | |
| end | |
| end | |
| end | |
| def running? | |
| @running | |
| end | |
| def stop | |
| @mutex.synchronize do | |
| @running = false | |
| @cond.signal | |
| end | |
| end | |
| # Comm: FIXME: Wywalic sprawdzanie typu!!! zostawic tylko sprawdzanie obecnosci metod | |
| def register_listener(l, name) | |
| @listeners[name] = l | |
| end | |
| def send_unicast_message_to_agent(agentname, message, listener) | |
| @mutex.synchronize do | |
| raise "listener named '#{agentname}' does not exist!" unless @listeners.has_key?(agentname) | |
| @queue.push([agentname, message, listener]) | |
| @cond.signal | |
| end | |
| end | |
| def send_unicast_message(node_address, service_name, message, listener) | |
| send_unicast_message_to_agent(service_name, message, listener) | |
| end | |
| end | |
| # FIXME: bledna nazwa typu wiadomosci! | |
| # FIXME: kiedy nie ma nazwy a processMoaMessage() zwraca true | |
| # TODO: nazwy parametrow jako symbole ? | |
| # FIXME: | |
| # @toNotify[observable + propertyName]=Array.new if @toNotify[observable + propertyName]==nil | |
| # zamienic na: | |
| # @toNotify[observable + propertyName] ||= Array.new | |
| # | |
| # FIXME: to wyzej jest niebezpieczne np "test" / "owyParametr" lub "testowy" / "Parametr" zarejestruje w zlym miejscu | |
| # rozwiazanie -> podwojny Hash.new { Hash.new { Array.new } } | |
| # odwolywac sie @toNotify[observable][property].push(...) | |
| # | |
| module MonitoringObjectsAttributes::Observable | |
| def properties | |
| @properties ||= MonitoringObjectsAttributes::MonitoredHash.new(self) | |
| end | |
| def []=(prop, value) | |
| self.properties[prop] = value | |
| end | |
| attr_reader :observers | |
| end | |
| class Agent < CoreService::Workplace | |
| attr_reader :minimum | |
| attr_writer :max_iterations | |
| def on_received_message(message, sender, *args) | |
| message.type = :getProperties if message.type == :getproperties # Hack! Hack! Hack! | |
| logger.info("#{self.address}: received message typed as '#{message.type}' from '#{sender}'") | |
| self.service.send_message(self.address, :moa, message) # TODO: rzutowanie typow ? | |
| end | |
| with_message :moa do | |
| logger.info("#{self.address}: MoaMessage: #{receiveMOAMessage(self.message.content)}") | |
| end | |
| def sendMOAMessage(where, msg) | |
| self.service.send_node_message(where, :moaService, msg) | |
| end | |
| after :init do | |
| @observers = Hash.new | |
| @a = Function.a | |
| @b = Function.b | |
| @function = Function | |
| @minimum = 1/0.0 | |
| @other = self.address == "Agent1" ? "Agent2" : "Agent1" | |
| @iterations_without_new_min = 0 | |
| self.stop_condition = CoreService::Condition::Block.new do | |
| @iterations_without_new_min >= @max_iterations | |
| end | |
| self["result"] = 0 | |
| end | |
| def perform | |
| n = self.fetch_all_messages | |
| # say("received #{n} message#{n != 1 ? "s" : ""}") if n > 0 | |
| x = @a + rand()*(@b - @a) | |
| if (min = @function.call(x)) < @minimum | |
| @minimum = min | |
| self.service.send_message(@other, :found, @minimum) | |
| # say("found new minimum = #{@minimum}") | |
| @iterations_without_new_min = 0 | |
| self["result"] = @minimum | |
| else | |
| @iterations_without_new_min += 1 | |
| end | |
| end | |
| with_message :found do | |
| new_min = self.message.content | |
| if @minimum > new_min | |
| @minimum = new_min | |
| self["result"] = @minimum | |
| @iterations_without_new_min = 0 | |
| # say("received new minimum from agent named '#{@other}'") | |
| end | |
| end | |
| def finalize | |
| @minimum | |
| end | |
| protected | |
| def say(msg) | |
| self.logger.info("#{self.address}: #{msg}") | |
| end | |
| end | |
| class MyObserver | |
| include MonitoringObjectsAttributes::Observer | |
| def initialize(bus) | |
| @bus = bus | |
| @mutex = Mutex.new | |
| @cond = ConditionVariable.new | |
| @bus.get(:logger).info("Observer: init") | |
| end | |
| def running? | |
| @mutex.synchronize { @running } | |
| end | |
| def stop | |
| @mutex.synchronize do | |
| @running = false | |
| @cond.signal | |
| end | |
| end | |
| def start | |
| @thread = Thread.new do | |
| @bus.get(:logger).info("MyObserver: start") | |
| @bus.get(:logger).info("MyObserver: getting properties") | |
| props = @bus.get(:moaService).getProperties("test", "Agent1") # FIXME: moze jakis timeout ? | |
| @bus.get(:logger).info("MyObserver: received props: #{props.inspect}") | |
| @bus.get(:moaService).register(self, "Agent1", "result") | |
| while running? do | |
| sleep(1) | |
| @bus.get(:logger).info("Observer: sleeping...") | |
| end | |
| @bus.stopped(self) | |
| end | |
| end | |
| def update(observable, property, value) | |
| @bus.get(:logger).info("Observer: #{observable} has improved minimum to #{value}") | |
| end | |
| end | |
| SERVICES = { | |
| :logger => ExampleServices::LoggerService, | |
| :configService => Configuration::Configuration, | |
| :moaService => MonitoringObjectsAttributes::MOAService, # moze MOA::Service | |
| :communicationService => CommunicationService, | |
| :nameService => NameService, | |
| :coreService => Service, | |
| :observer => MyObserver, | |
| } | |
| config = <<EOF | |
| # mozna zmienic domyslna nazwe drzewa | |
| agents_tree_name: | |
| value: `custom_agents_tree` | |
| custom_agents_tree: | |
| value: | |
| - agent | |
| - agent | |
| agent: | |
| class: Agent | |
| autowire: false | |
| setter-parameters: | |
| max_iterations: 10000 | |
| EOF | |
| print "Give f(x) = " | |
| Function.line = gets | |
| print "Give range <a, b> = " | |
| Function.a, Function.b = gets.split(" ").collect(&:to_f) | |
| bus = Node::Bus.new(SERVICES) | |
| bus.get(:configService).set_configuration(YAML::load(config.gsub("\t", " "))) | |
| bus.start | |
| sleep(10) | |
| bus.stop | |
| bus.wait |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment