Skip to content

Instantly share code, notes, and snippets.

@augustfly
Forked from lupalberto/vaods9.py
Created April 8, 2011 17:17
Show Gist options
  • Select an option

  • Save augustfly/910305 to your computer and use it in GitHub Desktop.

Select an option

Save augustfly/910305 to your computer and use it in GitHub Desktop.
Gus's play on Omar's sampy code
# src: https://gist.github.com/906716/82e170c4952a75fde3fdc1e5c5f008979e1a8364
import sampy
import urlparse
mtype_lu = {}
mtype_lu['votable'] = {'mtype':'table.load.votable',
'params':['name','url','table-id']}
mtype_lu['fits_table'] = {'mtype':'table.fits.votable' ,
'params':['name','url','table-id']}
mtype_lu['fits_image'] = {'mtype':'image.load.fits' ,
'params':['name','url','image-id']}
mtype_lu['pan'] = {'mtype':'coord.pointAt.sky' ,
'params':['ra','dec']}
def furl(url):
f = urlparse.urlsplit(url)[:]
fl = list(f)
if f[0] == '': fl[0] = 'file'
return urlparse.urlunsplit(fl)
def ds9echo(response):
print response
class InterCom(sampy.SAMPIntegratedClient):
def __init__(self, name="VAO Python Client",
description="An interactive SAMP client for Python by the Virtual Astronomical Observatory",
version="0.0.1"):
self.metadata = {"samp.name": name,
"samp.description.text": description,
"client.version": version}
sampy.SAMPIntegratedClient.__init__(self)
if len(self.hub.getRunningHubs()) == 0:
self.samp_hub = sampy.SAMPHubServer()
self.samp_hub.start()
else:
self.samp_hub = self.hub.getRunningHubs()
self.connect()
self.declareMetadata(self.metadata)
self.mtype_lu = mtype_lu
def cleanup(self):
self.disconnect()
if isinstance(self.samp_hub,sampy.SAMPHubServer):
self.samp_hub.stop()
print "client disconnected"
def handler_wrapper(self, private_key, sender_id, msg_id, response):
response = response['samp.result']['value']
self.handler(response)
def messsages(self):
for x in self.mtype_lu: print x, self.mtype_lu[x]['params']
def send(self, mtype, *args):
mt = self.mtype_lu[mtype]['mtype']
pa = mtype_lu[mtype]['params']
ps = args
pt = dict(zip(pa,ps))
message = {'samp.mtype': mt,
'samp.params': pt}#{'name': name, 'url': url}}
self.callAll("vao_InterCom_send", message)
class ds9Com(InterCom):
""" subclass with ds9 private xpa-like functions """
def __init__(self):
InterCom.__init__(self)
def set(self, cmd, url=""):
message = {"samp.mtype": "ds9.set",
"samp.params":{"cmd": cmd, "url":url}}
self.callAll("vaods9set", message)
def get(self, cmd, url="", tag="vaods9get", function=ds9echo):
self.bindReceiveResponse(tag, self.handler_wrapper)
self.handler = function
message = {"samp.mtype": "ds9.get",
"samp.params":{"cmd": cmd, "url":url}}
self.callAll(tag, message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment