Skip to content

Commit

Permalink
Implementing reverseScan
Browse files Browse the repository at this point in the history
Signed-off-by: Jiaming Lu <jiaming.lu@simplytyped.cn>
  • Loading branch information
Jiaming Lu committed Jan 11, 2023
1 parent d278e3a commit 11cfac0
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 37 deletions.
32 changes: 26 additions & 6 deletions src/main/java/org/tikv/common/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,25 @@ public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long ve
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
return scan(startKey, endKey, version, false);
}

/**
* Scan key-value pairs from TiKV in range [startKey, endKey) or if reversely, [endKey, startKey)
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @param reverse whether to scan reversely
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version, boolean reverse)
throws GrpcException {
Iterator<KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand All @@ -115,7 +133,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit, false);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand Down Expand Up @@ -183,16 +201,18 @@ private Iterator<KvPair> scanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}

private Iterator<KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}
}
38 changes: 36 additions & 2 deletions src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,24 @@ public Iterator<KvPair> scan(ByteString startKey) {
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);
}

/**
* scan all keys becofe startKey, inclusive
*
* @param startKey start of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScan(ByteString startKey) {
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE,
true);
}

/**
Expand All @@ -173,7 +190,24 @@ public Iterator<KvPair> scanPrefix(ByteString prefix) {
session.getRegionStoreClientBuilder(),
prefix,
nextPrefix,
timestamp.getVersion());
timestamp.getVersion(),
false);
}
/**
* scan all keys with prefix, reversely
*
* @param prefix prefix of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScanPrefix(ByteString prefix) {
ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString();
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
nextPrefix,
prefix,
timestamp.getVersion(),
true);
}

public TiConfiguration getConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ public ConcreteScanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
int limit,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, ByteString.EMPTY, version, limit);
this(conf, builder, startKey, ByteString.EMPTY, version, limit, reverse);
}

public ConcreteScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
long version,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE, reverse);
}

private ConcreteScanIterator(
Expand All @@ -65,8 +67,9 @@ private ConcreteScanIterator(
ByteString startKey,
ByteString endKey,
long version,
int limit) {
super(conf, builder, startKey, endKey, limit, false);
int limit,
boolean reverse) {
super(conf, builder, startKey, endKey, limit, false, reverse);
this.version = version;
}

Expand All @@ -76,7 +79,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
currentCache = client.scan(backOffer, startKey, version, reverse);
// If we get region before scan, we will use region from cache which
// may have wrong end key. This may miss some regions that split from old region.
// Client will get the newest region during scan. So we need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public RawScanIterator(
ByteString endKey,
int limit,
boolean keyOnly,
boolean reverse,
BackOffer scanBackOffer) {
super(conf, builder, startKey, endKey, limit, keyOnly);
super(conf, builder, startKey, endKey, limit, keyOnly, reverse);

this.scanBackOffer = scanBackOffer;
}
Expand All @@ -56,7 +57,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
currentCache = null;
} else {
try {
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly, reverse);
// Client will get the newest region during scan. So we need to
// update region after scan.
region = client.getRegion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,24 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected Key endKey;
protected boolean hasEndKey;
protected boolean processingLastBatch = false;
protected boolean reverse = false;

ScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
boolean reverse) {
this.startKey = requireNonNull(startKey, "start key is null");
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.isEmpty();
this.limit = limit;
this.keyOnly = keyOnly;
this.conf = conf;
this.builder = builder;
this.reverse = reverse;
}

/**
Expand Down
23 changes: 15 additions & 8 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.*;
import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.CommitRequest;
Expand Down Expand Up @@ -109,8 +108,6 @@
import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.AbstractLockResolverClient;
Expand Down Expand Up @@ -336,7 +333,7 @@ private List<KvPair> handleBatchGetResponse(
}

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly, boolean reverse) {
boolean forWrite = false;
while (true) {
Supplier<ScanRequest> request =
Expand All @@ -348,6 +345,7 @@ public List<KvPair> scan(
.setStartKey(codec.encodeKey(startKey))
.setVersion(version)
.setKeyOnly(keyOnly)
.setReverse(reverse)
.setLimit(getConf().getScanBatchSize())
.build();

Expand Down Expand Up @@ -417,6 +415,11 @@ public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version)
return scan(backOffer, startKey, version, false);
}

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean reverse) {
return scan(backOffer, startKey, version, false, reverse);
}

/**
* Prewrite batch keys
*
Expand Down Expand Up @@ -1238,9 +1241,11 @@ private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
* @param backOffer BackOffer
* @param key startKey
* @param keyOnly true if value of KvPair is not needed
* @param reverse
* @return KvPair list
*/
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, int limit, boolean keyOnly, boolean reverse) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
Expand All @@ -1254,6 +1259,7 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
.setEndKey(range.second)
.setKeyOnly(keyOnly)
.setLimit(limit)
.setReverse(reverse)
.build();
};

Expand All @@ -1271,8 +1277,9 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
}
}

public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, boolean keyOnly, boolean reverse) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly, reverse);
}

private List<KvPair> rawScanHelper(RawScanResponse resp) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ private Iterator<KvPair> rawScanIterator(
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, false, backOffer);
}

/**
Expand Down
64 changes: 54 additions & 10 deletions src/main/java/org/tikv/txn/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,37 @@ public List<Kvrpcpb.KvPair> batchGet(BackOffer backOffer, List<ByteString> keys,
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(startKey, endKey, version, false);
}
/**
* Scan key-value pairs from TiKV reversely in range (startKey, endKey]
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(endKey, startKey, version, true);
}

public List<Kvrpcpb.KvPair> scan(
ByteString startKey, ByteString endKey, long version, boolean reverse) throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}

public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit, boolean reverse)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator =
scanIterator(conf, clientBuilder, startKey, endKey, version);
scanIterator(conf, clientBuilder, startKey, version, limit, reverse);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand All @@ -130,14 +159,27 @@ public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long ve
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
return scan(startKey, version, limit, false);
}

/**
* Scan key-value pairs reversively from TiKV in range ('', endKey], maximum to `limit` pairs
*
* @param endKey start key, inclusive
* @param limit limit of kv pairs
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version, int limit)
throws GrpcException {
return scan(endKey, version, limit, true);
}

public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version) throws GrpcException {
return scan(startKey, version, Integer.MAX_VALUE);
return scan(startKey, version, Integer.MAX_VALUE, false);
}

public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version) throws GrpcException {
return scan(endKey, version, Integer.MAX_VALUE, true);
}

public synchronized void ingest(List<Pair<ByteString, ByteString>> list) throws GrpcException {
Expand Down Expand Up @@ -264,17 +306,19 @@ private Iterator<Kvrpcpb.KvPair> scanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}

private Iterator<Kvrpcpb.KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}

private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList)
Expand Down

0 comments on commit 11cfac0

Please sign in to comment.