Skip to content

Commit

Permalink
Closes #83
Browse files Browse the repository at this point in the history
  • Loading branch information
sylvainhalle committed Oct 9, 2024
1 parent 3680d09 commit 26d9762
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 4 deletions.
41 changes: 41 additions & 0 deletions Core/src/ca/uqac/lif/cep/CallAfterConnect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
BeepBeep, an event stream processor
Copyright (C) 2008-2024 Sylvain Hallé
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package ca.uqac.lif.cep;

/**
* An object carrying a {@link Processor} to be connected, and which expects
* a call to its {@link #call()} method after the connection is established.
* Currently the only use of this object is discussed in detail in
* {@link GroupProcessor#out(Processor)}.
* @see GroupProcessor#out(Processor)
* @author Sylvain Hallé
* @since 0.11.4
*/
public interface CallAfterConnect
{
/**
* Gets the processor carried by this object.
* @return The processor
*/
public Processor getProcessor();

/**
* Performs whatever action is needed after the connection is established.
*/
public void call();
}
62 changes: 58 additions & 4 deletions Core/src/ca/uqac/lif/cep/GroupProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,31 @@ public Processor in(Processor p)
* The single line creates the pipeline, associates P1 as the input of the
* group, P3 as its output, and automatically adds P1, P2 and P3 to the
* group.
* <p>
* The reason why this method returns a {@link CallAfterConnect} object,
* instead of just a processor, comes from the fact that in the context of a
* Groovy script, method {@code out} is called <em>before</em> the processor
* is connected upstream to the rest of the chain (hence in the previous
* example, before P3 is connected to P2). Therefore, trying to
* harvest other processors in the group by working up the chain from
* {@code p}, in the context of this method, would not return anything.
* <p>
* The workaround is therefore to pass this object, which will allow the
* processor to be connected, and then for upstream processors to be
* harvested by {@link #collectProcessors(Processor)} through the call to
* {@link CallAfterConnect#call()}.
*
* @param p The processor to set as the output of the group
* @return That processor
* @return A {@link CallAfterConnect} object, which allows the underlying
* processor to be connected, and <em>then</em> for upstream processors to be
* harvested by {@link #collectProcessors(Processor)}.
* @since 0.11.3
*/
public Processor out(Processor p)
public CallAfterConnect out(Processor p)
{
addProcessor(p);
associateOutput(p);
collectProcessors(p);
return p;
return new OutputCallAfterConnect(p);
}

/**
Expand Down Expand Up @@ -1202,4 +1217,43 @@ public Object getState()
}
return group_state;
}

/**
* A {@link CallAfterConnect} object that can be used to connect the
* underlying processor of a {@link GroupProcessor}, and then to collect all
* processors that are part of the group. Currently the only use of this
* class is to be returned by the {@link #out(Processor)} method. (And in
* turn, the only real use of this method is in Groovy scripts to save
* a few keystrokes when creating a group.)
* @since 0.11.4
*/
protected class OutputCallAfterConnect implements CallAfterConnect
{
/**
* The processor to be connected.
*/
private final Processor m_processor;

/**
* Creates a new {@link OutputCallAfterConnect} object.
* @param p The processor to be connected
*/
public OutputCallAfterConnect(Processor p)
{
super();
m_processor = p;
}

@Override
public Processor getProcessor()
{
return m_processor;
}

@Override
public void call()
{
collectProcessors(m_processor);
}
}
}
18 changes: 18 additions & 0 deletions Core/src/ca/uqac/lif/cep/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,24 @@ public Processor or(Processor p)
Connector.connect(this, p);
return p;
}

/**
* Operates similar to {@link #or(Processor)}, but also calls a method
* after the connection has been established. Currently there is a single use
* for this method, which is when {@link GroupProcessor#or(Processor)} is
* called &mdash;which, again, typically only occurs in the context of a
* Groovy script.
* @param c The object that contains the method to call
* @return The processor encapsulated in {@code c}
* @since 0.11.4
*/
public Processor or(CallAfterConnect c)
{
Processor p = c.getProcessor();
Connector.connect(this, p);
c.call();
return p;
}

/**
* Connects the output at index 0 of the current processor to the input
Expand Down

0 comments on commit 26d9762

Please sign in to comment.