Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARQL operations in-memory #255

Closed
giacomociti opened this issue Jan 9, 2024 · 11 comments · Fixed by #271
Closed

SPARQL operations in-memory #255

giacomociti opened this issue Jan 9, 2024 · 11 comments · Fixed by #271

Comments

@giacomociti
Copy link
Contributor

we can use the oxygraph in-memory store to run SPARQL on each chunk in a pipeline (assuming the chunk is a dataset or a quads array), for example:

<main> a p:Pipeline , p:Readable ;
  p:steps
    [
      p:stepList ( 
        [ base:stdin () ]
        [ n3:parse () ]
        [ splitDataset:bySubject () ]
        [ sparql:update ("""
            PREFIX ex1: <http://example1.org/>
            PREFIX ex2: <http://example2.org/>
            
            DELETE { ?s ex1:knows ?o }
            INSERT { 
                ?s ex2:knows ?o .
                ?o ex2:knows ?s .
            } 
            WHERE { ?s ex1:knows ?o }
          """
        )]
        [ base:flatten () ]
        [ ntriples:serialize () ]
      )
    ]
.

The implementation could be:

import { Transform } from 'stream'
import oxigraph from 'oxigraph'

export const update = sparql =>
  new Transform({
    objectMode: true,
    transform: (chunk, encoding, callback) => {
      const store = new oxigraph.Store([...chunk])
      store.update(sparql)
      callback(null, store.match())
    },
  })

export const query = sparql =>
  new Transform({
    objectMode: true,
    transform: (chunk, encoding, callback) => {
      const store = new oxigraph.Store([...chunk])
      const result = store.query(sparql)
      callback(null, result)
    },
  })

I think it's useful to have both an update operation yielding the modified chunk and a query operation yielding only the query (CONSTRUCT) results.

Even though the implementation is simple enough, I think it's yet another valuable addition in the toolbox.
I'm not sure if it should be in its own package or we can add this to the sparql package (the main issue is whether we want to add the oxigraph dependency in a package mainly used to query http endpoints).

@tpluscode
Copy link
Contributor

OOH this is super cool

@tpluscode
Copy link
Contributor

It's a good question whether we could add this to the b59:sparql package. If we find good and descriptive identifiers for the manifest, that should be enough. What about a dedicated ns to separate from the remote SPARQL calls

prefix in-memory-sparql: <.../operations/sparql/in-memory/>

[
  in-memory-sparql:update ( "DELETE/INSERT" )
]
[
  in-memory-sparql:query ( "CONSTRUCT" )
] 

@tpluscode
Copy link
Contributor

tpluscode commented Jan 10, 2024

On a related note, I'd like SPARQL loaded from files, supporting VALUES

[ sparql:update (
  [
    a code:SparqlQuery ;
    code:link <file:query.rq> ;
    code:arguments [
      code:name "foo" ;
      code:value "bar"^^code:VariableName ;
    ]
  ]
)]

That would insert VALUES ?foo { "value of bar" } to the query

@giacomociti
Copy link
Contributor Author

loading queries from files and injecting values are two things I strongly support.
I was even experimenting with the injection of values coming as bindings from a preceding step:

[ in-memory-sparql:query ( """
   SELECT ?s ?name
    WHERE { ?s schema:name ?name }
   """ )
] 
[ sparql:update ("""
INSERT { ?s schema:name ?name }
WHERE {
  VALUES (?s ?name) {
      # values to be injected here
  }
}
""")]

The following code snippet injects the bindings in the VALUES clause:

import sparqljs from 'sparqljs'

const setValues = (results, query) => {
  const parsed = new sparqljs.Parser().parse(query)
  const values = parsed.where.find(({ type }) => type === 'values').values
  results.forEach(bindings => {
    const obj = {}
    for (const [key, value] of bindings) {
      obj[`?${key}`] = value
    }
    values.push(obj)
  })
  return new sparqljs.Generator().stringify(parsed)
}

@tpluscode
Copy link
Contributor

