From b073296902270a5ac7046e317483131068456f81 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Mon, 26 Feb 2024 17:56:31 -0600 Subject: [PATCH 01/10] Update gradle checksum to match the documented value (#5198) --- gradle/wrapper/gradle-wrapper.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 39fcd6c5e4e..865f1ba80d1 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionSha256Sum=97a52d145762adc241bad7fd18289bf7f6801e08ece6badf80402fe2b9f250b1 +distributionSha256Sum=85719317abd2112f021d4f41f09ec370534ba288432065f4b477b6a3b652910d distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-all.zip networkTimeout=10000 validateDistributionUrl=true From d741ec3316084ba73f303c4e66129a545e1948cd Mon Sep 17 00:00:00 2001 From: bierus Date: Tue, 27 Feb 2024 04:34:07 +0100 Subject: [PATCH 02/10] Added py.typed marker for deephaven (#5196) * Added py.typed marker for deephaven * Added py.typed to setup.py * Update setup.py * Update py/server/setup.py Co-authored-by: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> * Add py.typed marker for Py client/Py embd server --------- Co-authored-by: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Co-authored-by: jianfengmao --- py/client/pydeephaven/py.typed | 0 py/client/setup.py | 3 ++- py/embedded-server/deephaven_server/py.typed | 0 py/embedded-server/setup.py | 2 +- py/server/deephaven/py.typed | 1 + py/server/setup.py | 3 ++- 6 files changed, 6 insertions(+), 3 deletions(-) create mode 100644 py/client/pydeephaven/py.typed create mode 100644 py/embedded-server/deephaven_server/py.typed create mode 100644 py/server/deephaven/py.typed diff --git a/py/client/pydeephaven/py.typed b/py/client/pydeephaven/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/py/client/setup.py b/py/client/setup.py index 8b36c44c8a7..c4719d404ad 100644 --- a/py/client/setup.py +++ b/py/client/setup.py @@ -54,5 +54,6 @@ def _compute_version(): install_requires=['pyarrow', 'bitstring', 'grpcio', - 'protobuf'] + 'protobuf'], + package_data={'pydeephaven': ['py.typed']} ) diff --git a/py/embedded-server/deephaven_server/py.typed b/py/embedded-server/deephaven_server/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/py/embedded-server/setup.py b/py/embedded-server/setup.py index 037357212dc..82db40063ed 100644 --- a/py/embedded-server/setup.py +++ b/py/embedded-server/setup.py @@ -36,7 +36,7 @@ def _compute_version(): long_description=_get_readme(), long_description_content_type='text/markdown', packages=find_namespace_packages(exclude=("tests")), - package_data={'deephaven_server': ['jars/*']}, + package_data={'deephaven_server': ['jars/*', 'py.typed']}, url='https://deephaven.io/', author='Deephaven Data Labs', author_email='python@deephaven.io', diff --git a/py/server/deephaven/py.typed b/py/server/deephaven/py.typed new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/py/server/deephaven/py.typed @@ -0,0 +1 @@ + diff --git a/py/server/setup.py b/py/server/setup.py index d144c670718..31ef430a0d9 100644 --- a/py/server/setup.py +++ b/py/server/setup.py @@ -72,5 +72,6 @@ def _compute_version(): }, entry_points={ 'deephaven.plugin': ['registration_cls = deephaven.pandasplugin:PandasPluginRegistration'] - } + }, + package_data={'deephaven': ['py.typed']} ) From 6f27a68a1b99278d04e85449c26c6791ff7df160 Mon Sep 17 00:00:00 2001 From: Alex Peters <80283343+alexpeters1208@users.noreply.github.com> Date: Tue, 27 Feb 2024 10:55:51 -0600 Subject: [PATCH 03/10] Fix learn dependency issue (#5201) --- py/server/deephaven/learn/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/py/server/deephaven/learn/__init__.py b/py/server/deephaven/learn/__init__.py index 6677ebe371d..e652e49a101 100644 --- a/py/server/deephaven/learn/__init__.py +++ b/py/server/deephaven/learn/__init__.py @@ -153,11 +153,14 @@ def learn(table: Table = None, model_func: Callable = None, inputs: List[Input] result = _create_non_conflicting_col_name(table, "__Result") + # calling __computer.clear() in a separate update ensures calculations are complete before computer is cleared return (table .update(formulas=[ f"{future_offset} = __computer.compute(k)", - f"{result} = {future_offset}.getFuture().get()", - f"{clean} = __computer.clear()", + f"{result} = {future_offset}.getFuture().get()" + ]) + .update(formulas=[ + f"{clean} = __computer.clear()" ]) .drop_columns(cols=[ f"{future_offset}", From f20b29884aac1a6efb6a81b83c08f1220e1a6926 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 27 Feb 2024 21:53:02 +0000 Subject: [PATCH 04/10] Update web version 0.66.0 (#5202) Release notes https://github.com/deephaven/web-client-ui/releases/tag/v0.66.0 # [0.66.0](https://github.com/deephaven/web-client-ui/compare/v0.65.0...v0.66.0) (2024-02-27) ### Bug Fixes * Fixed svg url ([#1839](https://github.com/deephaven/web-client-ui/issues/1839)) ([63fe035](https://github.com/deephaven/web-client-ui/commit/63fe0354df2df40e318aa1738ff2bb916c0aea8e)) * keep active cell selection in first column from going offscreen ([#1823](https://github.com/deephaven/web-client-ui/issues/1823)) ([69e8cdd](https://github.com/deephaven/web-client-ui/commit/69e8cdd1d138c661ed56bbd5e03e31713e8113a4)) * spectrum textfield validation icon position with set content-box ([#1825](https://github.com/deephaven/web-client-ui/issues/1825)) ([8d95212](https://github.com/deephaven/web-client-ui/commit/8d952125009ddc4e4039833be4a80404d82ed7d7)) ### Features * exposes editor-line-number-active-fg theme variable ([#1833](https://github.com/deephaven/web-client-ui/issues/1833)) ([448f0f0](https://github.com/deephaven/web-client-ui/commit/448f0f0d5bf99be14845e3f6b0e063f55a8de775)) * Lazy loading and code splitting ([#1802](https://github.com/deephaven/web-client-ui/issues/1802)) ([25d1c09](https://github.com/deephaven/web-client-ui/commit/25d1c09b2f55f9f10eff5918501d385554f237e6)) * Picker Component ([#1821](https://github.com/deephaven/web-client-ui/issues/1821)) ([e50f0f6](https://github.com/deephaven/web-client-ui/commit/e50f0f6c0402717f1bb8adb8a08a217a0f8d1f45)) ### BREAKING CHANGES * the duplicate `spectrum-Textfield-validationIcon` css in DHE should be removed Co-authored-by: deephaven-internal --- web/client-ui/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/web/client-ui/Dockerfile b/web/client-ui/Dockerfile index c7011ac6437..ca9e9096a17 100644 --- a/web/client-ui/Dockerfile +++ b/web/client-ui/Dockerfile @@ -2,10 +2,10 @@ FROM deephaven/node:local-build WORKDIR /usr/src/app # Most of the time, these versions are the same, except in cases where a patch only affects one of the packages -ARG WEB_VERSION=0.65.0 -ARG GRID_VERSION=0.65.0 -ARG CHART_VERSION=0.65.0 -ARG WIDGET_VERSION=0.65.0 +ARG WEB_VERSION=0.66.0 +ARG GRID_VERSION=0.66.0 +ARG CHART_VERSION=0.66.0 +ARG WIDGET_VERSION=0.66.0 # Pull in the published code-studio package from npmjs and extract is RUN set -eux; \ From c2ab633a070cf3f0fd8a116ab5b0a3734441ad6e Mon Sep 17 00:00:00 2001 From: Chip Kent <5250374+chipkent@users.noreply.github.com> Date: Tue, 27 Feb 2024 14:54:53 -0700 Subject: [PATCH 05/10] Provide upperBin and lowerBin signatures that accept Durations (#5148) * Added upperBin and lowerBin methods that accept durations. --- .../java/io/deephaven/time/DateTimeUtils.java | 208 ++++++++++++++++-- .../io/deephaven/time/TestDateTimeUtils.java | 43 ++++ 2 files changed, 229 insertions(+), 22 deletions(-) diff --git a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java index 12a5985a618..b96a3c65746 100644 --- a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java +++ b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java @@ -3152,8 +3152,8 @@ public static ZonedDateTime atMidnight(@Nullable final ZonedDateTime dateTime) { /** * Returns an {@link Instant} value, which is at the starting (lower) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the instant value for the start of the - * five-minute window that contains the input instant. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the instant value for the start + * of the five-minute window that contains the input instant. * * @param instant instant for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds @@ -3170,10 +3170,30 @@ public static Instant lowerBin(@Nullable final Instant instant, long intervalNan return epochNanosToInstant(Numeric.lowerBin(epochNanos(instant), intervalNanos)); } + /** + * Returns an {@link Instant} value, which is at the starting (lower) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the instant value for the start of + * the five-minute window that contains the input instant. + * + * @param instant instant for which to evaluate the start of the containing window + * @param interval size of the window + * @return {@code null} if either input is {@code null}; otherwise, an {@link Instant} representing the start of the + * window + */ + @ScriptApi + @Nullable + public static Instant lowerBin(@Nullable final Instant instant, Duration interval) { + if (instant == null || interval == null) { + return null; + } + + return lowerBin(instant, interval.toNanos()); + } + /** * Returns a {@link ZonedDateTime} value, which is at the starting (lower) end of a time range defined by the - * interval nanoseconds. For example, a 5*MINUTE intervalNanos value would return the zoned date time value for the - * start of the five-minute window that contains the input zoned date time. + * interval nanoseconds. For example, a five-minute {@code intervalNanos} value would return the zoned date time + * value for the start of the five-minute window that contains the input zoned date time. * * @param dateTime zoned date time for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds @@ -3190,16 +3210,36 @@ public static ZonedDateTime lowerBin(@Nullable final ZonedDateTime dateTime, lon return epochNanosToZonedDateTime(Numeric.lowerBin(epochNanos(dateTime), intervalNanos), dateTime.getZone()); } + /** + * Returns a {@link ZonedDateTime} value, which is at the starting (lower) end of a time range defined by the + * interval nanoseconds. For example, a five-minute {@code interval} value would return the zoned date time value + * for the start of the five-minute window that contains the input zoned date time. + * + * @param dateTime zoned date time for which to evaluate the start of the containing window + * @param interval size of the window + * @return {@code null} if either input is {@code null}; otherwise, a {@link ZonedDateTime} representing the start + * of the window + */ + @ScriptApi + @Nullable + public static ZonedDateTime lowerBin(@Nullable final ZonedDateTime dateTime, Duration interval) { + if (dateTime == null || interval == null) { + return null; + } + + return lowerBin(dateTime, interval.toNanos()); + } + /** * Returns an {@link Instant} value, which is at the starting (lower) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the instant value for the start of the - * five-minute window that contains the input instant. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the instant value for the start + * of the five-minute window that contains the input instant. * * @param instant instant for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds * @param offset The window start offset in nanoseconds. For example, a value of MINUTE would offset all windows by * one minute. - * @return {@code null} if either input is {@code null}; otherwise, an {@link Instant} representing the start of the + * @return {@code null} if any input is {@code null}; otherwise, an {@link Instant} representing the start of the * window */ @ScriptApi @@ -3212,17 +3252,38 @@ public static Instant lowerBin(@Nullable final Instant instant, long intervalNan return epochNanosToInstant(Numeric.lowerBin(epochNanos(instant) - offset, intervalNanos) + offset); } + /** + * Returns an {@link Instant} value, which is at the starting (lower) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the instant value for the start of + * the five-minute window that contains the input instant. + * + * @param instant instant for which to evaluate the start of the containing window + * @param interval size of the window + * @param offset The window start offset. For example, a value of 'PT1m' would offset all windows by one minute. + * @return {@code null} if any input is {@code null}; otherwise, an {@link Instant} representing the start of the + * window + */ + @ScriptApi + @Nullable + public static Instant lowerBin(@Nullable final Instant instant, Duration interval, Duration offset) { + if (instant == null || interval == null || offset == null) { + return null; + } + + return lowerBin(instant, interval.toNanos(), offset.toNanos()); + } + /** * Returns a {@link ZonedDateTime} value, which is at the starting (lower) end of a time range defined by the - * interval nanoseconds. For example, a 5*MINUTE intervalNanos value would return the zoned date time value for the - * start of the five-minute window that contains the input zoned date time. + * interval nanoseconds. For example, a five-minute {@code intervalNanos} value would return the zoned date time + * value for the start of the five-minute window that contains the input zoned date time. * * @param dateTime zoned date time for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds * @param offset The window start offset in nanoseconds. For example, a value of MINUTE would offset all windows by * one minute. - * @return {@code null} if either input is {@code null}; otherwise, a {@link ZonedDateTime} representing the start - * of the window + * @return {@code null} if any input is {@code null}; otherwise, a {@link ZonedDateTime} representing the start of + * the window */ @ScriptApi @Nullable @@ -3235,10 +3296,31 @@ public static ZonedDateTime lowerBin(@Nullable final ZonedDateTime dateTime, lon dateTime.getZone()); } + /** + * Returns a {@link ZonedDateTime} value, which is at the starting (lower) end of a time range defined by the + * interval nanoseconds. For example, a five-minute {@code interval} intervalNanos value would return the zoned date + * time value for the start of the five-minute window that contains the input zoned date time. + * + * @param dateTime zoned date time for which to evaluate the start of the containing window + * @param interval size of the window + * @param offset The window start offset. For example, a value of MINUTE would offset all windows by one minute. + * @return {@code null} if any input is {@code null}; otherwise, a {@link ZonedDateTime} representing the start of + * the window + */ + @ScriptApi + @Nullable + public static ZonedDateTime lowerBin(@Nullable final ZonedDateTime dateTime, Duration interval, Duration offset) { + if (dateTime == null || interval == null || offset == null) { + return null; + } + + return lowerBin(dateTime, interval.toNanos(), offset.toNanos()); + } + /** * Returns an {@link Instant} value, which is at the ending (upper) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the instant value for the end of the - * five-minute window that contains the input instant. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the instant value for the end of + * the five-minute window that contains the input instant. * * @param instant instant for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds @@ -3255,10 +3337,30 @@ public static Instant upperBin(@Nullable final Instant instant, long intervalNan return epochNanosToInstant(Numeric.upperBin(epochNanos(instant), intervalNanos)); } + /** + * Returns an {@link Instant} value, which is at the ending (upper) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the instant value for the end of the + * five-minute window that contains the input instant. + * + * @param instant instant for which to evaluate the start of the containing window + * @param interval size of the window + * @return {@code null} if either input is {@code null}; otherwise, an {@link Instant} representing the end of the + * window + */ + @ScriptApi + @Nullable + public static Instant upperBin(@Nullable final Instant instant, Duration interval) { + if (instant == null || interval == null) { + return null; + } + + return upperBin(instant, interval.toNanos()); + } + /** * Returns a {@link ZonedDateTime} value, which is at the ending (upper) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the zoned date time value for the end of - * the five-minute window that contains the input zoned date time. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the zoned date time value for + * the end of the five-minute window that contains the input zoned date time. * * @param dateTime zoned date time for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds @@ -3275,16 +3377,36 @@ public static ZonedDateTime upperBin(@Nullable final ZonedDateTime dateTime, lon return epochNanosToZonedDateTime(Numeric.upperBin(epochNanos(dateTime), intervalNanos), dateTime.getZone()); } + /** + * Returns a {@link ZonedDateTime} value, which is at the ending (upper) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the zoned date time value for the end + * of the five-minute window that contains the input zoned date time. + * + * @param dateTime zoned date time for which to evaluate the start of the containing window + * @param interval size of the window + * @return {@code null} if either input is {@code null}; otherwise, a {@link ZonedDateTime} representing the end of + * the window + */ + @ScriptApi + @Nullable + public static ZonedDateTime upperBin(@Nullable final ZonedDateTime dateTime, Duration interval) { + if (dateTime == null || interval == null) { + return null; + } + + return upperBin(dateTime, interval.toNanos()); + } + /** * Returns an {@link Instant} value, which is at the ending (upper) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the instant value for the end of the - * five-minute window that contains the input instant. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the instant value for the end of + * the five-minute window that contains the input instant. * * @param instant instant for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds * @param offset The window start offset in nanoseconds. For example, a value of MINUTE would offset all windows by * one minute. - * @return {@code null} if either input is {@code null}; otherwise, an {@link Instant} representing the end of the + * @return {@code null} if any input is {@code null}; otherwise, an {@link Instant} representing the end of the * window */ @ScriptApi @@ -3298,17 +3420,38 @@ public static Instant upperBin(@Nullable final Instant instant, long intervalNan return epochNanosToInstant(Numeric.upperBin(epochNanos(instant) - offset, intervalNanos) + offset); } + /** + * Returns an {@link Instant} value, which is at the ending (upper) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the instant value for the end of the + * five-minute window that contains the input instant. + * + * @param instant instant for which to evaluate the start of the containing window + * @param interval size of the window + * @param offset The window start offset. For example, a value of 'PT1m' would offset all windows by one minute. + * @return {@code null} if any input is {@code null}; otherwise, an {@link Instant} representing the end of the + * window + */ + @ScriptApi + @Nullable + public static Instant upperBin(@Nullable final Instant instant, Duration interval, Duration offset) { + if (instant == null || interval == null || offset == null) { + return null; + } + + return upperBin(instant, interval.toNanos(), offset.toNanos()); + } + /** * Returns a {@link ZonedDateTime} value, which is at the ending (upper) end of a time range defined by the interval - * nanoseconds. For example, a 5*MINUTE intervalNanos value would return the zoned date time value for the end of - * the five-minute window that contains the input zoned date time. + * nanoseconds. For example, a five-minute {@code intervalNanos} value would return the zoned date time value for + * the end of the five-minute window that contains the input zoned date time. * * @param dateTime zoned date time for which to evaluate the start of the containing window * @param intervalNanos size of the window in nanoseconds * @param offset The window start offset in nanoseconds. For example, a value of MINUTE would offset all windows by * one minute. - * @return {@code null} if either input is {@code null}; otherwise, a {@link ZonedDateTime} representing the end of - * the window + * @return {@code null} if any input is {@code null}; otherwise, a {@link ZonedDateTime} representing the end of the + * window */ @ScriptApi @Nullable @@ -3322,6 +3465,27 @@ public static ZonedDateTime upperBin(@Nullable final ZonedDateTime dateTime, lon dateTime.getZone()); } + /** + * Returns a {@link ZonedDateTime} value, which is at the ending (upper) end of a time range defined by the interval + * nanoseconds. For example, a five-minute {@code interval} value would return the zoned date time value for the end + * of the five-minute window that contains the input zoned date time. + * + * @param dateTime zoned date time for which to evaluate the start of the containing window + * @param interval size of the window + * @param offset The window start offset. For example, a value of 'PT1m' would offset all windows by one minute. + * @return {@code null} if any input is {@code null}; otherwise, a {@link ZonedDateTime} representing the end of the + * window + */ + @ScriptApi + @Nullable + public static ZonedDateTime upperBin(@Nullable final ZonedDateTime dateTime, Duration interval, Duration offset) { + if (dateTime == null || interval == null || offset == null) { + return null; + } + + return upperBin(dateTime, interval.toNanos(), offset.toNanos()); + } + // endregion // region Format diff --git a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java index 4bbe4dcf693..1e83430e8ac 100644 --- a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java +++ b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java @@ -1837,6 +1837,11 @@ public void testLowerBin() { TestCase.assertEquals(DateTimeUtils.lowerBin(instant, second), DateTimeUtils.lowerBin(DateTimeUtils.lowerBin(instant, second), second)); + TestCase.assertEquals(DateTimeUtils.lowerBin(instant, Duration.ofMinutes(1)), + DateTimeUtils.lowerBin(instant, minute)); + TestCase.assertNull(DateTimeUtils.lowerBin((Instant) null, Duration.ofMinutes(1))); + TestCase.assertNull(DateTimeUtils.lowerBin(instant, (Duration) null)); + final ZonedDateTime zdt = DateTimeUtils.toZonedDateTime(instant, TZ_AL); TestCase.assertEquals(DateTimeUtils.toZonedDateTime(DateTimeUtils.lowerBin(instant, second), TZ_AL), @@ -1854,6 +1859,10 @@ public void testLowerBin() { TestCase.assertEquals(DateTimeUtils.lowerBin(zdt, second), DateTimeUtils.lowerBin(DateTimeUtils.lowerBin(zdt, second), second)); + TestCase.assertEquals(DateTimeUtils.lowerBin(zdt, Duration.ofMinutes(1)), + DateTimeUtils.lowerBin(zdt, minute)); + TestCase.assertNull(DateTimeUtils.lowerBin((ZonedDateTime) null, Duration.ofMinutes(1))); + TestCase.assertNull(DateTimeUtils.lowerBin(zdt, (Duration) null)); } public void testLowerBinWithOffset() { @@ -1871,6 +1880,12 @@ public void testLowerBinWithOffset() { TestCase.assertEquals(DateTimeUtils.lowerBin(instant, second, second), DateTimeUtils.lowerBin(DateTimeUtils.lowerBin(instant, second, second), second, second)); + TestCase.assertEquals(DateTimeUtils.lowerBin(instant, Duration.ofMinutes(1), Duration.ofSeconds(2)), + DateTimeUtils.lowerBin(instant, minute, 2 * second)); + TestCase.assertNull(DateTimeUtils.lowerBin((Instant) null, Duration.ofMinutes(1), Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.lowerBin(instant, (Duration) null, Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.lowerBin(instant, Duration.ofMinutes(1), (Duration) null)); + final ZonedDateTime zdt = DateTimeUtils.toZonedDateTime(instant, TZ_AL); TestCase.assertEquals(DateTimeUtils.parseZonedDateTime("2010-06-15T06:11:00 NY").withZoneSameInstant(TZ_AL), @@ -1881,6 +1896,12 @@ public void testLowerBinWithOffset() { TestCase.assertEquals(DateTimeUtils.lowerBin(zdt, second, second), DateTimeUtils.lowerBin(DateTimeUtils.lowerBin(zdt, second, second), second, second)); + + TestCase.assertEquals(DateTimeUtils.lowerBin(zdt, Duration.ofMinutes(1), Duration.ofSeconds(2)), + DateTimeUtils.lowerBin(zdt, minute, 2 * second)); + TestCase.assertNull(DateTimeUtils.lowerBin((ZonedDateTime) null, Duration.ofMinutes(1), Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.lowerBin(zdt, (Duration) null, Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.lowerBin(zdt, Duration.ofMinutes(1), (Duration) null)); } public void testUpperBin() { @@ -1902,6 +1923,11 @@ public void testUpperBin() { TestCase.assertEquals(DateTimeUtils.upperBin(instant, second), DateTimeUtils.upperBin(DateTimeUtils.upperBin(instant, second), second)); + TestCase.assertEquals(DateTimeUtils.upperBin(instant, Duration.ofMinutes(1)), + DateTimeUtils.upperBin(instant, minute)); + TestCase.assertNull(DateTimeUtils.upperBin((Instant) null, Duration.ofMinutes(1))); + TestCase.assertNull(DateTimeUtils.upperBin(instant, (Duration) null)); + final ZonedDateTime zdt = DateTimeUtils.toZonedDateTime(instant, TZ_AL); TestCase.assertEquals(DateTimeUtils.parseZonedDateTime("2010-06-15T06:14:02 NY").withZoneSameInstant(TZ_AL), @@ -1915,6 +1941,11 @@ public void testUpperBin() { TestCase.assertEquals(DateTimeUtils.upperBin(zdt, second), DateTimeUtils.upperBin(DateTimeUtils.upperBin(zdt, second), second)); + + TestCase.assertEquals(DateTimeUtils.upperBin(zdt, Duration.ofMinutes(1)), + DateTimeUtils.upperBin(zdt, minute)); + TestCase.assertNull(DateTimeUtils.upperBin((ZonedDateTime) null, Duration.ofMinutes(1))); + TestCase.assertNull(DateTimeUtils.upperBin(zdt, (Duration) null)); } public void testUpperBinWithOffset() { @@ -1932,6 +1963,12 @@ public void testUpperBinWithOffset() { TestCase.assertEquals(DateTimeUtils.upperBin(instant, second, second), DateTimeUtils.upperBin(DateTimeUtils.upperBin(instant, second, second), second, second)); + TestCase.assertEquals(DateTimeUtils.upperBin(instant, Duration.ofMinutes(1), Duration.ofSeconds(2)), + DateTimeUtils.upperBin(instant, minute, 2 * second)); + TestCase.assertNull(DateTimeUtils.upperBin((Instant) null, Duration.ofMinutes(1), Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.upperBin(instant, (Duration) null, Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.upperBin(instant, Duration.ofMinutes(1), (Duration) null)); + final ZonedDateTime zdt = DateTimeUtils.toZonedDateTime(instant, TZ_AL); TestCase.assertEquals(DateTimeUtils.parseZonedDateTime("2010-06-15T06:16:00 NY").withZoneSameInstant(TZ_AL), @@ -1942,6 +1979,12 @@ public void testUpperBinWithOffset() { TestCase.assertEquals(DateTimeUtils.upperBin(zdt, second, second), DateTimeUtils.upperBin(DateTimeUtils.upperBin(zdt, second, second), second, second)); + + TestCase.assertEquals(DateTimeUtils.upperBin(zdt, Duration.ofMinutes(1), Duration.ofSeconds(2)), + DateTimeUtils.upperBin(zdt, minute, 2 * second)); + TestCase.assertNull(DateTimeUtils.upperBin((ZonedDateTime) null, Duration.ofMinutes(1), Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.upperBin(zdt, (Duration) null, Duration.ofSeconds(2))); + TestCase.assertNull(DateTimeUtils.upperBin(zdt, Duration.ofMinutes(1), (Duration) null)); } public void testPlusLocalDate() { From 45081344b19455da0f47534bb2bfd0e78f859fc0 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Wed, 28 Feb 2024 08:26:05 +0800 Subject: [PATCH 06/10] Support use of DType instance in UDF annotation (#5200) --- py/server/deephaven/_udf.py | 10 ++++++++++ py/server/deephaven/dtypes.py | 3 +++ py/server/tests/test_udf_return_java_values.py | 10 ++++++++++ 3 files changed, 23 insertions(+) diff --git a/py/server/deephaven/_udf.py b/py/server/deephaven/_udf.py index f3ad8ba10db..b6b69479805 100644 --- a/py/server/deephaven/_udf.py +++ b/py/server/deephaven/_udf.py @@ -119,6 +119,11 @@ def _parse_type_no_nested(annotation: Any, p_param: _ParsedParam, t: Union[type, t = eval(t) if isinstance(t, str) else t p_param.orig_types.add(t) + + # if the annotation is a DH DType instance, we'll use its numpy type + if isinstance(t, dtypes.DType): + t = t.np_type + tc = _encode_param_type(t) if "[" in tc: p_param.has_array = True @@ -157,6 +162,11 @@ def _parse_return_annotation(annotation: Any) -> _ParsedReturnAnnotation: t = annotation.__args__[0] elif annotation.__args__[0] == type(None): # noqa: E721 t = annotation.__args__[1] + + # if the annotation is a DH DType instance, we'll use its numpy type + if isinstance(t, dtypes.DType): + t = t.np_type + component_char = _component_np_dtype_char(t) if component_char: pra.encoded_type = "[" + component_char diff --git a/py/server/deephaven/dtypes.py b/py/server/deephaven/dtypes.py index 63994270d84..2c8e09369c8 100644 --- a/py/server/deephaven/dtypes.py +++ b/py/server/deephaven/dtypes.py @@ -412,6 +412,9 @@ def _component_np_dtype_char(t: type) -> Optional[str]: component_type = None if isinstance(t, _GenericAlias) and issubclass(t.__origin__, Sequence): component_type = t.__args__[0] + # if the component type is a DType, get its numpy type + if isinstance(component_type, DType): + component_type = component_type.np_type if not component_type: component_type = _np_ndarray_component_type(t) diff --git a/py/server/tests/test_udf_return_java_values.py b/py/server/tests/test_udf_return_java_values.py index 1a7c78e3aa9..5a89a8a75e4 100644 --- a/py/server/tests/test_udf_return_java_values.py +++ b/py/server/tests/test_udf_return_java_values.py @@ -293,6 +293,16 @@ def nbsin(x): t3 = empty_table(10).update(["X3 = nbsin(i)"]) self.assertEqual(t3.columns[0].data_type, dtypes.double) + def test_java_instant_return(self): + from deephaven.time import to_j_instant + + t = empty_table(10).update(["X1 = to_j_instant(`2021-01-01T00:00:00Z`)"]) + self.assertEqual(t.columns[0].data_type, dtypes.Instant) + + def udf() -> List[dtypes.Instant]: + return [to_j_instant("2021-01-01T00:00:00Z")] + t = empty_table(10).update(["X1 = udf()"]) + self.assertEqual(t.columns[0].data_type, dtypes.instant_array) if __name__ == '__main__': unittest.main() From 38dda6bb2672ef9f97164b65cb073d585804a79f Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Tue, 27 Feb 2024 20:19:56 -0800 Subject: [PATCH 07/10] Bump to jetty 11.0.20 (#5203) --- buildSrc/src/main/groovy/Classpaths.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/Classpaths.groovy b/buildSrc/src/main/groovy/Classpaths.groovy index 80412826241..33df627126b 100644 --- a/buildSrc/src/main/groovy/Classpaths.groovy +++ b/buildSrc/src/main/groovy/Classpaths.groovy @@ -117,7 +117,7 @@ class Classpaths { static final String JETTY11_GROUP = 'org.eclipse.jetty' static final String JETTY11_NAME = 'jetty-bom' - static final String JETTY11_VERSION = '11.0.19' + static final String JETTY11_VERSION = '11.0.20' static final String GUAVA_GROUP = 'com.google.guava' static final String GUAVA_NAME = 'guava' From ca4dd87e9aee0338cc93eeda92e766259fc5bd81 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Wed, 28 Feb 2024 15:47:16 -0500 Subject: [PATCH 08/10] Improve WindowCheck memory usage. (#5197) * Port changes from legacy. * Use iterator for modified row nanosecond retrieval. * Do not reread nanos for modified singleton entries. * test changes 1. * Apply suggestions from code review Co-authored-by: Ryan Caudy * some changes * some more formatting * rest of review. * Correct two bugs in entry merging. * fix comment formatting * Update engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java Co-authored-by: Ryan Caudy * Reduce seed number. * Optimize the getLong away for singletons. --------- Co-authored-by: Ryan Caudy --- engine/table/build.gradle | 1 + .../table/impl/select/TimeSeriesFilter.java | 7 +- .../io/deephaven/engine/util/WindowCheck.java | 737 +++++++++++++++--- .../engine/util/TestWindowCheck.java | 254 +++++- engine/test-utils/build.gradle | 2 +- .../engine/testutil/GenerateTableUpdates.java | 9 + .../engine/testutil/QueryTableTestBase.java | 21 +- .../testcase/RefreshingTableTestCase.java | 2 +- 8 files changed, 889 insertions(+), 144 deletions(-) diff --git a/engine/table/build.gradle b/engine/table/build.gradle index d72c1a39543..83f2b935e7f 100644 --- a/engine/table/build.gradle +++ b/engine/table/build.gradle @@ -34,6 +34,7 @@ dependencies { implementation 'com.tdunning:t-digest:3.2' implementation 'com.squareup:javapoet:1.13.0' implementation 'io.github.classgraph:classgraph:4.8.165' + implementation 'it.unimi.dsi:fastutil:8.5.13' implementation project(':plugin') implementation depCommonsLang3 diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java index 3a736ff098f..8e203d88c78 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/select/TimeSeriesFilter.java @@ -24,7 +24,12 @@ import java.util.List; /** - * This will filter a table for the most recent N nanoseconds (must be on a date time column). + * This will filter a table for the most recent N nanoseconds (must be on an {@link Instant} column). + * + *

+ * Note, this filter rescans the source table. You should prefer to use {@link io.deephaven.engine.util.WindowCheck} + * instead. + *

*/ public class TimeSeriesFilter extends WhereFilterLivenessArtifactImpl diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 846a25a2fd7..08fdd21b532 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -11,7 +11,7 @@ import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.datastructures.util.CollectionUtil; +import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfLong; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys; @@ -21,7 +21,9 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.TableUpdateImpl; +import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; +import io.deephaven.engine.table.iterators.ChunkedLongColumnIterator; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.time.DateTimeUtils; @@ -30,11 +32,12 @@ import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; import io.deephaven.base.RAPriQueue; -import gnu.trove.map.hash.TLongObjectHashMap; import io.deephaven.util.QueryConstants; +import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import org.jetbrains.annotations.NotNull; -import java.time.Instant; import java.util.*; /** @@ -58,6 +61,10 @@ private WindowCheck() {} * The resultant table ticks whenever the input table ticks, or modifies a row when it passes out of the window. *

* + *

+ * The timestamp column must be an Instant or a long value expressed as nanoseconds since the epoch. + *

+ * * @param table the input table * @param timestampColumn the timestamp column to monitor in table * @param windowNanos how many nanoseconds in the past a timestamp can be before it is out of the window @@ -67,7 +74,9 @@ private WindowCheck() {} @SuppressWarnings("unused") public static Table addTimeWindow(QueryTable table, String timestampColumn, long windowNanos, String inWindowColumn) { - return addTimeWindowInternal(null, table, timestampColumn, windowNanos, inWindowColumn, true).first; + return QueryPerformanceRecorder.withNugget("addTimeWindow(" + timestampColumn + ", " + windowNanos + ")", + table.sizeForInstrumentation(), + () -> addTimeWindowInternal(null, table, timestampColumn, windowNanos, inWindowColumn, true).first); } private static class WindowListenerRecorder extends ListenerRecorder { @@ -105,8 +114,10 @@ static Pair addTimeWindowInternal(Clock clock, QueryT final TimeWindowListener timeWindowListener = new TimeWindowListener(inWindowColumn, inWindowColumnSource, recorder, table, result); recorder.setMergedListener(timeWindowListener); - table.addUpdateListener(recorder); - timeWindowListener.addRowSequence(table.getRowSet()); + if (table.isRefreshing()) { + table.addUpdateListener(recorder); + } + timeWindowListener.addRowSequence(table.getRowSet(), false); result.addParentReference(timeWindowListener); result.manage(table); if (addToMonitor) { @@ -119,42 +130,67 @@ static Pair addTimeWindowInternal(Clock clock, QueryT * The TimeWindowListener maintains a priority queue of rows that are within a configured window, when they pass out * of the window, the InWindow column is set to false and a modification tick happens. * + *

* It implements {@link Runnable}, so that we can be inserted into the {@link PeriodicUpdateGraph}. + *

*/ static class TimeWindowListener extends MergedListener implements Runnable { private final InWindowColumnSource inWindowColumnSource; private final QueryTable result; - /** a priority queue of InWindow entries, with the least recent timestamps getting pulled out first. */ + /** + * A priority queue of entries within our window, with the least recent timestamps getting pulled out first. + */ private final RAPriQueue priorityQueue; - /** a map from table indices to our entries. */ - private final TLongObjectHashMap rowKeyToEntry; + /** + * A sorted map from the last row key in an entry, to our entries. + */ + private final Long2ObjectAVLTreeMap rowKeyToEntry; private final ModifiedColumnSet.Transformer mcsTransformer; - private final ModifiedColumnSet mcsNewColumns; - private final ModifiedColumnSet reusableModifiedColumnSet; + private final ModifiedColumnSet mcsResultWindowColumn; + private final ModifiedColumnSet mcsSourceTimestamp; private final Table source; private final ListenerRecorder recorder; /** - * An intrusive entry inside of indexToEntry and priorityQueue. + * An intrusive entry in priorityQueue, also stored in rowKeyToEntry (for tables with + * modifications/removes/shifts). + * + *

+ * Each entry contains a contiguous range of row keys, with non-descending timestamps. + *

*/ private static class Entry { - /** position in the priority queue */ + /** + * position in the priority queue + */ int pos; - /** the timestamp */ + /** + * the timestamp of the first row key + */ long nanos; - /** the row key within the source (and result) table */ - long rowKey; - Entry(long rowKey, long timestamp) { - this.rowKey = Require.geqZero(rowKey, "rowKey"); - this.nanos = timestamp; + /** + * the first row key within the source (and result) table + */ + long firstRowKey; + /** + * the last row key within the source (and result) table + */ + long lastRowKey; + + + Entry(final long firstRowKey, final long lastRowKey, final long firstTimestamp) { + this.firstRowKey = Require.geqZero(firstRowKey, "firstRowKey"); + this.lastRowKey = Require.geq(lastRowKey, "lastRowKey", firstRowKey, "firstRowKey"); + this.nanos = firstTimestamp; } @Override public String toString() { return "Entry{" + "nanos=" + nanos + - ", rowKey=" + rowKey + + ", firstRowKey=" + firstRowKey + + ", lastRowKey=" + lastRowKey + '}'; } } @@ -177,27 +213,31 @@ private TimeWindowListener(final String inWindowColumnName, final InWindowColumn // queue; we'll just depend on exponential doubling to get us there if need be this.priorityQueue = new RAPriQueue<>(4096, new RAPriQueue.Adapter<>() { @Override - public boolean less(Entry a, Entry b) { + public boolean less(final Entry a, final Entry b) { return a.nanos < b.nanos; } @Override - public void setPos(Entry el, int pos) { + public void setPos(final Entry el, final int pos) { el.pos = pos; } @Override - public int getPos(Entry el) { + public int getPos(final Entry el) { return el.pos; } }, Entry.class); - this.rowKeyToEntry = new TLongObjectHashMap<>(); + if (source.isAddOnly()) { + this.rowKeyToEntry = null; + } else { + this.rowKeyToEntry = new Long2ObjectAVLTreeMap<>(); + } this.mcsTransformer = source.newModifiedColumnSetTransformer(result, source.getDefinition().getColumnNamesArray()); - this.mcsNewColumns = result.newModifiedColumnSet(inWindowColumnName); - this.reusableModifiedColumnSet = new ModifiedColumnSet(this.mcsNewColumns); + this.mcsSourceTimestamp = source.newModifiedColumnSet(inWindowColumnSource.timeStampName); + this.mcsResultWindowColumn = result.newModifiedColumnSet(inWindowColumnName); } @Override @@ -205,58 +245,124 @@ protected void process() { if (recorder.recordedVariablesAreValid()) { final TableUpdate upstream = recorder.getUpdate(); - // remove the removed indices from the priority queue - removeIndex(upstream.removed()); + // remove the removed row keys from the priority queue + removeRowSet(upstream.removed(), true); // anything that was shifted needs to be placed in the proper slots - try (final RowSet preShiftRowSet = source.getRowSet().copyPrev()) { + try (final WritableRowSet preShiftRowSet = source.getRowSet().copyPrev()) { + preShiftRowSet.remove(upstream.removed()); upstream.shifted().apply((start, end, delta) -> { - final RowSet subRowSet = preShiftRowSet.subSetByKeyRange(start, end); - - final RowSet.SearchIterator it = - delta < 0 ? subRowSet.searchIterator() : subRowSet.reverseIterator(); - while (it.hasNext()) { - final long idx = it.nextLong(); - final Entry entry = rowKeyToEntry.remove(idx); - if (entry != null) { - entry.rowKey = idx + delta; - rowKeyToEntry.put(idx + delta, entry); - } + try (final RowSet subRowSet = preShiftRowSet.subSetByKeyRange(start, end)) { + shiftSubRowset(subRowSet, delta); } }); } - // TODO: improve performance with getChunk - // TODO: reinterpret inWindowColumnSource so that it compares longs instead of objects - // figure out for all the modified row keys if the timestamp or row key changed - upstream.forAllModified((oldIndex, newIndex) -> { - final long currentTimestamp = inWindowColumnSource.timeStampSource.getLong(newIndex); - final long prevTimestamp = inWindowColumnSource.timeStampSource.getPrevLong(oldIndex); - if (currentTimestamp != prevTimestamp) { - updateRow(newIndex, currentTimestamp); + if (upstream.modifiedColumnSet().containsAny(mcsSourceTimestamp)) { + final RowSetBuilderSequential changedTimestampRowsToRemovePost = RowSetFactory.builderSequential(); + final RowSetBuilderSequential changedTimestampRowsToAddPost = RowSetFactory.builderSequential(); + + final int chunkSize = (int) Math.min(upstream.modified().size(), 4096); + + try (final ChunkSource.GetContext prevContext = + inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); + final ChunkSource.GetContext currContext = + inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); + final RowSequence.Iterator prevIt = upstream.getModifiedPreShift().getRowSequenceIterator(); + final RowSequence.Iterator currIt = upstream.modified().getRowSequenceIterator()) { + while (currIt.hasMore()) { + final RowSequence prevRows = prevIt.getNextRowSequenceWithLength(chunkSize); + final RowSequence currRows = currIt.getNextRowSequenceWithLength(chunkSize); + final LongChunk chunkKeys = currRows.asRowKeyChunk(); + final LongChunk prevTimestamps = inWindowColumnSource.timeStampSource + .getPrevChunk(prevContext, prevRows).asLongChunk(); + final LongChunk currTimestamps = + inWindowColumnSource.timeStampSource.getChunk(currContext, currRows).asLongChunk(); + + for (int ii = 0; ii < prevTimestamps.size(); ++ii) { + final long prevTimestamp = prevTimestamps.get(ii); + final long currentTimestamp = currTimestamps.get(ii); + if (currentTimestamp != prevTimestamp) { + final boolean prevInWindow = prevTimestamp != QueryConstants.NULL_LONG + && inWindowColumnSource.computeInWindowUnsafePrev(prevTimestamp); + final boolean curInWindow = currentTimestamp != QueryConstants.NULL_LONG + && inWindowColumnSource.computeInWindowUnsafe(currentTimestamp); + final long rowKey = chunkKeys.get(ii); + if (prevInWindow && curInWindow) { + // we might not have actually reordered anything, if we can check that "easily" + // we should do it to avoid churn and reading from the column, first find the + // entry based on our row key + final LongBidirectionalIterator iterator = + rowKeyToEntry.keySet().iterator(rowKey - 1); + // we have to have an entry, otherwise we would not be in the window + Assert.assertion(iterator.hasNext(), "iterator.hasNext()"); + final Entry foundEntry = rowKeyToEntry.get(iterator.nextLong()); + Assert.neqNull(foundEntry, "foundEntry"); + + if (foundEntry.firstRowKey == rowKey + && foundEntry.lastRowKey == foundEntry.firstRowKey) { + // we should update the nanos for this entry + foundEntry.nanos = currentTimestamp; + priorityQueue.enter(foundEntry); + continue; + } + + /* + * If we want to get fancier, there are some more cases where we could determine + * that there is no need to re-read the data. In particular, we would have to + * know that we have both the previous and next values in our chunk; otherwise + * we would be re-reading data anyway. The counterpoint is that if we are + * actually in those cases, where we are modifying Timestamps that are in the + * window it seems unlikely that the table is going to have consecutive + * timestamp ranges. To encode that logic would be fairly complex, and I think + * not actually worth it. + */ + } + if (prevInWindow) { + changedTimestampRowsToRemovePost.appendKey(rowKey); + } + if (curInWindow) { + changedTimestampRowsToAddPost.appendKey(rowKey); + } + } + } + } } - }); + + // we should have shifted values where relevant above, so we only operate on the new row key + try (final RowSet changedTimestamps = changedTimestampRowsToRemovePost.build()) { + if (changedTimestamps.isNonempty()) { + removeRowSet(changedTimestamps, false); + } + } + try (final RowSet changedTimestamps = changedTimestampRowsToAddPost.build()) { + if (changedTimestamps.isNonempty()) { + addRowSequence(changedTimestamps, rowKeyToEntry != null); + } + } + } // now add the new timestamps - addRowSequence(upstream.added()); + addRowSequence(upstream.added(), rowKeyToEntry != null); + + final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - final WritableRowSet downstreamModified = upstream.modified().copy(); try (final RowSet modifiedByTime = recomputeModified()) { if (modifiedByTime.isNonempty()) { - downstreamModified.insert(modifiedByTime); + downstream.modified.writableCast().insert(modifiedByTime); } } // everything that was added, removed, or modified stays added removed or modified - if (downstreamModified.isNonempty()) { - mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), reusableModifiedColumnSet); - reusableModifiedColumnSet.setAll(mcsNewColumns); + downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + if (downstream.modified.isNonempty()) { + mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); + downstream.modifiedColumnSet.setAll(mcsResultWindowColumn); } else { - reusableModifiedColumnSet.clear(); + downstream.modifiedColumnSet.clear(); } - result.notifyListeners(new TableUpdateImpl(upstream.added().copy(), upstream.removed().copy(), - downstreamModified, upstream.shifted(), reusableModifiedColumnSet)); + result.notifyListeners(downstream); } else { final RowSet modifiedByTime = recomputeModified(); if (modifiedByTime.isNonempty()) { @@ -265,9 +371,9 @@ protected void process() { downstream.added = RowSetFactory.empty(); downstream.removed = RowSetFactory.empty(); downstream.shifted = RowSetShiftData.EMPTY; - downstream.modifiedColumnSet = reusableModifiedColumnSet; - downstream.modifiedColumnSet().clear(); - downstream.modifiedColumnSet().setAll(mcsNewColumns); + downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + downstream.modifiedColumnSet.clear(); + downstream.modifiedColumnSet.setAll(mcsResultWindowColumn); result.notifyListeners(downstream); } else { modifiedByTime.close(); @@ -275,79 +381,367 @@ protected void process() { } } - /** - * Handles modified rowSets. If they are outside of the window, they need to be removed from the queue. If they - * are inside the window, they need to be (re)inserted into the queue. - */ - private void updateRow(final long rowKey, long currentTimestamp) { - Entry entry = rowKeyToEntry.remove(rowKey); - if (currentTimestamp == QueryConstants.NULL_LONG) { - if (entry != null) { - priorityQueue.remove(entry); - } - return; - } - if (inWindowColumnSource.computeInWindow(currentTimestamp, inWindowColumnSource.currentTime)) { - if (entry == null) { - entry = new Entry(rowKey, 0); - } - entry.nanos = currentTimestamp; - priorityQueue.enter(entry); - rowKeyToEntry.put(entry.rowKey, entry); - } else if (entry != null) { - priorityQueue.remove(entry); - } - } - /** * If the value of the timestamp is within the window, insert it into the queue and map. * * @param rowSequence the row sequence to insert into the table + * @param tryCombine try to combine newly added ranges with those already in the maps. For initial addition, + * there is nothing to combine with, so we do not spend the time on map lookups. For add-only tables, we + * do not maintain the rowKeyToEntry map, so cannot find adjacent ranges for combination. */ - private void addRowSequence(RowSequence rowSequence) { + private void addRowSequence(RowSequence rowSequence, boolean tryCombine) { final int chunkSize = (int) Math.min(rowSequence.size(), 4096); + Entry pendingEntry = null; + long lastNanos = Long.MAX_VALUE; + try (final ChunkSource.GetContext getContext = inWindowColumnSource.timeStampSource.makeGetContext(chunkSize); - final RowSequence.Iterator okit = rowSequence.getRowSequenceIterator()) { - while (okit.hasMore()) { - final RowSequence chunkOk = okit.getNextRowSequenceWithLength(chunkSize); - final LongChunk keyIndices = chunkOk.asRowKeyChunk(); + final RowSequence.Iterator rsit = rowSequence.getRowSequenceIterator()) { + while (rsit.hasMore()) { + final RowSequence chunkRows = rsit.getNextRowSequenceWithLength(chunkSize); + final LongChunk rowKeys = chunkRows.asRowKeyChunk(); final LongChunk timestampValues = - inWindowColumnSource.timeStampSource.getChunk(getContext, chunkOk).asLongChunk(); - for (int ii = 0; ii < keyIndices.size(); ++ii) { + inWindowColumnSource.timeStampSource.getChunk(getContext, chunkRows).asLongChunk(); + for (int ii = 0; ii < rowKeys.size(); ++ii) { + final long currentRowKey = rowKeys.get(ii); final long currentTimestamp = timestampValues.get(ii); if (currentTimestamp == QueryConstants.NULL_LONG) { + if (pendingEntry != null) { + enter(pendingEntry, lastNanos, tryCombine); + pendingEntry = null; + } continue; } - if (inWindowColumnSource.computeInWindowUnsafe( - currentTimestamp, inWindowColumnSource.currentTime)) { - final Entry el = new Entry(keyIndices.get(ii), currentTimestamp); - priorityQueue.enter(el); - rowKeyToEntry.put(el.rowKey, el); + if (pendingEntry != null && (currentTimestamp < lastNanos + || pendingEntry.lastRowKey + 1 != currentRowKey)) { + enter(pendingEntry, lastNanos, tryCombine); + pendingEntry = null; + } + if (inWindowColumnSource.computeInWindowUnsafe(currentTimestamp)) { + lastNanos = currentTimestamp; + if (pendingEntry == null) { + if (tryCombine) { + // see if this can be combined with the prior entry + final Entry priorEntry = rowKeyToEntry.get(currentRowKey - 1); + if (priorEntry != null && priorEntry.nanos <= currentTimestamp) { + Assert.eq(priorEntry.lastRowKey, "priorEntry.lastRowKey", currentRowKey - 1, + "currentRowKey - 1"); + final boolean canCombine; + if (priorEntry.firstRowKey != priorEntry.lastRowKey) { + final long priorEntryLastNanos = + inWindowColumnSource.timeStampSource.getLong(priorEntry.lastRowKey); + canCombine = priorEntryLastNanos <= currentTimestamp; + } else { + canCombine = true; + } + if (canCombine) { + rowKeyToEntry.remove(currentRowKey - 1); + // Since we might be combining this with an entry later, we should remove it + // so that we don't have extra entries + priorityQueue.remove(priorEntry); + priorEntry.lastRowKey = currentRowKey; + pendingEntry = priorEntry; + continue; + } + } + } + pendingEntry = new Entry(currentRowKey, currentRowKey, currentTimestamp); + } else { + Assert.eq(pendingEntry.lastRowKey, "pendingEntry.lastRowKey", currentRowKey - 1, + "currentRowKey - 1"); + pendingEntry.lastRowKey = currentRowKey; + } + } else { + Assert.eqNull(pendingEntry, "pendingEntry"); } } } + if (pendingEntry != null) { + enter(pendingEntry, lastNanos, tryCombine); + } + } + } + + /** + * Add an entry into the priority queue, and if applicable the reverse map + * + * @param pendingEntry the entry to insert + */ + void enter(@NotNull final Entry pendingEntry) { + priorityQueue.enter(pendingEntry); + if (rowKeyToEntry != null) { + rowKeyToEntry.put(pendingEntry.lastRowKey, pendingEntry); + } + } + + /** + * Insert pendingEntry into the queue and map (if applicable). + * + * @param pendingEntry the entry to insert into our queue and reverse map + * @param lastNanos the final nanosecond value of the pending entry to insert, used to determine if we may + * combine with the next entry + * @param tryCombine true if we should combine values with the next entry, previous entries would have been + * combined during addRowSequence + */ + void enter(@NotNull final Entry pendingEntry, final long lastNanos, final boolean tryCombine) { + if (tryCombine) { + final LongBidirectionalIterator it = rowKeyToEntry.keySet().iterator(pendingEntry.lastRowKey); + if (it.hasNext()) { + final long nextKey = it.nextLong(); + final Entry nextEntry = rowKeyToEntry.get(nextKey); + if (nextEntry.firstRowKey == pendingEntry.lastRowKey + 1 && nextEntry.nanos >= lastNanos) { + // we can combine ourselves into next entry, because it is contiguous and has a timestamp + // greater than or equal to our entries last timestamp + nextEntry.nanos = pendingEntry.nanos; + nextEntry.firstRowKey = pendingEntry.firstRowKey; + priorityQueue.enter(nextEntry); + return; + } + } } + enter(pendingEntry); } /** * If the keys are in the window, remove them from the map and queue. * * @param rowSet the row keys to remove + * @param previous whether to operate in previous space */ - private void removeIndex(final RowSet rowSet) { - rowSet.forAllRowKeys((final long key) -> { - final Entry e = rowKeyToEntry.remove(key); - if (e != null) { - priorityQueue.remove(e); + private void removeRowSet(final RowSet rowSet, final boolean previous) { + if (rowSet.isEmpty()) { + return; + } + Assert.neqNull(rowKeyToEntry, "rowKeyToEntry"); + + RANGE: for (final RowSet.RangeIterator rangeIterator = rowSet.rangeIterator(); rangeIterator.hasNext();) { + rangeIterator.next(); + long start = rangeIterator.currentRangeStart(); + final long end = rangeIterator.currentRangeEnd(); + + // We have some range in the rowSet that is removed. This range (or part thereof) may or may not exist + // in one or more entries. We process from the front of the range to the end of the range, possibly + // advancing the range start. + + while (start <= end) { + // we look for start - 1, so that we will find start if it exists + // https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/longs/LongSortedSet.html#iterator(long) + // "The next element of the returned iterator is the least element of the set that is greater than + // the starting point (if there are no elements greater than the starting point, hasNext() will + // return false)." + final LongBidirectionalIterator reverseMapIterator = rowKeyToEntry.keySet().iterator(start - 1); + // if there is no next, then the reverse map contains no values that are greater than or equal to + // start, we can actually break out of the entire loop + if (!reverseMapIterator.hasNext()) { + break RANGE; + } + + final long entryLastKey = reverseMapIterator.nextLong(); + final Entry entry = rowKeyToEntry.get(entryLastKey); + if (entry.firstRowKey > end) { + // there is nothing here for us + start = entry.lastRowKey + 1; + continue; + } + + // there is some part of our start to end range that could be present in this entry. + if (entry.firstRowKey >= start) { + // we have visually one of the following three situations when start == firstRowKey: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a) + // [ ENTRY ] - the whole entry is contained (case b) + // [ ENTRY ] - the entry is a prefix - (case c) + // @formatter:on + + // we have visually one of the following three situations when start > firstRowKey: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry starts in the middle and terminates after (case a); so we remove a prefix of the entry + // [ ENTRY ] - entry starts in the middle and terminates the at same value (case b); delete the entry + // [ ENTRY ] - this cannot happen based on the search (case c) + // @formatter:on + + if (entry.lastRowKey > end) { // (case a) + // slice off the beginning of the entry + entry.firstRowKey = end + 1; + entry.nanos = previous + ? inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey) + : inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + priorityQueue.enter(entry); + } else { // (case b and c) + // we are consuming the entire entry, so can remove it from the queue + reverseMapIterator.remove(); + priorityQueue.remove(entry); + } + // and we look for the next entry after this one + start = entry.lastRowKey + 1; + } else { + // our entry is at least partially before end (because of the check after retrieving it), + // and is after start (because of how we searched in the map). + + // we have visually one of the following three situations: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a), we must split into two entries + // [ ENTRY ] - the entry starts before the range but ends with the range (case b); so we remove a suffix of the entry + // [ ENTRY ] - the entry starts before the range and ends inside the range(case c); so we must remove a suffix of the entry + // @formatter:on + + if (entry.lastRowKey > end) { + final Entry frontEntry = new Entry(entry.firstRowKey, start - 1, entry.nanos); + enter(frontEntry); + + entry.firstRowKey = end + 1; + entry.nanos = previous + ? inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey) + : inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + priorityQueue.enter(entry); + } else { // case b and c + entry.lastRowKey = start - 1; + reverseMapIterator.remove(); + rowKeyToEntry.put(entry.lastRowKey, entry); + } + } + } + } + } + + private void shiftSubRowset(final RowSet rowSet, final long delta) { + Assert.neqNull(rowKeyToEntry, "rowKeyToEntry"); + + // We need to be careful about reinserting entries into the correct order, if we are traversing forward, + // then we need to add the entries in opposite order to avoid overwriting another entry. We remove the + // entries + // in the loop, and if entriesToInsert is non-null add them to the list. If entriesToInsert is null, then + // we add them to the map. + final List entriesToInsert = delta > 0 ? new ArrayList<>() : null; + + RANGE: for (final RowSet.RangeIterator rangeIterator = rowSet.rangeIterator(); rangeIterator.hasNext();) { + rangeIterator.next(); + long start = rangeIterator.currentRangeStart(); + final long end = rangeIterator.currentRangeEnd(); + + // We have some range in the rowSet that has been moved about. This range (or part thereof) may or may + // not exist in one or more entries. We process from the front of the range to the end of the range, + // possibly advancing the range start. + + while (start <= end) { + // we look for start - 1, so that we will find start if it exists + // https://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/longs/LongSortedSet.html#iterator(long) + // "The next element of the returned iterator is the least element of the set that is greater than + // the starting point (if there are no elements greater than the starting point, hasNext() will + // return false)." + final LongBidirectionalIterator reverseMapIterator = rowKeyToEntry.keySet().iterator(start - 1); + // if there is no next, then the reverse map contains no values that are greater than or equal to + // start, we can actually break out of the entire loop + if (!reverseMapIterator.hasNext()) { + break RANGE; + } + + final long entryLastKey = reverseMapIterator.nextLong(); + final Entry entry = rowKeyToEntry.get(entryLastKey); + if (entry.firstRowKey > end) { + // there is nothing here for us + start = entry.lastRowKey + 1; + continue; + } + + // there is some part of our start to end range that could be present in this entry. + if (entry.firstRowKey >= start) { + + // @formatter:off + // we have visually one of the following three situations when start == firstRowKey: + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a) + // [ ENTRY ] - the whole entry is contained (case b) + // [ ENTRY ] - the entry is a prefix - (case c) + + // we have visually one of the following three situations when start > firstRowKey: + // [ RANGE ] + // [ ENTRY ] - the entry starts in the middle and terminates after (case a) + // [ ENTRY ] - entry starts in the middle and terminates the at same value (case b) + // [ ENTRY ] - this cannot happen based on the search (case c) + // @formatter:on + + // we look for the next entry after this one, but need to make sure to keep that happening in + // pre-shift space + start = entry.lastRowKey + 1; + + if (entry.lastRowKey > end) { // (case a) + // slice off the beginning of the entry, creating a new entry for the shift + final Entry newEntry = new Entry(entry.firstRowKey + delta, end + delta, entry.nanos); + + entry.firstRowKey = end + 1; + entry.nanos = inWindowColumnSource.timeStampSource.getPrevLong(entry.firstRowKey); + priorityQueue.enter(entry); + priorityQueue.enter(newEntry); + + addOrDeferEntry(entriesToInsert, newEntry); + } else { // (case b and c) + // we are consuming the entire entry, so can leave it in the queue as is, but need to change + // its reverse mapping + entry.firstRowKey += delta; + entry.lastRowKey += delta; + reverseMapIterator.remove(); + addOrDeferEntry(entriesToInsert, entry); + } + } else { + // our entry is at least partially before end (because of the check after retrieving it), + // and is after start (because of how we searched in the map). + + // we have visually one of the following three situations: + // @formatter:off + // [ RANGE ] + // [ ENTRY ] - the entry exceeds the range ( case a), we must split into three entries; + // but we would be splatting over stuff, so this is not permitted in a reasonable shift + // [ ENTRY ] - the entry starts before the range but ends with the range (case b) + // [ ENTRY ] - the entry starts before the range and ends inside the range(case c) + // @formatter:on + + if (entry.lastRowKey > end) { + throw new IllegalStateException(); + } else { // case b and c + + final long backNanos = inWindowColumnSource.timeStampSource.getPrevLong(start); + final Entry backEntry = new Entry(start + delta, entry.lastRowKey + delta, backNanos); + priorityQueue.enter(backEntry); + + // the nanos stays the same, so entry just needs an adjust last rowSet and the reverse map + entry.lastRowKey = start - 1; + + // by reinserting, we preserve the things that we have not changed to enable us to find them + // in the rest of the processing + reverseMapIterator.remove(); + rowKeyToEntry.put(entry.lastRowKey, entry); + + addOrDeferEntry(entriesToInsert, backEntry); + } + } + } + } + if (entriesToInsert != null) { + for (int ii = entriesToInsert.size() - 1; ii >= 0; ii--) { + final Entry entry = entriesToInsert.get(ii); + rowKeyToEntry.put(entry.lastRowKey, entry); } - }); + } + } + + private void addOrDeferEntry(final List entriesToInsert, final Entry entry) { + if (entriesToInsert == null) { + rowKeyToEntry.put(entry.lastRowKey, entry); + } else { + entriesToInsert.add(entry); + } } /** * Pop elements out of the queue until we find one that is in the window. * + *

* Send a modification to the resulting table. + *

*/ @Override public void run() { @@ -364,14 +758,44 @@ private RowSet recomputeModified() { break; } - if (inWindowColumnSource.computeInWindowUnsafe(entry.nanos, inWindowColumnSource.currentTime)) { + if (inWindowColumnSource.computeInWindowUnsafe(entry.nanos)) { break; - } else { - // take it out of the queue, and mark it as modified - final Entry taken = priorityQueue.removeTop(); - Assert.equals(entry, "entry", taken, "taken"); - builder.addKey(entry.rowKey); - rowKeyToEntry.remove(entry.rowKey); + } + + // take it out of the queue, and mark it as modified + final Entry taken = priorityQueue.removeTop(); + Assert.equals(entry, "entry", taken, "taken"); + + + // now scan the rest of the entry, which requires reading from the timestamp source; + // this would ideally be done as a chunk, reusing the context + long newFirst = entry.firstRowKey + 1; + if (newFirst <= entry.lastRowKey) { + try (final RowSequence rowSequence = + RowSequenceFactory.forRange(entry.firstRowKey + 1, entry.lastRowKey); + final CloseablePrimitiveIteratorOfLong timestampIterator = + new ChunkedLongColumnIterator(inWindowColumnSource.timeStampSource, + rowSequence)) { + while (newFirst <= entry.lastRowKey) { + final long nanos = timestampIterator.nextLong(); + if (inWindowColumnSource.computeInWindowUnsafe(nanos)) { + // nothing more to do, we've passed out of the window, note the new nanos for this entry + entry.nanos = nanos; + break; + } + ++newFirst; + } + } + } + + builder.addRange(entry.firstRowKey, newFirst - 1); + + // if anything is left, we need to reinsert it into the priority queue + if (newFirst <= entry.lastRowKey) { + entry.firstRowKey = newFirst; + priorityQueue.enter(entry); + } else if (rowKeyToEntry != null) { + rowKeyToEntry.remove(entry.lastRowKey); } } @@ -384,16 +808,78 @@ void validateQueue() { final Entry[] entries = new Entry[priorityQueue.size()]; priorityQueue.dump(entries, 0); - Arrays.stream(entries).mapToLong(entry -> entry.rowKey).forEach(builder::addKey); + + if (rowKeyToEntry != null && entries.length != rowKeyToEntry.size()) { + dumpQueue(); + Assert.eq(entries.length, "entries.length", rowKeyToEntry.size(), "rowKeyToEntry.size()"); + } + + long entrySize = 0; + for (final Entry entry : entries) { + builder.addRange(entry.firstRowKey, entry.lastRowKey); + entrySize += (entry.lastRowKey - entry.firstRowKey + 1); + if (rowKeyToEntry != null) { + final Entry check = rowKeyToEntry.get(entry.lastRowKey); + if (check != entry) { + dumpQueue(); + Assert.equals(check, "check", entry, "entry"); + } + } + // validate that the entry is non-descending + if (entry.lastRowKey > entry.firstRowKey) { + long lastNanos = inWindowColumnSource.timeStampSource.getLong(entry.firstRowKey); + for (long rowKey = entry.firstRowKey + 1; rowKey <= entry.lastRowKey; ++rowKey) { + long nanos = inWindowColumnSource.timeStampSource.getLong(rowKey); + if (nanos < lastNanos) { + dumpQueue(); + Assert.geq(nanos, "nanos at " + rowKey, lastNanos, "lastNanos"); + } + lastNanos = nanos; + } + } + } final RowSet inQueue = builder.build(); - Assert.eq(inQueue.size(), "inQueue.size()", priorityQueue.size(), "priorityQueue.size()"); + Assert.eq(inQueue.size(), "inQueue.size()", entrySize, "entrySize"); + final boolean condition = inQueue.subsetOf(resultRowSet); if (!condition) { + dumpQueue(); // noinspection ConstantConditions Assert.assertion(condition, "inQueue.subsetOf(resultRowSet)", inQueue, "inQueue", resultRowSet, "resultRowSet", inQueue.minus(resultRowSet), "inQueue.minus(resultRowSet)"); } + + // Verify that the size of inQueue is equal to the number of values in the window + final RowSetBuilderSequential inWindowBuilder = RowSetFactory.builderSequential(); + try (final CloseablePrimitiveIteratorOfLong valueIt = + new ChunkedLongColumnIterator(inWindowColumnSource.timeStampSource, source.getRowSet())) { + source.getRowSet().forAllRowKeys(key -> { + long value = valueIt.nextLong(); + if (value != QueryConstants.NULL_LONG && inWindowColumnSource.computeInWindowUnsafe(value)) { + inWindowBuilder.appendKey(key); + } + }); + } + try (final RowSet rowsInWindow = inWindowBuilder.build()) { + Assert.equals(rowsInWindow, "rowsInWindow", inQueue, "inQueue"); + } + } + + void dumpQueue() { + final Entry[] entries = new Entry[priorityQueue.size()]; + priorityQueue.dump(entries, 0); + System.out.println("Queue size: " + entries.length); + for (final Entry entry : entries) { + System.out.println(entry); + } + + if (rowKeyToEntry != null) { + System.out.println("Map size: " + rowKeyToEntry.size()); + for (final Long2ObjectMap.Entry x : rowKeyToEntry.long2ObjectEntrySet()) { + System.out.println(x.getLongKey() + ": " + x.getValue()); + } + } } @Override @@ -422,24 +908,31 @@ private static class InWindowColumnSource extends AbstractColumnSource implements MutableColumnSourceGetDefaults.ForBoolean { private final long windowNanos; private final ColumnSource timeStampSource; + private final String timeStampName; - private long prevTime; - private long currentTime; + private long prevTime = 0; + private long currentTime = 0; private long clockStep; private final long initialStep; InWindowColumnSource(Table table, String timestampColumn, long windowNanos) { super(Boolean.class); this.windowNanos = windowNanos; + this.timeStampName = timestampColumn; clockStep = updateGraph.clock().currentStep(); initialStep = clockStep; - final ColumnSource timeStampSource = table.getColumnSource(timestampColumn); - if (!Instant.class.isAssignableFrom(timeStampSource.getType())) { - throw new IllegalArgumentException(timestampColumn + " is not of type Instant!"); + final ColumnSource timeStampSource = table.getColumnSource(timestampColumn); + final ColumnSource reinterpreted = ReinterpretUtils.maybeConvertToPrimitive(timeStampSource); + Class timestampType = reinterpreted.getType(); + if (timestampType == long.class) { + // noinspection unchecked + this.timeStampSource = (ColumnSource) reinterpreted; + } else { + throw new IllegalArgumentException("The timestamp column, " + timestampColumn + + ", cannot be interpreted as a long, it should be a supported time type (e.g. long, Instant, ZonedDateTime...)"); } - this.timeStampSource = ReinterpretUtils.instantToLongSource(timeStampSource); } /** @@ -480,6 +973,14 @@ private boolean computeInWindowUnsafe(long tableNanos, long time) { return (time - tableNanos) < windowNanos; } + private boolean computeInWindowUnsafe(long tableNanos) { + return computeInWindowUnsafe(tableNanos, currentTime); + } + + private boolean computeInWindowUnsafePrev(long tableNanos) { + return computeInWindowUnsafe(tableNanos, timeStampForPrev()); + } + @Override public boolean isImmutable() { return false; diff --git a/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java b/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java index 26d26b477d3..4d22e210975 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/TestWindowCheck.java @@ -5,7 +5,11 @@ import io.deephaven.base.Pair; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.TrackingWritableRowSet; +import io.deephaven.engine.rowset.WritableRowSet; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; @@ -13,6 +17,7 @@ import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.TableUpdateValidator; +import io.deephaven.engine.table.impl.util.RuntimeMemory; import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.generator.IntGenerator; import io.deephaven.engine.testutil.generator.UnsortedInstantGenerator; @@ -27,10 +32,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Random; import java.util.stream.Stream; @@ -38,6 +45,7 @@ import static io.deephaven.engine.testutil.TstUtils.*; import static io.deephaven.engine.util.TableTools.col; import static io.deephaven.engine.util.TableTools.intCol; +import static org.junit.jupiter.api.Assertions.assertTrue; @Category(OutOfBandTest.class) public class TestWindowCheck { @@ -47,21 +55,34 @@ public class TestWindowCheck { @Test public void testWindowCheckIterative() { for (int seed = 0; seed < 1; ++seed) { - testWindowCheckIterative(seed); + testWindowCheckIterative(seed, true); + } + } + + @Test + public void testWindowCheckIterativeNoShifts() { + for (int seed = 0; seed < 1; ++seed) { + testWindowCheckIterative(seed, false); } } /** * Run a window check over the course of a simulated day. * + *

* We have a Timestamp column and a sentinel column. + *

* + *

* Time advances by one second per step, which randomly modifies the source table. + *

* + *

* The WindowEvalNugget verifies the original columns are unchanged and that the value of the InWindow column is * correct. A prev checker is added to ensure that getPrev works on the new table. + *

*/ - private void testWindowCheckIterative(int seed) { + private void testWindowCheckIterative(int seed, boolean withShifts) { final Random random = new Random(seed); final Random combinedRandom = new Random(seed); @@ -103,15 +124,21 @@ private void testWindowCheckIterative(int seed) { ++step; final boolean combined = combinedRandom.nextBoolean(); + final GenerateTableUpdates.SimulationProfile profile = + withShifts ? GenerateTableUpdates.DEFAULT_PROFILE : GenerateTableUpdates.NO_SHIFT_PROFILE; + if (combined) { + if (RefreshingTableTestCase.printTableUpdates) { + System.out.println("Combined Step " + step); + } updateGraph.runWithinUnitTestCycle(() -> { - advanceTime(clock, en); - GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, size, + advanceTime(clock, en, 5 * DateTimeUtils.SECOND); + GenerateTableUpdates.generateShiftAwareTableUpdates(profile, size, random, table, columnInfo); }); TstUtils.validate("Step " + step, en); } else { - updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en)); + updateGraph.runWithinUnitTestCycle(() -> advanceTime(clock, en, 5 * DateTimeUtils.SECOND)); if (RefreshingTableTestCase.printTableUpdates) { TstUtils.validate("Step = " + step + " time = " + DateTimeUtils.epochNanosToInstant(clock.now), en); } @@ -120,15 +147,17 @@ private void testWindowCheckIterative(int seed) { if (RefreshingTableTestCase.printTableUpdates) { System.out.println("Step " + step + "-" + ii); } - RefreshingTableTestCase.simulateShiftAwareStep(step + "-" + ii, stepSize, random, table, columnInfo, + RefreshingTableTestCase.simulateShiftAwareStep(profile, step + "-" + ii, stepSize, random, table, + columnInfo, en); + TstUtils.validate("Step " + step + "-" + ii, en); } } } } - private void advanceTime(TestClock clock, WindowEvalNugget[] en) { - clock.now += 5 * DateTimeUtils.SECOND; + private void advanceTime(final TestClock clock, final WindowEvalNugget[] en, final long nanosToAdvance) { + clock.now += nanosToAdvance; if (RefreshingTableTestCase.printTableUpdates) { System.out.println("Ticking time to " + DateTimeUtils.epochNanosToInstant(clock.now)); } @@ -154,7 +183,7 @@ public void testWindowCheckEmptyInitial() { () -> WindowCheck.addTimeWindowInternal(clock, tableToCheck, "Timestamp", DateTimeUtils.SECOND * 60, "InWindow", false)); - TableTools.showWithRowSet(windowed.first); + TableTools.showWithRowSet(windowed.first, 200); updateGraph.runWithinUnitTestCycle(windowed.second::run); @@ -374,7 +403,212 @@ public void validate(String msg) { @Override public void show() { - TableTools.show(windowed.first); + TableTools.showWithRowSet(windowed.first, 200); + windowed.second.dumpQueue(); } } + + @Test + public void testMemoryUsageInWindow() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + QueryScope.addParam("startTime", startTime); + + final QueryTable inputTable = + (QueryTable) TableTools.emptyTable(100_000_000).updateView("Timestamp = startTime"); + inputTable.setRefreshing(true); + System.gc(); + final RuntimeMemory.Sample sample = new RuntimeMemory.Sample(); + RuntimeMemory.getInstance().read(sample); + final long memStart = sample.totalMemory - sample.freeMemory; + System.out.println("Start Memory: " + memStart); + final Pair withCheck = WindowCheck.addTimeWindowInternal(timeProvider, + inputTable, + "Timestamp", + 60 * DateTimeUtils.SECOND, + "InLastXSeconds", + false); + System.gc(); + RuntimeMemory.getInstance().read(sample); + final long memEnd = sample.totalMemory - sample.freeMemory; + System.out.println("End Memory: " + memEnd); + final long memChange = memEnd - memStart; + System.out.println("Change: " + memChange); + assertTrue(memChange < 100_000_000); // this previously would require about 645MB, so we're doing better + assertTableEquals(inputTable.updateView("InLastXSeconds=true"), withCheck.first); + } + + @Test + public void testSequentialRanges() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + // start about three minutes in so we'll take things off more directly + timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L; + + QueryScope.addParam("startTime", startTime); + + // each row is 10ms, we have 50_000 rows so the span of the table is 500 seconds + final Table inputRanges = TableTools.emptyTable(50_000).updateView("Timestamp = startTime + (ii * 10_000_000)"); + ((QueryTable) inputRanges).setRefreshing(true); + + final Table[] duplicated = new Table[10]; + Arrays.fill(duplicated, inputRanges); + final Table inputTable = TableTools.merge(duplicated); + + final WindowEvalNugget[] en; + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.exclusiveLock().lock(); + try { + en = new WindowEvalNugget[] { + new WindowEvalNugget(timeProvider, (QueryTable) inputTable) + }; + } finally { + updateGraph.exclusiveLock().unlock(); + } + + int step = 0; + while (timeProvider.now < DateTimeUtils.epochNanos(startTime) + 600 * DateTimeUtils.SECOND) { + step++; + updateGraph.runWithinUnitTestCycle(() -> advanceTime(timeProvider, en, 5 * DateTimeUtils.SECOND)); + TstUtils.validate("Step " + step, en); + } + } + + @Test + public void testSequentialRangesAddOnly() throws IOException { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2022-07-14T09:30:00 NY"); + // start about three minutes in so we'll take things off more directly + timeProvider.now = DateTimeUtils.epochNanos(startTime) + 180 * 1_000_000_000L; + final long regionSize = 1_000_000L; + + QueryScope.addParam("startTime", startTime); + QueryScope.addParam("regionSize", regionSize); + + final TrackingWritableRowSet inputRowSet = RowSetFactory.fromRange(0, 9999).toTracking(); + + inputRowSet.insertRange(regionSize, regionSize + 9_999); + final QueryTable rowsetTable = TstUtils.testRefreshingTable(inputRowSet); + // each chunk of 10_000 rows should account for one minute, or 60_000_000_000 / 10_000 = 6_000_000 nanos per row + // we start 3 minutes behind the start, so everything is in the five-minute window + final Table inputTable = rowsetTable.updateView("Timestamp = startTime + ((k % regionSize) * 6_000_000)") + .withAttributes(Collections.singletonMap(Table.ADD_ONLY_TABLE_ATTRIBUTE, true)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + + final WindowEvalNugget[] en; + final PrintListener pl; + updateGraph.exclusiveLock().lock(); + try { + en = new WindowEvalNugget[] { + new WindowEvalNugget(timeProvider, (QueryTable) inputTable) + }; + pl = new PrintListener("windowed", en[0].windowed.first, 0); + } finally { + updateGraph.exclusiveLock().unlock(); + } + + for (int step = 1; step < 10; ++step) { + final int fstep = step; + updateGraph.runWithinUnitTestCycle(() -> { + final WritableRowSet added = + RowSetFactory.fromRange(fstep * 10_000, fstep * 10_000 + 9_999); + added.insertRange(fstep * 10_000 + regionSize, fstep * 10_000 + 9_999 + regionSize); + rowsetTable.getRowSet().writableCast().insert(added); + rowsetTable.notifyListeners(added, i(), i()); + advanceTime(timeProvider, en, fstep * DateTimeUtils.MINUTE); + }); + TstUtils.validate("Step " + step, en); + } + } + + @Test + public void testCombination() { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2024-02-28T09:29:01 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + final Instant[] initialValues = Arrays.stream( + new String[] {"2024-02-28T09:25:00 NY", "2024-02-28T09:27:00 NY"}) + .map(DateTimeUtils::parseInstant).toArray(Instant[]::new); + final QueryTable tableToCheck = testRefreshingTable(i(0, 1).toTracking(), + col("Timestamp", initialValues), + intCol("Sentinel", 1, 2)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final Pair windowed = updateGraph.sharedLock().computeLocked( + () -> WindowCheck.addTimeWindowInternal( + timeProvider, tableToCheck, "Timestamp", DateTimeUtils.MINUTE * 5, "InWindow", false)); + + TableTools.showWithRowSet(windowed.first); + + updateGraph.runWithinUnitTestCycle(windowed.second::run); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + + updateGraph.runWithinUnitTestCycle(() -> { + final Instant[] newValue = new Instant[] {DateTimeUtils.parseInstant("2024-02-28T09:26:00 NY")}; + TstUtils.addToTable(tableToCheck, i(2), col("Timestamp", newValue), intCol("Sentinel", 3)); + tableToCheck.notifyListeners(i(2), i(), i()); + }); + TableTools.showWithRowSet(windowed.first); + windowed.second.validateQueue(); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + + // advance the clock to make sure we actually work + timeProvider.now += DateTimeUtils.MINUTE; // 9:30:01, passing the first entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel > 1"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:32:01, passing the second entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel == 2"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:33:01, passing the all entries out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = false"), windowed.first); + } + + @Test + public void testCombination2() { + final TestClock timeProvider = new TestClock(); + final Instant startTime = DateTimeUtils.parseInstant("2024-02-28T09:29:01 NY"); + timeProvider.now = DateTimeUtils.epochNanos(startTime); + + final Instant[] initialValues = Arrays.stream( + new String[] {"2024-02-28T09:25:00 NY", "2024-02-28T09:27:00 NY", "2024-02-28T09:26:00 NY"}) + .map(DateTimeUtils::parseInstant).toArray(Instant[]::new); + final QueryTable tableToCheck = testRefreshingTable(i(0, 1, 2).toTracking(), + col("Timestamp", initialValues), + intCol("Sentinel", 1, 2, 3)); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + final Pair windowed = updateGraph.sharedLock().computeLocked( + () -> WindowCheck.addTimeWindowInternal( + timeProvider, tableToCheck, "Timestamp", DateTimeUtils.MINUTE * 5, "InWindow", false)); + + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = true"), windowed.first); + windowed.second.validateQueue(); + + // advance the clock to make sure we actually work + timeProvider.now += DateTimeUtils.MINUTE; // 9:30:01, passing the first entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel > 1"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:32:01, passing the second entry out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = Sentinel == 2"), windowed.first); + + timeProvider.now += DateTimeUtils.MINUTE; // 9:33:01, passing the all entries out of the window + updateGraph.runWithinUnitTestCycle(windowed.second::run); + TableTools.showWithRowSet(windowed.first); + assertTableEquals(tableToCheck.updateView("InWindow = false"), windowed.first); + } } diff --git a/engine/test-utils/build.gradle b/engine/test-utils/build.gradle index 743e05a1dc5..bb3b8f172ca 100644 --- a/engine/test-utils/build.gradle +++ b/engine/test-utils/build.gradle @@ -18,7 +18,7 @@ dependencies { implementation depCommonsLang3 implementation depTrove3 - implementation 'it.unimi.dsi:fastutil:8.5.11' + implementation 'it.unimi.dsi:fastutil:8.5.13' Classpaths.inheritJUnitClassic(project, 'implementation') Classpaths.inheritJUnitPlatform(project, 'implementation') diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java index 9e6daaea98c..b046fce019e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/GenerateTableUpdates.java @@ -144,6 +144,15 @@ private void validateGroup(int... opts) { public static final SimulationProfile DEFAULT_PROFILE = new SimulationProfile(); + public static final SimulationProfile NO_SHIFT_PROFILE = + new SimulationProfile() { + { + SHIFT_10_PERCENT_KEY_SPACE = 0; + SHIFT_10_PERCENT_POS_SPACE = 0; + SHIFT_AGGRESSIVELY = 0; + } + }; + public static void generateShiftAwareTableUpdates(final SimulationProfile profile, final int targetUpdateSize, final Random random, final QueryTable table, final ColumnInfo[] columnInfo) { diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java index 6121df0b38e..b345b4f8c8e 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/QueryTableTestBase.java @@ -23,21 +23,13 @@ public abstract class QueryTableTestBase extends RefreshingTableTestCase { public final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - private static final GenerateTableUpdates.SimulationProfile NO_SHIFT_PROFILE = - new GenerateTableUpdates.SimulationProfile() { - { - SHIFT_10_PERCENT_KEY_SPACE = 0; - SHIFT_10_PERCENT_POS_SPACE = 0; - SHIFT_AGGRESSIVELY = 0; - } - }; - public final JoinIncrement leftStep = new JoinIncrement() { @Override public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, + leftColumnInfo, en); } @Override @@ -63,7 +55,8 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, + rightColumnInfo, en); } @Override @@ -89,8 +82,10 @@ public String toString() { public void step(int leftSize, int rightSize, QueryTable leftTable, QueryTable rightTable, ColumnInfo[] leftColumnInfo, ColumnInfo[] rightColumnInfo, EvalNuggetInterface[] en, Random random) { - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, leftColumnInfo, en); - simulateShiftAwareStep(NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, rightColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), leftSize, random, leftTable, + leftColumnInfo, en); + simulateShiftAwareStep(GenerateTableUpdates.NO_SHIFT_PROFILE, toString(), rightSize, random, rightTable, + rightColumnInfo, en); } @Override diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java index 813cd115d5e..44fdb6611cd 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/testcase/RefreshingTableTestCase.java @@ -160,7 +160,7 @@ public static void simulateShiftAwareStep(final String ctxt, int targetUpdateSiz en); } - protected static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, + public static void simulateShiftAwareStep(final GenerateTableUpdates.SimulationProfile simulationProfile, final String ctxt, int targetUpdateSize, Random random, QueryTable table, ColumnInfo[] columnInfo, EvalNuggetInterface[] en) { final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); From 440e26bcfb9fca5a9bfcdc8fe7a377573643d11b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 28 Feb 2024 20:54:54 +0000 Subject: [PATCH 09/10] Update web version 0.66.1 (#5205) Release notes https://github.com/deephaven/web-client-ui/releases/tag/v0.66.1 ## [0.66.1](https://github.com/deephaven/web-client-ui/compare/v0.66.0...v0.66.1) (2024-02-28) ### Bug Fixes * Load default dashboard data from workspace data ([#1810](https://github.com/deephaven/web-client-ui/issues/1810)) ([6dd9814](https://github.com/deephaven/web-client-ui/commit/6dd9814d5dde7928c3ad765ce8a0e25f770c1871)) * Spectrum actionbar selector ([#1841](https://github.com/deephaven/web-client-ui/issues/1841)) ([67de0e0](https://github.com/deephaven/web-client-ui/commit/67de0e09d11ba340aa546be71c400852a5a2092c)) Co-authored-by: deephaven-internal --- web/client-ui/Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/web/client-ui/Dockerfile b/web/client-ui/Dockerfile index ca9e9096a17..222bfb37160 100644 --- a/web/client-ui/Dockerfile +++ b/web/client-ui/Dockerfile @@ -2,10 +2,10 @@ FROM deephaven/node:local-build WORKDIR /usr/src/app # Most of the time, these versions are the same, except in cases where a patch only affects one of the packages -ARG WEB_VERSION=0.66.0 -ARG GRID_VERSION=0.66.0 -ARG CHART_VERSION=0.66.0 -ARG WIDGET_VERSION=0.66.0 +ARG WEB_VERSION=0.66.1 +ARG GRID_VERSION=0.66.1 +ARG CHART_VERSION=0.66.1 +ARG WIDGET_VERSION=0.66.1 # Pull in the published code-studio package from npmjs and extract is RUN set -eux; \ From 9a9938b4dea4e7abafb167cbc2bd09bf68b9a4c8 Mon Sep 17 00:00:00 2001 From: Nate Bauernfeind Date: Wed, 28 Feb 2024 17:36:03 -0700 Subject: [PATCH 10/10] Fix bugs in moveColumns and renameColumns (#5193) --- .../java/io/deephaven/engine/table/Table.java | 88 ++++++- .../engine/table/impl/BaseTable.java | 21 +- .../engine/table/impl/QueryTable.java | 244 ++++++++++-------- .../engine/table/impl/TableAdapter.java | 2 +- .../engine/table/impl/TableDefaults.java | 9 +- .../engine/table/impl/UncoalescedTable.java | 4 +- .../engine/table/impl/QueryTableTest.java | 193 +++++++++++++- .../engine/table/impl/TestMoveColumns.java | 48 +--- py/server/deephaven/table.py | 29 ++- py/server/tests/test_update_graph.py | 15 -- .../main/java/io/deephaven/api/Strings.java | 4 + 11 files changed, 453 insertions(+), 204 deletions(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index 56091230495..94138b1dab9 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -326,10 +326,56 @@ public interface Table extends @ConcurrentMethod Table dropColumnFormats(); + /** + * Produce a new table with the specified columns renamed using the specified {@link Pair pairs}. The renames are + * simultaneous and unordered, enabling direct swaps between column names. The resulting table retains the original + * column ordering after applying the specified renames. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a source column does not exist
  • + *
  • if a source column is used more than once
  • + *
  • if a destination column is used more than once
  • + *
+ * + * @param pairs The columns to rename + * @return The new table, with the columns renamed + */ + @ConcurrentMethod Table renameColumns(Collection pairs); + /** + * Produce a new table with the specified columns renamed using the syntax {@code "NewColumnName=OldColumnName"}. + * The renames are simultaneous and unordered, enabling direct swaps between column names. The resulting table + * retains the original column ordering after applying the specified renames. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a source column does not exist
  • + *
  • if a source column is used more than once
  • + *
  • if a destination column is used more than once
  • + *
+ * + * @param pairs The columns to rename + * @return The new table, with the columns renamed + */ + @ConcurrentMethod Table renameColumns(String... pairs); + /** + * Produce a new table with the specified columns renamed using the provided function. The renames are simultaneous + * and unordered, enabling direct swaps between column names. The resulting table retains the original column + * ordering after applying the specified renames. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a destination column is used more than once
  • + *
+ * + * @param renameFunction The function to apply to each column name + * @return The new table, with the columns renamed + */ + @ConcurrentMethod Table renameAllColumns(UnaryOperator renameFunction); @ConcurrentMethod @@ -343,27 +389,56 @@ public interface Table extends /** * Produce a new table with the specified columns moved to the leftmost position. Columns can be renamed with the - * usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. + * usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. The renames are simultaneous and unordered, enabling + * direct swaps between column names. All other columns are left in their original order. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a source column does not exist
  • + *
  • if a source column is used more than once
  • + *
  • if a destination column is used more than once
  • + *
* * @param columnsToMove The columns to move to the left (and, optionally, to rename) - * @return The new table, with the columns rearranged as explained above {@link #moveColumns(int, String...)} + * @return The new table, with the columns rearranged as explained above */ @ConcurrentMethod Table moveColumnsUp(String... columnsToMove); /** * Produce a new table with the specified columns moved to the rightmost position. Columns can be renamed with the - * usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. + * usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. The renames are simultaneous and unordered, enabling + * direct swaps between column names. All other columns are left in their original order. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a source column does not exist
  • + *
  • if a source column is used more than once
  • + *
  • if a destination column is used more than once
  • + *
* * @param columnsToMove The columns to move to the right (and, optionally, to rename) - * @return The new table, with the columns rearranged as explained above {@link #moveColumns(int, String...)} + * @return The new table, with the columns rearranged as explained above */ @ConcurrentMethod Table moveColumnsDown(String... columnsToMove); /** * Produce a new table with the specified columns moved to the specified {@code index}. Column indices begin at 0. - * Columns can be renamed with the usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. + * Columns can be renamed with the usual syntax, i.e. {@code "NewColumnName=OldColumnName")}. The renames are + * simultaneous and unordered, enabling direct swaps between column names. The resulting table retains the original + * column ordering except for the specified columns, which are inserted at the specified index, in the order of + * {@code columnsToMove}, after the effects of applying any renames. + *

+ * {@link IllegalArgumentException} will be thrown: + *

    + *
  • if a source column does not exist
  • + *
  • if a source column is used more than once
  • + *
  • if a destination column is used more than once
  • + *
+ *

+ * Values of {@code index} outside the range of 0 to the number of columns in the table (exclusive) will be clamped + * to the nearest valid index. * * @param index The index to which the specified columns should be moved * @param columnsToMove The columns to move to the specified index (and, optionally, to rename) @@ -372,9 +447,6 @@ public interface Table extends @ConcurrentMethod Table moveColumns(int index, String... columnsToMove); - @ConcurrentMethod - Table moveColumns(int index, boolean moveToEnd, String... columnsToMove); - // ----------------------------------------------------------------------------------------------------------------- // Slice Operations // ----------------------------------------------------------------------------------------------------------------- diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java index 412ec955bc2..d56c8e0096a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java @@ -5,6 +5,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import io.deephaven.api.Pair; import io.deephaven.base.Base64; import io.deephaven.base.log.LogOutput; import io.deephaven.base.reference.SimpleReference; @@ -1034,7 +1035,7 @@ public void copySortableColumns( destination.setSortableColumns(currentSortableColumns.stream().filter(shouldCopy).collect(Collectors.toList())); } - void copySortableColumns(BaseTable destination, MatchPair[] renamedColumns) { + void copySortableColumns(BaseTable destination, Collection renamedColumns) { final Collection currentSortableColumns = getSortableColumns(); if (currentSortableColumns == null) { return; @@ -1047,9 +1048,9 @@ void copySortableColumns(BaseTable destination, MatchPair[] renamedColumns) { // b) The original column exists, and has not been replaced by another. For example // T1 = [ Col1, Col2, Col3 ]; T1.renameColumns(Col1=Col3, Col2]; if (renamedColumns != null) { - for (MatchPair mp : renamedColumns) { + for (final Pair pair : renamedColumns) { // Only the last grouping matters. - columnMapping.forcePut(mp.leftColumn(), mp.rightColumn()); + columnMapping.forcePut(pair.output().name(), pair.input().name()); } } @@ -1158,7 +1159,9 @@ void maybeCopyColumnDescriptions(final BaseTable destination) { * @param destination the table which shall possibly have a column-description attribute created * @param renamedColumns an array of the columns which have been renamed */ - void maybeCopyColumnDescriptions(final BaseTable destination, final MatchPair[] renamedColumns) { + void maybeCopyColumnDescriptions( + final BaseTable destination, + final Collection renamedColumns) { // noinspection unchecked final Map oldDescriptions = (Map) getAttribute(Table.COLUMN_DESCRIPTIONS_ATTRIBUTE); @@ -1168,11 +1171,13 @@ void maybeCopyColumnDescriptions(final BaseTable destination, final MatchPair } final Map sourceDescriptions = new HashMap<>(oldDescriptions); - if (renamedColumns != null && renamedColumns.length != 0) { - for (final MatchPair mp : renamedColumns) { - final String desc = sourceDescriptions.remove(mp.rightColumn()); + if (renamedColumns != null) { + for (final Pair pair : renamedColumns) { + final String desc = sourceDescriptions.remove(pair.input().name()); if (desc != null) { - sourceDescriptions.put(mp.leftColumn(), desc); + sourceDescriptions.put(pair.output().name(), desc); + } else { + sourceDescriptions.remove(pair.output().name()); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 72c4d20aff0..ba1f33dae58 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -3,16 +3,7 @@ */ package io.deephaven.engine.table.impl; -import io.deephaven.UncheckedDeephavenException; -import io.deephaven.api.AsOfJoinMatch; -import io.deephaven.api.AsOfJoinRule; -import io.deephaven.api.ColumnName; -import io.deephaven.api.JoinAddition; -import io.deephaven.api.JoinMatch; -import io.deephaven.api.RangeJoinMatch; -import io.deephaven.api.Selectable; -import io.deephaven.api.SortColumn; -import io.deephaven.api.Strings; +import io.deephaven.api.*; import io.deephaven.api.agg.*; import io.deephaven.api.agg.spec.AggSpec; import io.deephaven.api.agg.spec.AggSpecColumnReferences; @@ -21,7 +12,6 @@ import io.deephaven.api.snapshot.SnapshotWhenOptions.Flag; import io.deephaven.api.updateby.UpdateByOperation; import io.deephaven.api.updateby.UpdateByControl; -import io.deephaven.base.Pair; import io.deephaven.base.verify.Assert; import io.deephaven.base.verify.Require; import io.deephaven.chunk.attributes.Values; @@ -47,8 +37,6 @@ import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget; import io.deephaven.engine.table.impl.rangejoin.RangeJoinOperation; -import io.deephaven.engine.table.impl.select.MatchPairFactory; -import io.deephaven.engine.table.impl.select.SelectColumnFactory; import io.deephaven.engine.table.impl.updateby.UpdateBy; import io.deephaven.engine.table.impl.select.analyzers.SelectAndViewAnalyzerWrapper; import io.deephaven.engine.table.impl.sources.ring.RingTableTools; @@ -79,6 +67,7 @@ import io.deephaven.util.annotations.TestUseOnly; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -90,6 +79,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -475,8 +465,9 @@ public ModifiedColumnSet.Transformer newModifiedColumnSetTransformer(QueryTable * @param matchPairs the columns that map one-to-one with the result table * @return a transformer that passes dirty details via an identity mapping */ - public ModifiedColumnSet.Transformer newModifiedColumnSetTransformer(QueryTable resultTable, - MatchPair... matchPairs) { + public ModifiedColumnSet.Transformer newModifiedColumnSetTransformer( + @NotNull final QueryTable resultTable, + @NotNull final MatchPair... matchPairs) { final ModifiedColumnSet[] columnSets = new ModifiedColumnSet[matchPairs.length]; for (int ii = 0; ii < matchPairs.length; ++ii) { columnSets[ii] = resultTable.newModifiedColumnSet(matchPairs[ii].leftColumn()); @@ -484,6 +475,27 @@ public ModifiedColumnSet.Transformer newModifiedColumnSetTransformer(QueryTable return newModifiedColumnSetTransformer(MatchPair.getRightColumns(matchPairs), columnSets); } + /** + * Create a {@link ModifiedColumnSet.Transformer} that can be used to propagate dirty columns from this table to + * listeners of the provided resultTable. + * + * @param resultTable the destination table + * @param pairs the columns that map one-to-one with the result table + * @return a transformer that passes dirty details via an identity mapping + */ + public ModifiedColumnSet.Transformer newModifiedColumnSetTransformer( + @NotNull final QueryTable resultTable, + @NotNull final Pair... pairs) { + return newModifiedColumnSetTransformer( + Arrays.stream(pairs) + .map(Pair::output) + .map(ColumnName::name) + .toArray(String[]::new), + Arrays.stream(pairs) + .map(pair -> resultTable.newModifiedColumnSet(pair.input().name())) + .toArray(ModifiedColumnSet[]::new)); + } + /** * Create a {@link ModifiedColumnSet.Transformer} that can be used to propagate dirty columns from this table to * listeners of the table used to construct columnSets. It is an error if {@code columnNames} and {@code columnSets} @@ -924,54 +936,11 @@ private String getCastFormulaInternal(Class dataType) { } @Override - public Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) { + public Table moveColumns(final int index, String... columnsToMove) { final UpdateGraph updateGraph = getUpdateGraph(); try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - // Get the current columns - final List currentColumns = getDefinition().getColumnNames(); - - // Create a Set from columnsToMove. This way, we can rename and rearrange columns at once. - final Set leftColsToMove = new HashSet<>(); - final Set rightColsToMove = new HashSet<>(); - int extraCols = 0; - - for (final String columnToMove : columnsToMove) { - final String left = MatchPairFactory.getExpression(columnToMove).leftColumn; - final String right = MatchPairFactory.getExpression(columnToMove).rightColumn; - - if (!leftColsToMove.add(left) || !currentColumns.contains(left) || (rightColsToMove.contains(left) - && !left.equals(right) && leftColsToMove.stream().anyMatch(col -> col.equals(right)))) { - extraCols++; - } - if (currentColumns.stream().anyMatch(currentColumn -> currentColumn.equals(right)) - && !left.equals(right) - && rightColsToMove.add(right) && !rightColsToMove.contains(left)) { - extraCols--; - } - } - index += moveToEnd ? extraCols : 0; - - // vci for write, cci for currentColumns, ctmi for columnsToMove - final SelectColumn[] viewColumns = new SelectColumn[currentColumns.size() + extraCols]; - for (int vci = 0, cci = 0, ctmi = 0; vci < viewColumns.length;) { - if (vci >= index && ctmi < columnsToMove.length) { - viewColumns[vci++] = SelectColumnFactory.getExpression(columnsToMove[ctmi++]); - } else { - // Don't add the column if it's one of the columns we're moving or if it has been renamed. - final String currentColumn = currentColumns.get(cci++); - if (!leftColsToMove.contains(currentColumn) - && Arrays.stream(viewColumns).noneMatch( - viewCol -> viewCol != null - && viewCol.getMatchPair().leftColumn.equals(currentColumn)) - && Arrays.stream(columnsToMove) - .noneMatch(colToMove -> MatchPairFactory.getExpression(colToMove).rightColumn - .equals(currentColumn))) { - - viewColumns[vci++] = SelectColumnFactory.getExpression(currentColumn); - } - } - } - return viewOrUpdateView(Flavor.View, viewColumns); + return renameColumnsImpl("moveColumns(" + index + ", ", Math.max(0, index), + Pair.from(columnsToMove)); } } @@ -1217,7 +1186,7 @@ private QueryTable whereInternal(final WhereFilter... filters) { } List selectFilters = new LinkedList<>(); - List>>> shiftColPairs = new LinkedList<>(); + List>>> shiftColPairs = new LinkedList<>(); for (final WhereFilter filter : filters) { filter.init(getDefinition()); if (filter instanceof AbstractConditionFilter @@ -1817,76 +1786,139 @@ public void onUpdate(final TableUpdate upstream) { } @Override - public Table renameColumns(Collection pairs) { + public Table renameColumns(Collection pairs) { final UpdateGraph updateGraph = getUpdateGraph(); try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - return renameColumnsImpl(MatchPair.fromPairs(pairs)); + return renameColumnsImpl("renameColumns(", -1, pairs); } } - private Table renameColumnsImpl(MatchPair... pairs) { - return QueryPerformanceRecorder.withNugget("renameColumns(" + matchString(pairs) + ")", + private Table renameColumnsImpl( + @NotNull final String methodNuggetPrefix, + final int movePosition, + @NotNull final Collection pairs) { + final String pairsLogString = Strings.ofPairs(pairs); + return QueryPerformanceRecorder.withNugget(methodNuggetPrefix + pairsLogString + ")", sizeForInstrumentation(), () -> { - if (pairs == null || pairs.length == 0) { + if (pairs.isEmpty()) { return prepareReturnThis(); } - checkInitiateOperation(); + Set notFound = null; + Set duplicateSource = null; + Set duplicateDest = null; - final Map pairLookup = new HashMap<>(); - for (final MatchPair pair : pairs) { - if (pair.leftColumn == null || pair.leftColumn.equals("")) { - throw new IllegalArgumentException( - "Bad left column in rename pair \"" + pair + "\""); + final Set newNames = new HashSet<>(); + final Map pairLookup = new LinkedHashMap<>(); + for (final Pair pair : pairs) { + if (!columns.containsKey(pair.input().name())) { + (notFound == null ? notFound = new LinkedHashSet<>() : notFound) + .add(pair.input().name()); } - if (null == columns.get(pair.rightColumn)) { - throw new IllegalArgumentException("Column \"" + pair.rightColumn + "\" not found"); + if (pairLookup.put(pair.input(), pair.output()) != null) { + (duplicateSource == null ? duplicateSource = new LinkedHashSet<>(1) : duplicateSource) + .add(pair.input().name()); + } + if (!newNames.add(pair.output())) { + (duplicateDest == null ? duplicateDest = new LinkedHashSet<>() : duplicateDest) + .add(pair.output().name()); } - pairLookup.put(pair.rightColumn, pair.leftColumn); } - int mcsPairIdx = 0; - final MatchPair[] modifiedColumnSetPairs = new MatchPair[columns.size()]; + // if we accumulated any errors, build one mega error message and throw it + if (notFound != null || duplicateSource != null || duplicateDest != null) { + throw new IllegalArgumentException(Stream.of( + notFound == null ? null : "Column(s) not found: " + String.join(", ", notFound), + duplicateSource == null ? null + : "Duplicate source column(s): " + String.join(", ", duplicateSource), + duplicateDest == null ? null + : "Duplicate destination column(s): " + String.join(", ", duplicateDest)) + .filter(Objects::nonNull).collect(Collectors.joining("\n"))); + } + final MutableInt mcsPairIdx = new MutableInt(); + final Pair[] modifiedColumnSetPairs = new Pair[columns.size()]; final Map> newColumns = new LinkedHashMap<>(); + + final Runnable moveColumns = () -> { + for (final Map.Entry rename : pairLookup.entrySet()) { + final ColumnName oldName = rename.getKey(); + final ColumnName newName = rename.getValue(); + final ColumnSource columnSource = columns.get(oldName.name()); + newColumns.put(newName.name(), columnSource); + modifiedColumnSetPairs[mcsPairIdx.getAndIncrement()] = + Pair.of(newName, oldName); + } + }; + for (final Map.Entry> entry : columns.entrySet()) { - final String oldName = entry.getKey(); + final ColumnName oldName = ColumnName.of(entry.getKey()); final ColumnSource columnSource = entry.getValue(); - String newName = pairLookup.get(oldName); + ColumnName newName = pairLookup.get(oldName); if (newName == null) { + if (newNames.contains(oldName)) { + // this column is being replaced by a rename + continue; + } newName = oldName; + } else if (movePosition >= 0) { + // we move this column when we get to the right position + continue; } - modifiedColumnSetPairs[mcsPairIdx++] = new MatchPair(newName, oldName); - newColumns.put(newName, columnSource); + + if (mcsPairIdx.intValue() == movePosition) { + moveColumns.run(); + } + + modifiedColumnSetPairs[mcsPairIdx.getAndIncrement()] = + Pair.of(newName, oldName); + newColumns.put(newName.name(), columnSource); } - final QueryTable queryTable = new QueryTable(rowSet, newColumns); - if (isRefreshing()) { - final ModifiedColumnSet.Transformer mcsTransformer = - newModifiedColumnSetTransformer(queryTable, modifiedColumnSetPairs); - addUpdateListener(new ListenerImpl("renameColumns(" + Arrays.deepToString(pairs) + ')', - this, queryTable) { - @Override - public void onUpdate(final TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = queryTable.getModifiedColumnSetForUpdates(); - if (upstream.modified().isNonempty()) { - mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), - downstream.modifiedColumnSet); - } else { - downstream.modifiedColumnSet.clear(); - } - queryTable.notifyListeners(downstream); - } - }); + if (mcsPairIdx.intValue() <= movePosition) { + moveColumns.run(); } - propagateFlatness(queryTable); - copyAttributes(queryTable, CopyAttributeOperation.RenameColumns); - copySortableColumns(queryTable, pairs); - maybeCopyColumnDescriptions(queryTable, pairs); + final Mutable result = new MutableObject<>(); + + final OperationSnapshotControl snapshotControl = + createSnapshotControlIfRefreshing(OperationSnapshotControl::new); + initializeWithSnapshot("renameColumns", snapshotControl, (usePrev, beforeClockValue) -> { + final QueryTable resultTable = new QueryTable(rowSet, newColumns); + propagateFlatness(resultTable); + + copyAttributes(resultTable, CopyAttributeOperation.RenameColumns); + copySortableColumns(resultTable, pairs); + maybeCopyColumnDescriptions(resultTable, pairs); + + if (snapshotControl != null) { + final ModifiedColumnSet.Transformer mcsTransformer = + newModifiedColumnSetTransformer(resultTable, modifiedColumnSetPairs); + final ListenerImpl listener = new ListenerImpl( + methodNuggetPrefix + pairsLogString + ')', this, resultTable) { + @Override + public void onUpdate(final TableUpdate upstream) { + final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); + if (upstream.modified().isNonempty()) { + downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates(); + mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), + downstream.modifiedColumnSet); + } else { + downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY; + } + resultTable.notifyListeners(downstream); + } + }; + snapshotControl.setListenerAndResult(listener, resultTable); + } + + result.setValue(resultTable); + + return true; + }); + + return result.getValue(); - return queryTable; }); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java index 4b25f59be2a..c724ad397bb 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableAdapter.java @@ -219,7 +219,7 @@ default Table renameColumns(Collection pairs) { } @Override - default Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) { + default Table moveColumns(int index, String... columnsToMove) { return throwUnsupported(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableDefaults.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableDefaults.java index 836ca71260e..f4e251f227f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableDefaults.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableDefaults.java @@ -243,14 +243,7 @@ default Table moveColumnsUp(String... columnsToMove) { @ConcurrentMethod @FinalDefault default Table moveColumnsDown(String... columnsToMove) { - return moveColumns(numColumns() - columnsToMove.length, true, columnsToMove); - } - - @Override - @ConcurrentMethod - @FinalDefault - default Table moveColumns(int index, String... columnsToMove) { - return moveColumns(index, false, columnsToMove); + return moveColumns(numColumns() - columnsToMove.length, columnsToMove); } // ----------------------------------------------------------------------------------------------------------------- diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java index 4599d3cdd4b..23c827532e1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UncoalescedTable.java @@ -271,8 +271,8 @@ public Table renameColumns(Collection pairs) { @Override @ConcurrentMethod - public Table moveColumns(int index, boolean moveToEnd, String... columnsToMove) { - return coalesce().moveColumns(index, moveToEnd, columnsToMove); + public Table moveColumns(int index, String... columnsToMove) { + return coalesce().moveColumns(index, columnsToMove); } @Override diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 58ef8ac5996..4d8b0708dec 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -664,6 +664,25 @@ public void testRenameColumns() { fail("Expected exception"); } catch (RuntimeException ignored) { } + + // Can't rename to a dest column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.renameColumns("O = Int", "O = Int"); + }); + + // Check what happens when we override a column by name + final Table override = table.renameColumns("Double = Int"); + Assert.assertEquals(override.getColumnSource("Double").getType(), int.class); + Assert.assertFalse(override.getColumnSourceMap().containsKey("Int")); + // Check that ordering of source columns does not matter + final Table override2 = table.renameColumns("Int = Double"); + Assert.assertEquals(override2.getColumnSource("Int").getType(), double.class); + Assert.assertFalse(override2.getColumnSourceMap().containsKey("Double")); + + // Validate that we can swap two columns simultaneously + final Table swapped = table.renameColumns("Double = Int", "Int = Double"); + Assert.assertEquals(swapped.getColumnSource("Double").getType(), int.class); + Assert.assertEquals(swapped.getColumnSource("Int").getType(), double.class); } public void testRenameColumnsIncremental() { @@ -680,14 +699,9 @@ public void testRenameColumnsIncremental() { final EvalNugget[] en = new EvalNugget[] { EvalNugget.from(() -> queryTable.renameColumns(List.of())), EvalNugget.from(() -> queryTable.renameColumns("Symbol=Sym")), - EvalNugget.from(() -> queryTable.renameColumns("Symbol=Sym", "Symbols=Sym")), EvalNugget.from(() -> queryTable.renameColumns("Sym2=Sym", "intCol2=intCol", "doubleCol2=doubleCol")), }; - // Verify our assumption that columns can be renamed at most once. - Assert.assertTrue(queryTable.renameColumns("Symbol=Sym", "Symbols=Sym").hasColumns("Symbols")); - Assert.assertFalse(queryTable.renameColumns("Symbol=Sym", "Symbols=Sym").hasColumns("Symbol")); - final int steps = 100; for (int i = 0; i < steps; i++) { if (printTableUpdates) { @@ -697,6 +711,175 @@ public void testRenameColumnsIncremental() { } } + public void testMoveColumnsUp() { + final Table table = emptyTable(1).update("A = 1", "B = 2", "C = 3", "D = 4", "E = 5"); + + assertTableEquals( + emptyTable(1).update("C = 3", "A = 1", "B = 2", "D = 4", "E = 5"), + table.moveColumnsUp("C")); + + assertTableEquals( + emptyTable(1).update("C = 3", "B = 2", "A = 1", "D = 4", "E = 5"), + table.moveColumnsUp("C", "B")); + + assertTableEquals( + emptyTable(1).update("D = 4", "A = 1", "C = 3", "B = 2", "E = 5"), + table.moveColumnsUp("D", "A", "C")); + + // test trivial do nothing case + assertTableEquals(table, table.moveColumnsUp()); + + // Can't move a column up twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsUp("C", "C"); + }); + + // Can't rename a source column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsUp("A1 = A", "A2 = A"); + }); + + // Can't rename to a dest column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsUp("O = A", "O = B"); + }); + + assertTableEquals( + emptyTable(1).update("B = 3", "A = 1", "D = 4", "E = 5"), + table.moveColumnsUp("B = C")); + assertTableEquals( + emptyTable(1).update("B = 1", "C = 3", "D = 4", "E = 5"), + table.moveColumnsUp("B = A")); + assertTableEquals( + emptyTable(1).update("B = 1", "A = 2", "C = 3", "D = 4", "E = 5"), + table.moveColumnsUp("B = A", "A = B")); + } + + public void testMoveColumnsDown() { + final Table table = emptyTable(1).update("A = 1", "B = 2", "C = 3", "D = 4", "E = 5"); + + assertTableEquals( + emptyTable(1).update("A = 1", "B = 2", "D = 4", "E = 5", "C = 3"), + table.moveColumnsDown("C")); + + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "E = 5", "C = 3", "B = 2"), + table.moveColumnsDown("C", "B")); + + assertTableEquals( + emptyTable(1).update("B = 2", "E = 5", "D = 4", "A = 1", "C = 3"), + table.moveColumnsDown("D", "A", "C")); + + // test trivial do nothing case + assertTableEquals(table, table.moveColumnsDown()); + + // Can't move a column down twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsDown("C", "C"); + }); + + // Can't rename a source column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsDown("A1 = A", "A2 = A"); + }); + + // Can't rename to a dest column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumnsDown("O = A", "O = B"); + }); + + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "E = 5", "B = 3"), + table.moveColumnsDown("B = C")); + assertTableEquals( + emptyTable(1).update("C = 3", "D = 4", "E = 5", "B = 1"), + table.moveColumnsDown("B = A")); + assertTableEquals( + emptyTable(1).update("C = 3", "D = 4", "E = 5", "B = 1", "A = 2"), + table.moveColumnsDown("B = A", "A = B")); + } + + public void testMoveColumns() { + final Table table = emptyTable(1).update("A = 1", "B = 2", "C = 3", "D = 4", "E = 5"); + + // single column + assertTableEquals( + emptyTable(1).update("C = 3", "A = 1", "B = 2", "D = 4", "E = 5"), + table.moveColumns(-1, "C")); + assertTableEquals( + emptyTable(1).update("C = 3", "A = 1", "B = 2", "D = 4", "E = 5"), + table.moveColumns(0, "C")); + assertTableEquals( + emptyTable(1).update("A = 1", "C = 3", "B = 2", "D = 4", "E = 5"), + table.moveColumns(1, "C")); + assertTableEquals( + emptyTable(1).update("A = 1", "B = 2", "C = 3", "D = 4", "E = 5"), + table.moveColumns(2, "C")); + assertTableEquals( + emptyTable(1).update("A = 1", "B = 2", "D = 4", "C = 3", "E = 5"), + table.moveColumns(3, "C")); + assertTableEquals( + emptyTable(1).update("A = 1", "B = 2", "D = 4", "E = 5", "C = 3"), + table.moveColumns(4, "C")); + assertTableEquals( + emptyTable(1).update("A = 1", "B = 2", "D = 4", "E = 5", "C = 3"), + table.moveColumns(10, "C")); + + // two columns + assertTableEquals( + emptyTable(1).update("C = 3", "B = 2", "A = 1", "D = 4", "E = 5"), + table.moveColumns(-1, "C", "B")); + assertTableEquals( + emptyTable(1).update("C = 3", "B = 2", "A = 1", "D = 4", "E = 5"), + table.moveColumns(0, "C", "B")); + assertTableEquals( + emptyTable(1).update("A = 1", "C = 3", "B = 2", "D = 4", "E = 5"), + table.moveColumns(1, "C", "B")); + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "C = 3", "B = 2", "E = 5"), + table.moveColumns(2, "C", "B")); + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "E = 5", "C = 3", "B = 2"), + table.moveColumns(3, "C", "B")); + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "E = 5", "C = 3", "B = 2"), + table.moveColumns(4, "C", "B")); + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "E = 5", "C = 3", "B = 2"), + table.moveColumns(10, "C", "B")); + + // test trivial do nothing case + for (int ii = -1; ii < 10; ++ii) { + assertTableEquals(table, table.moveColumns(ii)); + } + + // Can't move a column down twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumns(2, "C", "C"); + }); + + // Can't rename a source column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumns(2, "A1 = A", "A2 = A"); + }); + + // Can't rename to a dest column twice. + Assert.assertThrows(IllegalArgumentException.class, () -> { + table.moveColumns(2, "O = A", "O = B"); + }); + + + assertTableEquals( + emptyTable(1).update("A = 1", "D = 4", "B = 3", "E = 5"), + table.moveColumns(2, "B = C")); + assertTableEquals( + emptyTable(1).update("C = 3", "D = 4", "B = 1", "E = 5"), + table.moveColumns(2, "B = A")); + assertTableEquals( + emptyTable(1).update("C = 3", "D = 4", "B = 1", "A = 2", "E = 5"), + table.moveColumns(2, "B = A", "A = B")); + } + public static WhereFilter stringContainsFilter( String columnName, String... values) { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestMoveColumns.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestMoveColumns.java index 3edb395a20b..f82f26c7978 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestMoveColumns.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestMoveColumns.java @@ -69,29 +69,13 @@ public void testMoveColumns() { checkColumnOrder(temp, "dexab"); checkColumnValueOrder(temp, "45123"); - temp = table.moveColumns(0, "x=a", "b=x"); - checkColumnOrder(temp, "xbcde"); - checkColumnValueOrder(temp, "11345"); - - temp = table.moveColumns(0, "x=a", "y=a", "z=a"); - checkColumnOrder(temp, "xyzbcde"); - checkColumnValueOrder(temp, "1112345"); + temp = table.moveColumns(0, "x=a", "d"); + checkColumnOrder(temp, "xdbce"); + checkColumnValueOrder(temp, "14235"); temp = table.moveColumns(0, "b=a", "a=b"); checkColumnOrder(temp, "bacde"); - checkColumnValueOrder(temp, "11345"); - - temp = table.moveColumns(0, "d=c", "d=a", "x=e"); - checkColumnOrder(temp, "dxb"); - checkColumnValueOrder(temp, "152"); - - temp = table.moveColumns(0, "a=b", "a=c"); - checkColumnOrder(temp, "ade"); - checkColumnValueOrder(temp, "345"); - - temp = table.moveColumns(0, "a=b", "a=c", "a=d", "a=e"); - checkColumnOrder(temp, "a"); - checkColumnValueOrder(temp, "5"); + checkColumnValueOrder(temp, "12345"); } public void testMoveUpColumns() { @@ -110,14 +94,6 @@ public void testMoveUpColumns() { temp = table.moveColumnsUp("x=e"); checkColumnOrder(temp, "xabcd"); checkColumnValueOrder(temp, "51234"); - - temp = table.moveColumnsUp("x=a", "x=b"); - checkColumnOrder(temp, "xcde"); - checkColumnValueOrder(temp, "2345"); - - temp = table.moveColumnsUp("x=a", "y=a"); - checkColumnOrder(temp, "xybcde"); - checkColumnValueOrder(temp, "112345"); } public void testMoveDownColumns() { @@ -134,19 +110,11 @@ public void testMoveDownColumns() { temp = table.moveColumnsDown("b=a", "a=b", "c"); checkColumnOrder(temp, "debac"); - checkColumnValueOrder(temp, "45113"); - - temp = table.moveColumnsDown("b=a", "a=b", "c", "d=a"); - checkColumnOrder(temp, "ebacd"); - checkColumnValueOrder(temp, "51131"); - - temp = table.moveColumnsDown("x=a", "x=b"); - checkColumnOrder(temp, "cdex"); - checkColumnValueOrder(temp, "3452"); + checkColumnValueOrder(temp, "45123"); - temp = table.moveColumnsDown("x=a", "y=a"); - checkColumnOrder(temp, "bcdexy"); - checkColumnValueOrder(temp, "234511"); + temp = table.moveColumnsDown("b=a", "a=b", "c"); + checkColumnOrder(temp, "debac"); + checkColumnValueOrder(temp, "45123"); } private void checkColumnOrder(Table t, String expectedOrder) { diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index 922e6b3dcd1..5508a543a4b 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -80,11 +80,11 @@ class NodeType(Enum): """An enum of node types for RollupTable""" AGGREGATED = _JNodeType.Aggregated """Nodes at an aggregated (rolled up) level in the RollupTable. An aggregated level is above the constituent ( - leaf) level. These nodes have column names and types that result from applying aggregations on the source table + leaf) level. These nodes have column names and types that result from applying aggregations on the source table of the RollupTable. """ CONSTITUENT = _JNodeType.Constituent - """Nodes at the leaf level when :meth:`~deephaven.table.Table.rollup` method is called with - include_constituent=True. The constituent level is the lowest in a rollup table. These nodes have column names + """Nodes at the leaf level when :meth:`~deephaven.table.Table.rollup` method is called with + include_constituent=True. The constituent level is the lowest in a rollup table. These nodes have column names and types from the source table of the RollupTable. """ @@ -665,10 +665,12 @@ def drop_columns(self, cols: Union[str, Sequence[str]]) -> Table: def move_columns(self, idx: int, cols: Union[str, Sequence[str]]) -> Table: """The move_columns method creates a new table with specified columns moved to a specific column index value. + Columns may be renamed with the same semantics as rename_columns. The renames are simultaneous and unordered, + enabling direct swaps between column names. Specifying a source or destination more than once is prohibited. Args: idx (int): the column index where the specified columns will be moved in the new table. - cols (Union[str, Sequence[str]]) : the column name(s) + cols (Union[str, Sequence[str]]) : the column name(s) or the column rename expr(s) as "X = Y" Returns: a new table @@ -684,10 +686,12 @@ def move_columns(self, idx: int, cols: Union[str, Sequence[str]]) -> Table: def move_columns_down(self, cols: Union[str, Sequence[str]]) -> Table: """The move_columns_down method creates a new table with specified columns appearing last in order, to the far - right. + right. Columns may be renamed with the same semantics as rename_columns. The renames are simultaneous and + unordered, enabling direct swaps between column names. Specifying a source or destination more than once is + prohibited. Args: - cols (Union[str, Sequence[str]]) : the column name(s) + cols (Union[str, Sequence[str]]) : the column name(s) or the column rename expr(s) as "X = Y" Returns: a new table @@ -703,10 +707,12 @@ def move_columns_down(self, cols: Union[str, Sequence[str]]) -> Table: def move_columns_up(self, cols: Union[str, Sequence[str]]) -> Table: """The move_columns_up method creates a new table with specified columns appearing first in order, to the far - left. + left. Columns may be renamed with the same semantics as rename_columns. The renames are simultaneous and + unordered, enabling direct swaps between column names. Specifying a source or destination more than once is + prohibited. Args: - cols (Union[str, Sequence[str]]) : the column name(s) + cols (Union[str, Sequence[str]]) : the column name(s) or the column rename expr(s) as "X = Y" Returns: a new table @@ -721,7 +727,9 @@ def move_columns_up(self, cols: Union[str, Sequence[str]]) -> Table: raise DHError(e, "table move_columns_up operation failed.") from e def rename_columns(self, cols: Union[str, Sequence[str]]) -> Table: - """The rename_columns method creates a new table with the specified columns renamed. + """The rename_columns method creates a new table with the specified columns renamed. The renames are + simultaneous and unordered, enabling direct swaps between column names. Specifying a source or + destination more than once is prohibited. Args: cols (Union[str, Sequence[str]]) : the column rename expr(s) as "X = Y" @@ -734,8 +742,7 @@ def rename_columns(self, cols: Union[str, Sequence[str]]) -> Table: """ try: cols = to_sequence(cols) - with auto_locking_ctx(self): - return Table(j_table=self.j_table.renameColumns(*cols)) + return Table(j_table=self.j_table.renameColumns(*cols)) except Exception as e: raise DHError(e, "table rename_columns operation failed.") from e diff --git a/py/server/tests/test_update_graph.py b/py/server/tests/test_update_graph.py index 2cdf0d116cc..4da6db8e058 100644 --- a/py/server/tests/test_update_graph.py +++ b/py/server/tests/test_update_graph.py @@ -161,21 +161,6 @@ def test_auto_locking_joins(self): with self.subTest(op=op): result_table = left_table.aj(right_table, on="X") - def test_auto_locking_rename_columns(self): - with ug.shared_lock(self.test_update_graph): - test_table = time_table("PT00:00:00.001").update(["X=i", "Y=i%13", "Z=X*Y"]) - - cols_to_rename = [ - f"{f.name + '_2'} = {f.name}" for f in test_table.columns[::2] - ] - - with self.assertRaises(DHError) as cm: - result_table = test_table.rename_columns(cols_to_rename) - self.assertRegex(str(cm.exception), r"IllegalStateException") - - ug.auto_locking = True - result_table = test_table.rename_columns(cols_to_rename) - def test_auto_locking_ungroup(self): with ug.shared_lock(self.test_update_graph): test_table = time_table("PT00:00:00.001").update(["X=i", "Y=i%13"]) diff --git a/table-api/src/main/java/io/deephaven/api/Strings.java b/table-api/src/main/java/io/deephaven/api/Strings.java index af285661400..b4c7e478780 100644 --- a/table-api/src/main/java/io/deephaven/api/Strings.java +++ b/table-api/src/main/java/io/deephaven/api/Strings.java @@ -116,6 +116,10 @@ public static String of(FilterPattern pattern, boolean invert) { return (invert ? "!" : "") + inner; } + public static String ofPairs(Collection pairs) { + return pairs.stream().map(Strings::of).collect(Collectors.joining(",", "[", "]")); + } + public static String of(Pair pair) { if (pair.input().equals(pair.output())) { return of(pair.output());