Skip to content

Instantly share code, notes, and snippets.

@dejw
Created April 3, 2012 18:58
Show Gist options
  • Select an option

  • Save dejw/2294688 to your computer and use it in GitHub Desktop.

Select an option

Save dejw/2294688 to your computer and use it in GitHub Desktop.
API for remote procedure library done right
# -*- coding: utf-8 -*-
# client.py
from common import FooClient, LockedError, UnauthorizedError
from jsonrpclib import Server, ProtocolError as JsonProtocolError
if __name__ == "__main__":
# using remote
client = FooClient() # in fact FooClient can be also an instance of
# ServiceImplementation class, because it raises
# the same errors, which is awesome!
try:
client.login("invalid-password")
except UnauthorizedError:
client.login("secret")
print client.lock("alice")
try:
print client.lock("bob")
except LockedError, err:
print "already locked by", err.data['username'], "unlock and try again"
print client.unlock("alice")
# the same using pure jsonrpclib and related
# what if my service provider changes his endpoint? I am screwed because I
# need to update my endpoint as well; no need to worry in first case, because
# service endpoint is defined in FooClient
client = Server("http://localhost:9000")
try:
client.login("invalid-password")
except JsonProtocolError, err:
# self explanatory...
code, message = err.args[0]
if code == -32102:
client.login("secret")
else:
raise err
print client.lock("alice")
try:
print client.lock("bob")
except JsonProtocolError, err:
# noo, not again....
code, message = err.args[0]
if code == -32101:
# sigh :(
print "already locked by", "I DO NOT KNOW WHO, BECAUSE I HAVE NO DATA", "unlock and try again"
else:
raise err
print client.unlock("alice")
# -*- coding: utf-8 -*-
# common.py
from remote import ProtocolError, ClientBuilder, JsonTransport
class LockedError(ProtocolError):
code = -32101
class UnauthorizedError(ProtocolError):
code = -32102
FooClient = (ClientBuilder("FooClient")
.endpoint("http://localhost:9000")
.errorClasses(
LockedError,
UnauthorizedError
)
.transport(JsonTransport)
.build())
# -*- coding: utf-8 -*-
# remote.py
# Implementation of remote goes here... (not opensourced yet)
# -*- coding: utf-8 -*-
# server.py
from common import LockedError, UnauthorizedError
from remote import startService
if __name__ == "__main__":
class ServiceImplementation(object):
def __init__(self):
self.locked_by = None
def login(self, password):
if password != "secret":
raise UnauthorizedError
def lock(self, username):
if self.locked_by is not None and self.locked_by != username:
raise LockedError("Locked by %s" % self.locked_by,
data={"username" : self.locked_by})
self.locked_by = username
return {"locked" : username}
def unlock(self, username):
if self.locked_by == username:
self.locked_by = None
return {"unlocked" : self.locked_by == None}
startService(ServiceImplementation())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment