Skip to content

Instantly share code, notes, and snippets.

@dejw
Created January 13, 2010 22:56
Show Gist options
  • Select an option

  • Save dejw/276649 to your computer and use it in GitHub Desktop.

Select an option

Save dejw/276649 to your computer and use it in GitHub Desktop.
require "rubygems"
require "Node"
require "Configuration"
require "CoreService"
require "MonitoringObjectsAttributes"
require "sga"
require "SimpleAgents"
require "actions"
require "example-services"
require "mocha"
include SimpleGeneticAlgorithm
# Do dzialania potrzebny jest plik:
# https://lab.iisg.agh.edu.pl/svn/toik/branches/rageA/CoreService/integration/sga.rb
# ktory poprawia bugi w algorytmie genetycznym
# TODO & FIXME (modul = linia, ...)
# Actions = 91
# MOA = 96
# SimpleAgents = 135, 154
class NameService
def initialize(bus)
(@mock = Mocha::Mock.new).stubs(:register_address).returns("Genetic", "Simple1", "Aggregate1", "Simple2", "Simple3")
end
def register_address(suggest)
@mock.register_address
end
end
class CommunicationService
def initialize(bus)
@bus = bus
@listeners = Hash.new
@queue = []
@mutex = Mutex.new
@cond = ConditionVariable.new
@bus.get(:logger).info("Communication: init")
end
def init
@running = true
@thread = Thread.new do
@bus.get(:logger).info("Communication: start")
@mutex.synchronize do
while running? do
@cond.wait(@mutex) if @queue.empty?
until @queue.empty? do
agent, message, listener = @queue.shift
@bus.get(:logger).info("Communication: sending '#{listener}' -> '#{agent}'")
@listeners[agent].on_received_message(message, listener)
end
end
@bus.stopped(self)
end
end
end
def running?
@running
end
def stop
@mutex.synchronize do
@running = false
@cond.signal
end
end
def register_listener(l, name)
@listeners[name] = l
end
def send_unicast_message_to_agent(agentname, message, listener)
send_unicast_message(nil, listener, message, listener)
end
def send_unicast_message(node_address, service_name, message, listener)
@mutex.synchronize do
raise "listener named '#{service_name}' does not exist!" unless @listeners.has_key?(service_name)
@queue.push([service_name, message, listener])
@cond.signal
end
end
end
# TODO: dopisac to gdzies w module akcji
Aggregate.__send__(:include, Actions::ActionExecutor)
module MOA
class MOAService
# FIXME: poprawiony timeout - cala metoda do skopiowania
def getProperties(observer,observable, timeout = 2)
raise ArgumentError, "Nil argument" unless !(observer.nil? || observer.nil? )
ret = nil
condVar = ConditionVariable.new
if @awaitingObservers[observable].nil? then
@awaitingObservers[observable]=[condVar]
else
@awaitingObservers[observable].push(condVar)
end
#sends message and blocks
sendMessage(observable, MOAMessage.new(:getproperties,nil,observable,nil))
no_timeout = Thread.new do
@mutex.synchronize { condVar.wait(@mutex) }
end.join(timeout)
@incomingLists[observable] = TimeoutException.new() unless no_timeout
ret=@incomingLists[observable]
if ret!=nil && !ret.kind_of?(Exception) then
@awaitingObservers[observable].delete(condVar)
if @awaitingObservers[observable]==nil then
@awaitingObservers.delete(observable)
@incomingList.delete(observable)
end
end
if ret==nil
raise CommunicationException.new
elsif ret.kind_of?(Exception)
raise ret
else
ret
end
end
end
end
# FIXME: Metody do dodania
class SimpleAgent
include CoreService::Filters # Mamy taka super klase dzieki czemu mozna pisac jak ponizej w MySimpleAgent
attr_reader :parent
def address=(address)
self.properties['agentID'] = address
end
def address
self.properties['agentID']
end
def logger
@parent.logger
end
end
# FIXME: jak wyzej
class Aggregate
def agents=(agents)
@children.each_value { |a| self.remove_child(a) }
agents.each { |a| self.add_child(a) }
end
def process_actions
self.executeActions
end
end
class Service < CoreService::Service
# -----
alias_method :real_dispatch_moa_message, :dispatch_moa_message
def dispatch_moa_message(address, message)
send_remote_message(CoreService::AgentMessage.new(address, :moa, message))
end
# Pozwala na zdefiniowanie bloku, który będzie wywoływany
# podczas przetwarzania wiadomości podanym typie. Blok zostanie
# wywołany w kontekście obiektu. Patrz Workplace#with_message.
#
# _type_:: typ wiadomości do przetworzenia
# _block_:: blok przetwarzający
def self.with_message(type = nil, &block)
raise ArgumentError, "needs a block" unless block_given?
message_processors(type).push(block)
end
def self.message_processors(type)
@@message_processors ||= {}
@@message_processors[type] ||= []
end
def message=(msg)
@message = msg
end
attr_reader :message
with_message :moa do
real_dispatch_moa_message(self.message.receiver, self.message.content)
end
def process_remote_message(message)
if message.is_a?(CoreService::AgentMessage)
[message.type, nil].each do |type|
self.class.message_processors(type).each do |processor|
self.message = message
self.instance_eval(&processor)
self.message = nil
end
end
end
end
# -----
def notify(who)
self.logger.info("CoreService: '#{who.address}' stopped")
super(who)
end
def detached
self.logger.debug("CoreService: detached")
end
def init
super()
self.children.each do |address, agent|
@bus.get(:communicationService).register_listener(agent, address)
end
end
end
class GeneticWorkplace < CoreService::Workplace
# ---- Do przeniesienia do CoreService/Agent.rb -----
def dispatch_moa_message(address, message)
if address == self.address
self.send_message(address, :moa, message)
else
@agents.each_value do |agent|
agent.dispatch_moa_message(address, message)
end
end
end
def on_received_message(message, sender, *args)
raise NotImplementerError, "!!"
end
def init(service)
raise StateError, "Workplace already or not initialized!" unless @__state == :created
@service = service
@address = @service.register_address( WorkplaceAddress )
self.children.each do |agent|
agent.parent = self
agent.address = self.register_address
agent.init
@agents[agent.address] = agent
end
@__state = :initialized
self
end
# ----
# gen_operators - operatory genetyczne (dowolna liczba argumentow klasy pochodnej od GeneticOperator)
attr_writer :dimension, :gen_operators, :init_size, :iterations
after :init do
@gen_operators ||= []
@fit_function = FitnessFunction.new(@dimension)
@gen_repr = GenotypeRepresentation.new(@dimension)
self.stop_condition = CoreService::Condition::Times.new(@iterations)
self.logger.info("Genetic: dimension = #{@dimension}")
self.logger.info("Genetic: population_size = #{@init_size}")
self.logger.info("Genetic: iterations = #{@iterations}")
self.logger.info("Genetic: gen_operators = [#{@gen_operators.collect(&:class).join(", ")}]")
self.population_size = @init_size
self['best_evaluation'] = @fit_function.evaluate(@population[0])
self['best_inidividual'] = @gen_repr.copy_individual(@population[0])
self[:iteration] = @iterations
@last_population = evaluate_population()
end
def perform
self.fetch_all_messages
pop_distribution = Array.new(@population_size)
pop_evaluation = @last_population
self[:iteration] = self.stop_condition.times if self.stop_condition.times % 100 == 0
eval_sum = pop_evaluation.inject(0) { |sum, eval| sum + eval }
# wygeneruj dystrybuante rozkladu prawdopodobienstw wyboru osobnikow do nowej populacji
pop_distribution[0] = 1 - pop_evaluation[0]/eval_sum
1.upto(@population_size-2) { |i| pop_distribution[i] = pop_distribution[i-1] + (1 - pop_evaluation[0]/eval_sum) }
pop_distribution[@population_size-1] = 1.0
# stworz nowa populacje
new_population = Array.new(@population_size)
0.upto(@population_size-1) do |index|
i = give_index(pop_distribution)
new_population[index] = @gen_repr.copy_individual(@population[i])
end
# przeprowadz mutacje wedlug kolejnych operatorow
selected_individuals = Array.new()
selected_indices = Array.new()
@gen_operators.each do |operator|
@population_size.times do
selected_individuals.clear
selected_indices.clear
(operator.no_of_args).times do
i = rand(@population_size)
selected_individuals.push(new_population[i])
selected_indices.push(i)
end
mutated_individuals = operator.perform(*selected_individuals)
selected_indices.each_with_index { |i, index| new_population[i] = mutated_individuals[index] }
end
end
@population = new_population
# ocen biezaca populacje (po wykonaniu operatorow genetycznych)
@last_population = evaluate_population()
end
protected
# Akcesor umozliwiajacy ustawienie nowej liczebnosci populacji.
def population_size=(pop_size)
if pop_size != @population_size
raise ArgumentError.new("Population size must be positive integer") unless pop_size > 0
@population_size = pop_size
reset_population()
end
end
# Usuwa aktualna populacje i generuje nowa.
def reset_population
(@population ||=[]).clear
0.upto(@population_size-1) { @population.push(@gen_repr.create_individual) }
@best_current_evaluation = @fit_function.evaluate(@population[0])
@best_current_individual = @gen_repr.copy_individual(@population[0])
end
# Ocenia poszczegolne osobniki aktualnej populacji - uaktualniajac
# dane najlepszego dotychczas znalezionego osobnika
def evaluate_population()
pop_evaluation = Array.new(@population_size)
best_current_evaluation = self['best_evaluation']
best_current_individual = self['best_inidividual']
0.upto(@population_size-1) do |i|
pop_evaluation[i] = @fit_function.evaluate(@population[i])
if pop_evaluation[i] < best_current_evaluation
best_current_evaluation = pop_evaluation[i]
best_current_individual = @gen_repr.copy_individual(@population[i])
end
end
if best_current_evaluation < self['best_evaluation']
self['best_evaluation'] = best_current_evaluation
self['best_inidividual'] = @gen_repr.copy_individual(best_current_individual)
end
pop_evaluation
end
# Zwraca losowy indeks elementu wedlug podanej dystrybuanty
def give_index(pop_distr)
rand_num = rand
i = 0
until rand_num < pop_distr[i]
i += 1
end
i
end
end
class GeneticObserver
include MOA::Observer
def initialize(bus)
@bus = bus
@mutex = Mutex.new
@cond = ConditionVariable.new
@bus.get(:logger).info("#{self.class}: init")
end
def start
@running = true
@thread = Thread.new do
@bus.get(:logger).info("#{self.class}: start")
@bus.get(:logger).info("#{self.class}: getting properties")
props = @bus.get(:moaService).getProperties("test", "Genetic")
@bus.get(:logger).info("#{self.class}: received properties: #{props.inspect}")
props.each_key do |key|
@bus.get(:moaService).register(self, "Genetic", key)
end
@bus.stopped(self)
end
end
def update(observable, property, value)
@bus.get(:logger).info("#{self.class}: #{observable} has changed '#{property}' to #{value.inspect}")
end
end
class SimpleObserver < GeneticObserver
def start
@running = true
@thread = Thread.new do
@bus.get(:logger).info("#{self.class}: start")
@bus.get(:logger).info("#{self.class}: getting properties")
props = @bus.get(:moaService).getProperties("test", "Simple2")
@bus.get(:logger).info("#{self.class}: received properties: #{props.inspect}")
props.each_key do |key|
@bus.get(:moaService).register(self, "Simple2", key)
end
@bus.stopped(self)
end
end
end
class MutationStrategy
def initialize(agent)
@agent = agent
end
def run
for i in @agent.individuals
i.each_with_index do |element, idx|
i[idx] += rand(3) * (rand(2) == 0 ? 1 : -1)
end
end
end
end
class MyAggregate < Aggregate
after :init do
logger.info("#{self.address}: init")
end
end
class MySimpleAgent < SimpleAgent
attr_accessor :individuals
after :init do
logger.info("#{self.address}: init")
self['best_sum'] = 0
@individuals ||= []
end
after :step do
self['dummy'] = rand() if rand() < 0.1
for i in @individuals
i[0]=i[1]+i[2]
self['best_sum'] = i[0] if self['best_sum'] < i[0]
end
self.parent.addAction(Actions::UseStrategyAction.new(MutationStrategy.new(self))) if self.parent.respond_to?(:addAction)
end
end
config = <<EOF
agents_tree:
value:
- genetic
genetic:
class: GeneticWorkplace
setter-parameters:
iterations: 100
init_size: 500
dimension: 2
gen_operators:
type: array
contents:
0..4: mutation
5: crossover
children:
- simple
- aggregate
aggregate:
class: MyAggregate
setter-parameters:
agents:
- simple2
- simple
simple:
class: MySimpleAgent
simple2:
class: MySimpleAgent
setter-parameters:
individuals:
-
- 0
- 2
- 3
-
- 0
- 7
- 1
mutation:
class: SimpleGeneticAlgorithm::MutationOperator
constructor-parameters:
- 1
crossover:
class: SimpleGeneticAlgorithm::CrossoverOperator
constructor-parameters:
- 1
EOF
class Bus < Node::Bus
def stopped(service)
super(service)
if service.is_a?(Service)
self.stop
self.wait
end
end
end
SERVICES = {
:logger => ExampleServices::LoggerService,
:configService => Configuration::Configuration,
:moaService => MOA::MOAService,
:communicationService => CommunicationService,
:nameService => NameService,
:coreService => Service,
:simpleObserver => SimpleObserver,
:geneticObserver => GeneticObserver,
}
bus = Bus.new(SERVICES)
bus.get(:configService).set_configuration(YAML::load(config.gsub("\t", " ")))
bus.start
bus.wait
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment