#!/usr/bin/env python """ RSS 1.1 Validator Author: Sean B. Palmer, inamidst.com License: GPL 2; share and enjoy! Nearby: http://inamidst.com/rss1.1/ """ import sys, os, re, urllib, xml.sax try: from cStringIO import StringIO except ImportError: from StringIO import StringIO namespaces = {'rss': 'http://purl.org/net/rss1.1#', 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', 'xml': 'http://www.w3.org/XML/1998/namespace', 'xsd': 'http://www.w3.org/2001/XMLSchema#'} # Cf. http://ftp.davidashen.net/PreTI/RNV/ 1.7.6/xsd.c:298 patAnyURI = (r"^(([a-zA-Z][0-9a-zA-Z+\-\.]*:)?/{0,2}[0-9a-zA-Z;/?:@&=+" + r"$\.\-_!~*'()%]+)?(#[0-9a-zA-Z;/?:@&=+$\.\-_!~*'()%]+)?$") patLanguage = (r"(?:[a-zA-Z]{2}|[iI]-[a-zA-Z]+|" + r"[xX]-[a-zA-Z]{1,8})(?:-[a-zA-Z]{1,8})*") datatypes = {'xsd:string': re.compile(r'(?m)^.*$'), 'xsd:anyURI': re.compile(patAnyURI), 'xsd:language': re.compile(patLanguage)} optAttrs = ('Optional Attributes',) manAttrs = ('Mandatory Attributes',) content = ('Content',) ZeroOrMore = '*' ZeroOrOne = '?' One = '1' # Transcribed from RELAX NG Compact: # http://inamidst.com/rss1.1/schema.rnc # Dictionaries work here since it's all interleaved. Any = None url = {content: 'xsd:anyURI'} description = {optAttrs: {'xml:lang': 'xsd:language'}, content: 'xsd:string'} link = {content: 'xsd:anyURI'} title = {optAttrs: {'xml:lang': 'xsd:language'}, content: 'xsd:string'} image = {optAttrs: {'xml:lang': 'xsd:language'}, manAttrs: {'rdf:parseType': u'Resource'}, 'rss:title': (title, One), 'rss:link': (link, ZeroOrOne), 'rss:url': (url, One), '*': (Any, '*')} item = {optAttrs: {'xml:lang': 'xsd:language'}, manAttrs: {'rdf:about': 'xsd:anyURI'}, 'rss:title': (title, One), 'rss:link': (link, One), 'rss:description': (description, ZeroOrOne), 'rss:image': (image, ZeroOrOne), '*': (Any, ZeroOrMore)} items = {optAttrs: {'xml:lang': 'xsd:language'}, manAttrs: {'rdf:parseType': u'Collection'}, 'rss:item': (item, '*')} Channel = {optAttrs: {'xml:lang': 'xsd:language', 'xml:base': 'xsd:anyURI'}, manAttrs: {'rdf:about': 'xsd:anyURI'}, 'rss:title': (title, One), 'rss:link': (link, One), 'rss:description': (description, One), 'rss:image': (image, ZeroOrOne), '*': (Any, ZeroOrMore), 'rss:items': (items, One)} schema = {'rss:Channel': (Channel, One)} class Grab(urllib.URLopener): def __init__(self, *args): self.version = 'Mozilla/5.0 (RSS 1.1 Validator)' urllib.URLopener.__init__(self, *args) def http_error_default(self, url, fp, errcode, errmsg, headers): return urllib.addinfourl(fp, [headers, errcode], "http:" + url) urllib._urlopener = Grab() def dictize(attrs): name = attrs.getNameByQName value = attrs.getValueByQName return dict([(name(n), value(n)) for n in attrs.getQNames()]) def tagToQName(tag): if not (tag.count(':') == 1): raise ValueError, 'Expecting a QName with a single colon in it.' prefix, name = tag.split(':', 1) if namespaces.has_key(prefix): return (unicode(namespaces[prefix]), unicode(name)) else: raise Exception, 'Prefix %s unknown.' % prefix def qnameToTag((uri, name)): uri = uri.encode('utf-8') name = name.encode('utf-8') for (pfx, ns) in namespaces.iteritems(): if (ns == uri): return '%s:%s' % (pfx, name) raise Exception, 'Namespace %s unknown.' % uri r_error = re.compile(r'(?mi)^\s*

