Microbatch is a small library that does micro batch processing in Golang.
Micro batching is a data processing technique where data is collected and processed in small, manageable batches rather than as a continuous stream or in large chunks. This approach allows for more efficient resource utilization, reduced latency, and improved throughput. By processing data in micro batches, applications can achieve a balance between real-time processing and batch processing, making it suitable for various scenarios.
Micro batching is particularly beneficial in the following situations:
-
Real-Time Data Processing: When you need to process data in near real-time but cannot afford the overhead of processing each individual data point immediately.
-
Resource Optimization: When you want to optimize resource usage by reducing the number of times a system must wake up to process data, thus saving on CPU and memory.
-
Network Efficiency: When sending data over a network, micro batching can reduce the number of requests made, leading to lower latency and better throughput.
-
Error Handling: When processing data in batches, it becomes easier to handle errors and retries, as you can manage the entire batch rather than individual records.
-
Data Aggregation: When you need to aggregate data for analytics or reporting, micro batching allows you to collect and process data in intervals, providing timely insights.
By leveraging micro batching, developers can create more efficient and responsive applications that handle data processing effectively.
Here’s a simple example of how to use the Microbatch library:
package main
import (
"fmt"
"log"
"time"
"github.com/blackmamba23/microbatch"
)
// ExampleBatchProcessor simulates processing of jobs
type ExampleBatchProcessor struct{}
func (m *ExampleBatchProcessor) Process(jobs []microbatch.Job) ([]microbatch.JobResult, error) {
var results []microbatch.JobResult
for _, job := range jobs {
// Simulate processing
results = append(results, microbatch.JobResult{JobID: job.ID, Err: nil})
}
return results, nil
}
func main() {
// Configure the micro batcher
config := microbatch.Config{
BatchSize: 3,
BatchFrequency: 100 * time.Millisecond,
DeterministicOrder: true,
EnableLogging: true,
}
// Create a new micro batcher
processor := &ExampleBatchProcessor{}
batcher := microbatch.NewMicroBatcher(config, processor)
// Submit jobs
jobs := []microbatch.Job{
{ID: "job1", Data: "data1"},
{ID: "job2", Data: "data2"},
{ID: "job3", Data: "data3"},
{ID: "job4", Data: "data4"},
}
for _, job := range jobs {
resultChan := batcher.Submit(job)
result := <-resultChan
if result.Err != nil {
log.Printf("Error processing job %s: %v", job.ID, result.Err)
} else {
fmt.Printf("Processed job: %s\n", result.JobID)
}
}
// Shutdown the batcher
batcher.Shutdown()
}
Run the tests using the following command:
go test -cover ./...