Skip to content

Instantly share code, notes, and snippets.

@lurenx
Created March 24, 2016 09:58
Show Gist options
  • Select an option

  • Save lurenx/477f0559af7bc4e72673 to your computer and use it in GitHub Desktop.

Select an option

Save lurenx/477f0559af7bc4e72673 to your computer and use it in GitHub Desktop.
ruby avro kafka
require 'poseidon'
require 'avro'
require 'json'
schema = Avro::Schema.parse(File.open("item.avsc", "rb").read)
dw = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum = {"name" => "Desktop", "description" => "Office and Personal Usage", "price" => 30000}
dw.write(datum, encoder)
producer = Poseidon::Producer.new(["192.168.100.88:9092"], "my_avro_test_producer")
messages = []
messages << Poseidon::MessageToSend.new("avro_test",buffer.string)
producer.send_messages(messages)
consumer = Poseidon::PartitionConsumer.new("my_avro_test_consumer", "192.168.100.88", 9092, "avro_test", 0, :earliest_offset)
messages = consumer.fetch
messages.each do |fm|
p fm.value
stringreader = StringIO.new(fm.value)
decoder = Avro::IO::BinaryDecoder.new(stringreader)
datumreader = Avro::IO::DatumReader.new(schema)
#read the message
read_value = datumreader.read(decoder)
p read_value
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment