#!/usr/bin/env python """ ntriples.py - N-Triples Parser Copyright 2007, Sean B. Palmer, inamidst.com Licensed under the Eiffel Forum License 2. Package: http://inamidst.com/sw/trio/ """ import re r_comment = re.compile(r'#[\x20-\x7E]*') r_eoln = re.compile(r'\r|\n|\r\n') r_language = re.compile(r'[a-z]+(-[a-z0-9]+)*') r_line = re.compile(r'([^\r\n]+[\r\n]+)(?=[^\r\n])') r_nodeID = re.compile(r'_:[A-Za-z][A-Za-z0-9]*') r_string = re.compile(r'"[^"\\]*(?:\\.[^"\\]*)*"') r_universal = re.compile(r'\?[A-Za-z][A-Za-z0-9]*') r_uriref = re.compile(r'<[^>]*>') r_ws_plus = re.compile(r'[ \t]+') r_ws_star = re.compile(r'[ \t]*') extended = False def parse(s): s = s.replace('\\"', '"') s = s.replace('\\t', '\t') s = s.replace('\\r', '\r') s = s.replace('\\n', '\n') return s.decode('unicode-escape') URI = type('URI', (unicode,), {}) bNode = type('bNode', (unicode,), {}) PlainLiteral = type('PlainLiteral', (tuple,), { '__new__': lambda cls, lexical, language: tuple.__new__(cls, [lexical, language]) }) TypedLiteral = type('TypedLiteral', (tuple,), { '__new__': lambda cls, lexical, datatype: tuple.__new__(cls, [lexical, datatype]) }) class Document(object): def __init__(self, uri, input): self.uri = uri self.baseURI = uri self.input = input self.buffer = u'' self.lines = self.readlines() self.tokens = None self.bindings = {} def readlines(self): while True: bytes = self.input.read(8192) if not bytes: break text = bytes.decode('utf-8') self.buffer += text while True: m = r_line.match(self.buffer) if m: line = m.group(1) yield line self.buffer = self.buffer[m.end():] else: break if self.buffer: yield self.buffer self.buffer = u'' def eat(self, thing): if not self.tokens: raise ValueError('No tokens') if isinstance(thing, basestring): if self.tokens.startswith(thing): self.tokens = self.tokens[len(thing):] return thing else: print 'TOKENS: %r' % self.tokens[:50] raise ValueError('Expected: %s' % thing) elif hasattr(thing, 'pattern'): m = thing.match(self.tokens) if m: self.tokens = self.tokens[m.end():] return m.group(0) raise ValueError('Expected: %s' % thing.pattern) print type(thing), thing raise Exception def triple(self, s, p, o): for t in [type(s), type(p), type(o)]: if t not in (URI, bNode, Literal, DatatypedLiteral): print type(s), type(p), type(o) raise Exception('%s %s %s' % (s, p, o)) print s, p, o, '.' def parse(self): # ntripleDoc = line* try: self.tokens = self.lines.next() except StopIteration: return while self.tokens: self.line() def line(self): # ws* ( comment | triple )? eoln self.eat(r_ws_star) if (not self.tokens.startswith('#') and not self.tokens.startswith('\r') and not self.tokens.startswith('\n')): self.triple_production() elif self.tokens.startswith('#'): self.comment() self.eoln() def comment(self): # '#' ( character - ( cr | lf ) )* self.eat(r_comment) def triple_production(self): # subject ws+ predicate ws+ object ws* '.' ws* s = self.subject() self.eat(r_ws_plus) p = self.predicate() self.eat(r_ws_plus) o = self.object() self.triple(s, p, o) self.eat(r_ws_star) self.eat('.') self.eat(r_ws_star) def subject(self): # uriref | nodeID if self.tokens.startswith('<'): return URI(self.uriref()) elif self.tokens.startswith('_:'): return bNode(self.nodeID()) if not extended: if self.tokens.startswith('"'): return self.literal() elif self.tokens.startswith('?'): return Var(self.universal()) raise ValueError('Expected a valid subject') def predicate(self): # uriref if self.tokens.startswith('<'): return URI(self.uriref()) if not extended: if self.tokens.startswith('_:'): return bNode(self.nodeID()) elif self.tokens.startswith('?'): return Var(self.universal()) raise ValueError('Expected a valid predicate') def object(self): # uriref | nodeID | literal if self.tokens.startswith('<'): return URI(self.uriref()) elif self.tokens.startswith('_:'): return bNode(self.nodeID()) elif self.tokens.startswith('"'): return self.literal() if not extended: if self.tokens.startswith('?'): return Var(self.universal()) raise ValueError('Expected a valid object') def uriref(self): uri = self.eat(r_uriref) if (not extended) and (len(uri) < 3): raise ValueError('URI too small') return parse(uri[1:-1]) def nodeID(self): node = self.eat(r_nodeID) return node[2:] def literal(self): # langString | datatypeString string = self.eat(r_string) value = parse(string[1:-1]) if self.tokens.startswith('^^'): self.eat('^^') uri = self.uriref() return TypedLiteral(value, uri) elif self.tokens.startswith('@'): self.eat('@') language = self.language() else: language = None return PlainLiteral(value, language) def universal(self): univar = self.eat(r_universal) return univar[1:] def language(self): lang = self.eat(r_language) return lang def eoln(self): self.eat(r_eoln) try: self.tokens = self.lines.next() except StopIteration: return def parseFile(uri, f): doc = Document(uri, f) doc.parse() if __name__ == '__main__': print __doc__