forked from dgraph-io/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
managed_db.go
180 lines (163 loc) · 5.08 KB
/
managed_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)
// OpenManaged returns a new DB, which allows more control over setting
// transaction timestamps, by setting managedDB=true.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func OpenManaged(opts Options) (*DB, error) {
opts.managedTxns = true
return Open(opts)
}
// NewTransactionAt follows the same logic as DB.NewTransaction(), but uses the
// provided read timestamp.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn {
if !db.opt.managedTxns {
panic("Cannot use NewTransactionAt with managedDB=false. Use NewTransaction instead.")
}
txn := db.newTransaction(update, true)
txn.readTs = readTs
return txn
}
// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with managed transactions.
//
// This is only useful for databases built on top of Badger (like Dgraph), and
// can be ignored by most users.
func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
if !txn.db.opt.managedTxns {
panic("Cannot use CommitAt with managedDB=false. Use Commit instead.")
}
txn.commitTs = commitTs
if callback == nil {
return txn.Commit()
}
txn.CommitWith(callback)
return nil
}
// SetDiscardTs sets a timestamp at or below which, any invalid or deleted
// versions can be discarded from the LSM tree, and thence from the value log to
// reclaim disk space. Can only be used with managed transactions.
func (db *DB) SetDiscardTs(ts uint64) {
if !db.opt.managedTxns {
panic("Cannot use SetDiscardTs with managedDB=false.")
}
db.orc.setDiscardTs(ts)
}
var errDone = errors.New("Done deleting keys")
// DropAll would drop all the data stored in Badger. It does this in the following way.
// - Stop accepting new writes.
// - Pause the compactions.
// - Pick all tables from all levels, create a changeset to delete all these
// tables and apply it to manifest. DO not pick up the latest table from level
// 0, to preserve the (persistent) badgerHead key.
// - Iterate over the KVs in Level 0, and run deletes on them via transactions.
// - The deletions are done at the same timestamp as the latest version of the
// key. Thus, we could write the keys back at the same timestamp as before.
//
// DropAll is only available with managed transactions.
func (db *DB) DropAll() error {
if !db.opt.managedTxns {
panic("DropAll is only available with managedDB=true.")
}
// Stop accepting new writes.
atomic.StoreInt32(&db.blockWrites, 1)
// Wait for writeCh to reach size of zero. This is not ideal, but a very
// simple way to allow writeCh to flush out, before we proceed.
tick := time.NewTicker(100 * time.Millisecond)
for range tick.C {
if len(db.writeCh) == 0 {
break
}
}
tick.Stop()
// Stop the compactions.
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
}
_, err := db.lc.deleteLSMTree()
// Allow writes so that we can run transactions. Ideally, the user must ensure that they're not
// doing more writes concurrently while this operation is happening.
atomic.StoreInt32(&db.blockWrites, 0)
// Need compactions to happen so deletes below can be flushed out.
if db.closers.compactors != nil {
db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
}
if err != nil {
return err
}
type KV struct {
key []byte
version uint64
}
var kvs []KV
getKeys := func() error {
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
opts := DefaultIteratorOptions
opts.PrefetchValues = false
itr := txn.NewIterator(opts)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
kvs = append(kvs, KV{item.KeyCopy(nil), item.Version()})
}
return nil
}
if err := getKeys(); err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, kv := range kvs {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
if err := txn.Delete(kv.key); err != nil {
return err
}
if err := txn.CommitAt(kv.version, func(rerr error) {
if rerr != nil {
select {
case errCh <- rerr:
default:
}
}
wg.Done()
}); err != nil {
return err
}
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}