Skip to content

Instantly share code, notes, and snippets.

@dejw
Created January 12, 2010 19:25
Show Gist options
  • Select an option

  • Save dejw/275515 to your computer and use it in GitHub Desktop.

Select an option

Save dejw/275515 to your computer and use it in GitHub Desktop.
require "rubygems"
require "Node"
require "Configuration"
require "CoreService"
require "MonitoringObjectsAttributes"
require "example-services"
require "mocha"
class NameService
def initialize(bus)
(@mock = Mocha::Mock.new).stubs(:register_address).returns("Agent1", "Agent2")
end
def register_address(suggest)
@mock.register_address
end
end
class Service < CoreService::Service
def finalize
super()
children.values.min {|a, b| a.finalize <=> b.finalize }.finalize
end
def notify(who)
self.logger.info("CoreService: '#{who.address}' stopped")
super(who)
end
def detached
self.logger.info("Minimum of f(x) = #{Function.to_s} is #{self.finalize}")
self.logger.debug("CoreService: detached")
end
def init
super()
self.children.each do |address, agent|
@bus.get(:communicationService).register_listener(agent, address)
end
end
end
class Function
class << self
attr_accessor :a, :b, :line
end
def self.call(x)
eval(to_s)
end
def self.to_s
self.line.strip
end
end
class CommunicationService
def initialize(bus)
@bus = bus
@listeners = Hash.new
@queue = []
@mutex = Mutex.new
@cond = ConditionVariable.new
@bus.get(:logger).info("Communication: init")
end
def init
@running = true
@thread = Thread.new do
@bus.get(:logger).info("Communication: start")
@mutex.synchronize do
while running? do
@cond.wait(@mutex) if @queue.empty?
until @queue.empty? do
agent, message, listener = @queue.shift
@listeners[agent].on_received_message(message, listener)
end
end
@bus.stopped(self)
end
end
end
def running?
@running
end
def stop
@mutex.synchronize do
@running = false
@cond.signal
end
end
# Comm: FIXME: Wywalic sprawdzanie typu!!! zostawic tylko sprawdzanie obecnosci metod
def register_listener(l, name)
@listeners[name] = l
end
def send_unicast_message_to_agent(agentname, message, listener)
@mutex.synchronize do
raise "listener named '#{agentname}' does not exist!" unless @listeners.has_key?(agentname)
@queue.push([agentname, message, listener])
@cond.signal
end
end
def send_unicast_message(node_address, service_name, message, listener)
send_unicast_message_to_agent(service_name, message, listener)
end
end
# FIXME: bledna nazwa typu wiadomosci!
# FIXME: kiedy nie ma nazwy a processMoaMessage() zwraca true
# TODO: nazwy parametrow jako symbole ?
# FIXME:
# @toNotify[observable + propertyName]=Array.new if @toNotify[observable + propertyName]==nil
# zamienic na:
# @toNotify[observable + propertyName] ||= Array.new
#
# FIXME: to wyzej jest niebezpieczne np "test" / "owyParametr" lub "testowy" / "Parametr" zarejestruje w zlym miejscu
# rozwiazanie -> podwojny Hash.new { Hash.new { Array.new } }
# odwolywac sie @toNotify[observable][property].push(...)
#
module MonitoringObjectsAttributes::Observable
def properties
@properties ||= MonitoringObjectsAttributes::MonitoredHash.new(self)
end
def []=(prop, value)
self.properties[prop] = value
end
attr_reader :observers
end
class Agent < CoreService::Workplace
attr_reader :minimum
attr_writer :max_iterations
def on_received_message(message, sender, *args)
message.type = :getProperties if message.type == :getproperties # Hack! Hack! Hack!
logger.info("#{self.address}: received message typed as '#{message.type}' from '#{sender}'")
self.service.send_message(self.address, :moa, message) # TODO: rzutowanie typow ?
end
with_message :moa do
logger.info("#{self.address}: MoaMessage: #{receiveMOAMessage(self.message.content)}")
end
def sendMOAMessage(where, msg)
self.service.send_node_message(where, :moaService, msg)
end
after :init do
@observers = Hash.new
@a = Function.a
@b = Function.b
@function = Function
@minimum = 1/0.0
@other = self.address == "Agent1" ? "Agent2" : "Agent1"
@iterations_without_new_min = 0
self.stop_condition = CoreService::Condition::Block.new do
@iterations_without_new_min >= @max_iterations
end
self["result"] = 0
end
def perform
n = self.fetch_all_messages
# say("received #{n} message#{n != 1 ? "s" : ""}") if n > 0
x = @a + rand()*(@b - @a)
if (min = @function.call(x)) < @minimum
@minimum = min
self.service.send_message(@other, :found, @minimum)
# say("found new minimum = #{@minimum}")
@iterations_without_new_min = 0
self["result"] = @minimum
else
@iterations_without_new_min += 1
end
end
with_message :found do
new_min = self.message.content
if @minimum > new_min
@minimum = new_min
self["result"] = @minimum
@iterations_without_new_min = 0
# say("received new minimum from agent named '#{@other}'")
end
end
def finalize
@minimum
end
protected
def say(msg)
self.logger.info("#{self.address}: #{msg}")
end
end
class MyObserver
include MonitoringObjectsAttributes::Observer
def initialize(bus)
@bus = bus
@mutex = Mutex.new
@cond = ConditionVariable.new
@bus.get(:logger).info("Observer: init")
end
def running?
@mutex.synchronize { @running }
end
def stop
@mutex.synchronize do
@running = false
@cond.signal
end
end
def start
@thread = Thread.new do
@bus.get(:logger).info("MyObserver: start")
@bus.get(:logger).info("MyObserver: getting properties")
props = @bus.get(:moaService).getProperties("test", "Agent1") # FIXME: moze jakis timeout ?
@bus.get(:logger).info("MyObserver: received props: #{props.inspect}")
@bus.get(:moaService).register(self, "Agent1", "result")
while running? do
sleep(1)
@bus.get(:logger).info("Observer: sleeping...")
end
@bus.stopped(self)
end
end
def update(observable, property, value)
@bus.get(:logger).info("Observer: #{observable} has improved minimum to #{value}")
end
end
SERVICES = {
:logger => ExampleServices::LoggerService,
:configService => Configuration::Configuration,
:moaService => MonitoringObjectsAttributes::MOAService, # moze MOA::Service
:communicationService => CommunicationService,
:nameService => NameService,
:coreService => Service,
:observer => MyObserver,
}
config = <<EOF
# mozna zmienic domyslna nazwe drzewa
agents_tree_name:
value: `custom_agents_tree`
custom_agents_tree:
value:
- agent
- agent
agent:
class: Agent
autowire: false
setter-parameters:
max_iterations: 10000
EOF
print "Give f(x) = "
Function.line = gets
print "Give range <a, b> = "
Function.a, Function.b = gets.split(" ").collect(&:to_f)
bus = Node::Bus.new(SERVICES)
bus.get(:configService).set_configuration(YAML::load(config.gsub("\t", " ")))
bus.start
sleep(10)
bus.stop
bus.wait
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment