#!/usr/bin/python """ An Atom Extensibility Framework Parser. Sean B. Palmer, , 2003-08. Released under the same license as Python 2.2. """ import sys, re, urllib, cStringIO, xml.sax, xml.sax.handler try: from uripath import join as urijoin except ImportError: from urlparse import urljoin as urijoin class Namespace(unicode): def __getattr__(self, name): return self + name class NullAttr(unicode): def __getattr__(self, name): return ('', name) atom = Namespace("http://purl.org/atom/ns#") rdf = Namespace("http://www.w3.org/1999/02/22-rdf-syntax-ns#") x = Namespace("http://www.w3.org/XML/1998/namespace") nons = NullAttr() # # # # # # # # # # # # # # # # # # # # # # Atom Extensibility Framework Grammar # class AtomEFGrammar(object): def triple(self, *args): pass def uri(self, *args): pass def bNode(self, *args): pass def strlit(self, *args): pass def document(self, doc): if doc.xmlns == atom and doc.name == 'atom': for e in doc.children: self.property(e) else: self.property(doc) def property(self, e): attrs = filter(lambda s: s not in (nons.mode, nons.ref), e.attrs) if ((not attrs) and not e.attrs.has_key(nons.ref) and (e.attrs.has_key(nons.mode) or not e.children)): self.literalProp(e) elif attrs: self.complexLiteralProp(e) else: self.simpleProp(e) def simpleProp(self, e): if e.attrs.has_key(nons.ref): uri = urijoin(e.base, e.attrs[nons.ref]) e.subject = self.uri(uri) else: e.subject = self.bNode() if e.parent and e.parent.subject: self.triple(e.parent.subject, self.uri(e.URI), e.subject) for element in e.children: self.property(element) def literalProp(self, e): mode = e.attrs.get(nons.mode) content = e.xtext[1] if e.parent: if not mode and not e.children: if e.lang: object = self.strlit(content, lang=e.lang) else: object = self.strlit(content) self.triple(e.parent.subject, self.uri(e.URI), object) elif mode == 'xml': object = self.strlit(content, dtype=rdf.XMLLiteral) self.triple(e.parent.subject, self.uri(e.URI), object) elif mode: object = self.strlit(content, dtype=atom+mode) self.triple(e.parent.subject, self.uri(e.URI), object) def complexLiteralProp(self, e): mode = e.attrs.get(nons.mode) content = e.xtext[1] e.subject = self.bNode() if e.parent: self.triple(e.parent.subject, self.uri(e.URI), e.subject) for other in filter(lambda s: s not in (nons.mode, nons.ref), e.attrs): if e.lang: object = self.strlit(e.attrs[other], lang=e.lang) else: object = self.strlit(e.attrs[other]) if not other[0]: self.triple(e.subject, self.uri(e.xmlns + other[1]), object) else: self.triple(e.subject, self.uri(other[0] + other[1]), object) if e.children or mode: if e.children or mode == 'xml': object = self.strlit(content, dtype=rdf.XMLLiteral) else: object = self.strlit(content, dtype=atom + mode) self.triple(e.subject, self.uri(rdf.value), object) else: if e.lang: object = self.strlit(content, lang=e.lang) else: object = self.strlit(content) self.triple(e.subject, self.uri(rdf.value), object) # # # # # # # # # # # # # # # # # # # # # # class Element(object): def __init__(self, xmlns, name, attrs, p=None, base=None): self.xmlns, self.name, self.attrs = xmlns, name, attrs or {} self.URI = xmlns + name self.base = attrs.x.get(x.base) or (p and p.base) or base or '' self.lang = attrs.x.get(x.lang) or (p and p.lang) or '' self.parent, self.children, self.text, self.subject = p, [], '', None self.xtext = ['<%s xmlns="%s" %r>' % (name, xmlns, attrs), '', ''] def __getitem__(self, attr): return self.attrs[attr] class Attributes(dict): def __init__(self, attrs): self.attrs, self.x = attrs.items(), {} for (xmlns, name), value in self.attrs: xmlns, name = str(xmlns or ''), str(name) if xmlns == x: self.x[(xmlns, name)] = str(value) else: dict.__setitem__(self, (xmlns, name), str(value)) def __repr__(self): return ' '.join([('xmlns:ns%s="%s" ns%s:%s="%s"' % (i, self.attrs[i][0][0], i, self.attrs[i][0][1], self.attrs[i][1]), '%s="%s"' % (self.attrs[i][0][1], self.attrs[i][1]))[self.attrs[i][0][0] is None] for i in range(len(self.attrs))]) r_id = re.compile(r'^i([rd]+)') r_quot = re.compile(r'(^|[^\\])"') class Term(str): def __new__(cls, s, v): a = str.__new__(cls, s) a.val = v return a def __repr__(self): return self.val class AtomEFParser(AtomEFGrammar): def __init__(self, sink, base=None): self.triple = sink.triple self.tree = [] self.base = base or '' self.genID = 0 def startTag(self, xmlns, name, attrs): if self.tree: e = Element(xmlns, name, Attributes(attrs), self.tree[-1]) else: e = Element(xmlns, name, Attributes(attrs), base=self.base) self.tree += [e] def characterData(self, chars): if self.tree: self.tree[-1].text += chars self.tree[-1].xtext[1] += chars def endTag(self, xmlns, name): element = self.tree.pop() element.xtext[2] += '' if self.tree: self.tree[-1].children += [element] self.tree[-1].xtext[1] += ''.join(element.xtext) else: self.document(element) def uri(self, u): return Term("<%s>" % u, u) def bNode(self, label=None): if label: if not label[0].isalpha(): label = 'b' + label return '_:' + r_id.sub('ir\g<1>', label) self.genID = self.genID + 1 return Term('_:id%s' % (self.genID - 1), (self.genID - 1)) def strlit(self, s, lang=None, dtype=None): if lang and dtype: raise "ParseError", "Can't have both" return Term(''.join(('"%s"' % r_quot.sub('\g<1>\\"', `unicode(s)`[2:-1]).replace("\\'", "'"), lang and ("@" + lang) or '', dtype and ("^^<%s>" % dtype) or '')), s) class SAXAtomEFParser(xml.sax.handler.ContentHandler, AtomEFParser): def __init__(self, sink, base=None): AtomEFParser.__init__(self, sink, base=None) def startElementNS(self, name, qname, attribs): (xmlns, name), attrs = name, dict([(attribs.getNameByQName(n), attribs.getValueByQName(n)) for n in attribs.getQNames()]) self.startTag(xmlns, name, attrs) def characters(self, chars): self.characterData(chars) def endElementNS(self, name, qname): xmlns, name = name self.endTag(xmlns, name) class Sink(object): def __init__(self): self.result = "" def triple(self, s, p, o): self.result += "%s %s %s .\n" % (s, p, o) def write(self): return self.result.rstrip().encode('utf-8') def parseAtomEF(s, base=None, sink=None): sink = sink or Sink() parser = xml.sax.make_parser() parser.start_namespace_decl("xml", x) parser.setFeature(xml.sax.handler.feature_namespaces, 1) try: parser.setFeature(xml.sax.handler.feature_namespace_prefixes, 1) except (xml.sax._exceptions.SAXNotSupportedException, xml.sax._exceptions.SAXNotRecognizedException): pass parser.setContentHandler(SAXAtomEFParser(sink, base)) parser.parse(cStringIO.StringIO(s)) return sink def parseURI(uri, sink=None): return parseAtomEF(urllib.urlopen(uri).read(), base=uri, sink=sink) if __name__=="__main__": if len(sys.argv) != 2: print __doc__ else: print parseURI(sys.argv[1]).write()