Skip to content

Commit

Permalink
Merge pull request #37 from mit-pdos/history
Browse files Browse the repository at this point in the history
history
  • Loading branch information
sanjit-bhat authored Oct 23, 2024
2 parents 6dc6d03 + 4cbccb5 commit 5b767a3
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 125 deletions.
6 changes: 3 additions & 3 deletions kt/basictest.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func testBasic(setup *setupParams) {
primitive.Assume(!err2)

adtrs := mkRpcClients(setup.adtrAddrs)
updAdtrs(upd0, adtrs)
updAdtrs(upd1, adtrs)
updAdtrsOnce(upd0, adtrs)
updAdtrsOnce(upd1, adtrs)

// bob get.
bob := newClient(bobUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
Expand All @@ -80,7 +80,7 @@ func mkRpcClients(addrs []uint64) []*advrpc.Client {
return c
}

func updAdtrs(upd *UpdateProof, adtrs []*advrpc.Client) {
func updAdtrsOnce(upd *UpdateProof, adtrs []*advrpc.Client) {
for _, cli := range adtrs {
err := callAdtrUpdate(cli, upd)
primitive.Assume(!err)
Expand Down
21 changes: 21 additions & 0 deletions kt/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package kt

type HistEntry struct {
Epoch uint64
HistVal []byte
}

// GetHist searches hist at the epoch and rets the latest val, or false
// if there's no registered val.
func GetHist(o []*HistEntry, epoch uint64) (bool, []byte) {
var isReg bool
var val []byte
// entries inv: ordered by epoch field.
for _, e := range o {
if e.Epoch <= epoch {
isReg = true
val = e.HistVal
}
}
return isReg, val
}
7 changes: 3 additions & 4 deletions kt/kt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
)

func TestAll(t *testing.T) {
serverAddr := makeUniqueAddr()
adtr0Addr := makeUniqueAddr()
adtr1Addr := makeUniqueAddr()
testAll(serverAddr, adtr0Addr, adtr1Addr)
servAddr := makeUniqueAddr()
adtrAddrs := []uint64{makeUniqueAddr(), makeUniqueAddr()}
testAllFull(servAddr, adtrAddrs)
}

func TestBasic(t *testing.T) {
Expand Down
146 changes: 49 additions & 97 deletions kt/test.go
Original file line number Diff line number Diff line change
@@ -1,152 +1,104 @@
package kt

// set global timing such that:
// - chaos interlaces enough with alice.
// - chaos mostly has up-to-date audits.
// - bob queries somewhere around halfway thru alice's puts.
// - before alice and bob finally check keys, the auditor has caught up.

import (
"github.com/goose-lang/primitive"
"github.com/goose-lang/std"
"github.com/mit-pdos/pav/advrpc"
"github.com/mit-pdos/pav/cryptoffi"
"sync"
)

const (
aliceUid uint64 = 0
bobUid uint64 = 1
charlieUid uint64 = 2
aliceUid uint64 = 0
bobUid uint64 = 1
)

func testAll(servAddr, adtr0Addr, adtr1Addr uint64) {
// start server and auditors.
serv, servSigPk, servVrfPk := newServer()
servRpc := newRpcServer(serv)
servRpc.Serve(servAddr)
adtr0, adtr0Pk := newAuditor(servSigPk)
adtr0Rpc := newRpcAuditor(adtr0)
adtr0Rpc.Serve(adtr0Addr)
adtr1, adtr1Pk := newAuditor(servSigPk)
adtr1Rpc := newRpcAuditor(adtr1)
adtr1Rpc.Serve(adtr1Addr)
primitive.Sleep(1_000_000)
func testAllFull(servAddr uint64, adtrAddrs []uint64) {
testAll(setup(servAddr, adtrAddrs))
}

// run background threads.
go func() {
charlie := newClient(charlieUid, servAddr, servSigPk, servVrfPk)
chaos(charlie, adtr0Addr, adtr1Addr, adtr0Pk, adtr1Pk)
}()
go func() {
syncAdtr(servAddr, adtr0Addr, adtr1Addr)
}()
func testAll(setup *setupParams) {
aliceCli := newClient(aliceUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
alice := &alice{cli: aliceCli}
bobCli := newClient(bobUid, setup.servAddr, setup.servSigPk, setup.servVrfPk)
bob := &bob{cli: bobCli}

// run alice and bob.
alice := &alice{}
aliceMu := new(sync.Mutex)
aliceMu.Lock()
aliceCli := newClient(aliceUid, servAddr, servSigPk, servVrfPk)
wg := new(sync.WaitGroup)
wg.Add(1)
wg.Add(1)
// alice does a bunch of puts.
go func() {
alice.run(aliceCli)
aliceMu.Unlock()
alice.run()
wg.Done()
}()
bob := &bob{}
bobMu := new(sync.Mutex)
bobMu.Lock()
bobCli := newClient(bobUid, servAddr, servSigPk, servVrfPk)
// bob does a get at some time in the middle of alice's puts.
go func() {
bob.run(bobCli)
bobMu.Unlock()
bob.run()
wg.Done()
}()
wg.Wait()

// wait for alice and bob to finish.
aliceMu.Lock()
bobMu.Lock()

// alice SelfMon + Audit. bob Audit. ordering irrelevant across clients.
primitive.Sleep(1000_000_000)
selfMonEp, err0 := aliceCli.SelfMon()
// alice self monitor. in real world, she'll come on-line at times and do this.
selfMonEp, err0 := alice.cli.SelfMon()
primitive.Assume(!err0.err)
// could also state this as bob.epoch <= last epoch in TS.
// this last self monitor will be our history bound.
primitive.Assume(bob.epoch <= selfMonEp)
err1 := aliceCli.Audit(adtr0Addr, adtr0Pk)
primitive.Assume(!err1.err)
err2 := aliceCli.Audit(adtr1Addr, adtr1Pk)
primitive.Assume(!err2.err)
err3 := bobCli.Audit(adtr0Addr, adtr0Pk)
primitive.Assume(!err3.err)
err4 := bobCli.Audit(adtr1Addr, adtr1Pk)
primitive.Assume(!err4.err)

// sync auditors. in real world, this'll happen periodically.
updAdtrsAll(setup.servAddr, setup.adtrAddrs)

// alice and bob audit. ordering irrelevant across clients.
doAudits(alice.cli, setup.adtrAddrs, setup.adtrPks)
doAudits(bob.cli, setup.adtrAddrs, setup.adtrPks)

// final check. bob got the right key.
isReg, aliceKey := GetTimeSeries(alice.pks, bob.epoch)
isReg, aliceKey := GetHist(alice.hist, bob.epoch)
primitive.Assert(isReg == bob.isReg)
if isReg {
primitive.Assert(std.BytesEqual(aliceKey, bob.alicePk))
}
}

type alice struct {
pks []*TimeSeriesEntry
cli *Client
hist []*HistEntry
}

func (a *alice) run(cli *Client) {
for i := byte(0); i < byte(20); i++ {
primitive.Sleep(50_000_000)
pk := []byte{i}
epoch, err0 := cli.Put(pk)
func (a *alice) run() {
for i := uint64(0); i < uint64(20); i++ {
primitive.Sleep(5_000_000)
pk := []byte{byte(i)}
epoch, err0 := a.cli.Put(pk)
primitive.Assume(!err0.err)
a.pks = append(a.pks, &TimeSeriesEntry{Epoch: epoch, TSVal: pk})
a.hist = append(a.hist, &HistEntry{Epoch: epoch, HistVal: pk})
}
}

type bob struct {
cli *Client
epoch uint64
isReg bool
alicePk []byte
}

func (b *bob) run(cli *Client) {
primitive.Sleep(550_000_000)
isReg, pk, epoch, err0 := cli.Get(aliceUid)
func (b *bob) run() {
primitive.Sleep(120_000_000)
isReg, pk, epoch, err0 := b.cli.Get(aliceUid)
primitive.Assume(!err0.err)
b.epoch = epoch
b.isReg = isReg
b.alicePk = pk
}

// chaos from Charlie running all the ops.
func chaos(charlie *Client, adtr0Addr, adtr1Addr uint64, adtr0Pk, adtr1Pk cryptoffi.PublicKey) {
for {
primitive.Sleep(40_000_000)
pk := []byte{2}
_, err0 := charlie.Put(pk)
primitive.Assume(!err0.err)
_, _, _, err1 := charlie.Get(aliceUid)
primitive.Assume(!err1.err)
_, err2 := charlie.SelfMon()
primitive.Assume(!err2.err)
charlie.Audit(adtr0Addr, adtr0Pk)
charlie.Audit(adtr1Addr, adtr1Pk)
}
}

func syncAdtr(servAddr, adtr0Addr, adtr1Addr uint64) {
func updAdtrsAll(servAddr uint64, adtrAddrs []uint64) {
servCli := advrpc.Dial(servAddr)
adtr0Cli := advrpc.Dial(adtr0Addr)
adtr1Cli := advrpc.Dial(adtr1Addr)
adtrs := mkRpcClients(adtrAddrs)
var epoch uint64
for {
primitive.Sleep(1_000_000)
upd, err0 := callServAudit(servCli, epoch)
if err0 {
continue
upd, err := callServAudit(servCli, epoch)
if err {
break
}
err1 := callAdtrUpdate(adtr0Cli, upd)
primitive.Assume(!err1)
err2 := callAdtrUpdate(adtr1Cli, upd)
primitive.Assume(!err2)
updAdtrsOnce(upd, adtrs)
epoch++
}
}
21 changes: 0 additions & 21 deletions kt/timeseries.go

This file was deleted.

0 comments on commit 5b767a3

Please sign in to comment.