Skip to content

Commit

Permalink
Merge pull request #315 from Consensys/307-memory-usage-during-trace-…
Browse files Browse the repository at this point in the history
…expansion

Support batching for trace expansion
  • Loading branch information
DavePearce authored Oct 3, 2024
2 parents f822511 + 7834324 commit ce1f798
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func checkTraceWithLowering(cols []tr.RawColumn, schema *hir.Schema, cfg checkCo
}

func checkTrace(ir string, cols []tr.RawColumn, schema sc.Schema, cfg checkConfig) bool {
builder := sc.NewTraceBuilder(schema).Expand(cfg.expand).Parallel(cfg.parallelExpansion)
builder := sc.NewTraceBuilder(schema).Expand(cfg.expand).Parallel(cfg.parallelExpansion).BatchSize(cfg.batchSize)
//
for n := cfg.padding.Left; n <= cfg.padding.Right; n++ {
stats := util.NewPerfStats()
Expand Down
35 changes: 25 additions & 10 deletions pkg/schema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,37 @@ type TraceBuilder struct {
// parallel. This should be the default, but a sequential option is
// retained for debugging purposes.
parallel bool
// Specify the maximum size of any dispatched batch.
batchSize uint
}

// NewTraceBuilder constructs a default trace builder. The idea is that this
// could then be customized as needed following the builder pattern.
func NewTraceBuilder(schema Schema) TraceBuilder {
return TraceBuilder{schema, true, 0, true}
return TraceBuilder{schema, true, 0, true, math.MaxUint}
}

// Expand updates a given builder configuration to perform trace expansion (or
// not).
func (tb TraceBuilder) Expand(flag bool) TraceBuilder {
return TraceBuilder{tb.schema, flag, tb.padding, tb.parallel}
return TraceBuilder{tb.schema, flag, tb.padding, tb.parallel, tb.batchSize}
}

// Padding updates a given builder configuration to use a given amount of padding
func (tb TraceBuilder) Padding(padding uint) TraceBuilder {
return TraceBuilder{tb.schema, tb.expand, padding, tb.parallel}
return TraceBuilder{tb.schema, tb.expand, padding, tb.parallel, tb.batchSize}
}

// Parallel updates a given builder configuration to allow trace expansion to be
// performed concurrently (or not).
func (tb TraceBuilder) Parallel(parallel bool) TraceBuilder {
return TraceBuilder{tb.schema, tb.expand, tb.padding, parallel}
return TraceBuilder{tb.schema, tb.expand, tb.padding, parallel, tb.batchSize}
}

// BatchSize sets the maximum number of batches to run in parallel during trace
// expansion.
func (tb TraceBuilder) BatchSize(batchSize uint) TraceBuilder {
return TraceBuilder{tb.schema, tb.expand, tb.padding, tb.parallel, batchSize}
}

// Build takes the given builder configuration, along with a given set of input
Expand All @@ -70,7 +78,7 @@ func (tb TraceBuilder) Build(columns []trace.RawColumn) (trace.Trace, []error) {
// Expand trace
if tb.parallel {
// Run (parallel) trace expansion
if err := parallelTraceExpansion(tb.schema, tr); err != nil {
if err := parallelTraceExpansion(tb.batchSize, tb.schema, tr); err != nil {
return nil, append(errs, err)
}
} else if err := sequentialTraceExpansion(tb.schema, tr); err != nil {
Expand Down Expand Up @@ -272,17 +280,19 @@ func sequentialTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
// is for two reasons: firstly, the latter would require locks that would slow
// down evaluation performance; secondly, the vast majority of jobs are run in
// the very first wave.
func parallelTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
func parallelTraceExpansion(batchsize uint, schema Schema, trace *tr.ArrayTrace) error {
batch := 0
// Construct a communication channel for errors.
ch := make(chan columnBatch, 100)
ch := make(chan columnBatch, 1024)
// Determine number of input columns
ninputs := schema.InputColumns().Count()
// Determine number of columns to compute
ntodo := schema.Assignments().Count()
// Iterate until all columns completed.
for ntodo > 0 {
stats := util.NewPerfStats()
// Dispatch next batch of assignments.
n := dispatchReadyAssignments(ninputs, schema, trace, ch)
n := dispatchReadyAssignments(batchsize, ninputs, schema, trace, ch)
//
batches := make([]columnBatch, n)
// Collect all the results
Expand All @@ -301,6 +311,10 @@ func parallelTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
//
ntodo--
}
// Log stats about this batch
stats.Log(fmt.Sprintf("Expansion batch %d (remaining %d)", batch, ntodo))
// Increment batch
batch++
}
// Done
return nil
Expand All @@ -310,10 +324,11 @@ func parallelTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
// results being fed back into the shared channel. This returns the number of
// jobs which have been dispatched (i.e. so the caller knows how many results to
// expect).
func dispatchReadyAssignments(ninputs uint, schema Schema, trace *tr.ArrayTrace, ch chan columnBatch) uint {
func dispatchReadyAssignments(batchsize uint, ninputs uint, schema Schema,
trace *tr.ArrayTrace, ch chan columnBatch) uint {
count := uint(0)
//
for iter, cid := schema.Assignments(), ninputs; iter.HasNext(); {
for iter, cid := schema.Assignments(), ninputs; iter.HasNext() && count < batchsize; {
ith := iter.Next()
// Check whether this assignment has already been computed and, if not,
// whether or not it is ready.
Expand Down

0 comments on commit ce1f798

Please sign in to comment.