Skip to content

Commit

Permalink
[2.1 cherry-pick] verify scan request range (tikv#4124)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 authored Jan 28, 2019
1 parent 8fc3e65 commit 75b8d92
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 2 deletions.
1 change: 1 addition & 0 deletions benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod coprocessor;
mod raftkv;
mod serialization;
mod storage;
mod util;
mod writebatch;

use test::Bencher;
Expand Down
14 changes: 14 additions & 0 deletions benches/util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

mod slice_compare;
62 changes: 62 additions & 0 deletions benches/util/slice_compare.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use rand::{thread_rng, Rng};
use test::Bencher;

#[inline]
fn gen_rand_str(len: usize) -> Vec<u8> {
let mut rand_str = vec![0; len];
thread_rng().fill_bytes(&mut rand_str);
rand_str
}

fn bench_slice_compare_less(b: &mut Bencher, n: usize) {
let (s1, s2) = (gen_rand_str(n), gen_rand_str(n));
b.iter(|| s1 < s2);
}

fn bench_slice_compare_greater(b: &mut Bencher, n: usize) {
let (s1, s2) = (gen_rand_str(n), gen_rand_str(n));
b.iter(|| s1 > s2);
}

#[bench]
fn bench_slice_compare_less_32(b: &mut Bencher) {
bench_slice_compare_less(b, 32)
}

#[bench]
fn bench_slice_compare_less_64(b: &mut Bencher) {
bench_slice_compare_less(b, 64)
}

#[bench]
fn bench_slice_compare_less_128(b: &mut Bencher) {
bench_slice_compare_less(b, 128)
}

#[bench]
fn bench_slice_compare_greater_32(b: &mut Bencher) {
bench_slice_compare_greater(b, 32)
}

#[bench]
fn bench_slice_compare_greater_64(b: &mut Bencher) {
bench_slice_compare_greater(b, 64)
}

#[bench]
fn bench_slice_compare_greater_128(b: &mut Bencher) {
bench_slice_compare_greater(b, 128)
}
10 changes: 10 additions & 0 deletions src/storage/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ pub trait Snapshot: Send + Debug + Clone + Sized {
fn get_properties_cf(&self, _: CfName) -> Result<TablePropertiesCollection> {
Err(Error::RocksDb("no user properties".to_owned()))
}
// The minimum key this snapshot can retrieve.
#[inline]
fn lower_bound(&self) -> Option<&[u8]> {
None
}
// The maximum key can be fetched from the snapshot should less than the upper bound.
#[inline]
fn upper_bound(&self) -> Option<&[u8]> {
None
}
}

pub trait Iterator: Send + Sized {
Expand Down
10 changes: 10 additions & 0 deletions src/storage/engine/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,16 @@ impl Snapshot for RegionSnapshot {
fn get_properties_cf(&self, cf: CfName) -> engine::Result<TablePropertiesCollection> {
RegionSnapshot::get_properties_cf(self, cf).map_err(|e| e.into())
}

#[inline]
fn lower_bound(&self) -> Option<&[u8]> {
Some(self.get_start_key())
}

#[inline]
fn upper_bound(&self) -> Option<&[u8]> {
Some(self.get_end_key())
}
}

impl EngineIterator for RegionIterator {
Expand Down
23 changes: 23 additions & 0 deletions src/storage/txn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::io::Error as IoError;
pub use self::process::RESOLVE_LOCK_BATCH_SIZE;
pub use self::scheduler::{Msg, Scheduler, CMD_BATCH_SIZE};
pub use self::store::{SnapshotStore, StoreScanner};
use util::escape;

quick_error! {
#[derive(Debug)]
Expand Down Expand Up @@ -63,6 +64,17 @@ quick_error! {
start_ts,
commit_ts)
}
InvalidReqRange {start: Option<Vec<u8>>,
end: Option<Vec<u8>>,
lower_bound: Option<Vec<u8>>,
upper_bound: Option<Vec<u8>>} {
description("Invalid request range")
display("Request range exceeds bound, request range:[{:?}, end:{:?}), physical bound:[{:?}, {:?})",
start.as_ref().map(|s| escape(&s)),
end.as_ref().map(|e| escape(&e)),
lower_bound.as_ref().map(|s| escape(&s)),
upper_bound.as_ref().map(|s| escape(&s)))
}
}
}

Expand All @@ -79,6 +91,17 @@ impl Error {
start_ts,
commit_ts,
}),
Error::InvalidReqRange {
ref start,
ref end,
ref lower_bound,
ref upper_bound,
} => Some(Error::InvalidReqRange {
start: start.clone(),
end: end.clone(),
lower_bound: lower_bound.clone(),
upper_bound: upper_bound.clone(),
}),
Error::Other(_) | Error::ProtoBuf(_) | Error::Io(_) => None,
}
}
Expand Down
202 changes: 200 additions & 2 deletions src/storage/txn/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ impl<S: Snapshot> SnapshotStore<S> {
lower_bound: Option<Key>,
upper_bound: Option<Key>,
) -> Result<StoreScanner<S>> {
// Check request bounds with physical bound
self.verify_range(&lower_bound, &upper_bound)?;

let (forward_scanner, backward_scanner) = match mode {
ScanMode::Forward => {
let forward_scanner =
Expand Down Expand Up @@ -114,6 +117,35 @@ impl<S: Snapshot> SnapshotStore<S> {
backward_scanner,
})
}

fn verify_range(&self, lower_bound: &Option<Key>, upper_bound: &Option<Key>) -> Result<()> {
if let Some(ref l) = lower_bound {
if let Some(b) = self.snapshot.lower_bound() {
if !b.is_empty() && l.as_encoded().as_slice() < b {
return Err(Error::InvalidReqRange {
start: Some(l.as_encoded().clone()),
end: upper_bound.as_ref().map(|ref b| b.as_encoded().clone()),
lower_bound: Some(b.to_vec()),
upper_bound: self.snapshot.upper_bound().map(|b| b.to_vec()),
});
}
}
}
if let Some(ref u) = upper_bound {
if let Some(b) = self.snapshot.upper_bound() {
if !b.is_empty() && (u.as_encoded().as_slice() > b || u.as_encoded().is_empty()) {
return Err(Error::InvalidReqRange {
start: lower_bound.as_ref().map(|ref b| b.as_encoded().clone()),
end: Some(u.as_encoded().clone()),
lower_bound: self.snapshot.lower_bound().map(|b| b.to_vec()),
upper_bound: Some(b.to_vec()),
});
}
}
}

Ok(())
}
}

