#!/usr/bin/env python """ Vanilla RDF Graph Isomorphism Tester Author: Sean B. Palmer, inamidst.com Uses the pyrple algorithm Usage: ./rdfdiff-vanilla.py Requirements: Python2.3+ http://inamidst.com/proj/rdf/ntriples.py References: http://inamidst.com/proj/rdf/rdfdiff.py http://miscoranda.com/comments/129#id2005010405560004 """ import sys, re, md5, urllib import ntriples from ntriples import bNode ntriples.r_uriref = re.compile(r'<([^\s"<>]+)>') class Graph(object): def __init__(self, uri): self.triples = {} self.cache = {} self.parse(uri) def parse(self, uri): class Sink(object): def triple(sink, s, p, o): self.triples[(s, p, o)] = True p = ntriples.NTriplesParser(sink=Sink()) u = urllib.urlopen(uri) p.parse(u) u.close() def __hash__(self): result = [] for (subj, pred, objt) in self.triples.iterkeys(): if isinstance(subj, bNode): tripleHash = md5.new(str(self.vhashmemo(subj))) else: tripleHash = md5.new(subj) for term in (pred, objt): if isinstance(term, bNode): tripleHash.update(str(self.vhashmemo(term))) else: tripleHash.update(term) result.append(tripleHash.digest()) result.sort() return hash(tuple(result)) def vhashmemo(self, term, done=False): if self.cache.has_key((term, done)): return self.cache[(term, done)] result = self.vhash(term, done=done) self.cache[(term, done)] = result return result def vhash(self, term, done=False): result = [] for triple in self.triples: if term in triple: for pos in xrange(3): if not isinstance(triple[pos], bNode): result.append(triple[pos]) elif done or (triple[pos] == term): result.append(pos) else: result.append(self.vhash(triple[pos], done=True)) result.sort() return tuple(result) def compare(p, q): return hash(Graph(p)) == hash(Graph(q)) def main(): result = compare(sys.argv[1], sys.argv[2]) print ('no', 'yes')[result] if __name__=="__main__": main()