Skip to content

Commit

Permalink
initial attempt at returning results with a BulkSet
Browse files Browse the repository at this point in the history
  • Loading branch information
kenhuuu committed Jul 26, 2024
1 parent 3862e93 commit 1482dea
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultQueue;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4;
import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected void channelRead0(final ChannelHandlerContext channelHandlerContext, f
final ResultQueue queue = pending.get();

if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) {
final List<Object> data = response.getResult().getData();
final BulkSet<Object> data = response.getResult().getData();
// unrolls the collection into individual results to be handled by the queue.
data.forEach(item -> queue.add(new Result(item)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser;
import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
import org.apache.tinkerpop.gremlin.structure.Element;
Expand All @@ -30,6 +31,7 @@
import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -191,13 +193,15 @@ private String determineMaterializeProperties() {
: TokensV4.MATERIALIZE_PROPERTIES_ALL;
}

public void handleDetachment(final List<Object> aggregate) {
public void handleDetachment(final BulkSet<Object> aggregate) {
if (!aggregate.isEmpty() && !this.getMaterializeProperties().equals(TokensV4.MATERIALIZE_PROPERTIES_ALL)) {
final Object firstElement = aggregate.get(0);

if (firstElement instanceof Element) {
for (int i = 0; i < aggregate.size(); i++)
aggregate.set(i, ReferenceFactory.detach((Element) aggregate.get(i)));
for (Map.Entry<Object, Long> element : aggregate.asBulk().entrySet()) {
aggregate.remove(element);
aggregate.add(ReferenceFactory.detach((Element) element.getKey()), element.getValue());
}
} else if (firstElement instanceof AbstractTraverser) {
for (final Object item : aggregate)
((AbstractTraverser) item).detach();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.GraphManager;
Expand Down Expand Up @@ -382,7 +383,7 @@ private void handleIterator(final Context context, final Iterator itty, final Me
if (!itty.hasNext()) {
ByteBuf chunk = null;
try {
chunk = makeChunk(context, msg, serializer, new ArrayList<>(), false);
chunk = makeChunk(context, msg, serializer, new BulkSet<>(), false);
nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
} catch (Exception ex) {
// Bytebuf is a countable release - if it does not get written downstream
Expand All @@ -394,9 +395,10 @@ private void handleIterator(final Context context, final Iterator itty, final Me
}

// the batch size can be overridden by the request
final int resultIterationBatchSize = (Integer) msg.optionalField(TokensV4.ARGS_BATCH_SIZE)
.orElse(settings.resultIterationBatchSize);
List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
// final int resultIterationBatchSize = (Integer) msg.optionalField(TokensV4.ARGS_BATCH_SIZE)
// .orElse(settings.resultIterationBatchSize);
final int resultIterationBatchSize = 1024;
BulkSet<Object> aggregate = new BulkSet<>();

// use an external control to manage the loop as opposed to just checking hasNext() in the while. this
// prevent situations where auto transactions create a new transaction after calls to commit() withing
Expand Down Expand Up @@ -458,7 +460,7 @@ private void handleIterator(final Context context, final Iterator itty, final Me
try {
// only need to reset the aggregation list if there's more stuff to write
if (hasMore) {
aggregate = new ArrayList<>(resultIterationBatchSize);
aggregate = new BulkSet<>();
}
} catch (Exception ex) {
// Bytebuf is a countable release - if it does not get written downstream
Expand Down Expand Up @@ -507,7 +509,7 @@ private boolean acceptsDeflateEncoding(List<String> encodings) {
}

private static ByteBuf makeChunk(final Context ctx, final RequestMessageV4 msg,
final MessageSerializerV4<?> serializer, final List<Object> aggregate,
final MessageSerializerV4<?> serializer, final BulkSet<Object> aggregate,
final boolean hasMore) throws Exception {
try {
final ChannelHandlerContext nettyContext = ctx.getChannelHandlerContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public Function<Object, String> getMapper() {
@Override
public ByteBuf serializeResponseAsBinary(final ResponseMessageV4 responseMessage, final ByteBufAllocator allocator) {
return (responseMessage.getStatus().getCode() == HttpResponseStatus.OK)
? convertStringData(responseMessage.getResult().getData(), false, allocator)
? convertStringData(responseMessage.getResult().getListData(), false, allocator)
: convertErrorString(responseMessage.getStatus().getMessage(), allocator);
}

@Override
public ByteBuf writeHeader(ResponseMessageV4 responseMessage, ByteBufAllocator allocator) {
return convertStringData(responseMessage.getResult().getData(), false, allocator);
return convertStringData(responseMessage.getResult().getListData(), false, allocator);
}

@Override
Expand All @@ -61,7 +61,7 @@ public ByteBuf writeChunk(Object aggregate, ByteBufAllocator allocator) {

@Override
public ByteBuf writeFooter(ResponseMessageV4 responseMessage, ByteBufAllocator allocator) {
return convertStringData(responseMessage.getResult().getData(), true, allocator);
return convertStringData(responseMessage.getResult().getListData(), true, allocator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.util.message;

import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -100,7 +101,7 @@ public static Builder build() {

public final static class Builder {
private HttpResponseStatus code = null;
private List<Object> result = Collections.emptyList();
private BulkSet<Object> result = new BulkSet<>();
private String statusMessage = null;
private String exception = null;
private Map<String, Object> attributes = Collections.emptyMap();
Expand Down Expand Up @@ -129,6 +130,11 @@ public Builder statusAttributes(final Map<String, Object> attributes) {
}

public Builder result(final List<Object> result) {
result.stream().forEach(res -> this.result.add(res));
return this;
}

public Builder result(final BulkSet<Object> result) {
this.result = result;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,34 @@
*/
package org.apache.tinkerpop.gremlin.util.message;

import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Data model for the "result" portion of a {@link ResponseMessageV4}.
*/
public final class ResponseResultV4 {
private final List<Object> data;
private final BulkSet<Object> data;
private final Map<String, Object> meta;

public ResponseResultV4(final List<Object> data, final Map<String, Object> meta) {
public ResponseResultV4(final BulkSet<Object> data, final Map<String, Object> meta) {
this.data = data;
this.meta = meta;
}

public List<Object> getData() {
public BulkSet<Object> getData() {
return data;
}

public List<Object> getListData() {
List results = new ArrayList(data.size());
data.stream().forEach(res -> results.add(res));
return results;
}

public Map<String, Object> getMeta() {
return meta;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.graphson.AbstractObjectDeserializer;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
Expand Down Expand Up @@ -378,7 +379,7 @@ public void ser(final ResponseMessageV4 responseMessage, final JsonGenerator jso
GraphSONUtil.writeEndObject(responseMessage, jsonGenerator, typeSerializer);

jsonGenerator.writeFieldName(SerTokensV4.TOKEN_RESULT);
final List<Object> result = responseMessage.getResult().getData();
final List<Object> result = responseMessage.getResult().getListData();
if (result != null) {
serializerProvider.findTypedValueSerializer(result.getClass(), true, null).serialize(result, jsonGenerator, serializerProvider);
} else {
Expand Down Expand Up @@ -476,7 +477,7 @@ public ResponseMessageV4 createObject(final Map<String, Object> data) {
final Map<String, Object> status = (Map<String, Object>) data.get(SerTokensV4.TOKEN_STATUS);
ResponseMessageV4.Builder response = ResponseMessageV4.build()
.code(HttpResponseStatus.valueOf((Integer) status.get(SerTokensV4.TOKEN_CODE)))
.result((List) data.get(SerTokensV4.TOKEN_RESULT));
.result((BulkSet<Object>) data.get(SerTokensV4.TOKEN_RESULT));

if (null != status.get(SerTokensV4.TOKEN_EXCEPTION)) {
response.exception(String.valueOf(status.get(SerTokensV4.TOKEN_EXCEPTION)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.io.Buffer;
import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
Expand Down Expand Up @@ -232,8 +233,9 @@ private ByteBuf write(final ResponseMessageV4 responseMessage, final Object aggr
? responseMessage.getResult().getData()
: aggregate;
if (data != null) {
for (final Object item : (List) data) {
writer.write(item, buffer);
for (final Map.Entry<Object, Long> item : ((BulkSet<Object>) data).asBulk().entrySet()) {
writer.write(item.getKey(), buffer);
writer.write(item.getValue(), buffer);
}
}
}
Expand Down Expand Up @@ -263,14 +265,17 @@ public ResponseMessageV4 deserializeBinaryResponse(final ByteBuf msg) throws Ser
return readChunk(msg, true);
}

private List<Object> readPayload(final Buffer buffer) throws IOException {
final List<Object> result = new ArrayList<>();
private BulkSet<Object> readPayload(final Buffer buffer) throws IOException {
final BulkSet<Object> result = new BulkSet<>();
while (buffer.readableBytes() != 0) {
final Object obj = reader.read(buffer);

if (Marker.END_OF_STREAM.equals(obj)) {
break;
}
result.add(obj);

final Long bulk = reader.read(buffer);
result.add(obj, bulk);
}
return result;
}
Expand All @@ -290,7 +295,7 @@ public ResponseMessageV4 readChunk(final ByteBuf byteBuf, final boolean isFirstC
try {
// empty input buffer
if (buffer.readableBytes() == 0) {
return ResponseMessageV4.build().result(Collections.emptyList()).create();
return ResponseMessageV4.build().result(new BulkSet<>()).create();
}

if (isFirstChunk) {
Expand All @@ -303,7 +308,7 @@ public ResponseMessageV4 readChunk(final ByteBuf byteBuf, final boolean isFirstC
}
}

final List<Object> result = readPayload(buffer);
final BulkSet<Object> result = readPayload(buffer);

// no footer
if (buffer.readableBytes() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.structure.io.Buffer;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter;
Expand Down Expand Up @@ -52,7 +53,7 @@ public ResponseMessageV4 readValue(final ByteBuf byteBuf, final GraphBinaryReade
.statusMessage(context.readValue(buffer, String.class, true))
.statusAttributes(context.readValue(buffer, Map.class, false))
.responseMetaData(context.readValue(buffer, Map.class, false))
.result(context.read(buffer))
.result(context.<BulkSet<Object>>read(buffer))
.create();
} catch (IOException ex) {
throw new SerializationException(ex);
Expand Down

0 comments on commit 1482dea

Please sign in to comment.