pub struct StoreScanner<S: Snapshot> {
Expand Down Expand Up @@ -163,9 +195,15 @@ impl<S: Snapshot> StoreScanner<S> {
mod test {
use super::SnapshotStore;
use kvproto::kvrpcpb::{Context, IsolationLevel};
use storage::engine::{self, Engine, RocksEngine, RocksSnapshot, TEMP_DIR};
use raftstore::store::engine::IterOption;
use storage::engine::{
self, Engine, Result as EngineResult, RocksEngine, RocksSnapshot, ScanMode, TEMP_DIR,
};
use storage::mvcc::MvccTxn;
use storage::{Key, KvPair, Mutation, Options, ScanMode, Statistics, ALL_CFS};
use storage::{
CfName, Cursor, Iterator, Key, KvPair, Mutation, Options, Snapshot, Statistics, Value,
ALL_CFS,
};

const KEY_PREFIX: &str = "key_prefix";
const START_TS: u64 = 10;
Expand Down Expand Up @@ -242,6 +280,89 @@ mod test {
}
}

// Snapshot with bound
#[derive(Clone, Debug)]
struct MockRangeSnapshot {
start: Vec<u8>,
end: Vec<u8>,
}

#[derive(Default)]
struct MockRangeSnapshotIter {}

impl Iterator for MockRangeSnapshotIter {
fn next(&mut self) -> bool {
true
}
fn prev(&mut self) -> bool {
true
}
fn seek(&mut self, _: &Key) -> EngineResult<bool> {
Ok(true)
}
fn seek_for_prev(&mut self, _: &Key) -> EngineResult<bool> {
Ok(true)
}
fn seek_to_first(&mut self) -> bool {
true
}
fn seek_to_last(&mut self) -> bool {
true
}
fn valid(&self) -> bool {
true
}
fn validate_key(&self, _: &Key) -> EngineResult<()> {
Ok(())
}
fn key(&self) -> &[u8] {
b""
}
fn value(&self) -> &[u8] {
b""
}
}

impl MockRangeSnapshot {
fn new(start: Vec<u8>, end: Vec<u8>) -> Self {
Self { start, end }
}
}

impl Snapshot for MockRangeSnapshot {
type Iter = MockRangeSnapshotIter;

fn get(&self, _: &Key) -> EngineResult<Option<Value>> {
Ok(None)
}
fn get_cf(&self, _: CfName, _: &Key) -> EngineResult<Option<Value>> {
Ok(None)
}
fn iter(&self, _: IterOption, _: ScanMode) -> EngineResult<Cursor<Self::Iter>> {
Ok(Cursor::new(
MockRangeSnapshotIter::default(),
ScanMode::Forward,
))
}
fn iter_cf(
&self,
_: CfName,
_: IterOption,
_: ScanMode,
) -> EngineResult<Cursor<Self::Iter>> {
Ok(Cursor::new(
MockRangeSnapshotIter::default(),
ScanMode::Forward,
))
}
fn lower_bound(&self) -> Option<&[u8]> {
Some(self.start.as_slice())
}
fn upper_bound(&self) -> Option<&[u8]> {
Some(self.end.as_slice())
}
}

#[test]
fn test_snapshot_store_get() {
let key_num = 100;
Expand Down Expand Up @@ -368,4 +489,81 @@ mod test {
}
assert_eq!(result, expected.into_iter().rev().collect::<Vec<_>>());
}

#[test]
fn test_scanner_verify_bound() {
// Store with a limited range
let snap = MockRangeSnapshot::new(b"b".to_vec(), b"c".to_vec());
let store = SnapshotStore::new(snap, 0, IsolationLevel::SI, true);
let bound_a = Key::from_encoded(b"a".to_vec());
let bound_b = Key::from_encoded(b"b".to_vec());
let bound_c = Key::from_encoded(b"c".to_vec());
let bound_d = Key::from_encoded(b"d".to_vec());
assert!(store.scanner(ScanMode::Forward, false, None, None).is_ok());
assert!(
store
.scanner(
ScanMode::Forward,
false,
Some(bound_b.clone()),
Some(bound_c.clone())
)
.is_ok()
);
assert!(
store
.scanner(
ScanMode::Forward,
false,
Some(bound_a.clone()),
Some(bound_c.clone())
)
.is_err()
);
assert!(
store
.scanner(
ScanMode::Forward,
false,
Some(bound_b.clone()),
Some(bound_d.clone())
)
.is_err()
);
assert!(
store
.scanner(
ScanMode::Forward,
false,
Some(bound_a.clone()),
Some(bound_d.clone())
)
.is_err()
);

// Store with whole range
let snap2 = MockRangeSnapshot::new(b"".to_vec(), b"".to_vec());
let store2 = SnapshotStore::new(snap2, 0, IsolationLevel::SI, true);
assert!(store2.scanner(ScanMode::Forward, false, None, None).is_ok());
assert!(
store2
.scanner(ScanMode::Forward, false, Some(bound_a.clone()), None)
.is_ok()
);
assert!(
store2
.scanner(
ScanMode::Forward,
false,
Some(bound_a.clone()),
Some(bound_b.clone())
)
.is_ok()
);
assert!(
store2
.scanner(ScanMode::Forward, false, None, Some(bound_c.clone()))
.is_ok()
);
}
}

0 comments on commit 75b8d92

Please sign in to comment.