Skip to content

Commit

Permalink
Fix for inconsistent tests (#4483)
Browse files Browse the repository at this point in the history
* Fix for inconsistent test

* Extra logging in the third suspicious test

* Look up row keys properly instead of using `.getColumnSource().get(0)`. Include `function_generated_table` in `__all__`.

* Remove redundant utility function
  • Loading branch information
rbasralian authored Sep 15, 2023
1 parent c8c9676 commit 2bafd80
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,28 @@
*/
package io.deephaven.engine.table.impl.util;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.QueryTableTest;
import io.deephaven.engine.testutil.ColumnInfo;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.EvalNugget;
import io.deephaven.engine.testutil.EvalNuggetInterface;
import io.deephaven.engine.testutil.generator.IntGenerator;
import io.deephaven.engine.testutil.generator.SetGenerator;
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.engine.util.TableDiff;
import io.deephaven.qst.type.Type;
import io.deephaven.util.function.ThrowingRunnable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import static io.deephaven.engine.table.impl.util.TestKeyedArrayBackedMutableTable.handleDelayedRefresh;
import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;

public class TestFunctionBackedTableFactory extends RefreshingTableTestCase {
public class TestFunctionGeneratedTableFactory extends RefreshingTableTestCase {
public void testIterative() {
Random random = new Random(0);
ColumnInfo<?, ?>[] columnInfo;
Expand Down Expand Up @@ -98,42 +93,4 @@ public void testMultipleSources() throws Exception {
stringCol("StringCol", "MyString"),
intCol("IntCol", 12345)), functionBacked);
}

/**
* See {@link io.deephaven.engine.table.impl.util.TestKeyedArrayBackedMutableTable#handleDelayedRefresh}
*/
public static void handleDelayedRefresh(final ThrowingRunnable<IOException> action,
final BaseArrayBackedMutableTable... tables) throws Exception {
final Thread refreshThread;
final CountDownLatch gate = new CountDownLatch(tables.length);

Arrays.stream(tables).forEach(t -> t.setOnPendingChange(gate::countDown));
try {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
refreshThread = new Thread(() -> {
// If this unexpected interruption happens, the test thread may hang in action.run()
// indefinitely. Best to hope it's already queued the pending action and proceed with run.
updateGraph.runWithinUnitTestCycle(() -> {
try {
gate.await();
} catch (InterruptedException ignored) {
// If this unexpected interruption happens, the test thread may hang in action.run()
// indefinitely. Best to hope it's already queued the pending action and proceed with run.
}
Arrays.stream(tables).forEach(BaseArrayBackedMutableTable::run);
});
});

refreshThread.start();
action.run();
} finally {
Arrays.stream(tables).forEach(t -> t.setOnPendingChange(null));
}
try {
refreshThread.join();
} catch (InterruptedException e) {
throw new UncheckedDeephavenException(
"Interrupted unexpectedly while waiting for run cycle to complete", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.FailureListener;
import io.deephaven.engine.table.impl.TableUpdateValidator;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.config.InputTableStatusListener;
import io.deephaven.engine.util.config.MutableInputTable;
import io.deephaven.engine.table.impl.FailureListener;
import io.deephaven.engine.table.impl.TableUpdateValidator;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.util.function.ThrowingRunnable;
import junit.framework.TestCase;
import org.jetbrains.annotations.NotNull;
Expand All @@ -23,12 +23,13 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.util.TableTools.showWithRowSet;
import static io.deephaven.engine.util.TableTools.stringCol;
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;

public class TestKeyedArrayBackedMutableTable {

Expand All @@ -53,31 +54,31 @@ public void testSimple() throws Exception {

final Table input2 = TableTools.newTable(stringCol("Name", "Randy"), stringCol("Employer", "USGS"));

handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2));
handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut);
assertTableEquals(TableTools.merge(input, input2), kabut);

final Table input3 = TableTools.newTable(stringCol("Name", "Randy"), stringCol("Employer", "Tegridy"));
handleDelayedRefresh(kabut, () -> mutableInputTable.add(input3));
handleDelayedRefresh(() -> mutableInputTable.add(input3), kabut);
assertTableEquals(TableTools.merge(input, input3), kabut);


final Table input4 = TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Cogswell"));
handleDelayedRefresh(kabut, () -> mutableInputTable.add(input4));
handleDelayedRefresh(() -> mutableInputTable.add(input4), kabut);
showWithRowSet(kabut);

assertTableEquals(TableTools.merge(input, input3, input4).lastBy("Name"), kabut);

final Table input5 =
TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Spacely Sprockets"));
handleDelayedRefresh(kabut, () -> mutableInputTable.add(input5));
handleDelayedRefresh(() -> mutableInputTable.add(input5), kabut);
showWithRowSet(kabut);

assertTableEquals(TableTools.merge(input, input3, input4, input5).lastBy("Name"), kabut);

final long sizeBeforeDelete = kabut.size();
System.out.println("KABUT.rowSet before delete: " + kabut.getRowSet());
final Table delete1 = TableTools.newTable(stringCol("Name", "Earl"));
handleDelayedRefresh(kabut, () -> mutableInputTable.delete(delete1));
handleDelayedRefresh(() -> mutableInputTable.delete(delete1), kabut);
System.out.println("KABUT.rowSet after delete: " + kabut.getRowSet());
final long sizeAfterDelete = kabut.size();
TestCase.assertEquals(sizeBeforeDelete - 1, sizeAfterDelete);
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testAppendOnly() throws Exception {
final Table input2 =
TableTools.newTable(stringCol("Name", "Randy", "George"), stringCol("Employer", "USGS", "Cogswell"));

handleDelayedRefresh(aoabmt, () -> mutableInputTable.add(input2));
handleDelayedRefresh(() -> mutableInputTable.add(input2), aoabmt);
assertTableEquals(TableTools.merge(input, input2), aoabmt);
}

Expand All @@ -137,7 +138,7 @@ public void testFilteredAndSorted() throws Exception {

final Table delete = TableTools.newTable(stringCol("Name", "Fred"));

handleDelayedRefresh(kabut, () -> mutableInputTable.delete(delete));
handleDelayedRefresh(() -> mutableInputTable.delete(delete), kabut);
assertTableEquals(input.where("Name != `Fred`"), kabut);
}

Expand Down Expand Up @@ -203,13 +204,13 @@ public void testAddBack() throws Exception {
final Table input2 =
TableTools.newTable(stringCol("Name", "George"), stringCol("Employer", "Spacely Sprockets"));

handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2));
handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut);
assertTableEquals(input2, kabut);

handleDelayedRefresh(kabut, () -> mutableInputTable.delete(input2.view("Name")));
handleDelayedRefresh(() -> mutableInputTable.delete(input2.view("Name")), kabut);
assertTableEquals(input, kabut);

handleDelayedRefresh(kabut, () -> mutableInputTable.add(input2));
handleDelayedRefresh(() -> mutableInputTable.add(input2), kabut);
assertTableEquals(input2, kabut);
}

Expand Down Expand Up @@ -295,12 +296,12 @@ private synchronized void assertFailure(@NotNull final Class<? extends Throwable
}
}

