#!/usr/bin/env python
"""
RSS 1.1 Validator
Author: Sean B. Palmer, inamidst.com
License: GPL 2; share and enjoy!
Nearby: http://inamidst.com/rss1.1/
"""
import sys, os, re, urllib, xml.sax
try: from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
namespaces = {'rss': 'http://purl.org/net/rss1.1#',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'xml': 'http://www.w3.org/XML/1998/namespace',
'xsd': 'http://www.w3.org/2001/XMLSchema#'}
# Cf. http://ftp.davidashen.net/PreTI/RNV/ 1.7.6/xsd.c:298
patAnyURI = (r"^(([a-zA-Z][0-9a-zA-Z+\-\.]*:)?/{0,2}[0-9a-zA-Z;/?:@&=+" +
r"$\.\-_!~*'()%]+)?(#[0-9a-zA-Z;/?:@&=+$\.\-_!~*'()%]+)?$")
patLanguage = (r"(?:[a-zA-Z]{2}|[iI]-[a-zA-Z]+|" +
r"[xX]-[a-zA-Z]{1,8})(?:-[a-zA-Z]{1,8})*")
datatypes = {'xsd:string': re.compile(r'(?m)^.*$'),
'xsd:anyURI': re.compile(patAnyURI),
'xsd:language': re.compile(patLanguage)}
optAttrs = ('Optional Attributes',)
manAttrs = ('Mandatory Attributes',)
content = ('Content',)
ZeroOrMore = '*'
ZeroOrOne = '?'
One = '1'
# Transcribed from RELAX NG Compact:
# http://inamidst.com/rss1.1/schema.rnc
# Dictionaries work here since it's all interleaved.
Any = None
url = {content: 'xsd:anyURI'}
description = {optAttrs: {'xml:lang': 'xsd:language'},
content: 'xsd:string'}
link = {content: 'xsd:anyURI'}
title = {optAttrs: {'xml:lang': 'xsd:language'},
content: 'xsd:string'}
image = {optAttrs: {'xml:lang': 'xsd:language'},
manAttrs: {'rdf:parseType': u'Resource'},
'rss:title': (title, One),
'rss:link': (link, ZeroOrOne),
'rss:url': (url, One),
'*': (Any, '*')}
item = {optAttrs: {'xml:lang': 'xsd:language'},
manAttrs: {'rdf:about': 'xsd:anyURI'},
'rss:title': (title, One),
'rss:link': (link, One),
'rss:description': (description, ZeroOrOne),
'rss:image': (image, ZeroOrOne),
'*': (Any, ZeroOrMore)}
items = {optAttrs: {'xml:lang': 'xsd:language'},
manAttrs: {'rdf:parseType': u'Collection'},
'rss:item': (item, '*')}
Channel = {optAttrs: {'xml:lang': 'xsd:language',
'xml:base': 'xsd:anyURI'},
manAttrs: {'rdf:about': 'xsd:anyURI'},
'rss:title': (title, One),
'rss:link': (link, One),
'rss:description': (description, One),
'rss:image': (image, ZeroOrOne),
'*': (Any, ZeroOrMore),
'rss:items': (items, One)}
schema = {'rss:Channel': (Channel, One)}
class Grab(urllib.URLopener):
def __init__(self, *args):
self.version = 'Mozilla/5.0 (RSS 1.1 Validator)'
urllib.URLopener.__init__(self, *args)
def http_error_default(self, url, fp, errcode, errmsg, headers):
return urllib.addinfourl(fp, [headers, errcode], "http:" + url)
urllib._urlopener = Grab()
def dictize(attrs):
name = attrs.getNameByQName
value = attrs.getValueByQName
return dict([(name(n), value(n)) for n in attrs.getQNames()])
def tagToQName(tag):
if not (tag.count(':') == 1):
raise ValueError, 'Expecting a QName with a single colon in it.'
prefix, name = tag.split(':', 1)
if namespaces.has_key(prefix):
return (unicode(namespaces[prefix]), unicode(name))
else: raise Exception, 'Prefix %s unknown.' % prefix
def qnameToTag((uri, name)):
uri = uri.encode('utf-8')
name = name.encode('utf-8')
for (pfx, ns) in namespaces.iteritems():
if (ns == uri):
return '%s:%s' % (pfx, name)
raise Exception, 'Namespace %s unknown.' % uri
r_error = re.compile(r'(?mi)^\s*
\s*error')
def validateRDF(s):
uri = 'http://www.w3.org/RDF/Validator/ARPServlet'
data = {'RDF': s, 'EMBEDDED_RDF': '1'}
u = urllib.urlopen(uri, urllib.urlencode(data))
result = u.read(4096)
u.close()
if r_error.search(result):
return False
return True
class RSSParser(xml.sax.handler.ContentHandler):
def __init__(self, uri=None):
self.uri = uri or None
self.data = None
self.buffer = None
self.abouts = {}
self.any = []
self.nestedAny = False
self.warnings = 0
self.errors = 0
self.stack = [['/', # [0] -> Element name
schema, # [1] -> Element schema part
One, # [2] -> Cardinality
[], # [3] -> Children found so far
'']] # [4] -> Child text found so far
xml.sax.handler.ContentHandler.__init__(self)
if self.uri: self.validateURI(self.uri)
def note(self, msg):
print 'NOTE:', msg
def warning(self, msg):
self.warnings += 1
print >> sys.stderr, 'WARNING:', msg
def error(self, msg):
self.errors += 1
line, column, pos = self.pos()
print >> sys.stderr, ('ERROR:%s:%s:' % (line, column)) + msg, pos
sys.exit(1)
def pos(self):
if self._locator is None:
return '0', '0', '""'
linenum = self._locator.getLineNumber()
try: line = self.lines[linenum - 1]
except IndexError: line = ''
column = self._locator.getColumnNumber()
line = (line[:column] + '^' + line[column:])
return str(linenum), str(column), ('%r' % line)
def validateURI(self, uri):
self.uri = uri
u = urllib.urlopen(self.uri)
info = u.info()
# if (':' in uri) and info.has_key('Content-Type'):
try: mediatype = info['Content-Type']
except: mediatype = None
if (':' in uri) and mediatype:
if mediatype.startswith('application/rss+xml'):
self.note('Recommended media type used.')
elif mediatype.startswith('application/rdf+xml'):
self.warning('application/rss+xml is preferred')
else:
msg = '%s is NOT an allowed media type' % mediatype
self.error(msg)
data = u.read()
self.buffer = u
u.close()
self.data = data
self.lines = self.data.splitlines()
self.buffer = StringIO(data)
self.buffer.seek(0)
self.validate()
# u.close()
def validate(self):
self.parser = xml.sax.make_parser()
self.parser.start_namespace_decl('xml', namespaces['xml'])
self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
namespacePrefixes = xml.sax.handler.feature_namespace_prefixes
try: self.parser.setFeature(namespacePrefixes, 1)
except (xml.sax._exceptions.SAXNotSupportedException,
xml.sax._exceptions.SAXNotRecognizedException):
print >> sys.stderr, "Warning: prefixes error"
self.parser.setContentHandler(self)
try: self.parser.parse(self.buffer)
except xml.sax._exceptions.SAXParseException, e:
self.error('XML Parser Error: %s' % e)
def rdflibValidateRDF(self):
try: from rdflib.TripleStore import TripleStore
except ImportError:
self.warning("Couldn't import rdflib: only Tentatively Valid!")
else:
from rdflib.exceptions import Error
try: G = TripleStore(self.uri)
except (xml.sax._exceptions.SAXParseException, Error):
self.error('RDF validation with rdflib failed!')
else:
self.note('RDF validation with rdflib was successful')
self.warning('Only Tentatively Valid since rdflib is NV')
def getSchema(self, tag):
schemaPart = schema
for stackItem in self.stack[1:]:
schemaPart, cardinality = schemaPart[stackItem[0]]
return schemaPart[tag]
def startElementNS(self, (ns, name), tag, rawattrs):
attrs = dictize(rawattrs)
if tag is None: tag = name
if (ns == namespaces['rss']) and (':' in tag):
self.warning('Elements in the rss namespace should be prefixless.')
if (ns == namespaces['rss']) and self.any:
tagName = qnameToTag((ns, name))
self.error('%s is not allowed in an Any section' % tagName)
rdfabout = tagToQName('rdf:about')
if attrs.has_key(rdfabout):
aboutURI = attrs[rdfabout]
if self.abouts.has_key(aboutURI):
self.error('Repeat rdf:about value: %s' % aboutURI)
else: self.abouts[aboutURI] = True
try: fulltag = qnameToTag((ns, name))
except: fulltag = '*'
parentSchema = self.stack[-1][1]
# print 'parent-content:', parentSchema.keys()
if parentSchema.has_key(fulltag):
self.stack[-1][3].append(fulltag)
count = self.stack[-1][3].count(fulltag)
# @@ Hence probably not needed in the stack
cardinality = parentSchema[fulltag][1]
if ((cardinality == ZeroOrOne) and (count > 1)):
msg = 'Expected ZeroOrOne %s but now have %s' % (fulltag, count)
self.error(msg)
elif ((cardinality == One) and (count != 1)):
msg = 'Expected One %s but now have %s' % (fulltag, count)
self.error(msg)
else: self.note('Validated %s successfully.' % fulltag)
else: # Basically maximum cardinality was Zero
self.error('Element %s is not allowed here.' % fulltag)
if fulltag == '*':
for (ns, name) in attrs.iterkeys():
if ns == namespaces['rss']:
attrName = qnameToTag((ns, name))
msg = "@%s is not allowed in an Any section" % attrName
self.error(msg)
self.any.append(True)
# print self.any
if len(self.any) > 1:
self.nestedAny = True
return
try: schemaPart, cardinality = self.getSchema(fulltag)
except KeyError: schemaPart, cardinality = {}, One
self.stack.append([fulltag, schemaPart, cardinality, [], ''])
# Validate attributes
for _attrs in (manAttrs, optAttrs):
if schemaPart.has_key(_attrs):
for (key, value) in schemaPart[_attrs].iteritems():
attrkey = tagToQName(key)
if attrs.has_key(attrkey):
if isinstance(value, unicode):
if (value != attrs[attrkey]):
msg = "@%s's value must be: %r" % (key, value)
self.error(msg)
else:
self.note('Validated plain @%s successfully.' % key)
del attrs[attrkey]
else:
regexp = datatypes[value]
pattern = regexp.pattern
if not regexp.match(attrs[attrkey]):
msg = "@%s's value must match: %r" % (key, pattern)
self.error(msg)
else:
self.note('Validated dtyped @%s successfully.' % key)
del attrs[attrkey]
elif _attrs == manAttrs:
self.error('%s attribute is mandatory' % key)
if attrs:
for key in attrs.iterkeys():
try: attrtag = qnameToTag(key)
except: attrtag = str(key)
self.error('Attribute %s not allowed here.' % attrtag)
# print 'start', fulltag, tag, attrs
def characters(self, chars):
if self.stack and (not self.any):
self.stack[-1][-1] += chars
def endElementNS(self, qname, tag):
if self.any:
self.any.pop()
return
elementName = self.stack[-1][0]
schemaPart = self.stack[-1][1]
elementChildren = self.stack[-1][3]
elementContent = self.stack[-1][4]
self.stack.pop()
# @@ Will the following ever happen?
if elementName == '*': return
for key in schemaPart.iterkeys():
if key not in (optAttrs, manAttrs, content):
tag, cardinality = schemaPart[key]
count = elementChildren.count(key)
if ((cardinality == One) and (count != 1)):
self.error('Expected to find a %s' % key)
# @@ Content checking should go here...
ws = ' \t\r\n' # @@ check value
if schemaPart.has_key(content):
contentValue = schemaPart[content]
if isinstance(contentValue, unicode):
if contentValue.strip(ws) != elementContent.strip(ws):
msg = "Expected %s's content to be %r; got %r"
self.error(msg % (elementName, contentValue, elementContent))
else: self.msg("Validated %s's content" % elementName)
else:
regexp = datatypes[contentValue]
pattern = regexp.pattern
if not regexp.match(elementContent.strip(ws)):
msg = "Expected %s's content to match %r; got %r"
self.error(msg % (elementName, pattern, elementContent))
else: self.note("Validated %s's content" % elementName)
elif elementContent.strip(ws):
# print 'Got:', `elementContent[:-25]`
self.error("Expected %s to have no text content" % elementName)
# def endDocument(self):
if (elementName == 'rss:Channel') and self.nestedAny:
# if self.nestedAny:
try: valid = validateRDF(self.data)
except: self.rdflibValidateRDF()
else:
if valid: self.note("W3C RDF Validation successful")
else: self.error("W3C RDF Validation was unsuccessful")
def removeCRLF(s):
s = s.replace('\r', ' ')
return s.replace('\n', ' ')
class ReportRSSParser(RSSParser):
def __init__(self, *args):
self.report = StringIO()
RSSParser.__init__(self, *args)
def note(self, msg):
return
def warning(self, msg):
self.warnings += 1
msg = removeCRLF(msg)
print >> self.report, 'Warning:', msg
def error(self, msg):
self.errors += 1
msg = removeCRLF(msg)
line, column, pos = self.pos()
marker = '%s:%s:' % (line, column)
print >> self.report, 'Error:', (marker + msg), pos
class QuietRSSParser(RSSParser):
def note(self, msg):
return
def warning(self, msg):
self.warnings += 1
def error(self, msg):
self.errors += 1
def validate(uri, parser=None):
if parser is None:
p = RSSParser()
else: p = parser()
p.validateURI(uri)
return p, p.warnings, p.errors
def report(uri):
result = []
result.append('URI: ' + uri)
p, warnings, errors = validate(uri, parser=ReportRSSParser)
p.report.seek(0)
for line in p.report:
result.append(line.rstrip('\r\n'))
result.append('Warnings: %s' % warnings)
result.append('Errors: %s' % errors)
if errors:
result.append('Summary: Invalid')
else: result.append('Summary: Valid')
return result
def pyreport(uri):
lines = report(uri)
reportdata = {'WarningMessages': [],
'ErrorMessages': []}
for line in lines:
key, value = line.split(': ', 1)
if key == 'Warning':
reportdata['WarningMessages'].append(value)
elif key == 'Error':
reportdata['ErrorMessages'].append(value)
else: reportdata[key] = value
return reportdata
def doCGI():
import cgi, cgitb
cgitb.enable()
form = cgi.FieldStorage()
form.__call__ = lambda s: form[s].value
uri = form('uri')
f = open("/tmp/grr", "a")
f.write(uri + "\n")
f.flush()
f.close()
print 'Content-Type: text/plain'
print
print '\n'.join(report(uri))
def test(quiet=False):
"""Quis custodiet ipsos custodes?"""
# Cf. http://inamidst.com/rss1.1/test/
import glob
tests = glob.glob('test/*.rss')
tests.sort()
exitcode = 0
for fn in tests:
# print fn
p, warnings, errors = validate(fn, parser=QuietRSSParser)
if errors > 0:
errors = True
else: errors = False
if fn.startswith('test/neg-'):
errors = (not errors)
if errors:
if not quiet: print 'FAIL:', fn
exitcode = 1
elif not quiet: print 'Pass:', fn
# For using "if ./validate.cgi --test [...]"
if quiet and exitcode:
print 'FAIL: some tests failed'
elif quiet: print 'Pass: all tests passed'
sys.exit(exitcode)
def main():
if os.environ.has_key('SCRIPT_NAME'):
doCGI()
elif len(sys.argv) > 1:
uri = sys.argv[1]
if uri == '--test':
test()
elif uri == '-qt':
test(quiet=True)
else: validate(uri)
else: print __doc__
if __name__=="__main__":
main()