triple stream processing

i figure i'd share my approach

essentially the problem is, given a stream of triples, transform it somehow..

 Triple -> Triple is out as a type signature,
  as you might want to return multiple triples (merging to SIOC but keeping original, or adding provenance..)
  or 0 triples. (grep/filtering, access-control,bandwidth-saving for API/views requiring only certain fields)

 Triple -> [Triple] is a possible typesig

 applied to [t,t.t.t] results in

  [[t,t],[],[t],[t,t,t]],

rules out #flatten as solution to cleaning up, i'd like to not rule out Array to hold a triple's components

ive got a 1GB NT file that RDF::NTriples is going to provide, triple by triple


NTriples('stream.nt) do |s,p,o|

  actA, actB, actC..

end 

also i'm using RDF files as configuration, and a triple serves as a name of a source file to provide (again too much to comfortably load into RAM)

the array creation Triple->[Triple] entails is not a solution to these problems


(a simple grep() style scenario on triplestreams that never return more than they are input, + a clever composition function (internally using #concat/*splat and intermediate output buffer(s)) on a sequence of filterer functions could at least support the first case (notnecessarily w/ the comfort of knowing it wont blow up in your face w OOM-kills tho), maybe this is enough for most devs, i dont know)


ruby blocks make this relatively comfortable (without having to go full-hog into Iteratee[1]/Conal-land[2]) so i'd like to just use ruby 1.8 features, without building a custom "pipeline" library on Ruby coroutiness in 1.9 (Aka Fibers)


this is how it works

def filterTriples *a
 send(a) do |s,p,o|
 yield s',p',o'
end

send() instantiates the first argument (a closure/stack-frame/whathaveyou), recursing until an entire pipeline is setup triples are yielded through the stack.

 parsing a raw feed,
 normalizing various date fields to DC::Date,
 running thru a stdlib parser/serialier so its iso8601 clean,
 converting various predicates to SIOC before yielding up to whatever may want it:

the two typesignatures reflect sources vs filters. sinks (not pictured here) provide blocks but no yield

# tripleStream
  def feed &f 
    dateNorm :feedSIOCize,:feedRaw,&f
  end
 

# tripleStream
  def feedRaw &f
    read.extend(FeedParse).parse &f
  end

# tripleStream -> tripleStream
  def dateNorm *f
    send(*f){|s,p,o|
      yield *({E::RSS+'pubDate' => true,
                E::Date => true,
                E::Purl+'dc/elements/1.1/date' => true,
                Atom+'published' => true,
                Atom+'updated' => true
              }[p] ?
              [s,
               Date,
               Time.parse(o).utc.iso8601] :[s,p,o])} end

# tripleStream -> tripleStream
  def feedSIOCize *f
    send(*f){|s,p,o|
      yield s,
      { Purl+'dc/elements/1.1/creator' => Creator,
        Purl+'dc/elements/1.1/subject' => SIOC+'subject',
        Atom+'author' => Creator,
        RSS+'description' => Content,
        Atom+'content' => Content,
        RSS+'title' => Title,
        Atom+'title' => Title,
      }[p]||p,
      o } end


anyways..


[1] http://okmij.org/ftp/Streams.html
[2] http://conal.net/papers/

Received on Tuesday, 14 September 2010 15:45:05 UTC