private void handleDelayedRefresh(final BaseArrayBackedMutableTable table,
final ThrowingRunnable<IOException> action) throws Exception {
public static void handleDelayedRefresh(final ThrowingRunnable<IOException> action,
final BaseArrayBackedMutableTable... tables) throws Exception {
final Thread refreshThread;
final CountDownLatch gate = new CountDownLatch(1);
final CountDownLatch gate = new CountDownLatch(tables.length);

table.setOnPendingChange(gate::countDown);
Arrays.stream(tables).forEach(t -> t.setOnPendingChange(gate::countDown));
try {
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
refreshThread = new Thread(() -> {
Expand All @@ -313,14 +314,14 @@ private void handleDelayedRefresh(final BaseArrayBackedMutableTable table,
// If this unexpected interruption happens, the test thread may hang in action.run()
// indefinitely. Best to hope it's already queued the pending action and proceed with run.
}
table.run();
Arrays.stream(tables).forEach(BaseArrayBackedMutableTable::run);
});
});

refreshThread.start();
action.run();
} finally {
table.setOnPendingChange(null);
Arrays.stream(tables).forEach(t -> t.setOnPendingChange(null));
}
try {
refreshThread.join();
Expand Down
4 changes: 2 additions & 2 deletions py/server/deephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
from .dbc import read_sql

__all__ = ["read_csv", "write_csv", "kafka_consumer", "kafka_producer", "empty_table", "time_table", "merge",
"merge_sorted", "new_table", "input_table", "ring_table", "DynamicTableWriter", "TableReplayer",
"garbage_collect", "read_sql", "DHError", "SortDirection"]
"merge_sorted", "new_table", "input_table", "ring_table", "function_generated_table", "DynamicTableWriter",
"TableReplayer", "garbage_collect", "read_sql", "DHError", "SortDirection"]
88 changes: 49 additions & 39 deletions py/server/tests/test_function_generated_table.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from time import sleep
from datetime import datetime
from typing import Any