\s*error') def validateRDF(s): uri = 'http://www.w3.org/RDF/Validator/ARPServlet' data = {'RDF': s, 'EMBEDDED_RDF': '1'} u = urllib.urlopen(uri, urllib.urlencode(data)) result = u.read(4096) u.close() if r_error.search(result): return False return True class RSSParser(xml.sax.handler.ContentHandler): def __init__(self, uri=None): self.uri = uri or None self.data = None self.buffer = None self.abouts = {} self.any = [] self.nestedAny = False self.warnings = 0 self.errors = 0 self.stack = [['/', # [0] -> Element name schema, # [1] -> Element schema part One, # [2] -> Cardinality [], # [3] -> Children found so far '']] # [4] -> Child text found so far xml.sax.handler.ContentHandler.__init__(self) if self.uri: self.validateURI(self.uri) def note(self, msg): print 'NOTE:', msg def warning(self, msg): self.warnings += 1 print >> sys.stderr, 'WARNING:', msg def error(self, msg): self.errors += 1 line, column, pos = self.pos() print >> sys.stderr, ('ERROR:%s:%s:' % (line, column)) + msg, pos sys.exit(1) def pos(self): if self._locator is None: return '0', '0', '""' linenum = self._locator.getLineNumber() try: line = self.lines[linenum - 1] except IndexError: line = '' column = self._locator.getColumnNumber() line = (line[:column] + '^' + line[column:]) return str(linenum), str(column), ('%r' % line) def validateURI(self, uri): self.uri = uri u = urllib.urlopen(self.uri) info = u.info() # if (':' in uri) and info.has_key('Content-Type'): try: mediatype = info['Content-Type'] except: mediatype = None if (':' in uri) and mediatype: if mediatype.startswith('application/rss+xml'): self.note('Recommended media type used.') elif mediatype.startswith('application/rdf+xml'): self.warning('application/rss+xml is preferred') else: msg = '%s is NOT an allowed media type' % mediatype self.error(msg) data = u.read() self.buffer = u u.close() self.data = data self.lines = self.data.splitlines() self.buffer = StringIO(data) self.buffer.seek(0) self.validate() # u.close() def validate(self): self.parser = xml.sax.make_parser() self.parser.start_namespace_decl('xml', namespaces['xml']) self.parser.setFeature(xml.sax.handler.feature_namespaces, 1) namespacePrefixes = xml.sax.handler.feature_namespace_prefixes try: self.parser.setFeature(namespacePrefixes, 1) except (xml.sax._exceptions.SAXNotSupportedException, xml.sax._exceptions.SAXNotRecognizedException): print >> sys.stderr, "Warning: prefixes error" self.parser.setContentHandler(self) try: self.parser.parse(self.buffer) except xml.sax._exceptions.SAXParseException, e: self.error('XML Parser Error: %s' % e) def rdflibValidateRDF(self): try: from rdflib.TripleStore import TripleStore except ImportError: self.warning("Couldn't import rdflib: only Tentatively Valid!") else: from rdflib.exceptions import Error try: G = TripleStore(self.uri) except (xml.sax._exceptions.SAXParseException, Error): self.error('RDF validation with rdflib failed!') else: self.note('RDF validation with rdflib was successful') self.warning('Only Tentatively Valid since rdflib is NV') def getSchema(self, tag): schemaPart = schema for stackItem in self.stack[1:]: schemaPart, cardinality = schemaPart[stackItem[0]] return schemaPart[tag] def startElementNS(self, (ns, name), tag, rawattrs): attrs = dictize(rawattrs) if tag is None: tag = name if (ns == namespaces['rss']) and (':' in tag): self.warning('Elements in the rss namespace should be prefixless.') if (ns == namespaces['rss']) and self.any: tagName = qnameToTag((ns, name)) self.error('%s is not allowed in an Any section' % tagName) rdfabout = tagToQName('rdf:about') if attrs.has_key(rdfabout): aboutURI = attrs[rdfabout] if self.abouts.has_key(aboutURI): self.error('Repeat rdf:about value: %s' % aboutURI) else: self.abouts[aboutURI] = True try: fulltag = qnameToTag((ns, name)) except: fulltag = '*' parentSchema = self.stack[-1][1] # print 'parent-content:', parentSchema.keys() if parentSchema.has_key(fulltag): self.stack[-1][3].append(fulltag) count = self.stack[-1][3].count(fulltag) # @@ Hence probably not needed in the stack cardinality = parentSchema[fulltag][1] if ((cardinality == ZeroOrOne) and (count > 1)): msg = 'Expected ZeroOrOne %s but now have %s' % (fulltag, count) self.error(msg) elif ((cardinality == One) and (count != 1)): msg = 'Expected One %s but now have %s' % (fulltag, count) self.error(msg) else: self.note('Validated %s successfully.' % fulltag) else: # Basically maximum cardinality was Zero self.error('Element %s is not allowed here.' % fulltag) if fulltag == '*': for (ns, name) in attrs.iterkeys(): if ns == namespaces['rss']: attrName = qnameToTag((ns, name)) msg = "@%s is not allowed in an Any section" % attrName self.error(msg) self.any.append(True) # print self.any if len(self.any) > 1: self.nestedAny = True return try: schemaPart, cardinality = self.getSchema(fulltag) except KeyError: schemaPart, cardinality = {}, One self.stack.append([fulltag, schemaPart, cardinality, [], '']) # Validate attributes for _attrs in (manAttrs, optAttrs): if schemaPart.has_key(_attrs): for (key, value) in schemaPart[_attrs].iteritems(): attrkey = tagToQName(key) if attrs.has_key(attrkey): if isinstance(value, unicode): if (value != attrs[attrkey]): msg = "@%s's value must be: %r" % (key, value) self.error(msg) else: self.note('Validated plain @%s successfully.' % key) del attrs[attrkey] else: regexp = datatypes[value] pattern = regexp.pattern if not regexp.match(attrs[attrkey]): msg = "@%s's value must match: %r" % (key, pattern) self.error(msg) else: self.note('Validated dtyped @%s successfully.' % key) del attrs[attrkey] elif _attrs == manAttrs: self.error('%s attribute is mandatory' % key) if attrs: for key in attrs.iterkeys(): try: attrtag = qnameToTag(key) except: attrtag = str(key) self.error('Attribute %s not allowed here.' % attrtag) # print 'start', fulltag, tag, attrs def characters(self, chars): if self.stack and (not self.any): self.stack[-1][-1] += chars def endElementNS(self, qname, tag): if self.any: self.any.pop() return elementName = self.stack[-1][0] schemaPart = self.stack[-1][1] elementChildren = self.stack[-1][3] elementContent = self.stack[-1][4] self.stack.pop() # @@ Will the following ever happen? if elementName == '*': return for key in schemaPart.iterkeys(): if key not in (optAttrs, manAttrs, content): tag, cardinality = schemaPart[key] count = elementChildren.count(key) if ((cardinality == One) and (count != 1)): self.error('Expected to find a %s' % key) # @@ Content checking should go here... ws = ' \t\r\n' # @@ check value if schemaPart.has_key(content): contentValue = schemaPart[content] if isinstance(contentValue, unicode): if contentValue.strip(ws) != elementContent.strip(ws): msg = "Expected %s's content to be %r; got %r" self.error(msg % (elementName, contentValue, elementContent)) else: self.msg("Validated %s's content" % elementName) else: regexp = datatypes[contentValue] pattern = regexp.pattern if not regexp.match(elementContent.strip(ws)): msg = "Expected %s's content to match %r; got %r" self.error(msg % (elementName, pattern, elementContent)) else: self.note("Validated %s's content" % elementName) elif elementContent.strip(ws): # print 'Got:', `elementContent[:-25]` self.error("Expected %s to have no text content" % elementName) # def endDocument(self): if (elementName == 'rss:Channel') and self.nestedAny: # if self.nestedAny: try: valid = validateRDF(self.data) except: self.rdflibValidateRDF() else: if valid: self.note("W3C RDF Validation successful") else: self.error("W3C RDF Validation was unsuccessful") def removeCRLF(s): s = s.replace('\r', ' ') return s.replace('\n', ' ') class ReportRSSParser(RSSParser): def __init__(self, *args): self.report = StringIO() RSSParser.__init__(self, *args) def note(self, msg): return def warning(self, msg): self.warnings += 1 msg = removeCRLF(msg) print >> self.report, 'Warning:', msg def error(self, msg): self.errors += 1 msg = removeCRLF(msg) line, column, pos = self.pos() marker = '%s:%s:' % (line, column) print >> self.report, 'Error:', (marker + msg), pos class QuietRSSParser(RSSParser): def note(self, msg): return def warning(self, msg): self.warnings += 1 def error(self, msg): self.errors += 1 def validate(uri, parser=None): if parser is None: p = RSSParser() else: p = parser() p.validateURI(uri) return p, p.warnings, p.errors def report(uri): result = [] result.append('URI: ' + uri) p, warnings, errors = validate(uri, parser=ReportRSSParser) p.report.seek(0) for line in p.report: result.append(line.rstrip('\r\n')) result.append('Warnings: %s' % warnings) result.append('Errors: %s' % errors) if errors: result.append('Summary: Invalid') else: result.append('Summary: Valid') return result def pyreport(uri): lines = report(uri) reportdata = {'WarningMessages': [], 'ErrorMessages': []} for line in lines: key, value = line.split(': ', 1) if key == 'Warning': reportdata['WarningMessages'].append(value) elif key == 'Error': reportdata['ErrorMessages'].append(value) else: reportdata[key] = value return reportdata def doCGI(): import cgi, cgitb cgitb.enable() form = cgi.FieldStorage() form.__call__ = lambda s: form[s].value uri = form('uri') f = open("/tmp/grr", "a") f.write(uri + "\n") f.flush() f.close() print 'Content-Type: text/plain' print print '\n'.join(report(uri)) def test(quiet=False): """Quis custodiet ipsos custodes?""" # Cf. http://inamidst.com/rss1.1/test/ import glob tests = glob.glob('test/*.rss') tests.sort() exitcode = 0 for fn in tests: # print fn p, warnings, errors = validate(fn, parser=QuietRSSParser) if errors > 0: errors = True else: errors = False if fn.startswith('test/neg-'): errors = (not errors) if errors: if not quiet: print 'FAIL:', fn exitcode = 1 elif not quiet: print 'Pass:', fn # For using "if ./validate.cgi --test [...]" if quiet and exitcode: print 'FAIL: some tests failed' elif quiet: print 'Pass: all tests passed' sys.exit(exitcode) def main(): if os.environ.has_key('SCRIPT_NAME'): doCGI() elif len(sys.argv) > 1: uri = sys.argv[1] if uri == '--test': test() elif uri == '-qt': test(quiet=True) else: validate(uri) else: print __doc__ if __name__=="__main__": main()