diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index b7f0f5c074d74..9d09ded8023b3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; @@ -39,8 +41,11 @@ @PublicEvolving public class GlobalWindows extends WindowAssigner { private static final long serialVersionUID = 1L; + @Nullable private final Trigger defaultTrigger; - private GlobalWindows() {} + private GlobalWindows(Trigger defaultTrigger) { + this.defaultTrigger = defaultTrigger; + } @Override public Collection assignWindows( @@ -56,22 +61,29 @@ public Trigger getDefaultTrigger(StreamExecutionEnvironmen @Override public Trigger getDefaultTrigger() { - return new NeverTrigger(); + return defaultTrigger == null ? new NeverTrigger() : defaultTrigger; } @Override public String toString() { - return "GlobalWindows()"; + return "GlobalWindows(trigger=" + getDefaultTrigger().getClass().getSimpleName() + ")"; } /** - * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns all elements to the - * same {@link GlobalWindow}. - * - * @return The global window policy. + * Creates a {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}. + * The window is only useful if you also specify a custom trigger. Otherwise, the window will + * never be triggered and no computation will be performed. */ public static GlobalWindows create() { - return new GlobalWindows(); + return new GlobalWindows(new NeverTrigger()); + } + + /** + * Creates a {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow} + * and the window is triggered if and only if the input stream is ended. + */ + public static GlobalWindows createWithEndOfStreamTrigger() { + return new GlobalWindows(new EndOfStreamTrigger()); } /** A trigger that never fires, as default Trigger for GlobalWindows. */ @@ -107,6 +119,35 @@ public TypeSerializer getWindowSerializer(ExecutionConfig executio return new GlobalWindow.Serializer(); } + /** A trigger that fires iff the input stream reaches EndOfStream. */ + @Internal + public static class EndOfStreamTrigger extends Trigger { + private static final long serialVersionUID = 1L; + + @Override + public TriggerResult onElement( + Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} + + @Override + public void onMerge(GlobalWindow window, OnMergeContext ctx) {} + } + @Override public boolean isEventTime() { return false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java index d543eb84980c2..ca00ab9bdbce3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java @@ -58,5 +58,8 @@ public void testProperties() { assertEquals( new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); assertThat(assigner.getDefaultTrigger(), instanceOf(GlobalWindows.NeverTrigger.class)); + assigner = GlobalWindows.createWithEndOfStreamTrigger(); + assertThat( + assigner.getDefaultTrigger(), instanceOf(GlobalWindows.EndOfStreamTrigger.class)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index dd9710b39df79..0cfe877b72b02 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SerializerConfigImpl; @@ -1334,6 +1335,72 @@ public void testCountTrigger() throws Exception { testHarness.close(); } + @Test + public void testEndOfStreamTrigger() throws Exception { + ReducingStateDescriptor> stateDesc = + new ReducingStateDescriptor<>( + "window-contents", + new SumReducer(), + STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); + + WindowOperator< + String, + Tuple2, + Tuple2, + Tuple2, + GlobalWindow> + operator = + new WindowOperator<>( + GlobalWindows.createWithEndOfStreamTrigger(), + new GlobalWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer( + new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>( + new PassThroughWindowFunction< + String, GlobalWindow, Tuple2>()), + GlobalWindows.createWithEndOfStreamTrigger().getDefaultTrigger(), + 0, + null /* late data output tag */); + + OneInputStreamOperatorTestHarness, Tuple2> + testHarness = createTestHarness(operator); + + testHarness.open(); + + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + + TestHarnessUtil.assertOutputEqualsSorted( + "Output was not correct.", + Collections.EMPTY_LIST, + testHarness.getOutput(), + new Tuple2ResultSortComparator()); + + testHarness.processWatermark(Watermark.MAX_WATERMARK); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE)); + expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE)); + expectedOutput.add(Watermark.MAX_WATERMARK); + + TestHarnessUtil.assertOutputEqualsSorted( + "Output was not correct.", + expectedOutput, + testHarness.getOutput(), + new Tuple2ResultSortComparator()); + + testHarness.close(); + } + @Test public void testProcessingTimeTumblingWindows() throws Throwable { final int windowSize = 3;