Skip to content

Commit

Permalink
Optimize entity map
Browse files Browse the repository at this point in the history
  • Loading branch information
bserdar committed Nov 8, 2022
1 parent 39b2115 commit 66ea392
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 49 deletions.
4 changes: 4 additions & 0 deletions layers/cmd/ingest_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,10 @@ func (cji *CSVJoinIngester) Run(pipeline *pipeline.PipelineContext) error {
var doneErr error
var newGraphStart int = cji.StartRow
for row := 0; !done; row++ {
pipeline.EntryLogger(pipeline, map[string]interface{}{
"input": entryInfo.GetName(),
"row": row,
})
func() {
defer func() {
if err := recover(); err != nil {
Expand Down
75 changes: 66 additions & 9 deletions pkg/ls/docgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ls

import (
"strings"

"github.com/cloudprivacylabs/lpg"
)

Expand All @@ -37,18 +39,16 @@ func RegisterNewDocGraphHook(f func(*lpg.Graph)) {

// EntityInfo contains the entity information in the doc graph
type EntityInfo struct {
root *lpg.Node
sch string
root *lpg.Node
sch string
valueType []string
id []string
}

func (e EntityInfo) GetRoot() *lpg.Node { return e.root }
func (e EntityInfo) GetEntitySchema() string { return e.sch }
func (e EntityInfo) GetID() []string {
return AsPropertyValue(e.root.GetProperty(EntityIDTerm)).MustStringSlice()
}
func (e EntityInfo) GetValueType() []string {
return FilterNonLayerTypes(e.root.GetLabels().Slice())
}
func (e EntityInfo) GetID() []string { return e.id }
func (e EntityInfo) GetValueType() []string { return e.valueType }

// GetEntityInfo returns all the nodes that are entity roots,
// i.e. nodes containing EntitySchemaTerm
Expand All @@ -58,12 +58,69 @@ func GetEntityInfo(g *lpg.Graph) map[*lpg.Node]EntityInfo {
node := nodes.Node()
sch := AsPropertyValue(node.GetProperty(EntitySchemaTerm)).AsString()
if len(sch) > 0 {
ret[node] = EntityInfo{root: node, sch: sch}
types := FilterNonLayerTypes(node.GetLabels().Slice())
ret[node] = EntityInfo{
root: node,
sch: sch,
valueType: types,
id: AsPropertyValue(node.GetProperty(EntityIDTerm)).MustStringSlice(),
}
}
}
return ret
}

// GetEntityInfoIndex returns a fast-access entity info
func GetEntityInfoIndex(g *lpg.Graph) EntityInfoIndex {
return IndexEntityInfo(GetEntityInfo(g))
}

type EntityInfoIndex struct {
indexByType map[string]map[string][]*lpg.Node
}

func (e EntityInfoIndex) getFkHash(fk []string) string {
if len(fk) == 1 {
return fk[0]
}
return strings.Join(fk, " ")
}

func (e EntityInfoIndex) Find(entityName string, fk []string) []*lpg.Node {
m := e.indexByType[entityName]
if m == nil {
return nil
}
h := e.getFkHash(fk)
return m[h]
}

// IndexEntityInfo returns a fast-access version of entity info
func IndexEntityInfo(entityInfo map[*lpg.Node]EntityInfo) EntityInfoIndex {
ix := EntityInfoIndex{
indexByType: make(map[string]map[string][]*lpg.Node),
}

add := func(t, hash string, node *lpg.Node) {
m := ix.indexByType[t]
if m == nil {
m = make(map[string][]*lpg.Node)
ix.indexByType[t] = m
}
m[hash] = append(m[hash], node)
}
for node, ei := range entityInfo {
hash := ix.getFkHash(ei.GetID())
add(ei.sch, hash, node)
for _, t := range ei.valueType {
if t != ei.sch {
add(t, hash, node)
}
}
}
return ix
}

// GetParentDocumentNodes returns the document nodes that have incoming edges to this node
func GetParentDocumentNodes(node *lpg.Node) []*lpg.Node {
out := make(map[*lpg.Node]struct{})
Expand Down
16 changes: 5 additions & 11 deletions pkg/ls/graphbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,7 @@ func (gb GraphBuilder) ValueAsProperty(schemaNode *lpg.Node, graphPath []*lpg.No
if !schemaNode.HasLabel(AttributeTypeValue) {
return ErrSchemaValidation{Msg: "A value expected here"}
}
asPropertyOf := AsPropertyValue(schemaNode.GetProperty(AsPropertyOfTerm)).AsString()
propertyName := AsPropertyValue(schemaNode.GetProperty(PropertyNameTerm)).AsString()
if len(propertyName) == 0 {
propertyName = AsPropertyValue(schemaNode.GetProperty(AttributeNameTerm)).AsString()
}
if len(propertyName) == 0 {
propertyName = GetNodeID(schemaNode)
}
asPropertyOf, propertyName := GetIngestAsProperty(schemaNode)
if len(propertyName) == 0 {
return ErrCannotDeterminePropertyName{SchemaNodeID: GetNodeID(schemaNode)}
}
Expand Down Expand Up @@ -521,7 +514,7 @@ func (gb GraphBuilder) NewUniqueEdge(fromNode, toNode *lpg.Node, label string, p
// `spec` is the link spec. `docNode` contains the ingested document
// node that will be linked. It can be nil. `parentNode` is the
// document node containing the docNode.
func (gb GraphBuilder) LinkNode(spec *LinkSpec, docNode, parentNode *lpg.Node, entityInfo map[*lpg.Node]EntityInfo) error {
func (gb GraphBuilder) LinkNode(spec *LinkSpec, docNode, parentNode *lpg.Node, entityInfo EntityInfoIndex) error {
entityRoot := GetEntityRoot(parentNode)
if entityRoot == nil {
return ErrCannotResolveLink(*spec)
Expand Down Expand Up @@ -623,6 +616,7 @@ func (gb GraphBuilder) LinkNode(spec *LinkSpec, docNode, parentNode *lpg.Node, e
}

func (gb GraphBuilder) LinkNodes(ctx *Context, schema *Layer, entityInfo map[*lpg.Node]EntityInfo) error {
eix := IndexEntityInfo(entityInfo)
for nodes := schema.Graph.GetNodes(); nodes.Next(); {
attrNode := nodes.Node()
ls, err := GetLinkSpec(attrNode)
Expand Down Expand Up @@ -653,12 +647,12 @@ func (gb GraphBuilder) LinkNodes(ctx *Context, schema *Layer, entityInfo map[*lp
}
// childNode is an instance of attrNode, which is a link
childFound = true
if err := gb.LinkNode(ls, childNode, parent, entityInfo); err != nil {
if err := gb.LinkNode(ls, childNode, parent, eix); err != nil {
return err
}
}
if !childFound {
if err := gb.LinkNode(ls, nil, parent, entityInfo); err != nil {
if err := gb.LinkNode(ls, nil, parent, eix); err != nil {
return err
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/ls/ingestdoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ func GetIngestAs(schemaNode *lpg.Node) string {
return "node"
}

func GetIngestAsProperty(schemaNode *lpg.Node) (asPropertyOf, propertyName string) {
asPropertyOf = AsPropertyValue(schemaNode.GetProperty(AsPropertyOfTerm)).AsString()
propertyName = AsPropertyValue(schemaNode.GetProperty(PropertyNameTerm)).AsString()
if len(propertyName) == 0 {
propertyName = AsPropertyValue(schemaNode.GetProperty(AttributeNameTerm)).AsString()
}
if len(propertyName) == 0 {
propertyName = GetNodeID(schemaNode)
}
return
}

func ingestWithCursor(builder GraphBuilder, cursor ingestCursor) (bool, *lpg.Node, error) {
root := cursor.getInput()
schemaNode := root.GetSchemaNode()
Expand Down
31 changes: 2 additions & 29 deletions pkg/ls/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,35 +208,8 @@ func GetLinkSpec(schemaNode *lpg.Node) (*LinkSpec, error) {
}

// FindReference finds the root nodes with entitySchema=spec.Schema, with entityId=fk
func (spec *LinkSpec) FindReference(entityInfo map[*lpg.Node]EntityInfo, fk []string) ([]*lpg.Node, error) {
ret := make([]*lpg.Node, 0)
for _, ei := range entityInfo {
var exists bool
for _, typeName := range ei.GetValueType() {
if typeName == spec.TargetEntity {
exists = true
break
}
}
if exists || ei.GetEntitySchema() == spec.TargetEntity {
id := ei.GetID()
if len(fk) > 0 && len(id) != len(fk) {
continue
}
found := true
for i := range fk {
if id[i] != fk[i] {
found = false
break
}
}
if found {
ret = append(ret, ei.GetRoot())
}
}
}

return ret, nil
func (spec *LinkSpec) FindReference(entityInfo EntityInfoIndex, fk []string) ([]*lpg.Node, error) {
return entityInfo.Find(spec.TargetEntity, fk), nil
}

// GetForeignKeys returns the foreign keys for the link spec given the entity root node
Expand Down

0 comments on commit 66ea392

Please sign in to comment.