Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the heap spike #213

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ lazy val testSchemasDomainClasses = project
)

// currently relies on a self-published version of codepropertygraph and joern based on the respective `michael/flatgraph` branches
/*
lazy val benchmarks = project
.in(file("benchmarks"))
.enablePlugins(JavaAppPackaging, JmhPlugin)
Expand All @@ -185,6 +186,8 @@ lazy val benchmarks = project
),
publish / skip := true
)
*/


ThisBuild / libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test,
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/flatgraph/DNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ trait DNode extends DiffGraphBuilder.RawUpdate with DNodeOrNode {
def storedRef: Option[StoredNodeType]
def storedRef_=(ref: Option[GNode]): Unit

def flattenProperties(interface: BatchedUpdateInterface): Unit
def countAndVisitProperties(interface: BatchedUpdateInterface): Unit
}

trait BatchedUpdateInterface {
def insertProperty(node: DNode, propertyKind: Int, propertyValues: IterableOnce[Any]): Unit
def countProperty(node: DNode, propertyKind: Int, num: Int): Unit
def visitContainedNode(contained: DNodeOrNode): Unit
}
class GenericDNode(val nodeKind: Short, var storedRef: Option[GNode] = None) extends DNode {
final class GenericDNode(val nodeKind: Short, var storedRef: Option[GNode] = None) extends DNode {
override type StoredNodeType = GNode
override def flattenProperties(interface: BatchedUpdateInterface): Unit = {}
override def countAndVisitProperties(interface: BatchedUpdateInterface): Unit = {}
}
52 changes: 34 additions & 18 deletions core/src/main/scala/flatgraph/DiffGraphApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,29 @@ object DiffGraphApplier {
}
}

abstract class NewNodePropertyInsertionHelper {
def insertNewNodeProperties(newNodes: mutable.ArrayBuffer[DNode], dst: AnyRef, idxs: Array[Int]): Unit = {}
}

