Skip to content

Instantly share code, notes, and snippets.

@lurenx
Created March 24, 2016 10:19
Show Gist options
  • Select an option

  • Save lurenx/0a1e11c4cb086da2c73b to your computer and use it in GitHub Desktop.

Select an option

Save lurenx/0a1e11c4cb086da2c73b to your computer and use it in GitHub Desktop.
kafka Consumer
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('my-topic',
group_id='my_group',
bootstrap_servers=['localhost:9092'])
schema_path="user.avsc"
schema = avro.schema.parse(open(schema_path).read())
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
user1 = reader.read(decoder)
print user
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment