diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index e627a62d2ac..056aa07250b 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts), Tracing: vmTracing, ReturnEvents: sm.ChainStore().IsStoringEvents(), + ExecutionLane: vm.ExecutionLanePriority, } return sm.VMConstructor()(ctx, vmopt) diff --git a/chain/vm/execution.go b/chain/vm/execution.go new file mode 100644 index 00000000000..ea3a9719341 --- /dev/null +++ b/chain/vm/execution.go @@ -0,0 +1,192 @@ +package vm + +import ( + "context" + "os" + "strconv" + "sync" + + "github.com/ipfs/go-cid" + "go.opencensus.io/stats" + "go.opencensus.io/tag" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" +) + +const ( + // DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of + // concurrent active executions. + // This is the default value in filecoin-ffi + DefaultAvailableExecutionLanes = 4 + // DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations. + // This is purely userspace, but we believe it is a reasonable default, even with more available + // lanes. + DefaultPriorityExecutionLanes = 2 +) + +// the execution environment; see below for definition, methods, and initialization +var execution *executionEnv + +// implementation of vm executor with simple sanity check preventing use after free. +type vmExecutor struct { + vmi Interface + lane ExecutionLane +} + +var _ Interface = (*vmExecutor)(nil) + +func newVMExecutor(vmi Interface, lane ExecutionLane) Interface { + return &vmExecutor{vmi: vmi, lane: lane} +} + +func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { + token := execution.getToken(e.lane) + defer token.Done() + + return e.vmi.ApplyMessage(ctx, cmsg) +} + +func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { + token := execution.getToken(e.lane) + defer token.Done() + + return e.vmi.ApplyImplicitMessage(ctx, msg) +} + +func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { + return e.vmi.Flush(ctx) +} + +type executionToken struct { + lane ExecutionLane + reserved int +} + +func (token *executionToken) Done() { + execution.putToken(token) +} + +type executionEnv struct { + mx *sync.Mutex + cond *sync.Cond + + // available executors + available int + // reserved executors + reserved int +} + +func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + metricsUp(metrics.VMExecutionWaiting, lane) + defer metricsDown(metrics.VMExecutionWaiting, lane) + + e.mx.Lock() + defer e.mx.Unlock() + + switch lane { + case ExecutionLaneDefault: + for e.available <= e.reserved { + e.cond.Wait() + } + + e.available-- + + metricsUp(metrics.VMExecutionRunning, lane) + return &executionToken{lane: lane, reserved: 0} + + case ExecutionLanePriority: + for e.available == 0 { + e.cond.Wait() + } + + e.available-- + + reserving := 0 + if e.reserved > 0 { + e.reserved-- + reserving = 1 + } + + metricsUp(metrics.VMExecutionRunning, lane) + return &executionToken{lane: lane, reserved: reserving} + + default: + // already checked at interface boundary in NewVM, so this is appropriate + panic("bogus execution lane") + } +} + +func (e *executionEnv) putToken(token *executionToken) { + e.mx.Lock() + defer e.mx.Unlock() + + e.available++ + e.reserved += token.reserved + + // Note: Signal is unsound, because a priority token could wake up a non-priority + // goroutnie and lead to deadlock. So Broadcast it must be. + e.cond.Broadcast() + + metricsDown(metrics.VMExecutionRunning, token.lane) +} + +func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, 1) +} + +func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, -1) +} + +func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { + laneName := "default" + if lane > ExecutionLaneDefault { + laneName = "priority" + } + + ctx, _ := tag.New( + context.Background(), + tag.Upsert(metrics.ExecutionLane, laneName), + ) + stats.Record(ctx, metric.M(int64(delta))) +} + +func init() { + var err error + + available := DefaultAvailableExecutionLanes + if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" { + available, err = strconv.Atoi(concurrency) + if err != nil { + panic(err) + } + } + + priority := DefaultPriorityExecutionLanes + if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" { + priority, err = strconv.Atoi(reserved) + if err != nil { + panic(err) + } + } + + // some sanity checks + if available < 2 { + panic("insufficient execution concurrency") + } + + if available <= priority { + panic("insufficient default execution concurrency") + } + + mx := &sync.Mutex{} + cond := sync.NewCond(mx) + + execution = &executionEnv{ + mx: mx, + cond: cond, + available: available, + reserved: priority, + } +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index c8e3f251907..6fbe7933b68 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -250,6 +250,8 @@ type VMOpts struct { Tracing bool // ReturnEvents decodes and returns emitted events. ReturnEvents bool + // ExecutionLane specifies the execution priority of the created vm + ExecutionLane ExecutionLane } func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) { diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index 01b32d4ad17..042621ca2d4 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "os" cid "github.com/ipfs/go-cid" @@ -17,6 +18,15 @@ var ( StatApplied uint64 ) +type ExecutionLane int + +const ( + // ExecutionLaneDefault signifies a default, non prioritized execution lane. + ExecutionLaneDefault ExecutionLane = iota + // ExecutionLanePriority signifies a prioritized execution lane with reserved resources. + ExecutionLanePriority +) + type Interface interface { // Applies the given message onto the VM's current state, returning the result of the execution ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) @@ -33,7 +43,7 @@ type Interface interface { // Message failures, unexpected terminations,gas costs, etc. should all be ignored. var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" -func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { +func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) { if opts.NetworkVersion >= network.Version16 { if useFvmDebug { return NewDualExecutionFVM(ctx, opts) @@ -43,3 +53,18 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } + +func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { + switch opts.ExecutionLane { + case ExecutionLaneDefault, ExecutionLanePriority: + default: + return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) + } + + vmi, err := makeVM(ctx, opts) + if err != nil { + return nil, err + } + + return newVMExecutor(vmi, opts.ExecutionLane), nil +} diff --git a/metrics/metrics.go b/metrics/metrics.go index ca638ac273a..13627a663d5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -58,6 +58,9 @@ var ( ProtocolID, _ = tag.NewKey("proto") Direction, _ = tag.NewKey("direction") UseFD, _ = tag.NewKey("use_fd") + + // vm execution + ExecutionLane, _ = tag.NewKey("lane") ) // Measures @@ -121,6 +124,8 @@ var ( VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds) VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless) VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless) + VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless) + VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless) // miner WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) @@ -363,6 +368,16 @@ var ( Measure: VMApplied, Aggregation: view.LastValue(), } + VMExecutionWaitingView = &view.View{ + Measure: VMExecutionWaiting, + Aggregation: view.Sum(), + TagKeys: []tag.Key{ExecutionLane}, + } + VMExecutionRunningView = &view.View{ + Measure: VMExecutionRunning, + Aggregation: view.Sum(), + TagKeys: []tag.Key{ExecutionLane}, + } // miner WorkerCallsStartedView = &view.View{ @@ -727,6 +742,8 @@ var ChainNodeViews = append([]*view.View{ VMApplyFlushView, VMSendsView, VMAppliedView, + VMExecutionWaitingView, + VMExecutionRunningView, }, DefaultViews...) var MinerNodeViews = append([]*view.View{