This will create the configured number of CONSUMERS and PRODUCERS then start pushing messages as fast as it can.
you will have to use the command sudo killall python -9 to stop the process.
| #!/usr/bin/env python | |
| from boto import sqs | |
| from boto.sqs.message import Message | |
| import threading, logging, time | |
| CONSUMERS = 30 | |
| PRODUCERS = 5 | |
| MAX_MESAGES = 10 | |
| class Counter(threading.Thread): | |
| daemon = True | |
| name = None | |
| interval = 1 | |
| count = 0 | |
| total = 0 | |
| lock = threading.Lock() | |
| def __init__(self, *args, **kwargs): | |
| self.name = kwargs.get('name', None) | |
| super(Counter, self).__init__(*args, **kwargs) | |
| def increment(self): | |
| with self.lock: | |
| self.count += 1 | |
| self.total += 1 | |
| def value(self): | |
| with self.lock: | |
| return self.count | |
| def run(self): | |
| start = time.time() | |
| time.clock() | |
| elapsed = 0 | |
| while elapsed < self.interval: | |
| elapsed = time.time() - start | |
| print('timed:%s:%s'% (self.name, self.count, self.total)) | |
| self.count = 0 | |
| time.sleep(1) | |
| self.run() | |
| class Producer(threading.Thread): | |
| daemon = True | |
| count = 0 | |
| start_time = time.time() | |
| def __init__(self, *args, **kwargs): | |
| # keys should be in your boto config | |
| self.conn = sqs.connect_to_region("us-west-2") | |
| self.queue = self.conn.create_queue('test_test_test_deleteme') | |
| super(Producer, self).__init__(*args, **kwargs) | |
| def message(self, x): | |
| self.count = self.count + 1 | |
| message = Message() | |
| message.set_body('message:%s:%s' % (self.count, x)) | |
| return message | |
| def bulkMessage(self, x): | |
| self.count = self.count + 1 | |
| return ('%s%s'%(self.count, x), 'message:%s:%s'%(self.count, x), 0) | |
| def run(self): | |
| messages = [(self.bulkMessage(i)) for i in range(0,MAX_MESAGES)] | |
| self.queue.write_batch(messages) | |
| end_time = time.time() | |
| self.run() | |
| class Consumer(threading.Thread): | |
| daemon = True | |
| start_time = time.time() | |
| def __init__(self, counter, *args, **kwargs): | |
| # keys should be in your boto config | |
| self.counter = counter | |
| self.conn = sqs.connect_to_region("us-west-2") | |
| self.queue = self.conn.create_queue('test_test_test_deleteme') | |
| super(Consumer, self).__init__(*args, **kwargs) | |
| def run(self): | |
| messages = self.queue.get_messages( | |
| num_messages=MAX_MESAGES, | |
| visibility_timeout=10 | |
| ) | |
| for message in messages or []: | |
| if not message: | |
| return | |
| print(message.get_body()) | |
| self.counter.increment() | |
| if messages: | |
| self.queue.delete_message_batch(messages) | |
| print(self.counter.value()) | |
| self.run(); | |
| def main(): | |
| counter = Counter() | |
| threads = [counter] | |
| [threads.append(Producer()) for i in range(0,PRODUCERS)] | |
| [threads.append(Consumer(counter)) for i in range(0,CONSUMERS)] | |
| for t in threads: t.start() | |
| time.sleep(100) | |
| if __name__ == "__main__": | |
| main() |