#!/usr/bin/env python """ turtle.py - Turtle Parser Copyright 2007, Sean B. Palmer, inamidst.com Licensed under the Eiffel Forum License 2. Package: http://inamidst.com/sw/trio/ Cf. http://www.dajobe.org/2004/01/turtle/ """ import re if __import__('sys').maxunicode <= 0xffff: warning = ("Warning: You're using a Narrow Python build", "This means that Turtle parsing won't be fully compliant", "Use a python compiled with --enable-unicode=ucs4 to fix this") for line in warning: print >> __import__('sys').stderr, line r_boolean = re.compile(r'true|false') r_comment = re.compile(r'#[^\r\n]*') # r_decimal = re.compile(r'[+-]?([0-9]+\.[0-9]*|\.([0-9])+|([0-9])+)') r_decimal = re.compile(r'[+-]?([0-9]+\.[0-9]*|\.([0-9])+)') r_double = re.compile(r'[+-]?([0-9]+\.[0-9]*[eE][+-]?[0-9]+|\.[0-9]+[eE]' + r'[+-]?[0-9]+|[0-9]+[eE][+-]?[0-9]+)') r_integer = re.compile(r'[+-]?[0-9]+') r_language = re.compile(r'[a-z]+(-[a-z0-9]+)*') r_lcharacters = re.compile(r'(?s)[^"\\]*(?:(?:\\.|"(?!""))[^"\\]*)*') r_line = re.compile(r'([^\r\n]+[\r\n]+)(?=[^\r\n])') r_nameChar_extra = re.compile(ur'[-0-9\xB7\u0300-\u036F\u203F-\u2040]') try: r_nameStartChar = re.compile( u'[A-Z_a-z\u00C0-\u00D6\u00D8-\u00F6\u00F8-\u02FF\u0370-\u037D' + u'\u037F-\u1FFF\u200C-\u200D\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF' + u'\uF900-\uFDCF\uFDF0-\uFFFD\U00010000-\U000EFFFF]' ) except: r_nameStartChar = re.compile( u'[A-Z_a-z\u00C0-\u00D6\u00D8-\u00F6\u00F8-\u02FF\u0370-\u037D' + u'\u037F-\u1FFF\u200C-\u200D\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF' + u'\uF900-\uFDCF\uFDF0-\uFFFD]' ) try: r_nameStartChar_minus_underscore= re.compile( u'[A-Za-z\u00C0-\u00D6\u00D8-\u00F6\u00F8-\u02FF\u0370-\u037D' + u'\u037F-\u1FFF\u200C-\u200D\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF' + u'\uF900-\uFDCF\uFDF0-\uFFFD\U00010000-\U000EFFFF]' ) except: r_nameStartChar_minus_underscore= re.compile( u'[A-Za-z\u00C0-\u00D6\u00D8-\u00F6\u00F8-\u02FF\u0370-\u037D' + u'\u037F-\u1FFF\u200C-\u200D\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF' + u'\uF900-\uFDCF\uFDF0-\uFFFD]' ) r_scharacters = re.compile(r'[^"\\]*(?:\\.[^"\\]*)*') r_ucharacters = re.compile(r'[^>\\]*(?:\\.[^>\\]*)*') URI = type('URI', (unicode,), {}) bNode = type('bNode', (unicode,), {}) Literal = type('Literal', (tuple,), { '__new__': lambda cls, lexical, language: tuple.__new__(cls, [lexical, language]) }) DatatypedLiteral = type('DatatypedLiteral', (tuple,), { '__new__': lambda cls, lexical, datatype: tuple.__new__(cls, [lexical, datatype]) }) rdf = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#' xsd = 'http://www.w3.org/2001/XMLSchema#' import decimal def typed(value, datatype): if datatype == xsd + 'integer': value = str(int(value)) elif datatype == xsd + 'double': value = str(float(value)) elif datatype == xsd + 'decimal': value = decimal.Decimal(value) context = decimal.Context(17, decimal.ROUND_HALF_DOWN) # value = value.normalize(context) try: value = str(value.quantize(value, context=context)) except decimal.InvalidOperation: value = str(value.normalize(context)) value = value.rstrip('.0') if not '.' in value: value = value + '.0' return DatatypedLiteral(value, datatype) def generate_bnode_id(): generate_bnode_id.nextid += 1 return 'n%02i' % generate_bnode_id.nextid generate_bnode_id.nextid = 0 import urlparse def join(base, uri): if base == uri: return unicode(uri) return unicode(urlparse.urljoin(base, uri)) def parse_short(s): s = s.replace('\\"', '"') s = s.replace('\\t', '\t') s = s.replace('\\r', '\r') s = s.replace('\\n', '\n') # @@ utf-8 return s.decode('unicode-escape') def parse_long(s): s = s.replace('\\"', '"') s = s.replace('\\t', '\t') s = s.replace('\\r', '\r') s = s.replace('\\n', '\n') return s.decode('unicode-escape') class TurtleDocument(object): def __init__(self, uri, input): self.uri = uri self.baseURI = uri self.input = input self.buffer = u'' self.lines = self.readlines() # turtleDoc and ws and long string can read into this self.tokens = None self.bindings = {} def readlines(self): while True: data = self.input.read(8192) if not data: break if isinstance(data, unicode): text = data else: text = data.decode('utf-8') self.buffer += text while True: m = r_line.match(self.buffer) if m: line = m.group(1) yield line self.buffer = self.buffer[m.end():] else: break if self.buffer: yield self.buffer self.buffer = u'' def eat(self, thing): if not self.tokens: raise ValueError('No tokens') if isinstance(thing, basestring): if self.tokens.startswith(thing): self.tokens = self.tokens[len(thing):] return thing else: print 'TOKENS: %r' % self.tokens[:50] raise ValueError('Expected: %s' % thing) elif isinstance(thing, int): token = self.tokens[:thing] self.tokens = self.tokens[thing:] return token elif hasattr(thing, 'pattern'): m = thing.match(self.tokens) if m: self.tokens = self.tokens[m.end():] return m.group(0) raise ValueError('Expected: %s' % thing.pattern) print type(thing), thing raise Exception def test(self, thing): if isinstance(thing, basestring): if self.tokens.startswith(thing): return True return False print type(thing), thing raise Exception def triple(self, s, p, o): for t in [type(s), type(p), type(o)]: if t not in (URI, bNode, Literal, DatatypedLiteral): print type(s), type(p), type(o) raise Exception('%s %s %s' % (s, p, o)) print s, p, o, '.' def parse(self): self.turtleDoc() def turtleDoc(self): # statement* try: self.tokens = self.lines.next() except StopIteration: return while self.statement_test(): self.statement() def statement_test(self): if self.tokens: return True return False def statement(self): # directive ws* '.' ws* | triples ws* '.' ws* | ws+ if self.directive_test(): self.directive() while self.ws_test(): self.ws() self.eat('.') while self.ws_test(): self.ws() elif self.triples_test(): self.triples() while self.ws_test(): self.ws() self.eat('.') while self.ws_test(): self.ws() else: self.ws() while self.ws_test(): self.ws() def directive_test(self): # between directive | triples | ws # directives must start with @, triples must not if self.tokens.startswith('@'): return True return False def directive(self): # prefixID | base if self.prefixID_test(): self.prefixID() else: self.base() def prefixID_test(self): # between prefixID | base. prefixID is @prefix, base is @base if self.tokens.startswith('@prefix'): return True return False def prefixID(self): # '@prefix' ws+ prefixName? ':' ws+ uriref self.eat('@prefix') self.ws() while self.ws_test(): self.ws() if self.prefixName_test(): prefix = self.prefixName() else: prefix = '' self.eat(':') self.ws() while self.ws_test(): self.ws() uri = self.uriref() self.bindings[prefix] = uri def base(self): # '@base' ws+ uriref self.eat('@base') self.ws() while self.ws_test(): self.ws() self.baseURI = join(self.baseURI, self.uriref()) def triples_test(self): # between triples and ws. disjoint, so easy enough if self.tokens[0] not in set(['\r', '\n', '\t', ' ', '#']): return True return False def triples(self): # subject ws+ predicateObjectList subj = self.subject() self.ws() while self.ws_test(): self.ws() for (pred, objt) in self.predicateObjectList(): self.triple(subj, pred, objt) def predicateObjectList(self): # verb ws+ objectList ( ws* ';' ws* verb ws+ objectList )* (ws* ';')? pred = self.verb() self.ws() while self.ws_test(): self.ws() for objt in self.objectList(): yield (pred, objt) while self.ws_test() or self.test(';'): while self.ws_test(): self.ws() self.eat(';') while self.ws_test(): self.ws() if self.verb_test(): # @@ pred = self.verb() self.ws() while self.ws_test(): self.ws() for objt in self.objectList(): yield (pred, objt) else: break def objectList(self): # object (ws* ',' ws* object)* yield self.object() while self.ws_test(): self.ws() while self.test(','): while self.ws_test(): self.ws() self.eat(',') while self.ws_test(): self.ws() yield self.object() while self.ws_test(): self.ws() def verb_test(self): if self.tokens[0] != '.': return True return False def verb(self): # predicate | a if self.predicate_test(): return self.predicate() else: self.eat('a') return URI(rdf + 'type') def comment(self): # '#' ( [^#xA#xD] )* self.eat(r_comment) def subject(self): # resource | blank if self.resource_test(): return self.resource() else: return self.blank() def predicate_test(self): # between this and 'a'... a little tricky # if it's a, it'll be followed by whitespace; whitespace is mandatory # after a verb, which is the only thing predicate appears in if not self.tokens.startswith('a'): return True elif self.tokens[1] not in set(['\r', '\n', '\t', ' ', '#']): return True return False def predicate(self): # resource return self.resource() def object(self): # resource | blank | literal if self.resource_test(): return self.resource() elif self.blank_test(): return self.blank() else: return self.literal() def literal(self): # quotedString ( '@' language )? | datatypeString | integer | # double | decimal | boolean # datatypeString = quotedString '^^' resource # (so we change this around a bit to make it parsable without a huge # multiple lookahead) if self.quotedString_test(): value = self.quotedString() if self.test('@'): self.eat('@') lang = self.language() return Literal(value, lang) elif self.test('^^'): self.eat('^^') dtype = self.resource() return typed(value, dtype) else: return Literal(value, None) elif self.double_test(): return self.double() elif self.decimal_test(): return self.decimal() elif self.integer_test(): return self.integer() else: return self.boolean() def double_test(self): if r_double.match(self.tokens): return True return False def double(self): # ('-' | '+') ? ( [0-9]+ '.' [0-9]* exponent | '.' ([0-9])+ exponent # | ([0-9])+ exponent ) # exponent = [eE] ('-' | '+')? [0-9]+ token = self.eat(r_double) return typed(token, xsd + 'double') def decimal_test(self): if r_decimal.match(self.tokens): return True return False def decimal(self): # ('-' | '+')? ( [0-9]+ '.' [0-9]* | '.' ([0-9])+ | ([0-9])+ ) token = self.eat(r_decimal) return typed(token, xsd + 'decimal') def integer_test(self): if r_integer.match(self.tokens): return True return False def integer(self): # ('-' | '+') ? [0-9]+ token = self.eat(r_integer) return typed(token, xsd + 'integer') def boolean(self): # 'true' | 'false' token = self.eat(r_boolean) return DatatypedLiteral(token, xsd + 'boolean') def blank_test(self): # between this and literal. urgh! # this can start with... # _ | [ | ( # literal can start with... # * " | + | - | digit | t | f if self.tokens[0] in set(['_', '[', '(']): return True return False def blank(self): # nodeID | '[]' | '[' ws* predicateObjectList ws* ']' | collection if self.nodeID_test(): return bNode(self.nodeID()) elif self.test('[]'): self.eat('[]') return bNode(generate_bnode_id()) elif self.test('['): self.eat('[') subj = bNode(generate_bnode_id()) while self.ws_test(): self.ws() for (pred, objt) in self.predicateObjectList(): self.triple(subj, pred, objt) while self.ws_test(): self.ws() self.eat(']') return subj else: return self.collection() def itemList_test(self): # between this and whitespace or ')' if self.tokens[0] not in set('\r\n\t #)'): return True return False def itemList(self): # object (ws+ object)* yield self.object() while self.ws_test(): self.ws() while self.ws_test(): self.ws() if not self.test(')'): yield self.object() def collection(self): # '(' ws* itemList? ws* ')' b = bNode(generate_bnode_id()) this, rest = b, None self.eat('(') while self.ws_test(): self.ws() if self.itemList_test(): for objt in self.itemList(): if rest is not None: this = bNode(generate_bnode_id()) self.triple(rest, URI(rdf + 'rest'), this) self.triple(this, URI(rdf + 'first'), objt) rest = this if rest is not None: self.triple(rest, URI(rdf + 'rest'), URI(rdf + 'nil')) else: b = URI(rdf + 'nil') while self.ws_test(): self.ws() self.eat(')') return b def ws_test(self): if not self.tokens: return False # @@@@@@@@@ if self.tokens[0] in set(['\t', '\r', '\n', ' ', '#']): return True return False def ws(self): # #x9 | #xA | #xD | #x20 | comment if self.test('#'): self.comment() else: self.eat(1) if not self.tokens: try: self.tokens = self.lines.next() except StopIteration: return def resource_test(self): # between this and blank and literal # quotedString ( '@' language )? | datatypeString | integer | # double | decimal | boolean # datatypeString = quotedString '^^' resource r_booltest = re.compile(r'(true|false)\b') if self.tokens[0] not in set('_[("+-0123456789') and \ not r_booltest.match(self.tokens): return True return False def resource(self): # uriref | qname if self.uriref_test(): return URI(join(self.baseURI, self.uriref())) else: return URI(join(self.baseURI, self.qname())) def nodeID_test(self): # between this (_) and [] if self.tokens[0] == '_': return True return False def nodeID(self): # '_:' name self.eat('_:') return self.name() def qname(self): # prefixName? ':' name? if self.prefixName_test(): prefix = self.prefixName() else: prefix = '' self.eat(':') if self.name_test(): name = self.name() else: name = '' uri = self.bindings[prefix] return uri + name def uriref_test(self): # between this and qname if self.tokens.startswith('<'): return True return False def uriref(self): # '<' relativeURI '>' self.eat('<') value = self.relativeURI() self.eat('>') return value def language(self): # [a-z]+ ('-' [a-z0-9]+ )* token = self.eat(r_language) return token def nameStartChar_test(self): if r_nameStartChar.match(self.tokens): return True return False def nameStartChar(self): # [A-Z] | "_" | [a-z] | [#x00C0-#x00D6] | [#x00D8-#x00F6] | # [#x00F8-#x02FF] | [#x0370-#x037D] | [#x037F-#x1FFF] | [#x200C-#x200D] # | [#x2070-#x218F] | [#x2C00-#x2FEF] | [#x3001-#xD7FF] | # [#xF900-#xFDCF] | [#xFDF0-#xFFFD] | [#x10000-#xEFFFF] nc = self.eat(r_nameStartChar) return nc def nameChar_test(self): if r_nameStartChar.match(self.tokens): return True elif r_nameChar_extra.match(self.tokens): return True return False def nameChar(self): # nameStartChar | '-' | [0-9] | #x00B7 | [#x0300-#x036F] | # [#x203F-#x2040] if self.nameStartChar_test(): nc = self.nameStartChar() return nc else: nce = self.eat(r_nameChar_extra) return nce def name_test(self): # between this and ws? if r_nameStartChar.match(self.tokens): return True return False def name(self): # nameStartChar nameChar* parts = [] nsc = self.nameStartChar() parts.append(nsc) while self.nameChar_test(): nc = self.nameChar() parts.append(nc) return ''.join(parts) def prefixName_test(self): # between this and colon if r_nameStartChar_minus_underscore.match(self.tokens): return True return False def prefixName(self): # ( nameStartChar - '_' ) nameChar* parts = [] nscmu = self.eat(r_nameStartChar_minus_underscore) parts.append(nscmu) while self.nameChar_test(): nc = self.nameChar() parts.append(nc) return ''.join(parts) def relativeURI(self): # ucharacter* token = self.eat(r_ucharacters) return token def quotedString_test(self): if self.tokens[0] == '"': return True return False def quotedString(self): # string | longString if self.longString_test(): return self.longString() else: return self.string() def string(self): # #x22 scharacter* #x22 self.eat('"') value = self.eat(r_scharacters) self.eat('"') return parse_short(value) def longString_test(self): if self.tokens.startswith('"""'): return True return False def longString(self): # #x22 #x22 #x22 lcharacter* #x22 #x22 #x22 while self.tokens.count('"""') < 2: self.tokens += self.lines.next() self.eat('"""') value = self.eat(r_lcharacters) self.eat('"""') return parse_long(value) def parseFile(uri, f): doc = TurtleDocument(uri, f) doc.parse() r_hex4_32 = re.compile(ur'([\x00-\x08\x0b\x0C\x0E-\x1F\x7F-\uFFFF]+)') r_hex4_33 = re.compile(ur'([\x00-\x08\x0b\x0C\x0E-\x1F\x3E\x7F-\uFFFF]+)') # Cf. http://bugs.python.org/issue1477 try: r_hex6 = re.compile(u'([\U00010000-\U0010FFFF]+)') except: r_hex6 = None def hex4(m): return u''.join('\\u%04X' % ord(c) for c in m.group(1)) def hex6(m): return u''.join('\\U%08X' % ord(c) for c in m.group(1)) def escaped32(s): # http://www.w3.org/TR/rdf-testcases/#ntrip_strings s = s.replace('\\', '\\\\') s = s.replace('\t', '\\t') s = s.replace('\n', '\\n') s = s.replace('\r', '\\r') s = s.replace('"', '\\"') s = r_hex4_32.sub(hex4, s) if r_hex6 is not None: return r_hex6.sub(hex6, s) return s def escaped33(s): # http://lists.w3.org/Archives/Public/www-rdf-comments/2007OctDec/0008 s = s.replace('\\', '\\\\') s = s.replace('\t', '\\t') s = s.replace('\n', '\\n') s = s.replace('\r', '\\r') s = s.replace('"', '\\"') s = r_hex4_33.sub(hex4, s) if r_hex6 is not None: return r_hex6.sub(hex6, s) return s def serialise(term): if isinstance(term, URI): result = u'<' + term + u'>' elif isinstance(term, bNode): result = u'_:' + term elif isinstance(term, Literal): value = escaped32(term[0]) result = u'"' + value + u'"' if term[1] is not None: result += u'@' + term[1] elif isinstance(term, DatatypedLiteral): value = escaped32(term[0]) result = u'"' + value + u'"^^<' + term[1] + u'>' else: raise ValueError(term) return result.encode('utf-8') def convert(uri, f): class NTriplesConverter(TurtleDocument): def triple(self, s, p, o): print serialise(s), serialise(p), serialise(o), '.' doc = NTriplesConverter(uri, f) doc.parse() def main(argv=None): import sys if argv is None: argv = sys.argv if len(sys.argv) == 2: import urllib u = urllib.urlopen(sys.argv[1]) convert(sys.argv[1], u) u.close() else: print 'Usage: ./turtle.py ' if __name__ == '__main__': main()