import deephaven.dtypes as dht
from deephaven import empty_table, input_table, new_table, update_graph, function_generated_table
from deephaven.column import string_col, int_col
from deephaven.execution_context import get_exec_ctx
from deephaven.table import Table
from tests.testbase import BaseTestCase


Expand All @@ -20,17 +22,22 @@ def test_generated_table_timed_refresh(self):
def table_generator_function():
return empty_table(1).update("Timestamp = io.deephaven.base.clock.Clock.system().currentTimeMillis()")

result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000)
with update_graph.exclusive_lock(self.test_update_graph):
result_table = function_generated_table(table_generator_function, refresh_interval_ms=2000)
self.assertEqual(result_table.size, 1)
first_row_key = get_row_key(0, result_table)
initial_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

self.assertEqual(result_table.size, 1)
initial_time = result_table.j_table.getColumnSource("Timestamp").get(0)
sleep(5)
later_time = result_table.j_table.getColumnSource("Timestamp").get(0)
if not result_table.await_update(5_000):
raise RuntimeError("Result table did not update within 5 seconds")

first_row_key = get_row_key(0, result_table)
later_time = result_table.j_table.getColumnSource("Timestamp").get(first_row_key)

# Make sure it ticked at least once within 5 seconds. It should have ticked twice,
# but leaving a wider margin to ensure the test passes -- as long as it ticks at all
# we can be confident it's working.
self.assertGreaterEqual(later_time, initial_time + 2000)
self.assertGreater(later_time, initial_time)

def test_generated_table_1trigger(self):
col_defs = {
Expand All @@ -39,69 +46,72 @@ def test_generated_table_1trigger(self):
append_only_input_table = input_table(col_defs=col_defs)

def table_generator_function():
print("Running table_generator_function() at time: " + str(datetime.now()))
return append_only_input_table.last_by().update('ResultStr = MyStr')

result_table = function_generated_table(table_generator_function, source_tables=append_only_input_table)

self.assertEqual(result_table.size, 0)

print("Adding row at time: " + str(datetime.now()))
append_only_input_table.add(new_table([string_col(name='MyStr', data=['test string'])]))
print("add() returned at time: " + str(datetime.now()))
self.wait_ticking_table_update(result_table, row_count=1, timeout=30)
self.assertEqual(result_table.size, 1)
print("result_table has 1 row at time: " + str(datetime.now()))

result_str = result_table.j_table.getColumnSource("ResultStr").get(0)
first_row_key = get_row_key(0, result_table)
result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key)
self.assertEqual(result_str, 'test string')

