Skip to content

Commit

Permalink
rocksdb: commit acquires lock (#576)
Browse files Browse the repository at this point in the history
* commit acquires lock directly

* check conditions after acquiring lock
  • Loading branch information
chm-diederichs authored Oct 9, 2024
1 parent 1dd49ed commit 338cb58
Showing 1 changed file with 43 additions and 46 deletions.
89 changes: 43 additions & 46 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -389,32 +389,6 @@ class SessionState {
this.mutex.unlock()
}
}

async upgrade (start, end, batch, values, keyPair) {
await this.mutex.lock()

const update = this.createUpdate()

try {
// upsert compat manifest
if (this.core.verifier === null && keyPair) this.core._setManifest(update, null, keyPair)

this.blocks.putBatch(update.batch, start, values)

update.bitfield.setRange(start, end, true)
update.flushTreeBatch(batch)

const bitfield = { start, length: end - start, drop: false }
const status = batch.upgraded ? 0b0001 : 0

update.coreUpdate({ status, bitfield, value: null, from: null })

await this.flushUpdate(update)
} finally {
this._clearActiveBatch()
this.mutex.unlock()
}
}
}

module.exports = class Core {
Expand Down Expand Up @@ -850,37 +824,60 @@ module.exports = class Core {
}

async commit (state, { signature, keyPair = this.header.keyPair, length = state.tree.length, treeLength = state.treeLength } = {}) {
if (this.tree.fork !== state.tree.fork) return null
await this.state.mutex.lock()

if (this.tree.length > state.tree.length) return null // TODO: partial commit in the future if possible
const update = this.state.createUpdate()

if (this.tree.length > treeLength) {
for (const root of this.tree.roots) {
const batchRoot = await state.tree.get(root.index)
if (batchRoot.size !== root.size || !b4a.equals(batchRoot.hash, root.hash)) {
return null
try {
if (this.tree.fork !== state.tree.fork) return null

if (this.tree.length > state.tree.length) return null // TODO: partial commit in the future if possible

if (this.tree.length > treeLength) {
for (const root of this.tree.roots) {
const batchRoot = await state.tree.get(root.index)
if (batchRoot.size !== root.size || !b4a.equals(batchRoot.hash, root.hash)) {
return null
}
}
}
}

const promises = []
const promises = []

const reader = state.storage.createReadBatch()
for (let i = treeLength; i < length; i++) promises.push(reader.getBlock(i))
reader.tryFlush()
const reader = state.storage.createReadBatch()
for (let i = treeLength; i < length; i++) promises.push(reader.getBlock(i))
reader.tryFlush()

const values = await Promise.all(promises)
const values = await Promise.all(promises)

const batch = await this.tree.reconcile(state.tree, length, treeLength)
if (batch.upgraded) batch.signature = signature || this.verifier.sign(batch, keyPair)
const batch = await this.tree.reconcile(state.tree, length, treeLength)
if (batch.upgraded) batch.signature = signature || this.verifier.sign(batch, keyPair)

await this.state.upgrade(treeLength, length, batch, values, keyPair)
// upsert compat manifest
if (this.verifier === null && keyPair) this._setManifest(update, null, keyPair)

state.treeLength = batch.length
this.state.blocks.putBatch(update.batch, treeLength, values)

return {
length: batch.length,
byteLength: batch.byteLength
update.bitfield.setRange(treeLength, length, true)
update.flushTreeBatch(batch)

const bitfield = { start: treeLength, length: length - treeLength, drop: false }
const status = batch.upgraded ? 0b0001 : 0

update.coreUpdate({ status, bitfield, value: null, from: null })

await this.state.flushUpdate(update)

state.treeLength = batch.length

return {
length: batch.length,
byteLength: batch.byteLength
}
} finally {
this.state._clearActiveBatch()
this.updating = false
this.state.mutex.unlock()
}
}

Expand Down

0 comments on commit 338cb58

Please sign in to comment.