#!/usr/bin/python """ NTriples Tools: Parses and serializes N-Triples documents. http://infomesh.net/2001/10/ntriples/ Built on Aaron Swartz's RDF API: http://blogspace.com/rdf/rdfapi.txt cf. http://www.w3.org/TR/2001/WD-rdf-testcases-20010912/#ntriples """ import sys, string, re, urllib import rdfapi as rdf __author__ = "Sean B. Palmer with Aaron Swartz" __version__ = '$Id$' __license__ = """Copyright (C) 2001 Sean B. Palmer. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.""" DO_UNIVARS = 1 def parse(document, store=rdf.Store()): bNodes, univars, CT, rc = {}, {}, [], re.compile(r'(\#[^\n]*)') rw = re.compile(r'[ \t]+') if DO_UNIVARS: t = r'(<[^>]*>|_:[^\s]+|\?[^\s]+|\"(?:\\\"|[^"])*\")' else: t = r'(<[^>]*>|_:[^\s]+|\"(?:\\\"|[^"])*\")' rt = re.compile(r'[ \t]*'+t+r'[ \t]+'+t+r'[ \t]+'+t+r'[ \t]*.[ \t]*') if len(document) == 0: raise 'Document has no content' # Normalize the new lines in document document = string.replace(document, '\r\n', '\n') document = string.replace(document, '\r', '\n') # Parse document into tripleList lines = string.split(document, '\n') for line in lines: if len(line) == 0: continue # line has no content (a double '\n') elif rt.match(line): terms = rt.findall(line)[0] for t in terms: if t[0] == '<' and t[-1] == '>': CT.append(rdf.Node(t[1:-1])) elif t[:2] == '_:': # Term is an unlabelled node: bNode bNode = t[2:] if re.compile(r'[A-Za-z][A-Za-z0-9]*', re.S).match(bNode): if not bNode in bNodes.keys(): bNodes[bNode] = rdf.Node(None) CT.append(bNodes[bNode]) else: raise 'bnode: "'+bNode+'" is not a valid bNode' elif t[0] == '?' and DO_UNIVARS: u = t[1:] if re.compile(r'[A-Za-z][A-Za-z0-9]*', re.S).match(u): if not u in univars.keys(): univars[u] = rdf.Node(None, uni=1) CT.append(univars[u]) else: raise 'univar: "'+u+'" is not a valid univar' elif t[0] == '"' and t[-1] == '"': CT.append(rdf.Node(unicode(t[1:-1]))) else: raise 'Term '+str(t)+' is not a valid NTriples term.' store.triple(CT[0], CT[1], CT[2]) CT = [] # Reset the current triple elif rc.match(line): continue # Line is a comment elif rw.match(line): continue # Line is just whitespace else: raise "Line is invalid", line # Validity error return store def serialize(store): """Prints out as NTriples (co-written).""" nMap, nNum, uMap, uNum, nt, utol = {}, 0, {}, 0, '', rdf.URIToLiteral for t in store.tripleList: for pos in [t.subject, t.predicate, t.object]: if hasattr(pos, 'universal') and pos not in uMap.keys(): uNum += 1; uMap[pos] = 's'+`uNum` elif not hasattr(pos, 'uri') and pos not in nMap.keys(): nNum += 1; nMap[pos] = 's'+`nNum` for pos in [t.subject, t.predicate, t.object]: if pos in nMap.keys(): nt += '_:'+nMap[pos]+' ' elif pos in uMap.keys(): nt += '?'+uMap[pos]+' ' elif pos.uri[:6] == 'data:,': nt += '"'+utol(pos.uri)+'" ' else: nt += '<'+pos.uri+'> ' nt += '.\n' return string.rstrip(nt) # Main program if __name__ == "__main__": x = parse(open(sys.argv[1]).read()) print serialize(x) # Phew