We're veering off subject but I'm not seeing how a setup like above would work. In stream logic, I expect the SELECT step to push binding chunks from the resultset. Then what would the INSERT operate on?

I think this would have to be set up more like the SHACL steps, where the shapes are loaded from a stream which is passed in as argument. Expanding on my snippet above, I imagine, using positional argument instead of named arguments, that you could be able to provide the result of a SELECT query (or any other stream of objects with the values of VALUES)

[ sparql:update (
  [
    a code:SparqlQuery ;
    code:link <file:query.rq> ;
    code:arguments ( [
      sparql:select ( "SELECT ..." )
    ] )
  ]
)]

Note the we cannot directly use a step outside a pipeline but I don't see why this wouldn't be made possible.

@giacomociti
Copy link
Contributor Author

what you suggest probably makes more sense, anyway that was just a digression. We can start with the basic thing and later discuss about adding more features

@ktk
Copy link
Member

ktk commented Jan 11, 2024

What popped up in my head last night, do we have the equivalent for Notation-3/eye or not yet? Because I think that would work in a similar way, just different language?

@giacomociti
Copy link
Contributor Author

for Notation-3/eye there's a dedicated issue with a suggested prototype implementation. But once the batch operation is merged, I suggest a simpler implementation much like this in-memory SPARQL step:

import { Transform } from 'stream'
import fs from 'fs'
import { n3reasoner } from 'eyereasoner'
import { Parser } from 'n3'

async function infer(rules) {
  const parser = new Parser({ format: 'text/n3' })
  const text = fs.readFileSync(rules).toString()
  const ruleQuads = parser.parse(text)

  return new Transform({
    objectMode: true,
    async transform(chunk, encoding, callback) {
      const result = await n3reasoner(chunk, ruleQuads)
      callback(null, result)
    },
  })
}

export default infer

although loading rules from a file like in the example above may not be the best option.

@giacomociti
Copy link
Contributor Author

what I really wanted to address in a previous comment is a use-case in which each chunk is a dataset and we want to enrich it with data from an external endpoint. I first tried with federation:

INSERT { ?s :city ?city }
WHERE {
    ?s :name ?name
    SERVICE <http://my.endpoint/query> {
        ?s :city ?city
    }
}

but apparently federation is not supported in oxigraph for js. So I thought of manually implementing it with VALUES injection. But what I showed in that comment is not enough. We need a step which does three things:

  • select values from the chunk
SELECT ?s WHERE { ?s :name ?name }
  • query the endpoint injecting the bindings
CONSTRUCT { ?s :city ?city } WHERE { 
  ?s :city ?city 
  VALUES ?s { :a :b :c }
}
  • add the triples from the endpoint to the chunk

We could implement such an operation with arguments for the in-memory SELECT query, the remote CONSTRUCT query and the endpoint URL. But before proposing such an ad-hoc solution, I'm wondering about Eigensolutions: can we figure out some lower level, more general operations combining which we can achieve the same?

@ktk
Copy link
Member

ktk commented Jan 16, 2024

Not what you asked but another remark: I see you use splitDataset as well, I think this should be easier for people to use. I had to talk to @tpluscode to get that to work properly, and I also simply wanted a dataset. In other words, the whole duplex stuff should IMO be abstracted away.

The method itself looks like this now:

<readFile> a p:Pipeline , p:Readable;
  p:steps
    [
      p:stepList (
        [ op:core\/fs\/createReadStream ( "simple.nt" ) ]
        [ op:formats\/n3\/parse () ]
        [ op:rdf\/splitDataset\/bySubject ()]
        <opensearchPush>
      )
    ] .

<opensearchPush> a p:Step ;
  code:implementedBy
  [
    a code:EcmaScriptModule ;
    code:link <file:../lib/opensearch.js#push>
  ]
 .
export function push () {
  const { logger } =  this
  return Duplex.from(async function * (stream) {
    for await (const dataset of stream) {
      // do something with dataset

    }
  })
}

@giacomociti
Copy link
Contributor Author

not sure I get your point, isn't base.map the abstraction needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants