From c649758a6d79a5d49ef62c6993357eb7697105df Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Mon, 11 Dec 2023 15:30:23 -0700 Subject: [PATCH] Use ByteBuffer to receive data from Python --- .../barrage/util/ArrowToTableConverter.java | 11 ++++++----- py/server/deephaven/arrow.py | 4 ++-- py/server/tests/test_arrow.py | 1 + 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java index 072e149f41d..12063e81b6e 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/ArrowToTableConverter.java @@ -48,10 +48,9 @@ public class ArrowToTableConverter { private volatile boolean completed = false; - private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException { + private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(ByteBuffer bb) throws IOException { final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo(); - final ByteBuffer bb = ByteBuffer.wrap(ipcMessage); bb.order(ByteOrder.LITTLE_ENDIAN); final int continuation = bb.getInt(); final int metadata_size = bb.getInt(); @@ -70,7 +69,7 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip } @ScriptApi - public synchronized void setSchema(final byte[] ipcMessage) { + public synchronized void setSchema(final ByteBuffer ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -82,7 +81,7 @@ public synchronized void setSchema(final byte[] ipcMessage) { } @ScriptApi - public synchronized void addRecordBatch(final byte[] ipcMessage) { + public synchronized void addRecordBatch(final ByteBuffer ipcMessage) { if (completed) { throw new IllegalStateException("Conversion is complete; cannot process additional messages"); } @@ -192,7 +191,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i return msg; } - private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { + private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) { final BarrageProtoUtil.MessageInfo mi; try { mi = parseArrowIpcMessage(ipcMessage); @@ -201,4 +200,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) { } return mi; } + + } diff --git a/py/server/deephaven/arrow.py b/py/server/deephaven/arrow.py index 9cefb97e03b..4b2cb6b90b7 100644 --- a/py/server/deephaven/arrow.py +++ b/py/server/deephaven/arrow.py @@ -101,12 +101,12 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table: try: pa_buffer = dh_schema.serialize() - j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.setSchema(pa_buffer) record_batches = pa_table.to_batches() for rb in record_batches: pa_buffer = rb.serialize() - j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer)) + j_barrage_table_builder.addRecordBatch(pa_buffer) j_barrage_table_builder.onCompleted() return Table(j_table=j_barrage_table_builder.getResultTable()) diff --git a/py/server/tests/test_arrow.py b/py/server/tests/test_arrow.py index 411128b2f4f..b56418a2d69 100644 --- a/py/server/tests/test_arrow.py +++ b/py/server/tests/test_arrow.py @@ -18,6 +18,7 @@ from deephaven.table import Table from tests.testbase import BaseTestCase + class ArrowTestCase(BaseTestCase): test_table: Table