# Copyright 2012, Sean B. Palmer # Code at http://inamidst.com/duxlot/ # Apache License 2.0 import datetime import decimal import html.entities as entities import json import math import os.path import pickle import re import socket import struct import subprocess import time import unicodedata import urllib.parse import urllib.request import duxlot def data(name): return os.path.join(duxlot.path, "data", name) def copy(a, b): # or b, a... for key, value in b().items(): setattr(a, key, value) class Error(Exception): ... # @@ pre-wrapper for api.text services? def service(collection): def decorate(function): def decorated(**kargs): # if collection.name == "text": # check args args = duxlot.FrozenStorage(kargs) # if collection.name == "text": # check result return function(args) setattr(collection, function.__name__, decorated) decorated.__doc__ = function.__doc__ # @@ .name, canonicalised return decorated return decorate ### Module: Clock ### clock = duxlot.Storage() clock.name = "clock" @service(clock) def beats(args): out = duxlot.Storage() beats = ((time.time() + 3600) % 86400) / 86.4 out.beats_int = int(math.floor(beats)) out.beats = "@%03i" % out.beats_int return out @service(clock) def cache_timezones_data(args): with duxlot.filesystem.open(data("timezones.json"), encoding="utf-8") as f: clock.timezones_data = json.load(f) @service(clock) def date_utc(args): # @@ optional suffix? if "unixtime" in args: dt = datetime.datetime.utcfromtimestamp(args.unixtime) else: dt = datetime.datetime.utcnow() return dt.strftime("%Y-%m-%d") @service(clock) def datetime_utc(args): # @@ optional suffix? if "unixtime" in args: dt = datetime.datetime.utcfromtimestamp(args.unixtime) else: dt = datetime.datetime.utcnow() return dt.strftime("%Y-%m-%d %H:%M:%S") @service(clock) def duration_phrase(args): # tz, seconds, unixtime, offset tz = "Z" if (args.tz == "UTC") else " " + args.tz if args.seconds >= (3600 * 12): format = "on %d %b %Y at %H:%M" + tz elif args.seconds >= 60: format = "at %H:%M" + tz else: return "in %s secs" % int(args.seconds) return clock.format_datetime( unixtime=args.unixtime, offset=args.offset, format=format ) @service(clock) def format_datetime(args): # format - string, can have $D and $TZ too # offset - in seconds # unixtime - OPT # tz - OPT if "unixtime" in args: dt = datetime.datetime.utcfromtimestamp(args.unixtime) else: dt = datetime.datetime.utcnow() delta = datetime.timedelta(seconds=args.offset) adjusted = dt + delta formatted = adjusted.strftime(args.format) if "$TZ" in args.format: if "tz" in args: formatted = formatted.replace("$TZ", args.tz) else: so = "+" + str(args.offset) if (args.offset >= 0) else str(args.offset) formatted = formatted.replace("$TZ", so) if "$D" in args.format: day = adjusted.strftime("%d") formatted = formatted.replace("$D", day.lstrip("0")) return formatted @service(clock) def npl(args): out = duxlot.Storage() out.server = "ntp1.npl.co.uk" client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) client.sendto(b'\x1b' + 47 * b'\0', (out.server, 123)) data, address = client.recvfrom(1024) if data: buf = struct.unpack('B' * 48, data) d = decimal.Decimal('0.0') for i in range(8): d += decimal.Decimal(buf[32 + i]) * \ decimal.Decimal(str(math.pow(2, (3 - i) * 8))) d -= decimal.Decimal(2208988800) out.timestamp = str(d) a, b = str(d).split('.') f = '%Y-%m-%d %H:%M:%S' dt = datetime.datetime.utcfromtimestamp(d).strftime(f) out.datetime = dt + '.' + b[:6] else: raise Error("No data was received from %s" % out.server) return out @service(clock) def offset_datetime(args): fmt = args("format", "%d %b %Y, %H:%M:%S $TZ") now = datetime.datetime.utcnow() delta = datetime.timedelta(seconds=args.offset * 3600) dt = (now + delta).strftime(fmt) if "tz" in args: dt = dt.replace("$TZ", args.tz) else: so = "+" + str(args.offset) if (args.offset >= 0) else str(args.offset) dt = dt.replace("$TZ", so) if fmt.startswith("%d"): dt = dt.lstrip("0") return dt @service(clock) def parse_zoneinfo(args): # Specification from http://69.36.11.139/tzdb/tzfile-format.html # tzfile(5) also gives the information, though less clearly with duxlot.filesystem.open(args.filename, "rb") as f: def get(struct_format): struct_format = "> " + struct_format file_bytes = f.read(struct.calcsize(struct_format)) return struct.unpack(struct_format, file_bytes) header, version, future_use = get("4s c 15s") counts = {} for name in ("ttisgmt", "ttisstd", "leap", "time", "type", "char"): counts[name] = get("l")[0] transitions = get("%sl" % counts["time"]) indices = get("%sB" % counts["time"]) ttinfo = [] for current in range(counts["type"]): ttinfo_struct = get("l?B") ttinfo.append(ttinfo_struct) abbreviations = get("%sc" % counts["char"]) index = 0 abbreviation_indices = {} for abbreviation in b"".join(abbreviations).split(b"\x00"): abbreviation_indices[index] = abbreviation.decode("us-ascii") index += len(abbreviation) + 1 for current, ttinfo_struct in enumerate(ttinfo): replacement = abbreviation_indices[ttinfo_struct[2]] ttinfo[current] = (ttinfo_struct[0], ttinfo_struct[1], replacement) offset, dst, abbreviation = ttinfo[0] tzinfo = [(None, offset, dst, abbreviation)] for transition, index in zip(transitions, indices): offset, dst, abbreviation = ttinfo[index] tzinfo.append((transition, offset, dst, abbreviation)) return tzinfo clock_dict_scales = { 365.25 * 24 * 3600: ("years", "year", "yrs", "y"), 29.53059 * 24 * 3600: ("months", "month", "mo"), 7 * 24 * 3600: ("weeks", "week", "wks", "wk", "w"), 24 * 3600: ("days", "day", "d"), 3600: ("hours", "hour", "hrs", "hr", "h"), 60: ("minutes", "minute", "mins", "min", "m"), 1: ("seconds", "second", "secs", "sec", "s") } clock_dict_scaling = {} for period, names in clock_dict_scales.items(): for name in names: clock_dict_scaling[name] = period clock_regex_period = re.compile(r"(?i)([0-9]+(?:\.[0-9]+)?) *([a-z]+)") @service(clock) def period_seconds(args): out = duxlot.Storage() match = clock_regex_period.match(args.period) if not match: raise Error("Invalid period syntax: %s" % args.period) number, unit = match.groups() out.number = float(number) out.unit = unit.lower() if not out.unit in clock_dict_scaling: raise Error("Invalid period unit: %s" % out.unit) out.scale = clock_dict_scaling[unit] out.seconds = out.number * out.scale return out @service(clock) def periods_seconds(args): out = duxlot.Storage() out.seconds = 0 out.periods = 0 out.durations = [] out.remainder = args.text while True: out.remainder = out.remainder.lstrip() match = clock_regex_period.match(out.remainder) if not match: break period = match.group(0) try: p = clock.period_seconds(period=match.group(0)) except Error as err: break out.seconds += p.seconds out.periods += 1 out.durations.append(p.seconds) out.remainder = out.remainder[len(period):] return out @service(clock) def periods_unixtime(args): out = duxlot.Storage() out.basetime = time.time() copy(out, clock.periods_seconds(text=args.text)) out.unixtime = out.basetime + out.seconds return out @service(clock) def time_utc(args): # @@ optional suffix? if "unixtime" in args: dt = datetime.datetime.utcfromtimestamp(args.unixtime) else: dt = datetime.datetime.utcnow() return dt.strftime("%H:%M:%S") @service(clock) def timezone_convert(args): out = duxlot.Storage() source = clock.timezone_info(tz=args.source) target = clock.timezone_info(tz=args.target) if not "name" in source: raise Error("Unrecognized timezone: %s" % args.source) if not "name" in target: raise Error("Unrecognized timezone: %s" % args.target) try: numbers = args.time.split(":") numbers = [int(n.lstrip("0") or "0") for n in numbers] if len(numbers) > 3 or len(numbers) < 2: raise Error("Parser expected HH:MM[:SS]") except Exception as err: raise Error("Parser reports: " + str(err)) tobj = datetime.datetime(2000, 1, 1, *numbers) offset = source.offset - target.offset result = tobj - datetime.timedelta(seconds=offset * 3600) out.source_time = args.time out.source_code = args.source out.source_name = source.name out.target_code = args.target out.target_name = target.name out.target_time = result.strftime("%H:%M") return out @service(clock) def timezone_datetime(args): tz = clock.timezone_info(tz=args.tz) return clock.offset_datetime(**tz()) @service(clock) def timezone_info(args): out = duxlot.Storage() timezones = clock.timezones_data for tz in list(timezones.keys()): timezones[tz.lower()] = timezones[tz] if args.tz.lower() in timezones: out.name, out.offset = timezones[args.tz.lower()] else: raise Error("Unknown timezone: %s" % args.tz) return out @service(clock) def tock(args): out = duxlot.Storage() page = web.request(url="http://tycho.usno.navy.mil/cgi-bin/timer.pl") out.server = "tycho.usno.navy.mil" if "date" in page.headers: out.date = page.headers["date"] else: raise Error("Server %s didn't return a Date header" % out.server) return out @service(clock) def unix_date(args): fmt = args("format", "%d %b %Y, %H:%M:%S $TZ") if "zone" in args: if not clock.data.regex_zone.match(args.zone): raise Error("Bad zone format: %s" % args.zone) if not os.path.isfile("/usr/share/zoneinfo/" + args.zone): raise Error("Zone not supported: %s" % args.zone) # the fmt doesn't work cmd = ["TZ=%s date" % args.zone] # , "+'%s'" % fmt] else: cmd = ["date"] p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) return p.communicate()[0].decode("utf-8", "replace") @service(clock) def version_number(args): epoch = args("epoch", 2012) now = datetime.datetime.utcnow() major = now.year - epoch minor = now.month patch = now.day sub = "%02i%02i" % (now.hour, now.minute) return "%s.%s.%s-%s" % (major, minor, patch, sub) @service(clock) def yi(args): def divide(a, b): return (a / b), (a % b) quadraels, remainder = divide(int(time.time()), 1753200) raels = quadraels * 4 extraraels, remainder = divide(remainder, 432000) return True if (extraraels == 4) else False @service(clock) def zoneinfo_offset(args): out = duxlot.Storage() now = time.time() tzinfo = clock.parse_zoneinfo(filename=args.filename) transition, offset, dst, abbreviation = tzinfo[0] out.offset = offset out.abbreviation = abbreviation for transition, offset, dst, abbreviation in tzinfo[1:]: if now >= transition: out.offset = offset out.abbreviation = abbreviation else: break return out clock.data = duxlot.Storage() clock.data.regex_number = re.compile(r"^([+-]?[0-9]+(?:\.[0-9]+)?)$") clock.data.regex_zone = re.compile(r"^[A-Za-z]+(?:/[A-Za-z_]+)*$") # @@ regex_link = re.compile(r"(http[s]?://[^<> \"\x01]+)[,.]?") ### Module: General ### general = duxlot.Storage() general.name = "general" @service(general) def duxlot_version(args): with duxlot.filesystem.open(data("version"), "r", encoding="ascii") as f: version = f.read() version = version.rstrip() return version @service(general) def wolfram_alpha(args): page = web.request( url="http://www.wolframalpha.com/input/", follow=True, query={ "asynchronous": "false", "i": args.query } ) simples = { r"\/": "/", r"\'": "'", "": "", " °": "°", "~~": " ~", "~~ ": " ~" } patterns = { r"\\n( *\| *)?": ", ", r"~~ *\(": "~(", r"[ \t]+": " ", r"([0-9]{12})[0-9]+": r"\g<1>" } r_parens = re.compile(r"(\s*)\(\s*(.*?)\s*\)") def parentheses(text): def replacement(match): pre = " " if match.group(1) else "" content = match.group(2) return pre + "(" + content + ")" return r_parens.sub(replacement, text) r_superscript = re.compile(r"\^\(?(-?[0-9]+)\)?") def superscript(text): super = {"0": "⁰", "1": "¹", "2": "²", "3": "³", "4": "⁴", "5": "⁵", "6": "⁶", "7": "⁷","8": "⁸", "9": "⁹", "-": "⁻"} def replacement(match): characters = [] for character in match.group(1): characters.append(super.get(character, character)) return "".join(characters) return r_superscript.sub(replacement, text) def pretty(text): for key, value in simples.items(): text = text.replace(key, value) for key, value in patterns.items(): text = re.sub(key, value, text) text = re.sub(r"[ \t]+\|[ \t]+", ": ", text) text = parentheses(text) return superscript(text) first = "" r_stringified = re.compile(r'"stringified":\s"([^"]+)"') items = [] out = duxlot.Storage() out.expression = "" out.expressions = [] for i, stringified in enumerate(r_stringified.findall(page.text)): # print(">", stringified) exp = pretty(stringified) if not i: out.expression = exp[:] else: if exp.startswith(out.expression): exp = exp[len(out.expression):].lstrip(" =") out.expressions.append(exp) # print("*", exp) sep = "; " limit = args().get("limit", 1024) if not out.expression: out.text = "No results found" # @@ move to wa(...)? return out out.text = out.expression + " = " for exp in out.expressions: if len((out.text + exp + sep).encode("utf-8")) > limit: break out.text += exp + sep out.text = out.text.rstrip(sep) return out ### Module: Geo ### geo = duxlot.Storage() geo.name = "geo" @service(geo) def flight(args): page = web.request( url="http://dial-a-page.dpk.org.uk/flights/", query={"flight": args.flight} ) return json.loads(page.text) @service(geo) def timezone(args): out = duxlot.Storage() page = web.request( url="http://dial-a-page.dpk.org.uk/iptime/", query={"ip": args.ip} ) out.json = json.loads(page.text) out.zone = out.json["tzinfo"] out.tz = out.json["abbreviation"] out.offset = out.json["offset_hours"] return clock.offset_datetime(**out()) # argh @service(geo) def timezone_info(args): out = duxlot.Storage() page = web.request( url="http://dial-a-page.dpk.org.uk/iptime/", query={"ip": args.address} ) out.json = json.loads(page.text) out.zone = out.json["tzinfo"] out.tz = out.json["abbreviation"] out.offset = out.json["offset_hours"] return out ### Module: Google ### google = duxlot.Storage() google.name = "google" @service(google) def calculator(args): out = duxlot.Storage() substitutions = { "ϕ": "phi", "π": "pi", "tau": "(pi*2)", "τ": "(pi*2)" } expression = args.expression for a, b in substitutions.items(): expression = expression.replace(a, b) out.expression_substituted = expression page = web.request( url="http://www.google.com/ig/calculator", query={"q": expression} ) out.url = page.url def parse(text): text = text.strip("{}") regex_entry = re.compile(r"(\w+):\s*\"([^\"]*)\",?\s*") while text: match = regex_entry.match(text) if not match: break yield match.groups() text = text[match.end():] fields = dict(parse(page.text)) out.google_left = fields.get("lhs") out.google_right = fields.get("rhs") if fields.get("error"): raise Error("Google indicates that the input may be malformed") right = fields.get("rhs", "") if right: right = right.encode("iso-8859-1") right = right.decode("unicode-escape") substitutions = { "": "^(", "": ")", "\xA0": "," # nbsp } for a, b in substitutions.items(): right = right.replace(a, b) # this html.decode_entities is needed: source is JSON, not HTML out.response = html.decode_entities(html=right) else: raise Error("Google indicates a bad 'rhs' field. Malformed input?") return out @service(google) def count(args): arg = args.phrase # @@ unused, move to some formatting collection? def concise(number): if number.endswith(",000,000,000"): return number[:-12] + "b" if number.endswith(",000,000"): return number[:-8] + "m" if number.endswith(",000"): return number[:-4] + "k" return number if args.method in {None, "", "*", "all"}: a = google.count_api(phrase=arg) # v = google.count_verbatim(phrase=arg) # e = google.count_end(phrase=arg) s = google.count_site(phrase=arg) return ", ".join(( a + " (api)", # v + " (vend)", # e + " (end)", s + " (site)" )) elif args.method in {"a", "api"}: return google.count_api(phrase=arg) elif args.method in {"v", "vend"}: return google.count_verbatim(phrase=arg) elif args.method in {"e", "end"}: return google.count_end(phrase=arg) elif args.method in {"s", "site"}: return google.count_site(phrase=arg) raise Error("Unknown method: %s" % args.method) @service(google) def count_api(args): data = google.search_api_json(**args()) if "responseData" in data: if "cursor" in data["responseData"]: if "estimatedResultCount" in data["responseData"]["cursor"]: count = data["responseData"]["cursor"]["estimatedResultCount"] return format(int(count), ",") return "0" # raise Error("Google API JSON didn't contain an estimated result count") @service(google) def counts_api(args): terms = search.terms(text=args.terms) method = args.method if len(terms) > 6: raise Error("Can only compare up to six terms inclusive") results = [] for i, term in enumerate(terms): term = term.strip("[]") # bleh, "phrase=term". also use "query" too if method in {"a", "api"}: count = google.count_api(phrase=term) elif method in {"v", "vend"}: count = google.count_verbatim(phrase=term) elif method in {"e", "end"}: count = google.count_end(phrase=term) elif method in {"s", "site"}: count = google.count_site(phrase=term) else: raise Error("Unknown method: %s" % method) count = count.replace(",", "") # @@ # except api.Error: count = "0" results.append((int(count), term)) time.sleep(i * 0.2) results = list(reversed(sorted(results))) return ", ".join("%s (%s)" % (b, format(a, ",")) for (a, b) in results) @service(google) def count_site(args): regex_google_site_results = re.compile(r"(?i)([0-9,]+) results?") regex_google_end_results = re.compile( r"(?i)very similar to the ([0-9,]+) already displayed" ) query = { "hl": "en", "safe": "off", "nfpr": "1", "q": args.phrase } option = args("option") if option in {"end", "verbatim"}: query["prmd"] = "imvns" query["start"] = "950" if option == "verbatim": query["tbs"] = "li:1" page = web.request( url="https://www.google.com/search", query=query ) if "No results found for" in page.text: return "0" elif "did not match any documents" in page.text: return "0" if "start" in query: for result in regex_google_end_results.findall(page.text): return result for result in regex_google_site_results.findall(page.text): return result @service(google) def count_end(args): return google.count_site(option="end", **args()) @service(google) def count_verbatim(args): return google.count_site(option="verbatim", **args()) @service(google) def dictionary(args): ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.1 " ua += "(KHTML, like Gecko) Chrome/21.0.1180.82 Safari/537.1" page = web.request( url="https://www.google.com/search", # ?q=%s&tbs=dfn:1" % urllib.parse.quote(args.term), headers={"user-agent": ua}, query={ "q": args.term, "tbs": "dfn:1" } ) regex_whitespace = re.compile(r"[ \t\r\n]+") regex_definition = re.compile(r'(?ims)
)")
regex_twitter_tag = re.compile(r"(?ims)<[^>]+>")
regex_twitter_anchor = re.compile(r"(?ims)( ") and block.endswith(" " in line):
etymology = text(line)
elif (mode is not None) and ("(.*?) ')
def normalise(text):
text = html.scrape(html=text)
text = text.replace("(Brit.)", "")
text = text.replace("adj.", "a.")
text = text.replace("[", "(")
text = text.replace("]", ")")
return regex_whitespace.sub(" ", text.strip())
definitions = [normalise(td) for td in regex_td.findall(page.text)]
pairs = zip(definitions[::2], definitions[1::2])
pairs = [(a, b) for a, b in pairs if not '(Amer.)' in a and not '(Amer.)' in b]
order = []
translations = {}
for a, b in pairs:
if args.term in b:
a, b = b, a
try: translations[a].append(b)
except KeyError:
order.append(a)
translations[a] = [b]
result = []
for entry in order[:5]:
result.append(entry + " = " + ", ".join(translations[entry][:5]))
result[-1] = result[-1].replace(" | ", ", ").strip(" ,|")
out.results = result
out.text = " / ".join(result)
out.url = page.url
return out
@service(word)
def rhymes(args):
if not args.word.isalpha():
raise Error("Word must be alphabetical only")
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.1 "
ua += "(KHTML, like Gecko) Chrome/21.0.1180.82 Safari/537.1"
page = web.request(
url="http://www.rhymezone.com/r/rhyme.cgi",
query={
"Word": args.word,
"typeofrhyme": "perfect",
# "org1": "syl",
# "org2": "l",
# "org3": "y"
},
headers={
"user-agent": ua,
# "referer": "http://www.rhymezone.com/"
}
)
if "was not found" in page.text:
return "Can't find words that rhyme with %s" % args.word
results = []
length = 0
text = page.text.split("syllable", 1).pop()
text = text.split("
(.*?)", text):
word = html.scrape(html=bold)
if word == "Word:":
return "Can't find words that rhyme with %s" % args.word
if (" " in word) or ("\xA0" in word):
continue
results.append(word)
length += len(word) + 2
if length >= 256:
results.append("...")
break
return ", ".join(results) + " (rhymezone.com)"
@service(word)
def thesaurus(args):
if not args.word.isalpha():
raise Error("Word must be alphabetical only")
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.1 "
ua += "(KHTML, like Gecko) Chrome/21.0.1180.82 Safari/537.1"
page = web.request(
url="http://thesaurus.com/browse/" + args.word.lower(),
query={"s": "t"},
headers={
"user-agent": ua,
# "referer": "http://www.rhymezone.com/"
}
)
page = page.text
if "Concept Thesaurus" in page:
page = page.split("Concept Thesaurus")[0]
findall = re.findall("(?ims)Synonyms:(.*?)", page)
if not findall:
return "Can't find synonyms of %s" % args.word
findall = sorted(findall, key=len)
synonyms = findall.pop()
text = html.scrape(html=synonyms)
text = text.replace(", ", ",").replace(",", ", ").replace("*", "")
return text.strip()[:256] + " (thesaurus.com)"
@service(word)
def wiktionary(args):
article = word.wiktionary_article(**args())
return word.wiktionary_format(**article())
@service(word)
def wiktionary_article(args):
out = duxlot.Storage()
regex_wiktionary_ul = re.compile(r"(?ims).*?
")
def text(input):
text = html.scrape(html=input)
text = text.replace("\n", " ")
text = text.replace("\r", "")
text = text.replace("(intransitive", "(intr.")
text = text.replace("(transitive", "(trans.")
return text
page = web.request(
url="http://en.wiktionary.org/w/index.php",
query={"title": args.word, "printable": "yes"}
)
content = page.text
content = regex_wiktionary_ul.sub("", content)
mode = None
etymology = None
definitions = {}
for line in content.splitlines():
if 'id="Etymology"' in line:
mode = "etymology"
elif 'id="Noun"' in line:
mode = "noun"
elif 'id="Verb"' in line:
mode = "verb"
elif 'id="Adjective"' in line:
mode = "adjective"
elif 'id="Adverb"' in line:
mode = "adverb"
elif 'id="Interjection"' in line:
mode = "interjection"
elif 'id="Particle"' in line:
mode = "particle"
elif 'id="Preposition"' in line:
mode = "preposition"
elif 'id="' in line:
mode = None
elif (mode == "etmyology") and ("