#!/usr/bin/python """absenceData.py -- convert absence.db to RDF Calendar Dan Connolly Sep 2005 W3C Open Source. Share and Enjoy. part of RDF Calendar Workspace http://www.w3.org/2002/12/cal/ """ __version__ = "$Id: absenceData.py,v 1.10 2007/11/29 13:54:34 connolly Exp $" import cgi import re RDF_mediaType = "application/rdf+xml" #DataFile = '/usr/local/apache/htdocs/absence/absence.db' def main(argv, DataFile='absence.db', Current="2007-04-01", Dummy_User_Id = '71'): wr = RDFOut(sys.stdout.write) def checkEvent(e): if e['person_id'] == Dummy_User_Id: return False if isoDate(e['start']) < Current: return False return True lines = file(DataFile) sections = eachSection(lines) rdfcal(wr, sections, checkEvent) def serveRequest(env, DataFile='absence.db'): print "Status: 200 OK" print "Content-Type: %s" % RDF_mediaType print def checkEvent(e): return True sections = eachSection(file(DataFile)) rdfcal(wr, sections, checkEvent) def rdfcal(wr, sections, checkEvent): """Write absence data as RDF @param wr: an RDFOut @param sections: an interator over absence data sections @param checkEvent: a boolean function of absence event records to include """ cal = Namespace('c', 'http://www.w3.org/2002/12/cal/icaltzd#') foaf = Namespace('foaf', 'http://xmlns.com/foaf/0.1/') wr.start((RDF, cal, foaf)) n, groups = sections.next() for group in groups: gt = wr.term('group_%s' % group['id']) wr.add(gt, RDF.type, foaf.Group) wr.add(gt, foaf.name, olit = group['name']) who = {} # map ids to people n, dummy = sections.next() # USERS: n, people = sections.next() assert n == 'PEOPLE' for record in people: who[record['id']] = record pt = wr.term("person_%s" % record['id']) wr.add(pt, RDF.type, foaf.Person) if record.has_key("email"): wr.add(pt, foaf.mbox, ouri="mailto:%s" % record['email']) wr.add(pt, foaf.name, olit = record['name']) for group in record['group'].split(","): gt = wr.term("group_%s" % group) wr.add(gt, RDF.type, foaf.Group) wr.add(gt, foaf.member, pt) n, reservations = sections.next() assert n == 'RESERVATIONS' ct = wr.term("cal") wr.add(ct, RDF.type, cal.Vcalendar) for e in reservations: if not checkEvent(e): continue et = wr.term("res_" + e['id']) wr.add(ct, cal.component, et) wr.add(et, RDF.type, cal.Vevent) wr.add(et, cal.dtstart, olit=isoDate(e['start'])) #hmm... datatype? wr.add(et, cal.dtend, olit=isoDate(e['end'])) #@@ add one day p = e['person_id'] person = who[p] pt = wr.term("person_%s" % e['person_id']) wr.add(et, cal.attendee, pt) if person.has_key('email'): wr.add(pt, cal.calAddress, ouri = 'mailto:%s' % person['email']) wr.add(pt, cal.cn, olit = person['name']) wr.add(et, cal.categories, olit=e['type']) if e.has_key('desc'): wr.add(et, cal.summary, olit=e['desc']) wr.end() def isoDate(dmy): d, m, y = dmy.split('.') d = int(d) m = int(m) y = int(y) return "%04d-%02d-%02d" % (y, m, d) ######## # RDF Serializer, quick n dirty class Namespace(): def __init__(self, pfx, t): self._pfx = pfx self._t = t self._terms = {} def term(self, n): return self.__getattr__(n) def __getattr__(self, n): d = self._terms try: return d[n] except KeyError: t = Term(self, n) d[n] = t return t RDF = Namespace('rdf', 'http://www.w3.org/1999/02/22-rdf-syntax-ns#') class Term(object): def __init__(self, ns, ln): self._ns = ns self._ln = ln def qname(self): return "%s:%s" % (self._ns._pfx, self._ln) def uri(self, basens): if self._ns is basens: return "#" + self._ln else: return "%s%s" % (self._ns._t, self._ln) class RDFOut(Namespace): """Simple RDF serializer. No nesting, no blank nodes. We assume one global namespace prefix mapping """ def __init__(self, w, base="file:/dev/stdout#"): Namespace.__init__(self, "out", base) self._w = w self._subj = None self._stag = None # tag used for subject element def start(self, namespaces): assert RDF in namespaces attrs = {} for ns in namespaces: attrs["xmlns:%s" % ns._pfx] = ns._t startTag(self._w, RDF.RDF.qname(), attrs) def end(self): if self._stag: endTag(self._w, self._stag) endTag(self._w, RDF.RDF.qname()) self._w("\n") def add(self, s, p, oterm = None, ouri = None, olit = None): w = self._w if s != self._subj: if self._stag: endTag(w, self._stag) self._stag = None self._subj = None if p is RDF.type and oterm: tagn = oterm.qname() startTag(w, tagn, {RDF.about.qname(): s.uri(self)}) self._subj = s self._stag = tagn return if s != self._subj: e = RDF.Description.qname() startTag(w, e, {RDF.about.qname(): s.uri(self)}) self._subj = s self._stag = e if oterm: e = p.qname() startTag(w, e, {RDF.resource.qname(): oterm.uri(self)}) endTag(w, e) elif olit is not None: e = p.qname() startTag(w, e) doChars(w, olit) endTag(w, e) elif ouri: e = p.qname() startTag(w, e, {'rdf:resource': ouri}) endTag(w, e) else: raise RuntimeError, "must give one of oterm, ouri, lit" ######## # format the records as XML def asXML(lines): w = sys.stdout.write startTag("absence") for n, records in eachSection(lines): for record in records: startTag(w, n, record, empty=1) endTag("absence") def startTag(w, n, attrs={}, empty=0): w("<%s" % n) for n, v in attrs.iteritems(): w("\n %s='" % n) doChars(w, v) w("'") if empty: w("\n/>") else: w("\n>") markupChar = re.compile(r"[\n\r<>&']") def doChars(w, ch, start=0, length=-1): if length<0: length = len(ch) else: length = start+length i = start while i < length: m = markupChar.search(ch, i) if not m: w(ch[i:].encode('utf-8')) break j = m.start() w(ch[i:j].encode('utf-8')) w("&#%d;" % (ord(ch[j]),)) i = j + 1 def endTag(w, n): w("" % n) ########## # parse absence data # def eachSection(lines): l = None while 1: if not l: l = lines.next() l = l.strip() if l.endswith(":"): section = l[:-1] records = [] print >>sys.stderr, "@@found section:", section while 1: try: l = lines.next() except StopIteration: yield section, records raise StopIteration l = l.strip() if l == 'START': record = {} while 1: l = lines.next() l = l.strip() if l == 'END': records.append(record) break else: n, v = l.split(':', 1) v = v.decode('iso8859-1').strip() record[n] = v else: break else: raise ValueError, "expected FOO:; found: %s" % l yield section, records print >>sys.stderr, "@@fell off end of loop. how?!?!" ############ def _test(): DataFile = 'absence.db' asCalendar(file(DataFile)) ############ if __name__ == '__main__': import sys, os if '--test' in sys.argv: _test() elif 0: #@@ cgi serveRequest(os.environ) else: main(sys.argv)