Skip to content

Instantly share code, notes, and snippets.

@janzhen
Last active August 29, 2015 14:00
Show Gist options
  • Select an option

  • Save janzhen/11204046 to your computer and use it in GitHub Desktop.

Select an option

Save janzhen/11204046 to your computer and use it in GitHub Desktop.
Distributing Lock
"""Distributing lock."""
from contextlib import contextmanager
SLEEP_SECONDS = .1
@contextmanager
def redis_lock(redis_client, key, timeout, expires):
"""Redis based lock."""
import time
try:
locked = False
for _ in xrange(int(timeout/SLEEP_SECONDS)):
expires_time = time.time() + expires + 1
if redis_client.setnx(key, expires_time):
# gained the lock
locked = True
break
current_expires_time = redis_client.get(key)
if (
current_expires_time
and float(current_expires_time) < time.time()
and redis_client.getset(key, expires_time) < time.time()
):
# found expired lock and replaced it, gained the lock
locked = True
break
# failed, wait and try again
time.sleep(SLEEP_SECONDS)
# timeout
yield locked
finally:
if locked and time.time() < expires_time:
redis_client.delete(key)
@contextmanager
def memcache_lock(memcache_client, key, timeout, expires):
"""Memcache based lock."""
import time
try:
locked = False
for _ in xrange(int(timeout/SLEEP_SECONDS)):
if memcache_client.add(key, 1, expires):
# gained the lock
locked = True
expires_time = time.time() + expires + 1
break
# failed, wait and try again
time.sleep(SLEEP_SECONDS)
# timeout
yield locked
finally:
if locked and time.time() < expires_time:
memcache_client.delete(key)
def test():
from time import sleep
from threading import Thread
import redis
redis_client = redis.StrictRedis()
class RedisLockTestingThread(Thread):
def run(self):
print 'Try to get a lock...'
with redis_lock(redis_client, 'test-key', 10, 20) as locked:
if locked:
print 'Gained lock. Sleep for a while.'
sleep(1)
else:
print 'Failed.'
import memcache
memcache_client = memcache.Client(['localhost:11211'])
class MemcacheLockTestingThread(Thread):
def run(self):
print 'Try to get a lock...'
with memcache_lock(memcache_client, 'test-key', 10, 20) as locked:
if locked:
print 'Gained lock. Sleep for a while.'
sleep(1)
else:
print 'Failed.'
print 'Test redis lock...'
thread_list = [RedisLockTestingThread() for _ in range(10)]
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
print 'Test memcache lock...'
thread_list = [MemcacheLockTestingThread() for _ in range(10)]
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment