forked from talent-plan/tinykv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
104 lines (86 loc) · 3.18 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package server
import (
"context"
"github.com/pingcap-incubator/tinykv/kv/coprocessor"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/storage/raft_storage"
"github.com/pingcap-incubator/tinykv/kv/transaction/latches"
coppb "github.com/pingcap-incubator/tinykv/proto/pkg/coprocessor"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/tinykvpb"
"github.com/pingcap/tidb/kv"
)
var _ tinykvpb.TinyKvServer = new(Server)
// Server is a TinyKV server, it 'faces outwards', sending and receiving messages from clients such as TinySQL.
type Server struct {
storage storage.Storage
// (Used in 4A/4B)
Latches *latches.Latches
// coprocessor API handler, out of course scope
copHandler *coprocessor.CopHandler
}
func NewServer(storage storage.Storage) *Server {
return &Server{
storage: storage,
Latches: latches.NewLatches(),
}
}
// The below functions are Server's gRPC API (implements TinyKvServer).
// Raft commands (tinykv <-> tinykv)
// Only used for RaftStorage, so trivially forward it.
func (server *Server) Raft(stream tinykvpb.TinyKv_RaftServer) error {
return server.storage.(*raft_storage.RaftStorage).Raft(stream)
}
// Snapshot stream (tinykv <-> tinykv)
// Only used for RaftStorage, so trivially forward it.
func (server *Server) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
return server.storage.(*raft_storage.RaftStorage).Snapshot(stream)
}
// Transactional API.
func (server *Server) KvGet(_ context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
// Your Code Here (4B).
return nil, nil
}
func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
// Your Code Here (4B).
return nil, nil
}
func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
// Your Code Here (4B).
return nil, nil
}
func (server *Server) KvScan(_ context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
// Your Code Here (4C).
return nil, nil
}
func (server *Server) KvCheckTxnStatus(_ context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
// Your Code Here (4C).
return nil, nil
}
func (server *Server) KvBatchRollback(_ context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
// Your Code Here (4C).
return nil, nil
}
func (server *Server) KvResolveLock(_ context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
// Your Code Here (4C).
return nil, nil
}
// SQL push down commands.
func (server *Server) Coprocessor(_ context.Context, req *coppb.Request) (*coppb.Response, error) {
resp := new(coppb.Response)
reader, err := server.storage.Reader(req.Context)
if err != nil {
if regionErr, ok := err.(*raft_storage.RegionError); ok {
resp.RegionError = regionErr.RequestErr
return resp, nil
}
return nil, err
}
switch req.Tp {
case kv.ReqTypeDAG:
return server.copHandler.HandleCopDAGRequest(reader, req), nil
case kv.ReqTypeAnalyze:
return server.copHandler.HandleCopAnalyzeRequest(reader, req), nil
}
return nil, nil
}