From 2bafd80b2de8da06bf547e8f4838f44d55db5d08 Mon Sep 17 00:00:00 2001 From: rbasralian Date: Fri, 15 Sep 2023 19:40:57 -0400 Subject: [PATCH] Fix for inconsistent tests (#4483) * Fix for inconsistent test * Extra logging in the third suspicious test * Look up row keys properly instead of using `.getColumnSource().get(0)`. Include `function_generated_table` in `__all__`. * Remove redundant utility function --- ...=> TestFunctionGeneratedTableFactory.java} | 47 +--------- .../TestKeyedArrayBackedMutableTable.java | 41 ++++----- py/server/deephaven/__init__.py | 4 +- .../tests/test_function_generated_table.py | 88 +++++++++++-------- 4 files changed, 74 insertions(+), 106 deletions(-) rename engine/table/src/test/java/io/deephaven/engine/table/impl/util/{TestFunctionBackedTableFactory.java => TestFunctionGeneratedTableFactory.java} (65%) diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionBackedTableFactory.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionGeneratedTableFactory.java similarity index 65% rename from engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionBackedTableFactory.java rename to engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionGeneratedTableFactory.java index d4f5814127b..809180c7d45 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionBackedTableFactory.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestFunctionGeneratedTableFactory.java @@ -3,7 +3,6 @@ */ package io.deephaven.engine.table.impl.util; -import io.deephaven.UncheckedDeephavenException; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.Table; @@ -11,7 +10,6 @@ import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.QueryTableTest; import io.deephaven.engine.testutil.ColumnInfo; -import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.EvalNugget; import io.deephaven.engine.testutil.EvalNuggetInterface; import io.deephaven.engine.testutil.generator.IntGenerator; @@ -19,17 +17,14 @@ import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase; import io.deephaven.engine.util.TableDiff; import io.deephaven.qst.type.Type; -import io.deephaven.util.function.ThrowingRunnable; -import java.io.IOException; -import java.util.Arrays; import java.util.Random; -import java.util.concurrent.CountDownLatch; +import static io.deephaven.engine.table.impl.util.TestKeyedArrayBackedMutableTable.handleDelayedRefresh; import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.util.TableTools.*; -public class TestFunctionBackedTableFactory extends RefreshingTableTestCase { +public class TestFunctionGeneratedTableFactory extends RefreshingTableTestCase { public void testIterative() { Random random = new Random(0); ColumnInfo[] columnInfo; @@ -98,42 +93,4 @@ public void testMultipleSources() throws Exception { stringCol("StringCol", "MyString"), intCol("IntCol", 12345)), functionBacked); } - - /** - * See {@link io.deephaven.engine.table.impl.util.TestKeyedArrayBackedMutableTable#handleDelayedRefresh} - */ - public static void handleDelayedRefresh(final ThrowingRunnable action, - final BaseArrayBackedMutableTable... tables) throws Exception { - final Thread refreshThread; - final CountDownLatch gate = new CountDownLatch(tables.length); - - Arrays.stream(tables).forEach(t -> t.setOnPendingChange(gate::countDown)); - try { - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); - refreshThread = new Thread(() -> { - // If this unexpected interruption happens, the test thread may hang in action.run() - // indefinitely. Best to hope it's already queued the pending action and proceed with run. - updateGraph.runWithinUnitTestCycle(() -> { - try { - gate.await(); - } catch (InterruptedException ignored) { - // If this unexpected interruption happens, the test thread may hang in action.run() - // indefinitely. Best to hope it's already queued the pending action and proceed with run. - } - Arrays.stream(tables).forEach(BaseArrayBackedMutableTable::run); - }); - }); - - refreshThread.start(); - action.run(); - } finally { - Arrays.stream(tables).forEach(t -> t.setOnPendingChange(null)); - } - try { - refreshThread.join(); - } catch (InterruptedException e) { - throw new UncheckedDeephavenException( - "Interrupted unexpectedly while waiting for run cycle to complete", e); - } - } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestKeyedArrayBackedMutableTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestKeyedArrayBackedMutableTable.java index 14fcc5a3c69..a211071cbe5 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestKeyedArrayBackedMutableTable.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/util/TestKeyedArrayBackedMutableTable.java @@ -8,13 +8,13 @@ import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.impl.FailureListener; +import io.deephaven.engine.table.impl.TableUpdateValidator; import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.config.InputTableStatusListener; import io.deephaven.engine.util.config.MutableInputTable; -import io.deephaven.engine.table.impl.FailureListener; -import io.deephaven.engine.table.impl.TableUpdateValidator; -import io.deephaven.engine.testutil.junit4.EngineCleanup; import io.deephaven.util.function.ThrowingRunnable; import junit.framework.TestCase; import org.jetbrains.annotations.NotNull; @@ -23,12 +23,13 @@ import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CountDownLatch; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.util.TableTools.showWithRowSet; import static io.deephaven.engine.util.TableTools.stringCol; -import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; public class TestKeyedArrayBackedMutableTable { @@ -53,23 +54,23 @@ public void testSimple() throws Exception { final Table input2 = TableTools.newTable(stringCol("Name", "Randy"), stringCol("Employer", "USGS")); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2)); + handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut); assertTableEquals(TableTools.merge(input, input2), kabut); final Table input3 = TableTools.newTable(stringCol("Name", "Randy"), stringCol("Employer", "Tegridy")); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input3)); + handleDelayedRefresh(() -> mutableInputTable.add(input3), kabut); assertTableEquals(TableTools.merge(input, input3), kabut); final Table input4 = TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Cogswell")); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input4)); + handleDelayedRefresh(() -> mutableInputTable.add(input4), kabut); showWithRowSet(kabut); assertTableEquals(TableTools.merge(input, input3, input4).lastBy("Name"), kabut); final Table input5 = TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Spacely Sprockets")); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input5)); + handleDelayedRefresh(() -> mutableInputTable.add(input5), kabut); showWithRowSet(kabut); assertTableEquals(TableTools.merge(input, input3, input4, input5).lastBy("Name"), kabut); @@ -77,7 +78,7 @@ public void testSimple() throws Exception { final long sizeBeforeDelete = kabut.size(); System.out.println("KABUT.rowSet before delete: " + kabut.getRowSet()); final Table delete1 = TableTools.newTable(stringCol("Name", "Earl")); - handleDelayedRefresh(kabut, () -> mutableInputTable.delete(delete1)); + handleDelayedRefresh(() -> mutableInputTable.delete(delete1), kabut); System.out.println("KABUT.rowSet after delete: " + kabut.getRowSet()); final long sizeAfterDelete = kabut.size(); TestCase.assertEquals(sizeBeforeDelete - 1, sizeAfterDelete); @@ -113,7 +114,7 @@ public void testAppendOnly() throws Exception { final Table input2 = TableTools.newTable(stringCol("Name", "Randy", "George"), stringCol("Employer", "USGS", "Cogswell")); - handleDelayedRefresh(aoabmt, () -> mutableInputTable.add(input2)); + handleDelayedRefresh(() -> mutableInputTable.add(input2), aoabmt); assertTableEquals(TableTools.merge(input, input2), aoabmt); } @@ -137,7 +138,7 @@ public void testFilteredAndSorted() throws Exception { final Table delete = TableTools.newTable(stringCol("Name", "Fred")); - handleDelayedRefresh(kabut, () -> mutableInputTable.delete(delete)); + handleDelayedRefresh(() -> mutableInputTable.delete(delete), kabut); assertTableEquals(input.where("Name != `Fred`"), kabut); } @@ -203,13 +204,13 @@ public void testAddBack() throws Exception { final Table input2 = TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Spacely Sprockets")); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2)); + handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut); assertTableEquals(input2, kabut); - handleDelayedRefresh(kabut, () -> mutableInputTable.delete(input2.view("Name"))); + handleDelayedRefresh(() -> mutableInputTable.delete(input2.view("Name")), kabut); assertTableEquals(input, kabut); - handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2)); + handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut); assertTableEquals(input2, kabut); } @@ -295,12 +296,12 @@ private synchronized void assertFailure(@NotNull final Class action) throws Exception { + public static void handleDelayedRefresh(final ThrowingRunnable action, + final BaseArrayBackedMutableTable... tables) throws Exception { final Thread refreshThread; - final CountDownLatch gate = new CountDownLatch(1); + final CountDownLatch gate = new CountDownLatch(tables.length); - table.setOnPendingChange(gate::countDown); + Arrays.stream(tables).forEach(t -> t.setOnPendingChange(gate::countDown)); try { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); refreshThread = new Thread(() -> { @@ -313,14 +314,14 @@ private void handleDelayedRefresh(final BaseArrayBackedMutableTable table, // If this unexpected interruption happens, the test thread may hang in action.run() // indefinitely. Best to hope it's already queued the pending action and proceed with run. } - table.run(); + Arrays.stream(tables).forEach(BaseArrayBackedMutableTable::run); }); }); refreshThread.start(); action.run(); } finally { - table.setOnPendingChange(null); + Arrays.stream(tables).forEach(t -> t.setOnPendingChange(null)); } try { refreshThread.join(); diff --git a/py/server/deephaven/__init__.py b/py/server/deephaven/__init__.py index a5ce6b412e4..d299242a7a3 100644 --- a/py/server/deephaven/__init__.py +++ b/py/server/deephaven/__init__.py @@ -29,5 +29,5 @@ from .dbc import read_sql __all__ = ["read_csv", "write_csv", "kafka_consumer", "kafka_producer", "empty_table", "time_table", "merge", - "merge_sorted", "new_table", "input_table", "ring_table", "DynamicTableWriter", "TableReplayer", - "garbage_collect", "read_sql", "DHError", "SortDirection"] + "merge_sorted", "new_table", "input_table", "ring_table", "function_generated_table", "DynamicTableWriter", + "TableReplayer", "garbage_collect", "read_sql", "DHError", "SortDirection"] diff --git a/py/server/tests/test_function_generated_table.py b/py/server/tests/test_function_generated_table.py index 82ae96b6bfa..3b82acf45cd 100644 --- a/py/server/tests/test_function_generated_table.py +++ b/py/server/tests/test_function_generated_table.py @@ -1,9 +1,11 @@ -from time import sleep +from datetime import datetime +from typing import Any import deephaven.dtypes as dht from deephaven import empty_table, input_table, new_table, update_graph, function_generated_table from deephaven.column import string_col, int_col from deephaven.execution_context import get_exec_ctx +from deephaven.table import Table from tests.testbase import BaseTestCase @@ -20,17 +22,22 @@ def test_generated_table_timed_refresh(self): def table_generator_function(): return empty_table(1).update("Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()") - result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000) + with update_graph.exclusive_lock(self.test_update_graph): + result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000) + self.assertEqual(result_table.size, 1) + first_row_key = get_row_key(0, result_table) + initial_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key) - self.assertEqual(result_table.size, 1) - initial_time = result_table.j_table.getColumnSource("Timestamp").get(0) - sleep(5) - later_time = result_table.j_table.getColumnSource("Timestamp").get(0) + if not result_table.await_update(5_000): + raise RuntimeError("Result table did not update within 5 seconds") + + first_row_key = get_row_key(0, result_table) + later_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key) # Make sure it ticked at least once within 5 seconds. It should have ticked twice, # but leaving a wider margin to ensure the test passes -- as long as it ticks at all # we can be confident it's working. - self.assertGreaterEqual(later_time, initial_time + 2000) + self.assertGreater(later_time, initial_time) def test_generated_table_1trigger(self): col_defs = { @@ -39,26 +46,36 @@ def test_generated_table_1trigger(self): append_only_input_table = input_table(col_defs=col_defs) def table_generator_function(): + print("Running table_generator_function() at time: " + str(datetime.now())) return append_only_input_table.last_by().update('ResultStr = MyStr') result_table = function_generated_table(table_generator_function, source_tables=append_only_input_table) self.assertEqual(result_table.size, 0) + print("Adding row at time: " + str(datetime.now())) append_only_input_table.add(new_table([string_col(name='MyStr', data=['test string'])])) + print("add() returned at time: " + str(datetime.now())) self.wait_ticking_table_update(result_table, row_count=1, timeout=30) - self.assertEqual(result_table.size, 1) + print("result_table has 1 row at time: " + str(datetime.now())) - result_str = result_table.j_table.getColumnSource("ResultStr").get(0) + first_row_key = get_row_key(0, result_table) + result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key) self.assertEqual(result_str, 'test string') def test_generated_table_2triggers(self): + # NOTE: This tests that both trigger tables cause the refresh function to run. + # It does not test updating two source tables in the same cycle (that is covered by + # io.deephaven.engine.table.impl.util.TestFunctionGeneratedTableFactory.testMultipleSources). append_only_input_table1 = input_table(col_defs={"MyStr": dht.string}) append_only_input_table2 = input_table(col_defs={"MyInt": dht.int32}) def table_generator_function(): - my_str = append_only_input_table1.last_by().j_table.getColumnSource('MyStr').get(0) - my_int = append_only_input_table2.last_by().j_table.getColumnSource('MyInt').getInt(0) + t1 = append_only_input_table1.last_by() + t2 = append_only_input_table2.last_by() + + my_str = t1.j_table.getColumnSource('MyStr').get(get_row_key(0, t1)) + my_int = t2.j_table.getColumnSource('MyInt').getInt(get_row_key(0, t2)) return new_table([ string_col('ResultStr', [my_str]), @@ -66,42 +83,35 @@ def table_generator_function(): ]) result_table = function_generated_table(table_generator_function, - source_tables=[append_only_input_table1, append_only_input_table2]) + source_tables=[append_only_input_table1, append_only_input_table2]) self.assertEqual(result_table.size, 1) - result_str = result_table.j_table.getColumnSource("ResultStr").get(0) - result_int = result_table.j_table.getColumnSource("ResultInt").get(0) + first_row_key = get_row_key(0, result_table) + result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key) + result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key) self.assertEqual(result_str, None) self.assertEqual(result_int, None) - append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string'])])) - self.wait_ticking_table_update(append_only_input_table1, row_count=1, timeout=30) - + with update_graph.exclusive_lock(self.test_update_graph): + append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string'])])) - self.assertEqual(result_table.size, 1) - result_str = result_table.j_table.getColumnSource("ResultStr").get(0) - result_int = result_table.j_table.getColumnSource("ResultInt").get(0) - self.assertEqual(result_str, 'test string') - self.assertEqual(result_int, None) + self.assertEqual(result_table.size, 1) + first_row_key = get_row_key(0, result_table) + result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key) + result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key) + self.assertEqual(result_str, 'test string') + self.assertEqual(result_int, None) - append_only_input_table2.add(new_table([int_col(name='MyInt', data=[12345])])) - self.wait_ticking_table_update(append_only_input_table2, row_count=1, timeout=30) + append_only_input_table2.add(new_table([int_col(name='MyInt', data=[12345])])) - self.assertEqual(result_table.size, 1) - result_str = result_table.j_table.getColumnSource("ResultStr").get(0) - result_int = result_table.j_table.getColumnSource("ResultInt").get(0) - self.assertEqual(result_str, 'test string') - self.assertEqual(result_int, 12345) + self.assertEqual(result_table.size, 1) - with update_graph.exclusive_lock(self.test_update_graph): - append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string 2'])])) - append_only_input_table2.add(new_table([int_col(name='MyInt', data=[54321])])) + first_row_key = get_row_key(0, result_table) + result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key) + result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key) + self.assertEqual(result_str, 'test string') + self.assertEqual(result_int, 12345) - self.wait_ticking_table_update(append_only_input_table1, row_count=2, timeout=30) - self.wait_ticking_table_update(append_only_input_table2, row_count=2, timeout=30) - self.assertEqual(result_table.size, 1) - result_str = result_table.j_table.getColumnSource("ResultStr").get(0) - result_int = result_table.j_table.getColumnSource("ResultInt").get(0) - self.assertEqual(result_str, 'test string 2') - self.assertEqual(result_int, 54321) +def get_row_key(row_position: int, t: Table) -> Any: + return t.j_table.getRowSet().get(row_position)