#!/usr/bin/env python """ srx.py - SPARQL XML Query Results Parser Copyright 2007, Sean B. Palmer, inamidst.com Licensed under the Eiffel Forum License 2. Package: http://inamidst.com/sw/trio/ """ import itertools, xml.dom.minidom class URI(unicode): pass class bNode(unicode): pass class Literal(tuple): def __new__(cls, lexical, language): return tuple.__new__(cls, (lexical, language)) class DatatypedLiteral(tuple): def __new__(cls, lexical, datatype): return tuple.__new__(cls, (lexical, datatype)) def elements(nodes): for node in nodes: if isinstance(node, xml.dom.minidom.Element): yield node def text(nodes): result = [] for node in nodes: if not isinstance(node, xml.dom.minidom.Element): result.append(node.data) return u''.join(result) def nextID(hint='n'): return hint + str(nextID.counter.next()) nextID.counter = itertools.count(1) rdf = u'http://www.w3.org/1999/02/22-rdf-syntax-ns#' rs = u'http://www.w3.org/2001/sw/DataAccess/tests/result-set#' class Document(object): def __init__(self, doc): self.doc = doc def triple(self, s, p, o): print s, p, o, '.' def parse(self): root = self.doc.documentElement children = elements(root.childNodes) first = children.next() resultSet = self.head(first) second = children.next() if second.localName == 'results': self.results(second, resultSet) elif second.localName == 'boolean': self.boolean(second, resultSet) def head(self, element): resultSet = bNode(nextID('set')) self.triple(resultSet, URI(rdf + u'type'), URI(rs + u'ResultSet')) for child in elements(element.childNodes): if child.localName == 'variable': o = Literal(child.getAttribute('name').decode('utf-8'), None) self.triple(resultSet, URI(rs + u'resultVariable'), o) return resultSet def results(self, element, resultSet): for solution in elements(element.childNodes): sol = bNode(nextID('solution')) self.triple(resultSet, URI(rs + u'solution'), sol) for binding in elements(solution.childNodes): bind = bNode(nextID('binding')) self.triple(sol, URI(rs + u'binding'), bind) o = Literal(binding.getAttribute('name').decode('utf-8'), None) self.triple(bind, URI(rs + u'variable'), o) value = elements(binding.childNodes).next() if value.localName == 'uri': v = URI(text(value.childNodes)) elif value.localName == 'bnode': v = bNode(text(value.childNodes)) elif value.localName == 'literal': lang = value.getAttribute('xml:lang') datatype = unicode(value.getAttribute('datatype')) t = text(value.childNodes) if not datatype: v = Literal(t, lang) else: v = DatatypedLiteral(t, URI(datatype)) self.triple(bind, URI(rs + u'value'), v) def boolean(self, element, resultSet): raise Exception('Not implemented') def parse(filename_or_file): if isinstance(filename_or_file, basestring) and ':' in filename_or_file: import urllib filename_or_file = urllib.urlopen(filename_or_file) doc = xml.dom.minidom.parse(filename_or_file) p = Document(doc) p.parse() if not isinstance(filename_or_file, basestring): filename_or_file.close() # parse('http://www.w3.org/TR/rdf-sparql-XMLres/output.srx') # EOF