def test_generated_table_2triggers(self):
# NOTE: This tests that both trigger tables cause the refresh function to run.
# It does not test updating two source tables in the same cycle (that is covered by
# io.deephaven.engine.table.impl.util.TestFunctionGeneratedTableFactory.testMultipleSources).
append_only_input_table1 = input_table(col_defs={"MyStr": dht.string})
append_only_input_table2 = input_table(col_defs={"MyInt": dht.int32})

def table_generator_function():
my_str = append_only_input_table1.last_by().j_table.getColumnSource('MyStr').get(0)
my_int = append_only_input_table2.last_by().j_table.getColumnSource('MyInt').getInt(0)
t1 = append_only_input_table1.last_by()
t2 = append_only_input_table2.last_by()

my_str = t1.j_table.getColumnSource('MyStr').get(get_row_key(0, t1))
my_int = t2.j_table.getColumnSource('MyInt').getInt(get_row_key(0, t2))

return new_table([
string_col('ResultStr', [my_str]),
int_col('ResultInt', [my_int]),
])

result_table = function_generated_table(table_generator_function,
source_tables=[append_only_input_table1, append_only_input_table2])
source_tables=[append_only_input_table1, append_only_input_table2])

self.assertEqual(result_table.size, 1)
result_str = result_table.j_table.getColumnSource("ResultStr").get(0)
result_int = result_table.j_table.getColumnSource("ResultInt").get(0)
first_row_key = get_row_key(0, result_table)
result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key)
result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key)
self.assertEqual(result_str, None)
self.assertEqual(result_int, None)

append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string'])]))
self.wait_ticking_table_update(append_only_input_table1, row_count=1, timeout=30)

with update_graph.exclusive_lock(self.test_update_graph):
append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string'])]))

self.assertEqual(result_table.size, 1)
result_str = result_table.j_table.getColumnSource("ResultStr").get(0)
result_int = result_table.j_table.getColumnSource("ResultInt").get(0)
self.assertEqual(result_str, 'test string')
self.assertEqual(result_int, None)
self.assertEqual(result_table.size, 1)
first_row_key = get_row_key(0, result_table)
result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key)
result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key)
self.assertEqual(result_str, 'test string')
self.assertEqual(result_int, None)

append_only_input_table2.add(new_table([int_col(name='MyInt', data=[12345])]))
self.wait_ticking_table_update(append_only_input_table2, row_count=1, timeout=30)
append_only_input_table2.add(new_table([int_col(name='MyInt', data=[12345])]))

self.assertEqual(result_table.size, 1)
result_str = result_table.j_table.getColumnSource("ResultStr").get(0)
result_int = result_table.j_table.getColumnSource("ResultInt").get(0)
self.assertEqual(result_str, 'test string')
self.assertEqual(result_int, 12345)
self.assertEqual(result_table.size, 1)

with update_graph.exclusive_lock(self.test_update_graph):
append_only_input_table1.add(new_table([string_col(name='MyStr', data=['test string 2'])]))
append_only_input_table2.add(new_table([int_col(name='MyInt', data=[54321])]))
first_row_key = get_row_key(0, result_table)
result_str = result_table.j_table.getColumnSource("ResultStr").get(first_row_key)
result_int = result_table.j_table.getColumnSource("ResultInt").get(first_row_key)
self.assertEqual(result_str, 'test string')
self.assertEqual(result_int, 12345)

self.wait_ticking_table_update(append_only_input_table1, row_count=2, timeout=30)
self.wait_ticking_table_update(append_only_input_table2, row_count=2, timeout=30)

self.assertEqual(result_table.size, 1)
result_str = result_table.j_table.getColumnSource("ResultStr").get(0)
result_int = result_table.j_table.getColumnSource("ResultInt").get(0)
self.assertEqual(result_str, 'test string 2')
self.assertEqual(result_int, 54321)
def get_row_key(row_position: int, t: Table) -> Any:
return t.j_table.getRowSet().get(row_position)

0 comments on commit 2bafd80

Please sign in to comment.