Skip to content

Commit

Permalink
#5651 Move tuple order correction from runtime to network build time
Browse files Browse the repository at this point in the history
  • Loading branch information
mdproctor committed Jan 17, 2024
1 parent 41ac5c9 commit f450d27
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import jakarta.xml.bind.annotation.XmlAttribute;
import jakarta.xml.bind.annotation.XmlRootElement;

import org.drools.base.common.RuleBasePartitionId;
import org.drools.base.factmodel.traits.TraitTypeEnum;
import org.drools.base.rule.EntryPointId;
import org.drools.core.WorkingMemoryEntryPoint;
Expand Down Expand Up @@ -369,10 +370,6 @@ public void addLastLeftTuple( TupleImpl leftTuple ) {
linkedTuples.addLastLeftTuple( leftTuple );
}

public void addTupleInPosition( TupleImpl tuple ) {
linkedTuples.addTupleInPosition( tuple );
}

public void removeLeftTuple( TupleImpl leftTuple ) {
linkedTuples.removeLeftTuple( leftTuple );
}
Expand Down Expand Up @@ -510,48 +507,6 @@ public void addLastLeftTuple( TupleImpl leftTuple) {
lastLeftTuple = leftTuple;
}

@Override
public void addTupleInPosition( TupleImpl tuple ) {
boolean left = tuple.isLeftTuple();
ObjectTypeNodeId otnId = tuple.getInputOtnId();
if (otnId == null) { // can happen only in tests // @FIXME fix this
addLastTuple( tuple, left );
return;
}

TupleImpl previous = left ? lastLeftTuple : lastRightTuple;
if ( previous == null ) {
// no other LeftTuples, just add.
tuple.setHandlePrevious( null );
tuple.setHandleNext( null );
setFirstTuple( tuple, left );
setLastTuple( tuple, left );
return;
} else if (previous.getSink() == null || !otnId.before(previous.getInputOtnId()) ) {
// the last LeftTuple comes before the new one so just add it at the end
tuple.setHandlePrevious( previous );
tuple.setHandleNext( null );
previous.setHandleNext( tuple );
setLastTuple( tuple, left );
return;
}

TupleImpl next = previous;
previous = previous.getHandlePrevious();
while (previous != null && otnId.before( previous.getInputOtnId()) ) {
next = previous;
previous = previous.getHandlePrevious();
}
tuple.setHandleNext( next );
next.setHandlePrevious( tuple );
tuple.setHandlePrevious( previous );
if ( previous != null ) {
previous.setHandleNext( tuple );
} else {
setFirstTuple( tuple, left );
}
}

private void addLastTuple(TupleImpl tuple, boolean left) {
if (left) {
addLastLeftTuple(tuple);
Expand Down Expand Up @@ -715,6 +670,82 @@ public TupleImpl getFirstRightTuple(int partition) {
TupleImpl getFirstRightTuple() {
return firstRightTuple;
}

public TupleImpl detachLeftTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
TupleImpl tuple = lastLeftTuple;
TupleImpl detached = null;
// Find the first Tuple that comes after the current ID, so it can be detached.
while (tuple != null && otnId.before(tuple.getInputOtnId())) {
detached = tuple;
tuple = tuple.getHandlePrevious();
}

if (detached != null) {
if (firstLeftTuple == detached) {
firstLeftTuple = null;
}

if (lastLeftTuple == detached) {
lastLeftTuple = null;
}

if (detached.getHandlePrevious() != null) {
lastLeftTuple = detached.getHandlePrevious();
detached.setHandlePrevious(null);
lastLeftTuple.setHandleNext(null);
}
}

return detached;
}

public TupleImpl detachRightTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
TupleImpl tuple = lastRightTuple;
TupleImpl detached = null;
// Find the first Tuple that comes after the current ID, so it can be detached.
while (tuple != null && otnId.before(tuple.getInputOtnId())) {
detached = tuple;
tuple = tuple.getHandlePrevious();
}

if (detached != null) {
if (firstRightTuple == detached) {
firstRightTuple = null;
}

if (lastRightTuple == detached) {
lastRightTuple = null;
}

if (detached.getHandlePrevious() != null) {
lastRightTuple = detached.getHandlePrevious();
detached.setHandlePrevious(null);
lastRightTuple.setHandleNext(null);
}
}

return detached;
}

public void reattachToLeft(TupleImpl tuple) {
if (lastLeftTuple == null) {
lastLeftTuple = tuple;
} else {
lastLeftTuple.setHandleNext(tuple);
tuple.setHandlePrevious(lastLeftTuple);
lastLeftTuple = tuple;
}
}

public void reattachToRight(TupleImpl tuple) {
if (lastRightTuple == null) {
lastRightTuple = tuple;
} else {
lastRightTuple.setHandleNext(tuple);
tuple.setHandlePrevious(lastRightTuple);
lastRightTuple = tuple;
}
}
}