/** The class that is responsible for applying diffgraphs. This is not supposed to be public API, users should stick to applyDiff
*/
private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder, schemaViolationReporter: SchemaViolationReporter) {
val newNodes = new Array[mutable.ArrayBuffer[DNode]](graph.schema.getNumberOfNodeKinds)
// newEdges and delEdges are oversized, in order to permit usage of the same indexing function
val newEdges = new Array[mutable.ArrayBuffer[AddEdgeProcessed]](graph.neighbors.size)
val delEdges = new Array[mutable.ArrayBuffer[EdgeRepr]](graph.neighbors.size)
val setEdgeProperties = new Array[mutable.ArrayBuffer[EdgeRepr]](graph.neighbors.size)
val deferred = new mutable.ArrayDeque[DNode]()
val delNodes = new mutable.ArrayBuffer[GNode]()
val setNodeProperties = new Array[mutable.ArrayBuffer[Any]](graph.properties.size)
val newEdges = new Array[mutable.ArrayBuffer[AddEdgeProcessed]](graph.neighbors.size)
val delEdges = new Array[mutable.ArrayBuffer[EdgeRepr]](graph.neighbors.size)
val setEdgeProperties = new Array[mutable.ArrayBuffer[EdgeRepr]](graph.neighbors.size)
val deferred = new mutable.ArrayDeque[DNode]()
val delNodes = new mutable.ArrayBuffer[GNode]()
val setNodeProperties = new Array[mutable.ArrayBuffer[Any]](graph.properties.size)
val newNodeNewProperties = new Array[Int](graph.properties.size)

object NewNodeInterface extends BatchedUpdateInterface {
override def insertProperty(node: DNode, propertyKind: Int, propertyValues: IterableOnce[Any]): Unit = {
val iter = propertyValues.iterator
if (iter.hasNext) {
insertProperty0(node.storedRef.get, propertyKind, iter)
}
override def visitContainedNode(contained: DNodeOrNode): Unit = { if (contained != null) getGNode(contained) }

override def countProperty(node: DNode, propertyKind: Int, num: Int): Unit = {
val pos = graph.schema.propertyOffsetArrayIndex(node.nodeKind, propertyKind)
newNodeNewProperties(pos) += num
}
}

Expand Down Expand Up @@ -67,7 +72,7 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,

private def drainDeferred(): Unit = {
while (deferred.nonEmpty) {
deferred.removeHead().flattenProperties(NewNodeInterface)
deferred.removeHead().countAndVisitProperties(NewNodeInterface)
}
}

Expand Down Expand Up @@ -582,12 +587,18 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
private def setNodeProperties(nodeKind: Int, propertyKind: Int): Unit = {
val schema = graph.schema
val pos = schema.propertyOffsetArrayIndex(nodeKind, propertyKind)
val propertyBuf = setNodeProperties(pos)
if (propertyBuf != null) {
val setPropertyPositions = setNodeProperties(pos + 1).asInstanceOf[mutable.ArrayBuffer[SetPropertyDesc]]
val viaNewNode = newNodeNewProperties(pos)
val propertyBuf = Option(setNodeProperties(pos)).getOrElse(mutable.ArrayBuffer.empty)
if (setNodeProperties(pos) != null || viaNewNode > 0) {
val setPropertyPositions =
Option(setNodeProperties(pos + 1)).getOrElse(mutable.ArrayBuffer.empty).asInstanceOf[mutable.ArrayBuffer[SetPropertyDesc]]
graph.inverseIndices.set(pos, null)
setPropertyPositions.sortInPlaceBy(_.node.seq())
dedupBy(setPropertyPositions, (setProp: SetPropertyDesc) => setProp.node.seq())
val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val lengthDelta = setPropertyPositions.iterator.map { setP =>
setP.length - (get(oldQty, setP.node.seq()) - get(oldQty, setP.node.seq() + 1))
}.sum
val nodeCount = graph.nodesArray(nodeKind).length

val setPropertyValues = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(propertyBuf.size)
Expand All @@ -596,14 +607,14 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
} else {
copyToArray(propertyBuf, setPropertyValues)

val oldQty = Option(graph.properties(pos).asInstanceOf[Array[Int]]).getOrElse(new Array[Int](1))
val oldProperty = Option(graph.properties(pos + 1))
.getOrElse(schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(0))
.asInstanceOf[Array[?]]
if (oldProperty == null) schemaViolationReporter.illegalNodeProperty(nodeKind, propertyKind, schema)

val newQty = new Array[Int](nodeCount + 1)
val newProperty = schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + propertyBuf.size)
val newQty = new Array[Int](nodeCount + 1)
val newProperty =
schema.getNodePropertyFormalType(nodeKind, propertyKind).allocate(get(oldQty, nodeCount) + lengthDelta + viaNewNode)

val insertionIter = setPropertyPositions.iterator
var copyStartSeq = 0
Expand Down Expand Up @@ -631,6 +642,11 @@ private[flatgraph] class DiffGraphApplier(graph: Graph, diff: DiffGraphBuilder,
copyStartSeq = insertionSeq + 1
}
newQty(nodeCount) = outIndex
// now need to write the newproperties
if (viaNewNode > 0) {
val inserter = schema.getNewNodePropertyInserter(nodeKind, propertyKind)
inserter.insertNewNodeProperties(newNodes(nodeKind), newProperty, newQty)
}

graph.properties(pos) = newQty
// fixme: need to support graphs with unknown schema. Then we need to homogenize the array here.
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/flatgraph/Schema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ abstract class Schema {
def allocateEdgeProperty(nodeKind: Int, direction: Direction, edgeKind: Int, size: Int): Array[?]
def getNodePropertyFormalType(nodeKind: Int, propertyKind: Int): FormalQtyType.FormalType
def getNodePropertyFormalQuantity(nodeKind: Int, propertyKind: Int): FormalQtyType.FormalQuantity
def getNewNodePropertyInserter(ndoeKind: Int, propertyKind: Int): NewNodePropertyInsertionHelper

def verifyNodeKindIsValid(kind: Int): Unit = {
assert(
Expand All @@ -157,6 +158,8 @@ abstract class Schema {
}
}

object FreeSchemaInsertionHelper extends NewNodePropertyInsertionHelper

class FreeSchema(
nodeLabels: Array[String],
propertyLabels: Array[String], // important: array order corresponds to `nodePropertyPrototypes` order!
Expand Down Expand Up @@ -212,4 +215,5 @@ class FreeSchema(
}
else formalQuantities(propertyOffsetArrayIndex(nodeKind, propertyKind))

override def getNewNodePropertyInserter(ndoeKind: Int, propertyKind: Int): NewNodePropertyInsertionHelper = FreeSchemaInsertionHelper
}
4 changes: 3 additions & 1 deletion core/src/test/scala/flatgraph/GraphTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ class GraphTests extends AnyWordSpec with Matchers {
._setNodeProperty(V1_0.storedRef.get, 0, null)
._setNodeProperty(V0_1.storedRef.get, 1, null :: Nil)
)

debugDump(g) shouldBe
"""#Node numbers (kindId, nnodes) (0: 3), (1: 2), total 5
|Node kind 0. (eid, nEdgesOut, nEdgesIn):
Expand Down Expand Up @@ -890,6 +891,7 @@ class GraphTests extends AnyWordSpec with Matchers {
}.getMessage should include("unsupported property type")
}

/*
"Support custom domain classes for detached nodes" in {
class CustomNode extends DNode {
override type StoredNodeType = GNode
Expand Down Expand Up @@ -931,7 +933,7 @@ class GraphTests extends AnyWordSpec with Matchers {
|""".stripMargin
testSerialization(g)
}

*/
"support indexed lookups" in {
val schema = TestSchema.make(1, 0, 1, nodePropertyPrototypes = Array(new Array[String](0)))
val g = new Graph(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,96 @@ package flatgraph.codegen

object CodeSnippets {

object NewNodeInserters {
def forSingleItem(nameCamelCase: String, nodeType: String, propertyType: String, isNode: Boolean): String = {
s"""object NewNodeInserter_${nodeType}_${nameCamelCase} extends flatgraph.NewNodePropertyInsertionHelper {
| override def insertNewNodeProperties(newNodes: mutable.ArrayBuffer[flatgraph.DNode], dst: AnyRef, offsets: Array[Int]): Unit = {
| if(newNodes.isEmpty) return
| val dstCast = dst.asInstanceOf[Array[${propertyType}]]
| val seq = newNodes.head.storedRef.get.seq()
| var offset = offsets(seq)
| var idx = 0
| while(idx < newNodes.length){
| val nn = newNodes(idx)
| nn match {
| case generated: New${nodeType} =>
| dstCast(offset) = ${
if (isNode)
s"generated.$nameCamelCase match {case newV:flatgraph.DNode => newV.storedRef.get; case oldV: flatgraph.GNode => oldV; case null => null}"
else s"generated.${nameCamelCase}"
}
| offset += 1
| case _ =>
| }
| assert(seq + idx == nn.storedRef.get.seq(), "internal consistency check")
| idx += 1
| offsets(idx + seq) = offset
| }
| }
|}""".stripMargin
}
def forOptionalItem(nameCamelCase: String, nodeType: String, propertyType: String, isNode: Boolean): String = {
s"""object NewNodeInserter_${nodeType}_${nameCamelCase} extends flatgraph.NewNodePropertyInsertionHelper {
| override def insertNewNodeProperties(newNodes: mutable.ArrayBuffer[flatgraph.DNode], dst: AnyRef, offsets: Array[Int]): Unit = {
| if(newNodes.isEmpty) return
| val dstCast = dst.asInstanceOf[Array[${propertyType}]]
| val seq = newNodes.head.storedRef.get.seq()
| var offset = offsets(seq)
| var idx = 0
| while(idx < newNodes.length){
| val nn = newNodes(idx)
| nn match {
| case generated: New${nodeType} =>
| generated.${nameCamelCase} match {
| case Some(item) =>
| dstCast(offset) = ${
if (isNode) s"item match {case newV:flatgraph.DNode => newV.storedRef.get; case oldV: flatgraph.GNode => oldV; case null => null}"
else "item"
}
| offset += 1
| case _ =>
| }
| case _ =>
| }
| assert(seq + idx == nn.storedRef.get.seq(), "internal consistency check")
| idx += 1
| offsets(idx + seq) = offset
| }
| }
|}""".stripMargin
}

def forMultiItem(nameCamelCase: String, nodeType: String, propertyType: String, isNode: Boolean): String = {
s"""object NewNodeInserter_${nodeType}_${nameCamelCase} extends flatgraph.NewNodePropertyInsertionHelper {
| override def insertNewNodeProperties(newNodes: mutable.ArrayBuffer[flatgraph.DNode], dst: AnyRef, offsets: Array[Int]): Unit = {
| if(newNodes.isEmpty) return
| val dstCast = dst.asInstanceOf[Array[${propertyType}]]
| val seq = newNodes.head.storedRef.get.seq()
| var offset = offsets(seq)
| var idx = 0
| while(idx < newNodes.length){
| val nn = newNodes(idx)
| nn match {
| case generated: New${nodeType} =>
| for(item <- generated.${nameCamelCase}){
| dstCast(offset) = ${
if (isNode) s"item match {case newV:flatgraph.DNode => newV.storedRef.get; case oldV: flatgraph.GNode => oldV; case null => null}"
else "item"
}
| offset += 1
| }
| case _ =>
| }
| assert(seq + idx == nn.storedRef.get.seq(), "internal consistency check")
| idx += 1
| offsets(idx + seq) = offset
| }
| }
|}""".stripMargin
}

}

object FilterSteps {

def forSingleString(nameCamelCase: String, baseType: String, propertyId: Int) = {
Expand Down
Loading
Loading