diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 83203f964c4..16fc2af3f33 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultQueue; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4; import org.apache.tinkerpop.gremlin.util.ser.SerializationException; @@ -68,7 +69,7 @@ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, f final ResultQueue queue = pending.get(); if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) { - final List data = response.getResult().getData(); + final BulkSet data = response.getResult().getData(); // unrolls the collection into individual results to be handled by the queue. data.forEach(item -> queue.add(new Result(item))); } else { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java index d2589e2df71..836ec61a532 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser; import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; import org.apache.tinkerpop.gremlin.structure.Element; @@ -30,6 +31,7 @@ import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -191,13 +193,15 @@ private String determineMaterializeProperties() { : TokensV4.MATERIALIZE_PROPERTIES_ALL; } - public void handleDetachment(final List aggregate) { + public void handleDetachment(final BulkSet aggregate) { if (!aggregate.isEmpty() && !this.getMaterializeProperties().equals(TokensV4.MATERIALIZE_PROPERTIES_ALL)) { final Object firstElement = aggregate.get(0); if (firstElement instanceof Element) { - for (int i = 0; i < aggregate.size(); i++) - aggregate.set(i, ReferenceFactory.detach((Element) aggregate.get(i))); + for (Map.Entry element : aggregate.asBulk().entrySet()) { + aggregate.remove(element); + aggregate.add(ReferenceFactory.detach((Element) element.getKey()), element.getValue()); + } } else if (firstElement instanceof AbstractTraverser) { for (final Object item : aggregate) ((AbstractTraverser) item).detach(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index e3b34379a24..c7557ae70ec 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -41,6 +41,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Scope; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GraphManager; @@ -382,7 +383,7 @@ private void handleIterator(final Context context, final Iterator itty, final Me if (!itty.hasNext()) { ByteBuf chunk = null; try { - chunk = makeChunk(context, msg, serializer, new ArrayList<>(), false); + chunk = makeChunk(context, msg, serializer, new BulkSet<>(), false); nettyContext.writeAndFlush(new DefaultHttpContent(chunk)); } catch (Exception ex) { // Bytebuf is a countable release - if it does not get written downstream @@ -394,9 +395,10 @@ private void handleIterator(final Context context, final Iterator itty, final Me } // the batch size can be overridden by the request - final int resultIterationBatchSize = (Integer) msg.optionalField(TokensV4.ARGS_BATCH_SIZE) - .orElse(settings.resultIterationBatchSize); - List aggregate = new ArrayList<>(resultIterationBatchSize); +// final int resultIterationBatchSize = (Integer) msg.optionalField(TokensV4.ARGS_BATCH_SIZE) +// .orElse(settings.resultIterationBatchSize); + final int resultIterationBatchSize = 1024; + BulkSet aggregate = new BulkSet<>(); // use an external control to manage the loop as opposed to just checking hasNext() in the while. this // prevent situations where auto transactions create a new transaction after calls to commit() withing @@ -458,7 +460,7 @@ private void handleIterator(final Context context, final Iterator itty, final Me try { // only need to reset the aggregation list if there's more stuff to write if (hasMore) { - aggregate = new ArrayList<>(resultIterationBatchSize); + aggregate = new BulkSet<>(); } } catch (Exception ex) { // Bytebuf is a countable release - if it does not get written downstream @@ -507,7 +509,7 @@ private boolean acceptsDeflateEncoding(List encodings) { } private static ByteBuf makeChunk(final Context ctx, final RequestMessageV4 msg, - final MessageSerializerV4 serializer, final List aggregate, + final MessageSerializerV4 serializer, final BulkSet aggregate, final boolean hasMore) throws Exception { try { final ChannelHandlerContext nettyContext = ctx.getChannelHandlerContext(); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java index dadd1eb4687..4033fd35d72 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java @@ -45,13 +45,13 @@ public Function getMapper() { @Override public ByteBuf serializeResponseAsBinary(final ResponseMessageV4 responseMessage, final ByteBufAllocator allocator) { return (responseMessage.getStatus().getCode() == HttpResponseStatus.OK) - ? convertStringData(responseMessage.getResult().getData(), false, allocator) + ? convertStringData(responseMessage.getResult().getListData(), false, allocator) : convertErrorString(responseMessage.getStatus().getMessage(), allocator); } @Override public ByteBuf writeHeader(ResponseMessageV4 responseMessage, ByteBufAllocator allocator) { - return convertStringData(responseMessage.getResult().getData(), false, allocator); + return convertStringData(responseMessage.getResult().getListData(), false, allocator); } @Override @@ -61,7 +61,7 @@ public ByteBuf writeChunk(Object aggregate, ByteBufAllocator allocator) { @Override public ByteBuf writeFooter(ResponseMessageV4 responseMessage, ByteBufAllocator allocator) { - return convertStringData(responseMessage.getResult().getData(), true, allocator); + return convertStringData(responseMessage.getResult().getListData(), true, allocator); } @Override diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java index 10f0c0183bc..8f0bbc5dd94 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.util.message; import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import java.util.Collections; import java.util.HashMap; @@ -100,7 +101,7 @@ public static Builder build() { public final static class Builder { private HttpResponseStatus code = null; - private List result = Collections.emptyList(); + private BulkSet result = new BulkSet<>(); private String statusMessage = null; private String exception = null; private Map attributes = Collections.emptyMap(); @@ -129,6 +130,11 @@ public Builder statusAttributes(final Map attributes) { } public Builder result(final List result) { + result.stream().forEach(res -> this.result.add(res)); + return this; + } + + public Builder result(final BulkSet result) { this.result = result; return this; } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java index 1711a3c0bc2..daa8ec1e863 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java @@ -18,6 +18,9 @@ */ package org.apache.tinkerpop.gremlin.util.message; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; + +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -25,18 +28,24 @@ * Data model for the "result" portion of a {@link ResponseMessageV4}. */ public final class ResponseResultV4 { - private final List data; + private final BulkSet data; private final Map meta; - public ResponseResultV4(final List data, final Map meta) { + public ResponseResultV4(final BulkSet data, final Map meta) { this.data = data; this.meta = meta; } - public List getData() { + public BulkSet getData() { return data; } + public List getListData() { + List results = new ArrayList(data.size()); + data.stream().forEach(res -> results.add(res)); + return results; + } + public Map getMeta() { return meta; } diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java index 4a216dd50e2..dec168fcc60 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.graphson.AbstractObjectDeserializer; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; @@ -378,7 +379,7 @@ public void ser(final ResponseMessageV4 responseMessage, final JsonGenerator jso GraphSONUtil.writeEndObject(responseMessage, jsonGenerator, typeSerializer); jsonGenerator.writeFieldName(SerTokensV4.TOKEN_RESULT); - final List result = responseMessage.getResult().getData(); + final List result = responseMessage.getResult().getListData(); if (result != null) { serializerProvider.findTypedValueSerializer(result.getClass(), true, null).serialize(result, jsonGenerator, serializerProvider); } else { @@ -476,7 +477,7 @@ public ResponseMessageV4 createObject(final Map data) { final Map status = (Map) data.get(SerTokensV4.TOKEN_STATUS); ResponseMessageV4.Builder response = ResponseMessageV4.build() .code(HttpResponseStatus.valueOf((Integer) status.get(SerTokensV4.TOKEN_CODE))) - .result((List) data.get(SerTokensV4.TOKEN_RESULT)); + .result((BulkSet) data.get(SerTokensV4.TOKEN_RESULT)); if (null != status.get(SerTokensV4.TOKEN_EXCEPTION)) { response.exception(String.valueOf(status.get(SerTokensV4.TOKEN_EXCEPTION))); diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java index d98c7cfd852..0949965ffd8 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.Buffer; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; @@ -232,8 +233,9 @@ private ByteBuf write(final ResponseMessageV4 responseMessage, final Object aggr ? responseMessage.getResult().getData() : aggregate; if (data != null) { - for (final Object item : (List) data) { - writer.write(item, buffer); + for (final Map.Entry item : ((BulkSet) data).asBulk().entrySet()) { + writer.write(item.getKey(), buffer); + writer.write(item.getValue(), buffer); } } } @@ -263,14 +265,17 @@ public ResponseMessageV4 deserializeBinaryResponse(final ByteBuf msg) throws Ser return readChunk(msg, true); } - private List readPayload(final Buffer buffer) throws IOException { - final List result = new ArrayList<>(); + private BulkSet readPayload(final Buffer buffer) throws IOException { + final BulkSet result = new BulkSet<>(); while (buffer.readableBytes() != 0) { final Object obj = reader.read(buffer); + if (Marker.END_OF_STREAM.equals(obj)) { break; } - result.add(obj); + + final Long bulk = reader.read(buffer); + result.add(obj, bulk); } return result; } @@ -290,7 +295,7 @@ public ResponseMessageV4 readChunk(final ByteBuf byteBuf, final boolean isFirstC try { // empty input buffer if (buffer.readableBytes() == 0) { - return ResponseMessageV4.build().result(Collections.emptyList()).create(); + return ResponseMessageV4.build().result(new BulkSet<>()).create(); } if (isFirstChunk) { @@ -303,7 +308,7 @@ public ResponseMessageV4 readChunk(final ByteBuf byteBuf, final boolean isFirstC } } - final List result = readPayload(buffer); + final BulkSet result = readPayload(buffer); // no footer if (buffer.readableBytes() == 0) { diff --git a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java index fd5e600cf02..b6b078cb852 100644 --- a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java +++ b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.structure.io.Buffer; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter; @@ -52,7 +53,7 @@ public ResponseMessageV4 readValue(final ByteBuf byteBuf, final GraphBinaryReade .statusMessage(context.readValue(buffer, String.class, true)) .statusAttributes(context.readValue(buffer, Map.class, false)) .responseMetaData(context.readValue(buffer, Map.class, false)) - .result(context.read(buffer)) + .result(context.>read(buffer)) .create(); } catch (IOException ex) { throw new SerializationException(ex);