Skip to content

Latest commit

 

History

History
57 lines (45 loc) · 2.63 KB

custom-accumulators.md

File metadata and controls

57 lines (45 loc) · 2.63 KB

Implementing Custom Accumulators

An Accumulator is an alternative reduction technique, which siphons data from Partitions into a custom data structure. The result is itself an Accumulator, rather than a series of Partitions, thus ending the job (no more operations may be performed against the data). The advantage, however, is full control over the reduction technique, which can yield substantial performance benefits. As reduction is performed locally on all workers, then worker results are all reduced on the Coordinator, Accumulators are best utilized for smaller results. Distributed reductions via Reduce() are more efficient when there is a large reduction result (i.e. a large number of buckets).

Sif supports, and encourages, the definition of custom Accumulators which adhere to the following interface:

type GenericAccumulator[T any] interface {
	// Accumulate adds a row to this Accumulator
	Accumulate(row Row) error
	// Merge merges another Accumulator into this one
	Merge(o Accumulator) error
	// ToBytes serializes this Accumulator
	ToBytes() ([]byte, error)
	// FromBytes produces a *new* Accumulator from
	FromBytes(buf []byte) (Accumulator, error) serialized data\
	// Value returns the accumulated value of this Accumulator
	Value() T
}

Defining a Custom Accumulator

For reference implementations of various Accumulators, check out the accumulators package.

It is worth mentioning that, in addition to implementing the GenericAccumulator[T] interface, client code should also supply a factory function which returns a GenericAccumulatorAccessor[T], allowing instances of the Accumulator to be instantiated easily, and values from Acumulators of this type to be accessed in a typed fashion:

// ...implementation of Accumulate, Merge, ToBytes, FromBytes and Value on *myAccumulator

type myAccumulatorFactory struct {
}

// New produces a new instance of the Accumulator
func (a *myAccumulatorFactory) New() sif.Accumulator {
	return &myAccumulator{/*...init */}
}

// let's assume this accumulator returns a uint64
// Value is responsible for type checking the incoming accumulator,
// casting it appropriately and then returning the typed value
func (a *myAccumulatorFactory) Value(acc sif.Accumulator) (uint64, error) {
	// Type check the incoming accumulator before yielding the typed value
	tacc, ok := acc.(sif.GenericAccumulator[uint64])
	if !ok {
		return 0, fmt.Errorf("provided accumulator did not have the expected value type")
	}
	return tacc.Value(), nil
}

// MyAccumulator returns a new AccumulatorFactory
func MyAccumulator() sif.GenericAccumulatorFactory[uint64] {
	return &myAccumulatorFactory{}
}