Skip to content

Commit

Permalink
Use ByteBuffer to receive data from Python
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Dec 14, 2023
1 parent 482cd8e commit bb34d3b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ public class ArrowToTableConverter {

private volatile boolean completed = false;

private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException {
private BarrageProtoUtil.MessageInfo parseArrowIpcMessage(ByteBuffer bb) throws IOException {
final BarrageProtoUtil.MessageInfo mi = new BarrageProtoUtil.MessageInfo();

final ByteBuffer bb = ByteBuffer.wrap(ipcMessage);
bb.order(ByteOrder.LITTLE_ENDIAN);
final int continuation = bb.getInt();
final int metadata_size = bb.getInt();
Expand All @@ -70,7 +69,7 @@ private static BarrageProtoUtil.MessageInfo parseArrowIpcMessage(final byte[] ip
}

@ScriptApi
public synchronized void setSchema(final byte[] ipcMessage) {
public synchronized void setSchema(final ByteBuffer ipcMessage) {
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand All @@ -82,7 +81,7 @@ public synchronized void setSchema(final byte[] ipcMessage) {
}

@ScriptApi
public synchronized void addRecordBatch(final byte[] ipcMessage) {
public synchronized void addRecordBatch(final ByteBuffer ipcMessage) {
if (completed) {
throw new IllegalStateException("Conversion is complete; cannot process additional messages");
}
Expand Down Expand Up @@ -192,7 +191,7 @@ protected BarrageMessage createBarrageMessage(BarrageProtoUtil.MessageInfo mi, i
return msg;
}

private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
private BarrageProtoUtil.MessageInfo getMessageInfo(ByteBuffer ipcMessage) {
final BarrageProtoUtil.MessageInfo mi;
try {
mi = parseArrowIpcMessage(ipcMessage);
Expand All @@ -201,4 +200,6 @@ private BarrageProtoUtil.MessageInfo getMessageInfo(byte[] ipcMessage) {
}
return mi;
}


}
4 changes: 2 additions & 2 deletions py/server/deephaven/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table:

try:
pa_buffer = dh_schema.serialize()
j_barrage_table_builder.setSchema(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.setSchema(pa_buffer)

record_batches = pa_table.to_batches()
for rb in record_batches:
pa_buffer = rb.serialize()
j_barrage_table_builder.addRecordBatch(dtypes.array(dtypes.byte, pa_buffer))
j_barrage_table_builder.addRecordBatch(pa_buffer)
j_barrage_table_builder.onCompleted()

return Table(j_table=j_barrage_table_builder.getResultTable())
Expand Down
1 change: 1 addition & 0 deletions py/server/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from deephaven.table import Table
from tests.testbase import BaseTestCase


class ArrowTestCase(BaseTestCase):
test_table: Table

Expand Down

0 comments on commit bb34d3b

Please sign in to comment.