#!/usr/bin/env python """ web.py - World Wide Web Copyright 2007, Sean B. Palmer, inamidst.com Licensed under the Eiffel Forum License 2. Package: http://inamidst.com/sw/trio/ This module provides access to the World Wide Web with convenience methods for Semantic Web tests. >>> uri = 'http://inamidst.com/sbp/foaf.rdf' >>> response = trio.web.doc(uri) >>> response.format() 'rdfxml' """ import re, urllib2, codecs class Document(object): pass class PeekFile(object): """A file-like object with peek and read methods.""" def __init__(self, *args, **kargs): raise NotImplementedError('Must be subclassed') def bufread(self, length=None): raise NotImplementedError('Must be subclassed') def peek(self, length): if not self.peeked: content = self.bufread(length) self.cache = content self.peeked += length return content elif length <= self.peeked: return self.cache[:length] elif length > self.peeked: content = self.bufread(length - self.peeked) self.cache += content self.peeked += length return self.cache def read(self, length=None): available = max(self.peeked - self.pos, 0) if length is not None: if length <= available: content = self.cache[self.pos:self.pos+length] self.pos += length return content elif available: cached = self.cache[self.pos:] content = cached + self.bufread(length - available) self.pos += length return content else: return self.bufread(length) if available: return self.cache[self.pos:] + self.bufread() return self.bufread() class ByteFile(PeekFile): def __init__(self, entity): self.entity = entity self.peeked = 0 self.cache = '' self.pos = 0 def bufread(self, length=None): if length is not None: return self.entity.read(length) return self.entity.read() def close(self): self.entity.close() class CharacterFile(PeekFile): def __init__(self, bytefile, encoding): Reader = codecs.getreader(encoding) self.stream = Reader(bytefile) self.peeked = 0 self.cache = u'' self.pos = 0 def bufread(self, length=None): if length is not None: return self.stream.read(chars=length) return self.stream.read() def close(self): self.stream.close() r_doctype = re.compile(r'(?m)()') r_cdata = re.compile(r'))*)\]\]>') r_comment = re.compile(r'') r_pi = re.compile(r'<\?(\S+)[\t\n\r ]+(([^\?]+|\?(?!>))*)\?>') r_tag = re.compile(r'<([^\'">]+("[^"]+"|\'[^\']+\')?)*>') r_name = re.compile(r'\s*([^\s>]+)') r_attribute = re.compile(r'\s*([^\s=]+)\s*=\s*("[^"]+"|\'[^\']+\')?') r_entity = re.compile(r'&[A-Za-z0-9#]+;') def entity(m): from htmlentitydefs import name2codepoint e = m.group(0) if e.startswith('&#x'): return unichr(int(e[3:-1], 16)) elif e.startswith('&#'): return unichr(int(e[2:-1])) else: return unichr(name2codepoint[e[1:-1]]) def text(value): if not value: return value value = value[1:-1] return r_entity.sub(entity, value) class Event(object): def __init__(self, name): self.name = name.lower() self.attributes = {} def attribute(self, key, value): self.attributes[key.lower()] = value head_events = set([ 'html', 'head', 'title', '/title', 'base', '/base', 'script', '/script', 'style', '/style', 'meta', '/meta', 'link', '/link', 'object', '/object' ]) def parse(f): """Parse an HTML file into tag events.""" if not isinstance(f, PeekFile): msg = 'Expected a web.PeekFile instance, got %r' % f raise ValueError(msg) content = f.peek(32768) while content: try: i = content.index('<') except ValueError: break content = content[i:] if content.startswith('