From 6550abdfccefad062a3076bed4e5c1f5e6cd2ce1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 16:53:50 +0200 Subject: [PATCH 01/21] introduce execution lanes --- chain/vm/execution.go | 173 ++++++++++++++++++++++++++++++++++++++++++ chain/vm/vm.go | 2 + chain/vm/vmi.go | 41 +++++++++- 3 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 chain/vm/execution.go diff --git a/chain/vm/execution.go b/chain/vm/execution.go new file mode 100644 index 00000000000..e55883dae41 --- /dev/null +++ b/chain/vm/execution.go @@ -0,0 +1,173 @@ +package vm + +import ( + "context" + "errors" + "os" + "strconv" + "sync" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" +) + +const ( + DefaultAvailableExecutionLanes = 4 + DefaultPriorityExecutionLanes = 2 +) + +var ErrExecutorDone = errors.New("executor has been released") + +// the execution environment; see below for definition, methods, and initilization +var execution *executionEnv + +// implementation of vm executor with simple sanity check preventing use after free. +type vmExecutor struct { + lk sync.RWMutex + vmi Interface + token *executionToken + done bool +} + +var _ Executor = (*vmExecutor)(nil) + +func newVMExecutor(vmi Interface, token *executionToken) Executor { + return &vmExecutor{vmi: vmi, token: token} +} + +func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyMessage(ctx, cmsg) +} + +func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyImplicitMessage(ctx, msg) +} + +func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return cid.Undef, ErrExecutorDone + } + + return e.vmi.Flush(ctx) +} + +func (e *vmExecutor) Done() { + e.lk.Lock() + defer e.lk.Unlock() + + e.token.Done() + e.token = nil + e.done = true +} + +type executionToken struct { + reserved int +} + +func (token *executionToken) Done() { + execution.putToken(token) +} + +type executionEnv struct { + mx *sync.Mutex + cond *sync.Cond + + // available executors + available int + // reserved executors + reserved int +} + +func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + e.mx.Lock() + defer e.mx.Unlock() + + switch lane { + case ExecutionLaneDefault: + for e.available <= e.reserved { + e.cond.Wait() + } + + e.available-- + return &executionToken{reserved: 0} + + case ExecutionLanePriority: + for e.available == 0 { + e.cond.Wait() + } + + e.available-- + + reserving := 0 + if e.reserved > 0 { + e.reserved-- + reserving = 1 + } + return &executionToken{reserved: reserving} + + default: + // already checked at interface boundary in NewVM, so this is appropriate + panic("bogus execution lane") + } +} + +func (e *executionEnv) putToken(token *executionToken) { + e.mx.Lock() + defer e.mx.Unlock() + + e.available++ + e.reserved += token.reserved + + e.cond.Broadcast() +} + +func init() { + var available, priority int + var err error + + concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY") + if concurrency == "" { + available = DefaultAvailableExecutionLanes + } + available, err = strconv.Atoi(concurrency) + if err != nil { + panic(err) + } + + reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED") + if reserved == "" { + priority = DefaultPriorityExecutionLanes + } + priority, err = strconv.Atoi(reserved) + if err != nil { + panic(err) + } + + mx := &sync.Mutex{} + cond := sync.NewCond(mx) + + execution = &executionEnv{ + mx: mx, + cond: cond, + available: available, + reserved: priority, + } +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index c8e3f251907..6fbe7933b68 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -250,6 +250,8 @@ type VMOpts struct { Tracing bool // ReturnEvents decodes and returns emitted events. ReturnEvents bool + // ExecutionLane specifies the execution priority of the created vm + ExecutionLane ExecutionLane } func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) { diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index 01b32d4ad17..e5d5daff8a6 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "os" cid "github.com/ipfs/go-cid" @@ -17,6 +18,15 @@ var ( StatApplied uint64 ) +type ExecutionLane int + +const ( + // ExecutionLaneDefault signifies a default, non prioritized execution lane. + ExecutionLaneDefault ExecutionLane = iota + // ExecutionLanePriority signifies a prioritized execution lane with reserved resources. + ExecutionLanePriority +) + type Interface interface { // Applies the given message onto the VM's current state, returning the result of the execution ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) @@ -27,13 +37,24 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } +// Executor is the general vm execution interface, which is prioritized according to execution langes. +// User must call Done when it is done with this executor to release resource holds by the execution +// environment +type Executor interface { + Interface + + // Done must be called when done with the executor to release resource holds. + // It is an error to invoke Interface methods after Done has been called. + Done() +} + // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // An envvar that allows the user to specify debug actors bundles to be used by the FVM // alongside regular execution. This is basically only to be used to print out specific logging information. // Message failures, unexpected terminations,gas costs, etc. should all be ignored. var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" -func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { +func newVM(ctx context.Context, opts *VMOpts) (Interface, error) { if opts.NetworkVersion >= network.Version16 { if useFvmDebug { return NewDualExecutionFVM(ctx, opts) @@ -43,3 +64,21 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } + +func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { + switch opts.ExecutionLane { + case ExecutionLaneDefault, ExecutionLanePriority: + default: + return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) + } + + token := execution.getToken(opts.ExecutionLane) + + vmi, err := newVM(ctx, opts) + if err != nil { + token.Done() + return nil, err + } + + return newVMExecutor(vmi, token), nil +} From 7362556c02877b3c8d4f8069a10b303f287c0552 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 17:17:46 +0200 Subject: [PATCH 02/21] update VM interface references to use the executor, and call Done where appropriate --- chain/consensus/compute_state.go | 10 ++++++++-- chain/gen/genesis/genesis.go | 1 + chain/gen/genesis/miners.go | 6 +++++- chain/stmgr/forks_test.go | 30 ++++++++++++++++++------------ chain/stmgr/stmgr.go | 8 ++++---- conformance/driver.go | 2 +- 6 files changed, 37 insertions(+), 20 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index e627a62d2ac..cf05d612d26 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, }() ctx = blockstore.WithHotView(ctx) - makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) { + makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: e, @@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts), Tracing: vmTracing, ReturnEvents: sm.ChainStore().IsStoringEvents(), + ExecutionLane: vm.ExecutionLanePriority, } return sm.VMConstructor()(ctx, vmopt) @@ -116,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, var cronGas int64 - runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error { + runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error { cronMsg := &types.Message{ To: cron.Address, From: builtin.SystemActorAddr, @@ -169,13 +170,17 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, // run cron for null rounds if any if err = runCron(vmCron, i); err != nil { + vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err) } pstate, err = vmCron.Flush(ctx) if err != nil { + vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err) } + + vmCron.Done() } // handle state forks @@ -195,6 +200,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } + defer vmi.Done() var ( receipts []*types.MessageReceipt diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 3e88480218e..3ef8de968ec 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -496,6 +496,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca if err != nil { return cid.Undef, xerrors.Errorf("failed to create VM: %w", err) } + defer vm.Done() for mi, m := range template.Miners { for si, s := range m.Sectors { diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 5f741fd7c58..6e5be0b0adf 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return big.Zero(), nil } - newVM := func(base cid.Cid) (vm.Interface, error) { + newVM := func(base cid.Cid) (vm.Executor, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: 0, @@ -108,6 +108,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } + defer genesisVm.Done() if len(miners) == 0 { return cid.Undef, xerrors.New("no genesis miners") @@ -338,6 +339,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -410,6 +412,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -517,6 +520,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index f91d8997d6c..d852e2fdd2a 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -56,6 +56,12 @@ const testForkHeight = 40 type testActor struct { } +type mockExecutor struct { + vm.Interface +} + +func (*mockExecutor) Done() {} + // must use existing actor that an account is allowed to exec. func (testActor) Code() cid.Cid { return builtin0.PaymentChannelActorCodeID } func (testActor) State() cbor.Er { return new(testActorState) } @@ -178,13 +184,13 @@ func TestForkHeightTriggers(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -296,13 +302,13 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -518,13 +524,13 @@ func TestForkPreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -592,11 +598,11 @@ func TestDisablePreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -647,11 +653,11 @@ func TestMigrtionCache(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -691,11 +697,11 @@ func TestMigrtionCache(t *testing.T) { index.DummyMsgIndex, ) require.NoError(t, err) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) ctx := context.Background() diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 827aeeee571..5f201cf3230 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -125,7 +125,7 @@ type StateManager struct { compWait map[string]chan struct{} stlk sync.Mutex genesisMsigLk sync.Mutex - newVM func(context.Context, *vm.VMOpts) (vm.Interface, error) + newVM func(context.Context, *vm.VMOpts) (vm.Executor, error) Syscalls vm.SyscallBuilder preIgnitionVesting []msig0.State postIgnitionVesting []msig0.State @@ -439,12 +439,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err return nil } -func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) { +func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) { sm.newVM = nvm } -func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) { - return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) { +func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) { + return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) { return sm.newVM(ctx, opts) } } diff --git a/conformance/driver.go b/conformance/driver.go index e0d56d07410..c3041be7136 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -158,7 +158,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params results: []*vm.ApplyRet{}, } - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { vmopt.CircSupplyCalc = func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) { return big.Zero(), nil } From ee6c0f857068587646d9e98847aa2303f8d77913 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 17:28:08 +0200 Subject: [PATCH 03/21] only call Atoi on non empty strings --- chain/vm/execution.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index e55883dae41..8db0e43132e 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -146,19 +146,21 @@ func init() { concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY") if concurrency == "" { available = DefaultAvailableExecutionLanes - } - available, err = strconv.Atoi(concurrency) - if err != nil { - panic(err) + } else { + available, err = strconv.Atoi(concurrency) + if err != nil { + panic(err) + } } reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED") if reserved == "" { priority = DefaultPriorityExecutionLanes - } - priority, err = strconv.Atoi(reserved) - if err != nil { - panic(err) + } else { + priority, err = strconv.Atoi(reserved) + if err != nil { + panic(err) + } } mx := &sync.Mutex{} From 2bb89d9c30403fc7609e84af0fea5bc769db63ac Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 17:34:59 +0200 Subject: [PATCH 04/21] call Executor.Done where appropriate in stmgr uses --- chain/stmgr/call.go | 3 +++ chain/stmgr/utils.go | 1 + 2 files changed, 4 insertions(+) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 901fc2d1253..aa09fbfd37a 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -159,6 +159,8 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } + defer vmi.Done() + for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) if err != nil { @@ -191,6 +193,7 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr vmopt.BaseFee = big.Zero() vmopt.StateBase = stateCid + vmi.Done() vmi, err = sm.newVM(ctx, vmopt) if err != nil { return nil, xerrors.Errorf("failed to set up estimation vm: %w", err) diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index c93267d50f8..78129cb164d 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -106,6 +106,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, if err != nil { return cid.Undef, nil, err } + defer vmi.Done() for i, msg := range msgs { // TODO: Use the signed message length for secp messages From 2a0660447a674111c5a72a849dcc1d709c041b89 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 17:38:36 +0200 Subject: [PATCH 05/21] make token.Done idempotent --- chain/vm/execution.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 8db0e43132e..66f82280b24 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -73,9 +73,11 @@ func (e *vmExecutor) Done() { e.lk.Lock() defer e.lk.Unlock() - e.token.Done() - e.token = nil - e.done = true + if !e.done { + e.token.Done() + e.token = nil + e.done = true + } } type executionToken struct { From 317a87d6699c7b03467e683cf2c0804b5289de99 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 18:52:37 +0200 Subject: [PATCH 06/21] add some sanity checks for execution concurrency parameters --- chain/vm/execution.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 66f82280b24..b2573dffc61 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -165,6 +165,15 @@ func init() { } } + // some sanity checks + if available < 2 { + panic("insufficient execution concurrency") + } + + if priority > available-1 { + panic("insufficient default execution concurrency") + } + mx := &sync.Mutex{} cond := sync.NewCond(mx) From f11a7f8940071cb369c234c31f04e93a1acf4a5a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Mar 2023 19:51:23 +0200 Subject: [PATCH 07/21] fix incorrect deferred vm release --- chain/gen/genesis/miners.go | 2 +- chain/stmgr/call.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 6e5be0b0adf..900389f565d 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -108,7 +108,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } - defer genesisVm.Done() + defer func() { genesisVm.Done() }() if len(miners) == 0 { return cid.Undef, xerrors.New("no genesis miners") diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index aa09fbfd37a..9633d6417d7 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -159,7 +159,7 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } - defer vmi.Done() + defer func() { vmi.Done() }() for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) From 4b590e2102a36ae2fd0b9511a914c3de5221dee6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Mar 2023 15:16:22 +0200 Subject: [PATCH 08/21] add vm execution metrics --- chain/vm/execution.go | 39 +++++++++++++++++++++++++++++++++++++-- metrics/metrics.go | 15 +++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index b2573dffc61..d984d0095c5 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -7,9 +7,13 @@ import ( "strconv" "sync" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) const ( @@ -81,6 +85,7 @@ func (e *vmExecutor) Done() { } type executionToken struct { + lane ExecutionLane reserved int } @@ -99,6 +104,9 @@ type executionEnv struct { } func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + metricsUp(metrics.VMExecutionWaiting, lane) + defer metricsDown(metrics.VMExecutionWaiting, lane) + e.mx.Lock() defer e.mx.Unlock() @@ -109,7 +117,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { } e.available-- - return &executionToken{reserved: 0} + + metricsUp(metrics.VMExecutionActive, lane) + return &executionToken{lane: lane, reserved: 0} case ExecutionLanePriority: for e.available == 0 { @@ -123,7 +133,9 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { e.reserved-- reserving = 1 } - return &executionToken{reserved: reserving} + + metricsUp(metrics.VMExecutionActive, lane) + return &executionToken{lane: lane, reserved: reserving} default: // already checked at interface boundary in NewVM, so this is appropriate @@ -139,6 +151,29 @@ func (e *executionEnv) putToken(token *executionToken) { e.reserved += token.reserved e.cond.Broadcast() + + metricsDown(metrics.VMExecutionActive, token.lane) +} + +func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, 1) +} + +func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, -1) +} + +func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { + laneName := "default" + if lane > ExecutionLaneDefault { + laneName = "priority" + } + + ctx, _ := tag.New( + context.Background(), + tag.Upsert(metrics.ExecutionLane, laneName), + ) + stats.Record(ctx, metric.M(int64(delta))) } func init() { diff --git a/metrics/metrics.go b/metrics/metrics.go index ca638ac273a..61bd86fbd14 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -58,6 +58,9 @@ var ( ProtocolID, _ = tag.NewKey("proto") Direction, _ = tag.NewKey("direction") UseFD, _ = tag.NewKey("use_fd") + + // vm execution + ExecutionLane, _ = tag.NewKey("lane") ) // Measures @@ -121,6 +124,8 @@ var ( VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds) VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless) VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless) + VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless) + VMExecutionActive = stats.Int64("vm/execution_default", "Counter for active VM executions", stats.UnitDimensionless) // miner WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) @@ -363,6 +368,16 @@ var ( Measure: VMApplied, Aggregation: view.LastValue(), } + VMExecutionWaitingView = &view.View{ + Measure: VMExecutionWaiting, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } + VMExecutionActiveView = &view.View{ + Measure: VMExecutionActive, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } // miner WorkerCallsStartedView = &view.View{ From 08134552a49da645b3c08f8f748be37ed6891dcb Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Mar 2023 15:48:58 +0200 Subject: [PATCH 09/21] address review comments --- chain/consensus/compute_state.go | 1 + chain/gen/genesis/miners.go | 1 + chain/stmgr/call.go | 1 + chain/vm/execution.go | 14 ++++++++++---- chain/vm/vmi.go | 2 +- metrics/metrics.go | 6 +++--- 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index cf05d612d26..3c1bab9ca78 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -196,6 +196,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, cronGas = 0 partDone = metrics.Timer(ctx, metrics.VMApplyMessages) + // TODO reorg the code to minimize the execution critical section vmi, err := makeVm(pstate, epoch, ts.MinTimestamp()) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 900389f565d..09b46a6e76c 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -108,6 +108,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } + // Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show. defer func() { genesisVm.Done() }() if len(miners) == 0 { diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 9633d6417d7..8e18c25df96 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -159,6 +159,7 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } + // Note: vmi is mutated, so this has to happen in a deferred func; go horror show. defer func() { vmi.Done() }() for i, m := range priorMsgs { diff --git a/chain/vm/execution.go b/chain/vm/execution.go index d984d0095c5..0edb1388484 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -17,8 +17,14 @@ import ( ) const ( + // DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of + // concurrent active executions. + // This is the default value in filecoin-ffi DefaultAvailableExecutionLanes = 4 - DefaultPriorityExecutionLanes = 2 + // DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations. + // This is purely userspace, but we believe it is a reasonable default, even with more available + // lanes. + DefaultPriorityExecutionLanes = 2 ) var ErrExecutorDone = errors.New("executor has been released") @@ -118,7 +124,7 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { e.available-- - metricsUp(metrics.VMExecutionActive, lane) + metricsUp(metrics.VMExecutionRunning, lane) return &executionToken{lane: lane, reserved: 0} case ExecutionLanePriority: @@ -134,7 +140,7 @@ func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { reserving = 1 } - metricsUp(metrics.VMExecutionActive, lane) + metricsUp(metrics.VMExecutionRunning, lane) return &executionToken{lane: lane, reserved: reserving} default: @@ -152,7 +158,7 @@ func (e *executionEnv) putToken(token *executionToken) { e.cond.Broadcast() - metricsDown(metrics.VMExecutionActive, token.lane) + metricsDown(metrics.VMExecutionRunning, token.lane) } func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index e5d5daff8a6..7aa52b58534 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -37,7 +37,7 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } -// Executor is the general vm execution interface, which is prioritized according to execution langes. +// Executor is the general vm execution interface, which is prioritized according to execution lanes. // User must call Done when it is done with this executor to release resource holds by the execution // environment type Executor interface { diff --git a/metrics/metrics.go b/metrics/metrics.go index 61bd86fbd14..bd1295d1778 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -125,7 +125,7 @@ var ( VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless) VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless) VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless) - VMExecutionActive = stats.Int64("vm/execution_default", "Counter for active VM executions", stats.UnitDimensionless) + VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless) // miner WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) @@ -373,8 +373,8 @@ var ( Aggregation: view.LastValue(), TagKeys: []tag.Key{ExecutionLane}, } - VMExecutionActiveView = &view.View{ - Measure: VMExecutionActive, + VMExecutionRunningView = &view.View{ + Measure: VMExecutionRunning, Aggregation: view.LastValue(), TagKeys: []tag.Key{ExecutionLane}, } From ddebdfb37cc1efb034aa8fce09c907dab3a5c014 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 27 Mar 2023 20:56:07 +0300 Subject: [PATCH 10/21] add execution metrics to the chain node views --- metrics/metrics.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metrics/metrics.go b/metrics/metrics.go index bd1295d1778..465fab63d01 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -742,6 +742,8 @@ var ChainNodeViews = append([]*view.View{ VMApplyFlushView, VMSendsView, VMAppliedView, + VMExecutionWaitingView, + VMExecutionRunningView, }, DefaultViews...) var MinerNodeViews = append([]*view.View{ From a0f908da5739be310558a96908f9cb8b44bcffde Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 27 Mar 2023 23:11:56 +0300 Subject: [PATCH 11/21] use Count instead of LastValue --- metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 465fab63d01..58d235acef4 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -370,12 +370,12 @@ var ( } VMExecutionWaitingView = &view.View{ Measure: VMExecutionWaiting, - Aggregation: view.LastValue(), + Aggregation: view.Count(), TagKeys: []tag.Key{ExecutionLane}, } VMExecutionRunningView = &view.View{ Measure: VMExecutionRunning, - Aggregation: view.LastValue(), + Aggregation: view.Count(), TagKeys: []tag.Key{ExecutionLane}, } From 6ecaf826af63a01b5f6df76e90adb7cab4cfc8cc Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 27 Mar 2023 23:17:41 +0300 Subject: [PATCH 12/21] no, Sum it is. --- metrics/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 58d235acef4..13627a663d5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -370,12 +370,12 @@ var ( } VMExecutionWaitingView = &view.View{ Measure: VMExecutionWaiting, - Aggregation: view.Count(), + Aggregation: view.Sum(), TagKeys: []tag.Key{ExecutionLane}, } VMExecutionRunningView = &view.View{ Measure: VMExecutionRunning, - Aggregation: view.Count(), + Aggregation: view.Sum(), TagKeys: []tag.Key{ExecutionLane}, } From dcd9869842d64fbce4c46d07a608d90f0cbb4baf Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 28 Mar 2023 16:58:09 +0300 Subject: [PATCH 13/21] make gen --- chain/vm/execution.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 0edb1388484..1642dc2f8df 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -7,11 +7,10 @@ import ( "strconv" "sync" + "github.com/ipfs/go-cid" "go.opencensus.io/stats" "go.opencensus.io/tag" - "github.com/ipfs/go-cid" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" ) From b2b78e9dfa110186ae868b953d60cbc7015022ba Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 28 Mar 2023 17:56:35 +0300 Subject: [PATCH 14/21] Update chain/vm/execution.go Co-authored-by: Aayush Rajasekaran --- chain/vm/execution.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 1642dc2f8df..58f61ca6d9d 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -210,7 +210,7 @@ func init() { panic("insufficient execution concurrency") } - if priority > available-1 { + if available <= priority { panic("insufficient default execution concurrency") } From b27121612e1b679a1b558ed51bf297ea9ccd65e7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 28 Mar 2023 18:03:55 +0300 Subject: [PATCH 15/21] rename confusing variable --- chain/consensus/compute_state.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index 3c1bab9ca78..449a0ed207c 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -264,7 +264,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, return cid.Cid{}, cid.Cid{}, err } - vmCron := partDone() + vmDoCron := partDone() partDone = metrics.Timer(ctx, metrics.VMApplyFlush) rectarr := blockadt.MakeEmptyArray(sm.ChainStore().ActorStore(ctx)) @@ -303,7 +303,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, vmFlush := partDone() partDone = func() time.Duration { return time.Duration(0) } - log.Infow("ApplyBlocks stats", "early", vmEarly, "earlyCronGas", earlyCronGas, "vmMsg", vmMsg, "msgGas", msgGas, "vmCron", vmCron, "cronGas", cronGas, "vmFlush", vmFlush, "epoch", epoch, "tsk", ts.Key()) + log.Infow("ApplyBlocks stats", "early", vmEarly, "earlyCronGas", earlyCronGas, "vmMsg", vmMsg, "msgGas", msgGas, "vmCron", vmDoCron, "cronGas", cronGas, "vmFlush", vmFlush, "epoch", epoch, "tsk", ts.Key()) stats.Record(ctx, metrics.VMSends.M(int64(atomic.LoadUint64(&vm.StatSends))), metrics.VMApplied.M(int64(atomic.LoadUint64(&vm.StatApplied)))) From 71650cd8a4907deac2f1089d7354b17e91cc2a48 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 28 Mar 2023 18:05:00 +0300 Subject: [PATCH 16/21] rename newVM to makeVM for a happy yushie --- chain/vm/vmi.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index 7aa52b58534..a19c38fceab 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -54,7 +54,7 @@ type Executor interface { // Message failures, unexpected terminations,gas costs, etc. should all be ignored. var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" -func newVM(ctx context.Context, opts *VMOpts) (Interface, error) { +func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) { if opts.NetworkVersion >= network.Version16 { if useFvmDebug { return NewDualExecutionFVM(ctx, opts) @@ -74,7 +74,7 @@ func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { token := execution.getToken(opts.ExecutionLane) - vmi, err := newVM(ctx, opts) + vmi, err := makeVM(ctx, opts) if err != nil { token.Done() return nil, err From 4184ce9c7571a68821ccf74a7dd7ce329801df97 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 29 Mar 2023 16:45:45 +0300 Subject: [PATCH 17/21] refactor execution lanes: hide the lock --- chain/consensus/compute_state.go | 9 ++----- chain/gen/genesis/genesis.go | 1 - chain/gen/genesis/miners.go | 7 +---- chain/stmgr/call.go | 3 --- chain/stmgr/stmgr.go | 8 +++--- chain/stmgr/utils.go | 1 - chain/vm/execution.go | 46 +++++++------------------------- chain/vm/vmi.go | 18 ++----------- 8 files changed, 18 insertions(+), 75 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index 449a0ed207c..8442b55e9a7 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, }() ctx = blockstore.WithHotView(ctx) - makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) { + makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: e, @@ -117,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, var cronGas int64 - runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error { + runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error { cronMsg := &types.Message{ To: cron.Address, From: builtin.SystemActorAddr, @@ -170,17 +170,13 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, // run cron for null rounds if any if err = runCron(vmCron, i); err != nil { - vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err) } pstate, err = vmCron.Flush(ctx) if err != nil { - vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err) } - - vmCron.Done() } // handle state forks @@ -201,7 +197,6 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } - defer vmi.Done() var ( receipts []*types.MessageReceipt diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 3ef8de968ec..3e88480218e 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -496,7 +496,6 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca if err != nil { return cid.Undef, xerrors.Errorf("failed to create VM: %w", err) } - defer vm.Done() for mi, m := range template.Miners { for si, s := range m.Sectors { diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 09b46a6e76c..5f741fd7c58 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return big.Zero(), nil } - newVM := func(base cid.Cid) (vm.Executor, error) { + newVM := func(base cid.Cid) (vm.Interface, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: 0, @@ -108,8 +108,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } - // Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show. - defer func() { genesisVm.Done() }() if len(miners) == 0 { return cid.Undef, xerrors.New("no genesis miners") @@ -340,7 +338,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -413,7 +410,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -521,7 +517,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 8e18c25df96..816f50d1036 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -159,8 +159,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } - // Note: vmi is mutated, so this has to happen in a deferred func; go horror show. - defer func() { vmi.Done() }() for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) @@ -194,7 +192,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr vmopt.BaseFee = big.Zero() vmopt.StateBase = stateCid - vmi.Done() vmi, err = sm.newVM(ctx, vmopt) if err != nil { return nil, xerrors.Errorf("failed to set up estimation vm: %w", err) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 5f201cf3230..827aeeee571 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -125,7 +125,7 @@ type StateManager struct { compWait map[string]chan struct{} stlk sync.Mutex genesisMsigLk sync.Mutex - newVM func(context.Context, *vm.VMOpts) (vm.Executor, error) + newVM func(context.Context, *vm.VMOpts) (vm.Interface, error) Syscalls vm.SyscallBuilder preIgnitionVesting []msig0.State postIgnitionVesting []msig0.State @@ -439,12 +439,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err return nil } -func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) { +func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) { sm.newVM = nvm } -func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) { - return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) { +func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) { + return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) { return sm.newVM(ctx, opts) } } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 78129cb164d..c93267d50f8 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -106,7 +106,6 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, if err != nil { return cid.Undef, nil, err } - defer vmi.Done() for i, msg := range msgs { // TODO: Use the signed message length for secp messages diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 58f61ca6d9d..fb86a3a7d5e 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -33,62 +33,34 @@ var execution *executionEnv // implementation of vm executor with simple sanity check preventing use after free. type vmExecutor struct { - lk sync.RWMutex - vmi Interface - token *executionToken - done bool + vmi Interface + lane ExecutionLane } -var _ Executor = (*vmExecutor)(nil) +var _ Interface = (*vmExecutor)(nil) -func newVMExecutor(vmi Interface, token *executionToken) Executor { - return &vmExecutor{vmi: vmi, token: token} +func newVMExecutor(vmi Interface, lane ExecutionLane) Interface { + return &vmExecutor{vmi: vmi, lane: lane} } func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return nil, ErrExecutorDone - } + token := execution.getToken(e.lane) + defer token.Done() return e.vmi.ApplyMessage(ctx, cmsg) } func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return nil, ErrExecutorDone - } + token := execution.getToken(e.lane) + defer token.Done() return e.vmi.ApplyImplicitMessage(ctx, msg) } func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return cid.Undef, ErrExecutorDone - } - return e.vmi.Flush(ctx) } -func (e *vmExecutor) Done() { - e.lk.Lock() - defer e.lk.Unlock() - - if !e.done { - e.token.Done() - e.token = nil - e.done = true - } -} - type executionToken struct { lane ExecutionLane reserved int diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index a19c38fceab..042621ca2d4 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -37,17 +37,6 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } -// Executor is the general vm execution interface, which is prioritized according to execution lanes. -// User must call Done when it is done with this executor to release resource holds by the execution -// environment -type Executor interface { - Interface - - // Done must be called when done with the executor to release resource holds. - // It is an error to invoke Interface methods after Done has been called. - Done() -} - // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // An envvar that allows the user to specify debug actors bundles to be used by the FVM // alongside regular execution. This is basically only to be used to print out specific logging information. @@ -65,20 +54,17 @@ func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } -func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { +func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { switch opts.ExecutionLane { case ExecutionLaneDefault, ExecutionLanePriority: default: return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) } - token := execution.getToken(opts.ExecutionLane) - vmi, err := makeVM(ctx, opts) if err != nil { - token.Done() return nil, err } - return newVMExecutor(vmi, token), nil + return newVMExecutor(vmi, opts.ExecutionLane), nil } From 52d70d563af0849771ddf4708aa2748e95680e93 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 29 Mar 2023 16:46:37 +0300 Subject: [PATCH 18/21] fix tests --- chain/stmgr/forks_test.go | 30 ++++++++++++------------------ conformance/driver.go | 2 +- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index d852e2fdd2a..f91d8997d6c 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -56,12 +56,6 @@ const testForkHeight = 40 type testActor struct { } -type mockExecutor struct { - vm.Interface -} - -func (*mockExecutor) Done() {} - // must use existing actor that an account is allowed to exec. func (testActor) Code() cid.Cid { return builtin0.PaymentChannelActorCodeID } func (testActor) State() cbor.Er { return new(testActorState) } @@ -184,13 +178,13 @@ func TestForkHeightTriggers(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) cg.SetStateManager(sm) @@ -302,13 +296,13 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) cg.SetStateManager(sm) @@ -524,13 +518,13 @@ func TestForkPreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) cg.SetStateManager(sm) @@ -598,11 +592,11 @@ func TestDisablePreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) cg.SetStateManager(sm) @@ -653,11 +647,11 @@ func TestMigrtionCache(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) cg.SetStateManager(sm) @@ -697,11 +691,11 @@ func TestMigrtionCache(t *testing.T) { index.DummyMsgIndex, ) require.NoError(t, err) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return &mockExecutor{nvm}, nil + return nvm, nil }) ctx := context.Background() diff --git a/conformance/driver.go b/conformance/driver.go index c3041be7136..e0d56d07410 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -158,7 +158,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params results: []*vm.ApplyRet{}, } - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { vmopt.CircSupplyCalc = func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) { return big.Zero(), nil } From 54a80a8a97e635d215c6dc6d9283bc17be48487a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 30 Mar 2023 18:11:44 +0300 Subject: [PATCH 19/21] revert dead code --- chain/consensus/compute_state.go | 5 ++--- chain/stmgr/call.go | 1 - chain/vm/execution.go | 4 +--- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index 8442b55e9a7..056aa07250b 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -192,7 +192,6 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, cronGas = 0 partDone = metrics.Timer(ctx, metrics.VMApplyMessages) - // TODO reorg the code to minimize the execution critical section vmi, err := makeVm(pstate, epoch, ts.MinTimestamp()) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) @@ -259,7 +258,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, return cid.Cid{}, cid.Cid{}, err } - vmDoCron := partDone() + vmCron := partDone() partDone = metrics.Timer(ctx, metrics.VMApplyFlush) rectarr := blockadt.MakeEmptyArray(sm.ChainStore().ActorStore(ctx)) @@ -298,7 +297,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, vmFlush := partDone() partDone = func() time.Duration { return time.Duration(0) } - log.Infow("ApplyBlocks stats", "early", vmEarly, "earlyCronGas", earlyCronGas, "vmMsg", vmMsg, "msgGas", msgGas, "vmCron", vmDoCron, "cronGas", cronGas, "vmFlush", vmFlush, "epoch", epoch, "tsk", ts.Key()) + log.Infow("ApplyBlocks stats", "early", vmEarly, "earlyCronGas", earlyCronGas, "vmMsg", vmMsg, "msgGas", msgGas, "vmCron", vmCron, "cronGas", cronGas, "vmFlush", vmFlush, "epoch", epoch, "tsk", ts.Key()) stats.Record(ctx, metrics.VMSends.M(int64(atomic.LoadUint64(&vm.StatSends))), metrics.VMApplied.M(int64(atomic.LoadUint64(&vm.StatApplied)))) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 816f50d1036..901fc2d1253 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -159,7 +159,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } - for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) if err != nil { diff --git a/chain/vm/execution.go b/chain/vm/execution.go index fb86a3a7d5e..dfa9d98d273 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -26,9 +26,7 @@ const ( DefaultPriorityExecutionLanes = 2 ) -var ErrExecutorDone = errors.New("executor has been released") - -// the execution environment; see below for definition, methods, and initilization +// the execution environment; see below for definition, methods, and initialization var execution *executionEnv // implementation of vm executor with simple sanity check preventing use after free. From 7b4e68249a2eec6c6fadf14992261682988f51b2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 30 Mar 2023 18:13:08 +0300 Subject: [PATCH 20/21] add comment about Signal unsoundness --- chain/vm/execution.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index dfa9d98d273..37abedb6dc4 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -125,6 +125,8 @@ func (e *executionEnv) putToken(token *executionToken) { e.available++ e.reserved += token.reserved + // Note: Signal is unsound, because a priority token could wake up a non-priority + // goroutnie and lead to deadlock. So Broadcast it must be. e.cond.Broadcast() metricsDown(metrics.VMExecutionRunning, token.lane) From d71b52825300037dc3bfebb26c96e798b23c91e0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 30 Mar 2023 18:15:13 +0300 Subject: [PATCH 21/21] reorg initialization code for better readability, remove unused import --- chain/vm/execution.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 37abedb6dc4..ea3a9719341 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -2,7 +2,6 @@ package vm import ( "context" - "errors" "os" "strconv" "sync" @@ -154,23 +153,18 @@ func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { } func init() { - var available, priority int var err error - concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY") - if concurrency == "" { - available = DefaultAvailableExecutionLanes - } else { + available := DefaultAvailableExecutionLanes + if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" { available, err = strconv.Atoi(concurrency) if err != nil { panic(err) } } - reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED") - if reserved == "" { - priority = DefaultPriorityExecutionLanes - } else { + priority := DefaultPriorityExecutionLanes + if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" { priority, err = strconv.Atoi(reserved) if err != nil { panic(err)