# # xmlrpi.py 1.0 by Mark Paschal # Sends and responds XML-RPI messages over AIM. # # XML-RPI # http://frontier.userland.com/tcpIm # Requires Jamie Turner's PyTOC # http://www.jamwt.com/Py-TOC/ # # Changelog: # 1.0 17 October 2002: first version # # Future enhancements: # Handle methods with periods in their names correctly. # Better handling of potential feedback loops. # import sys from types import MethodType, StringType import xmllib import xmlrpclib import base64 from toc import TocTalk, TOCError def xmldecode(msg): """Return the parameter with any of the standard XML character entities decoded.""" return msg.replace('<','<').replace('>','>').replace(''',"'").replace('"','"').replace('&','&') def xmlencode(msg): """Return the parameter with any of XML's standard special characters present entity encoded.""" return msg.replace('&','&').replace('<','<').replace('>','>').replace("'",''').replace('"','"') class XmlrpiBot(TocTalk): """ A PyTOC bot that sends and responds to XML-RPI messages. Has protection against immediate feedback loops. To add your own responder, subclass XmlrpiBot and define your function to invoke as a method, with signature: fnName(self, sname, *args) The args array will be the arguments from afar. Added methods: do_SEND_RPI(self, sname, params, methodname=None, methodresponse=None, encoding=None) """ def __init__(self, sname, pw): TocTalk.__init__(self, sname, pw) self._debug = 0 self.lastNormalMessage = None def on_IM_IN(self,data): try: (sname,flags,msg) = data.split(":", 3) except ValueError: # Since we can't even get the screenname to send an error, we can't do anything. return # Translate the XML-RPC message body into params and method. try: (params, method) = xmlrpclib.loads(xmldecode(msg)) except xmllib.Error: # Not an XML-RPC message. # Might we be in a loop? if (sname,flags,msg) <> self.lastNormalMessage: # No, it's OK to say something. self.do_SEND_IM(sname, "I only accept XML-RPC calls, buddy.", 1) self.lastNormalMessage = (sname,flags,msg) if self._debug > 0: print "Got a non-XML-RPI message from %s: %s" % (sname, msg) else: if self._debug > 0: print "Got a non-XML-RPI message from %s to which I'm not responding: %s" % (sname, msg) return # String parameters are base64ed. Unbase64 them. params = list(params) for i in range(0,len(params)): if StringType == type(params[i]): params[i] = base64.decodestring(params[i]) if self._debug > 0: print "XML-RPI message from %s:" % sname, params params.insert(0, sname) # Invoke the method. if hasattr(self, method): if callable(getattr(self, method)): try: apply(getattr(self, method), params) return except: self.do_SEND_RPI(sname, xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))) return self.do_SEND_RPI(sname, xmlrpclib.Fault(-32601, "No such method '%s'" % (method))) def do_SEND_RPI(self, sname, params, methodname=None, methodresponse=None, encoding=None): """ do_SEND_RPI(self, sname, params, methodname=None, methodresponse=None, encoding=None) Sends the given XML-RPI invocation to the given screenname (sname). The other parameters are the same as xmlrpclib.dumps' parameters. """ if not isinstance(params, xmlrpclib.Fault): # String parameters are base64ed. Base64 them. params = list(params) for i in range(0, len(params)): if StringType == type(params[i]): params[i] = base64.encodestring(params[i]) params = tuple(params) # Send as normal. self.do_SEND_IM(sname, xmlencode(xmlrpclib.dumps(params, methodname, methodresponse, encoding))) if '__main__' == __name__: uname, passw, debuglevel = None, None, 0 import getopt try: opts, args = getopt.getopt(sys.argv[1:], "d:u:p:") for opt, arg in opts: if "-d" == opt: debuglevel = int(arg) if "-u" == opt: uname = arg if "-p" == opt: passw = arg if None == uname or None == passw: raise getopt.GetoptError("Missing option", None) except getopt.GetoptError: print "Try: %s -u -p [-d 1|-d 2]" % (sys.argv[0]) sys.exit(1) class StateBot(XmlrpiBot): """Trivial XmlrpiBot. Responds to getStateName(number) by calling XML-RPI function examples.msg(name), where "name" is the name of the number'th state of the United States.""" states = ('Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado', 'Connecticut', 'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New Hampshire', 'New Jersey', 'New Mexico', 'New York', 'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon', 'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington', 'West Virginia', 'Wisconsin', 'Wyoming') def getStateName(self, sname, *args): self.do_SEND_RPI(sname, (StateBot.states[args[0]],), "examples.msg") bot = StateBot(uname, passw) if debuglevel > 0: bot._debug = debuglevel try: bot.go() except TOCError: print "Oops, %s: %s" % (sys.exc_type, sys.exc_value)