Skip to content

Commit

Permalink
Merge pull request #10551 from filecoin-project/vyzo/feat/exec-lanes
Browse files Browse the repository at this point in the history
feat: VM Execution Lanes
  • Loading branch information
vyzo authored Mar 30, 2023
2 parents 3af5ef1 + d71b528 commit bf666a3
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 1 deletion.
1 change: 1 addition & 0 deletions chain/consensus/compute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
Tracing: vmTracing,
ReturnEvents: sm.ChainStore().IsStoringEvents(),
ExecutionLane: vm.ExecutionLanePriority,
}

return sm.VMConstructor()(ctx, vmopt)
Expand Down
192 changes: 192 additions & 0 deletions chain/vm/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package vm

import (
"context"
"os"
"strconv"
"sync"

"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
)

const (
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
// concurrent active executions.
// This is the default value in filecoin-ffi
DefaultAvailableExecutionLanes = 4
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
// This is purely userspace, but we believe it is a reasonable default, even with more available
// lanes.
DefaultPriorityExecutionLanes = 2
)

// the execution environment; see below for definition, methods, and initialization
var execution *executionEnv

// implementation of vm executor with simple sanity check preventing use after free.
type vmExecutor struct {
vmi Interface
lane ExecutionLane
}

var _ Interface = (*vmExecutor)(nil)

func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
return &vmExecutor{vmi: vmi, lane: lane}
}

func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
token := execution.getToken(e.lane)
defer token.Done()

return e.vmi.ApplyMessage(ctx, cmsg)
}

func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
token := execution.getToken(e.lane)
defer token.Done()

return e.vmi.ApplyImplicitMessage(ctx, msg)
}

func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
return e.vmi.Flush(ctx)
}

type executionToken struct {
lane ExecutionLane
reserved int
}

func (token *executionToken) Done() {
execution.putToken(token)
}

type executionEnv struct {
mx *sync.Mutex
cond *sync.Cond

// available executors
available int
// reserved executors
reserved int
}

func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
metricsUp(metrics.VMExecutionWaiting, lane)
defer metricsDown(metrics.VMExecutionWaiting, lane)

e.mx.Lock()
defer e.mx.Unlock()

switch lane {
case ExecutionLaneDefault:
for e.available <= e.reserved {
e.cond.Wait()
}

e.available--

metricsUp(metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: 0}

case ExecutionLanePriority:
for e.available == 0 {
e.cond.Wait()
}

e.available--

reserving := 0
if e.reserved > 0 {
e.reserved--
reserving = 1
}

metricsUp(metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: reserving}

default:
// already checked at interface boundary in NewVM, so this is appropriate
panic("bogus execution lane")
}
}

func (e *executionEnv) putToken(token *executionToken) {
e.mx.Lock()
defer e.mx.Unlock()

e.available++
e.reserved += token.reserved

// Note: Signal is unsound, because a priority token could wake up a non-priority
// goroutnie and lead to deadlock. So Broadcast it must be.
e.cond.Broadcast()

metricsDown(metrics.VMExecutionRunning, token.lane)
}

func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(metric, lane, 1)
}

func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(metric, lane, -1)
}

func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) {
laneName := "default"
if lane > ExecutionLaneDefault {
laneName = "priority"
}

ctx, _ := tag.New(
context.Background(),
tag.Upsert(metrics.ExecutionLane, laneName),
)
stats.Record(ctx, metric.M(int64(delta)))
}

func init() {
var err error

available := DefaultAvailableExecutionLanes
if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" {
available, err = strconv.Atoi(concurrency)
if err != nil {
panic(err)
}
}

priority := DefaultPriorityExecutionLanes
if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" {
priority, err = strconv.Atoi(reserved)
if err != nil {
panic(err)
}
}

// some sanity checks
if available < 2 {
panic("insufficient execution concurrency")
}

if available <= priority {
panic("insufficient default execution concurrency")
}

mx := &sync.Mutex{}
cond := sync.NewCond(mx)

execution = &executionEnv{
mx: mx,
cond: cond,
available: available,
reserved: priority,
}
}
2 changes: 2 additions & 0 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ type VMOpts struct {
Tracing bool
// ReturnEvents decodes and returns emitted events.
ReturnEvents bool
// ExecutionLane specifies the execution priority of the created vm
ExecutionLane ExecutionLane
}

func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {
Expand Down
27 changes: 26 additions & 1 deletion chain/vm/vmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vm

import (
"context"
"fmt"
"os"

cid "github.com/ipfs/go-cid"
Expand All @@ -17,6 +18,15 @@ var (
StatApplied uint64
)

type ExecutionLane int

const (
// ExecutionLaneDefault signifies a default, non prioritized execution lane.
ExecutionLaneDefault ExecutionLane = iota
// ExecutionLanePriority signifies a prioritized execution lane with reserved resources.
ExecutionLanePriority
)

type Interface interface {
// Applies the given message onto the VM's current state, returning the result of the execution
ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error)
Expand All @@ -33,7 +43,7 @@ type Interface interface {
// Message failures, unexpected terminations,gas costs, etc. should all be ignored.
var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1"

func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) {
if opts.NetworkVersion >= network.Version16 {
if useFvmDebug {
return NewDualExecutionFVM(ctx, opts)
Expand All @@ -43,3 +53,18 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {

return NewLegacyVM(ctx, opts)
}

func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
switch opts.ExecutionLane {
case ExecutionLaneDefault, ExecutionLanePriority:
default:
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
}

vmi, err := makeVM(ctx, opts)
if err != nil {
return nil, err
}

return newVMExecutor(vmi, opts.ExecutionLane), nil
}
17 changes: 17 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var (
ProtocolID, _ = tag.NewKey("proto")
Direction, _ = tag.NewKey("direction")
UseFD, _ = tag.NewKey("use_fd")

// vm execution
ExecutionLane, _ = tag.NewKey("lane")
)

// Measures
Expand Down Expand Up @@ -121,6 +124,8 @@ var (
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless)
VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless)

// miner
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
Expand Down Expand Up @@ -363,6 +368,16 @@ var (
Measure: VMApplied,
Aggregation: view.LastValue(),
}
VMExecutionWaitingView = &view.View{
Measure: VMExecutionWaiting,
Aggregation: view.Sum(),
TagKeys: []tag.Key{ExecutionLane},
}
VMExecutionRunningView = &view.View{
Measure: VMExecutionRunning,
Aggregation: view.Sum(),
TagKeys: []tag.Key{ExecutionLane},
}

// miner
WorkerCallsStartedView = &view.View{
Expand Down Expand Up @@ -727,6 +742,8 @@ var ChainNodeViews = append([]*view.View{
VMApplyFlushView,
VMSendsView,
VMAppliedView,
VMExecutionWaitingView,
VMExecutionRunningView,
}, DefaultViews...)

var MinerNodeViews = append([]*view.View{
Expand Down

0 comments on commit bf666a3

Please sign in to comment.