public static class CompositeLinkedTuples implements LinkedTuples {
Expand Down Expand Up @@ -780,11 +811,6 @@ public void addLastLeftTuple( TupleImpl leftTuple ) {
getOrCreatePartitionedTuple(leftTuple).addLastLeftTuple( leftTuple );
}

@Override
public void addTupleInPosition( TupleImpl tuple ) {
getOrCreatePartitionedTuple(tuple).addTupleInPosition( tuple );
}

@Override
public void removeLeftTuple( TupleImpl leftTuple ) {
getPartitionedTuple(leftTuple).removeLeftTuple( leftTuple );
Expand All @@ -807,6 +833,26 @@ public void removeRightTuple( TupleImpl rightTuple ) {
}
}

@Override
public TupleImpl detachLeftTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
return getOrCreatePartitionedTuple(partitionId.getId()).detachLeftTupleAfter(partitionId, otnId);
}

@Override
public TupleImpl detachRightTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
return getOrCreatePartitionedTuple(partitionId.getId()).detachRightTupleAfter(partitionId, otnId);
}

@Override
public void reattachToLeft(TupleImpl tuple) {
getOrCreatePartitionedTuple(tuple).reattachToLeft(tuple);
}

@Override
public void reattachToRight(TupleImpl tuple) {
getOrCreatePartitionedTuple(tuple).reattachToRight(tuple);
}

@Override
public void clearLeftTuples() {
for (int i = 0; i < partitionedTuples.length; i++) {
Expand Down Expand Up @@ -900,11 +946,6 @@ public void addLastLeftTuple(TupleImpl leftTuple) {
throw new UnsupportedOperationException();
}

@Override
public void addTupleInPosition(TupleImpl tuple) {
throw new UnsupportedOperationException();
}

@Override
public void removeLeftTuple(TupleImpl leftTuple) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,6 @@ public void addLastRightTuple(TupleImpl rightTuple) {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_ERROR_MESSAGE);
}

public void addTupleInPosition(TupleImpl rightTuple) {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_ERROR_MESSAGE);
}

