Skip to content

Commit

Permalink
Merge branch 'fix/metrics-ethcalls' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Oct 24, 2023
2 parents 277cf43 + 11654ab commit 9daeae8
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 65 deletions.
5 changes: 5 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
* Fixed metrics for time spent in eth_calls within modules stats (server and GUI)

## v1.1.18

### Fixed
Expand Down
139 changes: 82 additions & 57 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/streamingfast/dmetrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -29,6 +30,7 @@ type Stats struct {
runningJobs runningJobs
completedJobsStats map[string]*pbssinternal.ModuleStats

localProcessedBlockCount uint64
completedJobsBytesRead uint64
completedJobsBytesWritten uint64

Expand All @@ -53,6 +55,13 @@ func cloneStats(in *pbssinternal.ModuleStats) *pbssinternal.ModuleStats {
}
}

func (j runningJobs) blocksProcessed() (count uint64) {
for _, job := range j {
count += job.ProcessedBlocks
}
return
}

func (j runningJobs) ModuleStats(module string) (out *pbssinternal.ModuleStats) {
for _, job := range j {
for _, stat := range job.modulesStats {
Expand Down Expand Up @@ -143,7 +152,20 @@ type extendedStats struct {
processedBlocksInCompleteJobs uint64
storeOperationTime time.Duration
processingTime time.Duration
externalCallMetrics map[string]*extendedCallMetric

// uniqueID -> startTime
inprocessSince map[uint64]time.Time

// extension --> metric
externalCallMetrics map[string]*extendedCallMetric

// uniqueID -> metric
inprocessCallMetrics map[uint64]inprocessCall
}

type inprocessCall struct {
startTime time.Time
extension string
}

type extendedCallMetric struct {
Expand All @@ -154,14 +176,25 @@ type extendedCallMetric struct {
// updateDurations should be called while locked
func (s *extendedStats) updateDurations() {
s.ModuleStats.ProcessingTimeMs = uint64(s.processingTime.Milliseconds())
for _, inproc := range s.inprocessSince {
s.ModuleStats.ProcessingTimeMs += uint64(time.Since(inproc).Milliseconds())
}

s.ModuleStats.ExternalCallMetrics = make([]*pbssinternal.ExternalCallMetric, len(s.externalCallMetrics))
i := 0
for k, v := range s.externalCallMetrics {
s.ModuleStats.ExternalCallMetrics[i] = &pbssinternal.ExternalCallMetric{
callMetric := &pbssinternal.ExternalCallMetric{
Name: k,
Count: v.count,
TimeMs: uint64(v.time.Milliseconds()),
}
for _, inproc := range s.inprocessCallMetrics {
if inproc.extension == k {
callMetric.TimeMs += uint64(time.Since(inproc.startTime).Milliseconds())
}
}

s.ModuleStats.ExternalCallMetrics[i] = callMetric
sort.Slice(s.ModuleStats.ExternalCallMetrics, func(i, j int) bool {
return s.ModuleStats.ExternalCallMetrics[i].Name < s.ModuleStats.ExternalCallMetrics[j].Name
})
Expand Down Expand Up @@ -252,27 +285,61 @@ func (s *Stats) RecordEndSubrequest(jobIdx uint64) {
delete(s.runningJobs, jobIdx)
}

// RecordModuleWasmBlock should be called once per module per block. `elapsed` is the time spent in executing the WASM code, including store and extension calls
func (s *Stats) RecordModuleWasmBlock(moduleName string, elapsed time.Duration) {
// RecordModuleWasmBlockBegin should be called once per module per block
func (s *Stats) RecordModuleWasmBlockBegin(moduleName string) uint64 {
s.Lock()
defer s.Unlock()
uniqueID := uniqueIDCounter.Inc()
mod := s.moduleStats(moduleName)
mod.processingTime += elapsed
mod.inprocessSince[uniqueID] = time.Now()

return uniqueID
}

// RecordModuleWasmExternalCall can be called multiple times per module per block, for each external module call (ex: eth_call). `elapsed` is the time spent in executing that call.
func (s *Stats) RecordModuleWasmExternalCall(moduleName string, extension string, elapsed time.Duration) {
// RecordModuleWasmBlockEnd should be called once per module per block. `elapsed` is the time spent in executing the WASM code, including store and extension calls
func (s *Stats) RecordModuleWasmBlockEnd(moduleName string, uniqueID uint64) {
s.Lock()
defer s.Unlock()
mod := s.moduleStats(moduleName)
mod.processingTime += time.Since(mod.inprocessSince[uniqueID])
delete(mod.inprocessSince, uniqueID)
}

var uniqueIDCounter = atomic.NewUint64(0)

// RecordModuleWasmExternalCallBegin can be called multiple times per module per block, for each external module call (ex: eth_call).
func (s *Stats) RecordModuleWasmExternalCallBegin(moduleName string, extension string) uint64 {
s.Lock()
defer s.Unlock()

mod := s.moduleStats(moduleName)
uniqueID := uniqueIDCounter.Inc()

// initialize map
mod.inprocessCallMetrics[uniqueID] = inprocessCall{
startTime: time.Now(),
extension: extension,
}

return uniqueID
}

// RecordModuleWasmExternalCallEnd can be called multiple times per module per block, for each external module call (ex: eth_call). `elapsed` is the time spent in executing that call.
func (s *Stats) RecordModuleWasmExternalCallEnd(moduleName string, extension string, uniqueID uint64) {
s.Lock()
defer s.Unlock()

mod := s.moduleStats(moduleName)
met, ok := mod.externalCallMetrics[extension]
if !ok {
met = &extendedCallMetric{}
mod.externalCallMetrics[extension] = met
}
met.count++
met.time += elapsed
inproc := mod.inprocessCallMetrics[uniqueID]
met.time += time.Since(inproc.startTime)

delete(mod.inprocessCallMetrics, uniqueID)
}

// RecordModuleWasmStoreRead can be called multiple times per module per block `elapsed` is the time spent in executing that operation.
Expand Down Expand Up @@ -306,14 +373,17 @@ func (s *Stats) RecordModuleWasmStoreDeletePrefix(moduleName string, sizeBytes u

func (s *Stats) RecordBlock(ref bstream.BlockRef) {
s.blockRate.Add(1)
s.localProcessedBlockCount += 1
}

func newExtendedStats(moduleName string) *extendedStats {
return &extendedStats{
ModuleStats: &pbssinternal.ModuleStats{
Name: moduleName,
},
externalCallMetrics: make(map[string]*extendedCallMetric),
externalCallMetrics: make(map[string]*extendedCallMetric),
inprocessCallMetrics: make(map[uint64]inprocessCall),
inprocessSince: make(map[uint64]time.Time),
}
}

Expand Down Expand Up @@ -369,7 +439,7 @@ func (s *Stats) LocalModulesStats() []*pbssinternal.ModuleStats {
ProcessingTimeMs: uint64(v.processingTime.Milliseconds()),
StoreOperationTimeMs: uint64(v.storeOperationTime.Milliseconds()),
StoreReadCount: v.StoreReadCount,
ExternalCallMetrics: mergeCallMetricsMap(v.ExternalCallMetrics, v.externalCallMetrics),
ExternalCallMetrics: v.ExternalCallMetrics,
StoreWriteCount: v.StoreWriteCount,
StoreDeleteprefixCount: v.StoreDeleteprefixCount,
StoreSizeBytes: v.StoreSizeBytes,
Expand Down Expand Up @@ -450,35 +520,6 @@ func cloneCallMetrics(in []*pbssinternal.ExternalCallMetric) []*pbssinternal.Ext
return out
}

func mergeCallMetricsMap(completeJobsMetrics []*pbssinternal.ExternalCallMetric, localMetrics map[string]*extendedCallMetric) (out []*pbssinternal.ExternalCallMetric) {
seen := make(map[string]bool)
for _, m := range completeJobsMetrics {
seen[m.Name] = true
cloned := &pbssinternal.ExternalCallMetric{
Name: m.Name,
Count: m.Count,
TimeMs: m.TimeMs,
}
if local, ok := localMetrics[m.Name]; ok {
cloned.Count += local.count
cloned.TimeMs += uint64(local.time.Milliseconds())
}
out = append(out, cloned)
}

for k, lm := range localMetrics {
if !seen[k] {
out = append(out, &pbssinternal.ExternalCallMetric{
Name: k,
Count: lm.count,
TimeMs: uint64(lm.time.Milliseconds()),
})
}
}

return out
}

func (s *Stats) Stage(module string) (uint32, *pbsubstreamsrpc.Stage) {
for i, ss := range s.stages {
for _, mod := range ss.Modules {
Expand Down Expand Up @@ -525,11 +566,11 @@ func (s *Stats) AggregatedModulesStats() []*pbsubstreamsrpc.ModuleStats {
TotalProcessingTimeMs: uint64(v.processingTime.Milliseconds()),
TotalStoreOperationTimeMs: uint64(v.storeOperationTime.Milliseconds()),
TotalStoreReadCount: v.StoreReadCount,
ExternalCallMetrics: toRPCCallMetrics(mergeCallMetricsMap(v.ExternalCallMetrics, v.externalCallMetrics)),
ExternalCallMetrics: toRPCCallMetrics(v.ExternalCallMetrics),
TotalStoreWriteCount: v.StoreWriteCount,
TotalStoreDeleteprefixCount: v.StoreDeleteprefixCount,
StoreSizeBytes: v.StoreSizeBytes,
TotalProcessedBlockCount: v.processedBlocksInCompleteJobs,
TotalProcessedBlockCount: v.processedBlocksInCompleteJobs + s.runningJobs.blocksProcessed() + s.localProcessedBlockCount,
TotalStoreMergingTimeMs: uint64(v.mergingTime.Milliseconds()),
StoreCurrentlyMerging: v.merging,
}
Expand All @@ -548,22 +589,6 @@ func (s *Stats) AggregatedModulesStats() []*pbsubstreamsrpc.ModuleStats {
return out
}

func toRPCExternalCallMetrics(in []*pbssinternal.ExternalCallMetric) []*pbsubstreamsrpc.ExternalCallMetric {
if in == nil {
return nil
}

out := make([]*pbsubstreamsrpc.ExternalCallMetric, len(in))
for i := range in {
out[i] = &pbsubstreamsrpc.ExternalCallMetric{
Name: in[i].Name,
Count: in[i].Count,
TimeMs: in[i].TimeMs,
}
}
return out
}

func (s *Stats) LogAndClose() {
s.blockRate.SyncNow()
s.blockRate.Stop()
Expand Down
5 changes: 2 additions & 3 deletions pipeline/exec/module_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package exec
import (
"context"
"fmt"
"time"

"github.com/streamingfast/substreams/storage/execout"

Expand Down Expand Up @@ -50,12 +49,12 @@ func RunModule(ctx context.Context, executor ModuleExecutor, execOutput execout.
return moduleOutput, outputBytes, nil
}

t0 := time.Now()
uid := reqctx.ReqStats(ctx).RecordModuleWasmBlockBegin(modName)
outputBytes, moduleOutput, err := executor.run(ctx, execOutput)
if err != nil {
return nil, nil, fmt.Errorf("execute: %w", err)
}
reqctx.ReqStats(ctx).RecordModuleWasmBlock(modName, time.Since(t0))
reqctx.ReqStats(ctx).RecordModuleWasmBlockEnd(modName, uid)

fillModuleOutputMetadata(executor, moduleOutput)

Expand Down
2 changes: 1 addition & 1 deletion pipeline/storeboundary.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *storeBoundary) GetStoreFlushRanges(isSubRequest bool, reqStopBlockNum u
}

out := []uint64{}
for v, _ := range boundaries {
for v := range boundaries {
out = append(out, v)
}
sort.Slice(out, func(i, j int) bool {
Expand Down
2 changes: 1 addition & 1 deletion tui2/pages/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (p *Progress) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}
var externalMetrics string
for _, ext := range mod.ExternalCallMetrics {
externalMetrics += fmt.Sprintf(" [%s (%d): %d%%]", ext.Name, ext.Count, ext.TimeMs/mod.TotalProcessingTimeMs)
externalMetrics += fmt.Sprintf(" [%s (%d): %d%%]", ext.Name, ext.Count, ext.TimeMs*100/mod.TotalProcessingTimeMs)
}
var storeMetrics string
if mod.TotalStoreOperationTimeMs != 0 {
Expand Down
7 changes: 4 additions & 3 deletions wasm/wazero/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
Expand Down Expand Up @@ -175,12 +174,14 @@ func addExtensionFunctions(ctx context.Context, runtime wazero.Runtime, registry
data := readBytes(inst, ptr, length)
call := wasm.FromContext(ctx)

t0 := time.Now()
metricID := reqctx.ReqStats(ctx).RecordModuleWasmExternalCallBegin(call.ModuleName, fmt.Sprintf("%s:%s", namespace, importName))

out, err := f(ctx, reqctx.Details(ctx).UniqueIDString(), call.Clock, data)
if err != nil {
panic(fmt.Errorf(`running wasm extension "%s::%s": %w`, namespace, importName, err))
}
reqctx.ReqStats(ctx).RecordModuleWasmExternalCall(call.ModuleName, fmt.Sprintf("%s:%s", namespace, importName), time.Since(t0))

reqctx.ReqStats(ctx).RecordModuleWasmExternalCallEnd(call.ModuleName, fmt.Sprintf("%s:%s", namespace, importName), metricID)

if ctx.Err() == context.Canceled {
// Sometimes long-running extensions will come back to a canceled context.
Expand Down

0 comments on commit 9daeae8

Please sign in to comment.