public void removeRightTuple(TupleImpl rightTuple) {
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_ERROR_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.drools.base.factmodel.traits.TraitTypeEnum;
import org.drools.base.rule.EntryPointId;
import org.drools.core.WorkingMemoryEntryPoint;
import org.drools.core.reteoo.ObjectTypeNodeId;
import org.drools.core.reteoo.TupleImpl;
import org.kie.api.runtime.rule.FactHandle;

Expand Down Expand Up @@ -112,8 +113,6 @@ default String getEntryPointName() {

void removeRightTuple( TupleImpl rightTuple);

void addTupleInPosition( TupleImpl tuple);

boolean isNegated();
void setNegated(boolean negated);

Expand Down Expand Up @@ -142,9 +141,7 @@ interface LinkedTuples extends Serializable {

void addFirstLeftTuple( TupleImpl leftTuple);
void addLastLeftTuple( TupleImpl leftTuple);

void addTupleInPosition( TupleImpl tuple);


void removeLeftTuple( TupleImpl leftTuple);

void addFirstRightTuple( TupleImpl rightTuple);
Expand All @@ -171,6 +168,22 @@ default TupleImpl getFirstLeftTuple(RuleBasePartitionId partitionId) {
default TupleImpl getFirstRightTuple(RuleBasePartitionId partitionId) {
return getFirstRightTuple( partitionId.getParallelEvaluationSlot() );
}

default TupleImpl detachLeftTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
throw new UnsupportedOperationException();
}

default TupleImpl detachRightTupleAfter(RuleBasePartitionId partitionId, ObjectTypeNodeId otnId) {
throw new UnsupportedOperationException();
}

default void reattachToLeft(TupleImpl tuple) {
throw new UnsupportedOperationException();
}

default void reattachToRight(TupleImpl tuple) {
throw new UnsupportedOperationException();
}
}

static InternalFactHandle dummyFactHandleOf(Object object) {
Expand Down Expand Up @@ -345,11 +358,6 @@ public void removeRightTuple( TupleImpl rightTuple) {
throw new UnsupportedOperationException();
}

@Override
public void addTupleInPosition( TupleImpl tuple) {
throw new UnsupportedOperationException();
}

@Override
public boolean isNegated() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,6 @@ public void addLastRightTuple( TupleImpl rightTuple ) {
throw new UnsupportedOperationException( "QueryElementFactHandle does not support this method" );
}

public void addTupleInPosition( TupleImpl rightTuple ) {
throw new UnsupportedOperationException( "QueryElementFactHandle does not support this method" );
}

public void removeRightTuple( TupleImpl rightTuple ) {
throw new UnsupportedOperationException( "QueryElementFactHandle does not support this method" );
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.drools.core.phreak;

import org.drools.core.common.DefaultFactHandle;
import org.drools.core.reteoo.TupleImpl;

public class DetachedTuple {
private DefaultFactHandle fh;
private TupleImpl tuple;

public DetachedTuple(DefaultFactHandle fh, TupleImpl tuple) {
this.fh = fh;
this.tuple = tuple;
}

public void reattachToLeft() {
fh.getLinkedTuples().reattachToLeft(tuple);
}

public void reattachToRight() {
fh.getLinkedTuples().reattachToRight(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@
import org.drools.core.reteoo.AlphaTerminalNode;
import org.drools.core.reteoo.BetaMemory;
import org.drools.core.reteoo.BetaNode;
import org.drools.core.reteoo.BetaNode.RightTupleSinkAdapter;
import org.drools.core.reteoo.FromNode.FromMemory;
import org.drools.core.reteoo.LeftInputAdapterNode;
import org.drools.core.reteoo.LeftInputAdapterNode.RightTupleSinkAdapter;
import org.drools.core.reteoo.LeftInputAdapterNode.LeftTupleSinkAdapter;
import org.drools.core.reteoo.LeftTuple;
import org.drools.core.reteoo.LeftTupleNode;
import org.drools.core.reteoo.LeftTupleSinkNode;
Expand Down Expand Up @@ -268,11 +269,26 @@ public static void insertLiaFacts(LeftTupleNode startNode, InternalWorkingMemory
final PropagationContext pctx = pctxFactory.createPropagationContext(wm.getNextPropagationIdCounter(), PropagationContext.Type.RULE_ADDITION, null, null, null);
LeftInputAdapterNode lian = (LeftInputAdapterNode) startNode;
if (allBranches && visited.add(lian.getId()) || lian.getAssociatedTerminalsSize() == 1 ) {
RightTupleSinkAdapter liaAdapter = new RightTupleSinkAdapter(lian);
lian.getObjectSource().updateSink(liaAdapter, pctx, wm);
attachAdapterAndPropagate(wm, lian, pctx);
}
}

public static void attachAdapterAndPropagate(InternalWorkingMemory wm, LeftInputAdapterNode lian, PropagationContext pctx) {
List<DetachedTuple> detachedTuples = new ArrayList<>();
LeftTupleSinkAdapter liaAdapter = new LeftTupleSinkAdapter(lian, detachedTuples);
lian.getObjectSource().updateSink(liaAdapter, pctx, wm);
detachedTuples.forEach(d -> d.reattachToLeft());
}

public static void attachAdapterAndPropagate(InternalWorkingMemory wm, BetaNode bn) {
PropagationContextFactory pctxFactory = RuntimeComponentFactory.get().getPropagationContextFactory();
final PropagationContext pctx = pctxFactory.createPropagationContext(wm.getNextPropagationIdCounter(), PropagationContext.Type.RULE_ADDITION, null, null, null);
List<DetachedTuple> detachedTuples = new ArrayList<>();
RightTupleSinkAdapter bnAdapter = new RightTupleSinkAdapter(bn, detachedTuples);
bn.getRightInput().updateSink(bnAdapter, pctx, wm);
detachedTuples.forEach(d -> d.reattachToRight());
}

public static SegmentPrototype processSplit(LeftTupleNode splitNode, InternalRuleBase kbase, Collection<InternalWorkingMemory> wms, Set<SegmentMemoryPair> smemsToNotify) {
LeftTupleNode segmentRoot = BuildtimeSegmentUtilities.findSegmentRoot(splitNode);
SegmentPrototype proto1 = kbase.getSegmentPrototype(segmentRoot);
Expand Down Expand Up @@ -326,9 +342,7 @@ public static void insertFacts(TerminalNode tn, InternalWorkingMemory wm, Set<In
BetaNode bn = (BetaNode) node;

if (!bn.isRightInputIsRiaNode()) {
PropagationContextFactory pctxFactory = RuntimeComponentFactory.get().getPropagationContextFactory();
final PropagationContext pctx = pctxFactory.createPropagationContext(wm.getNextPropagationIdCounter(), PropagationContext.Type.RULE_ADDITION, null, null, null);
bn.getRightInput().updateSink(bn, pctx, wm);
attachAdapterAndPropagate(wm, bn);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.drools.core.reteoo.FromNode;
import org.drools.core.reteoo.FromNode.FromMemory;
import org.drools.core.reteoo.LeftInputAdapterNode;
import org.drools.core.reteoo.LeftInputAdapterNode.RightTupleSinkAdapter;
import org.drools.core.reteoo.LeftTuple;
import org.drools.core.reteoo.LeftTupleNode;
import org.drools.core.reteoo.LeftTupleSink;
Expand Down Expand Up @@ -95,6 +94,7 @@
import static org.drools.core.phreak.BuildtimeSegmentUtilities.isSet;
import static org.drools.core.phreak.BuildtimeSegmentUtilities.nextNodePosMask;
import static org.drools.core.phreak.BuildtimeSegmentUtilities.updateNodeTypesMask;
import static org.drools.core.phreak.EagerPhreakBuilder.Add.attachAdapterAndPropagate;
import static org.drools.core.phreak.EagerPhreakBuilder.deleteLeftTuple;
import static org.drools.core.phreak.RuntimeSegmentUtilities.createRiaSegmentMemory;
import static org.drools.core.phreak.RuntimeSegmentUtilities.getOrCreateSegmentMemory;
Expand Down Expand Up @@ -711,8 +711,7 @@ private static void insertLiaFacts(LeftTupleNode startNode, InternalWorkingMemor
PropagationContextFactory pctxFactory = RuntimeComponentFactory.get().getPropagationContextFactory();
final PropagationContext pctx = pctxFactory.createPropagationContext(wm.getNextPropagationIdCounter(), PropagationContext.Type.RULE_ADDITION, null, null, null);
LeftInputAdapterNode lian = (LeftInputAdapterNode) startNode;
RightTupleSinkAdapter liaAdapter = new RightTupleSinkAdapter(lian);
lian.getObjectSource().updateSink(liaAdapter, pctx, wm);
attachAdapterAndPropagate(wm, lian, pctx);
}

private static void insertFacts(PathEndNodes endNodes, Collection<InternalWorkingMemory> wms) {
Expand All @@ -730,9 +729,7 @@ private static void insertFacts(PathEndNodes endNodes, Collection<InternalWorkin

if (!bn.isRightInputIsRiaNode()) {
for ( InternalWorkingMemory wm : wms ) {
PropagationContextFactory pctxFactory = RuntimeComponentFactory.get().getPropagationContextFactory();
final PropagationContext pctx = pctxFactory.createPropagationContext(wm.getNextPropagationIdCounter(), PropagationContext.Type.RULE_ADDITION, null, null, null);
bn.getRightInput().updateSink(bn, pctx, wm);
attachAdapterAndPropagate(wm, bn);
}
}
}
Expand Down
Loading

0 comments on commit f450d27

Please sign in to comment.