Batch Event Processing
This post contains a simple batch event processing Golang implementation example
Event model
First, let’s define a simplified event model:
package model
type Event struct {
ID string
}
This model at least must contain a unique identifier
In this example we’ll use a map as a data structure for batch to show how multiple duplicates of the same event can be handled
The event model may also include the Version
integer field that could help with checking what version is higher before updating batch entry
Processor
Structure
package processor
import (
"sync/atomic"
"time"
"processor/internal/model"
)
type EventProcessor struct {
batchSize int
batchProcessInterval time.Duration
batchProcessFunc BatchProcessFunc
queue chan model.Event
// suspend signalling
suspendThreshold int
suspended atomic.Bool
// graceful stop signalling
stop chan struct{}
stopped atomic.Bool
}
func New(options ...Option) *EventProcessor {
ep := &EventProcessor{
batchSize: defaultBatchSize,
batchProcessInterval: defaultBatchProcessInterval,
batchProcessFunc: defaultBatchProcessFunc,
suspendThreshold: defaultSuspendThresholdPercent,
queue: make(chan model.Event, defaultQueueSize),
stop: make(chan struct{}),
}
for _, option := range options {
option(ep)
}
return ep
}
Configuration
package processor
import (
"context"
"time"
"processor/internal/model"
)
const (
defaultQueueSize = 10000
defaultBatchSize = 100
defaultBatchProcessInterval = time.Minute
defaultSuspendThresholdPercent = 99
)
type BatchProcessFunc func(ctx context.Context, events map[string]model.Event) (succeed bool)
var defaultBatchProcessFunc = BatchProcessFunc(
func(ctx context.Context, events map[string]model.Event) (succeed bool) {
return true
},
)
The event processor itself is an abstraction that requires 4 basic parameters:
type Option func(*EventProcessor)
queue size
func OptionQueueSize(queueSize int) func(*EventProcessor) {
return func(ep *EventProcessor) {
ep.queue = make(chan model.Event, queueSize)
}
}
Max number of events stored in queue at the moment
When BatchProcessFunc
fails, we keep the events for the next process attempt
$$ qs = maxRSS / sizeOf(event)$$
Proper value of this variable depends on the maximum acceptable service memory consumption, divided by the size of an average model.Event
structure object
batch size
func OptionBatchSize(batchSize int) func(*EventProcessor) {
return func(ep *EventProcessor) {
ep.batchSize = batchSize
}
}
Number of events, our service will process at the moment
$$ max(bs) = storage.maxFieldsPerQuery / fieldCount(model.Event) $$
If the process involves database upserts, maximum value of this variable depends on the maximum records we can update in single operation
For example, Postgres can update up to 65535 parameters, so the batch size can not be higher than 65535 divided by number of fields in model.Event
$$ bs \uparrow \downarrow bps $$
We tend to increase the batch size and decrease a batch process interval
batch process interval
func OptionBatchProcessInterval(batchProcessInterval time.Duration) func(*EventProcessor) {
return func(ep *EventProcessor) {
ep.batchProcessInterval = batchProcessInterval
}
}
Normally, we process the event batch as soon as it’s size reaches batch size
,
this is an additional parameter to guarantee that batch will also be forcibly processed every batch process interval
batch process function
func OptionBatchProcessFunc(batchProcessFunc BatchProcessFunc) func(*EventProcessor) {
return func(ep *EventProcessor) {
ep.batchProcessFunc = batchProcessFunc
}
}
This is a function we pass to the processor, that will be called for every batch
The function must return boolean value to evaluate new batch creation is required
suspend threshold
func OptionSuspendThreshold(thresholdPercent int) func(*EventProcessor) {
return func(ep *EventProcessor) {
ep.suspendThreshold = thresholdPercent
}
}
When batch process function return false
too often
(usually because of some problems related with external dependencies e.g. database)
we keep collecting old data and getting close to reaching the maximum queue size,
which means all the future requests will end up timed out
We can prevent that by an additional parameter allowing us to preventively respond with error on new requests
func (ep *EventProcessor) isSuspendThresholdExceeded() bool {
if ep.suspendThreshold == 0 {
return false
}
thresholdQueueSize := (cap(ep.queue) / 100) * ep.suspendThreshold
currentQueueSize := len(ep.queue)
if currentQueueSize >= thresholdQueueSize {
return true
}
return false
}
func (ep *EventProcessor) suspendQueueing() {
if !ep.isSuspended() {
ep.suspended.Store(true)
}
}
func (ep *EventProcessor) continueQueuing() {
if ep.isSuspended() {
ep.suspended.Store(false)
}
}
func (ep *EventProcessor) isSuspended() bool {
return ep.suspended.Load()
}
Event processing
Every received event will be queued via public interface
A dedicated goroutine will aggregate event batch and try to execute the batchProcessFunc
on:
- every
batchProcessInterval
- batch size reaches the max configured value
If batchProcessFunc
succeeds, we create a new batch, otherwise retry on the next iteration
package processor
import (
"context"
"time"
"processor/internal/model"
)
func (ep *EventProcessor) Start(ctx context.Context) {
ticker := time.NewTicker(ep.batchProcessInterval)
defer ticker.Stop()
events := map[string]model.Event{}
// when Stop(ctx) is called, some of the events can still be stored in ep.queue chan
// we need to save them in different way, otherwise this data will be lost
onStop := func() {
for event := range ep.queue {
events[event.ID] = event
}
ctx = context.Background()
ep.batchProcessFunc(ctx, events)
close(ep.stop)
}
defer onStop()
for {
select {
// ok is false after the Stop(ctx) is called
// since we cannot close ep.queue as a receiver,
// this is the only case we execute the deferred function
// and return (no <-ctx.Done handling here)
case event, ok := <-ep.queue:
if !ok {
return
}
events[event.ID] = event
if len(events) >= ep.batchSize {
if ep.batchProcessFunc(ctx, events) {
events = map[string]model.Event{}
}
}
case <-ticker.C:
if ep.isSuspendThresholdExceeded() {
ep.suspendQueueing()
} else {
ep.continueQueuing()
}
if ep.batchProcessFunc(ctx, events) {
events = map[string]model.Event{}
}
}
}
}
func (ep *EventProcessor) Stop(ctx context.Context) {
ep.stopped.Store(true)
close(ep.queue)
<-ep.stop
}
func (ep *EventProcessor) isStopped() bool {
return ep.stopped.Load()
}
Public interface
We’ll use Queue
function as a public interface that either put event to the queue, or return error when:
- processor is stopped
- queueing is suspended
Both errors will be used as a signal for client side that processor is discarding all new events at the moment
package processor
import (
"context"
"errors"
"processor/internal/model"
)
var (
ErrProcessorStopped = errors.New("processor has been stopped")
ErrQueueingSuspended = errors.New("queueing has been suspended")
)
func (ep *EventProcessor) Queue(ctx context.Context, events map[string]model.Event) error {
if ep.isStopped() {
return ErrProcessorStopped
}
if ep.isSuspended() {
return ErrQueueingSuspended
}
for _, event := range events {
ep.queue <- event
}
return nil
}
Monitoring
It takes to expose:
- gauge with actual length of
ep.queue
event channel - resident set size to tune the
queue size
parameter - counter with number of events received / processed
- duration of
batchProcessFunc
Example
Let’s generate and queue 100-element batch, stop the processor and try again:
package main
import (
"context"
"log"
"strconv"
"time"
"processor/internal/model"
"processor/internal/processor"
)
func main() {
batchProcessFunc := processor.BatchProcessFunc(
func(ctx context.Context, events map[string]model.Event) (succeed bool) {
if len(events) == 0 {
return true
}
log.Printf("processed %d unique events", len(events))
return true
},
)
ep := processor.New(
processor.OptionQueueSize(100),
processor.OptionBatchSize(10),
processor.OptionBatchProcessInterval(time.Second),
processor.OptionBatchProcessFunc(batchProcessFunc),
)
ctx := context.Background()
go ep.Start(ctx)
generateAndSaveEvents(ctx, ep, 100)
ep.Stop(ctx)
log.Println("processor is stopped")
generateAndSaveEvents(ctx, ep, 100)
}
func generateAndSaveEvents(ctx context.Context, ep *processor.EventProcessor, batchSize int) {
events := make(map[string]model.Event)
for i := 0; i < batchSize; i++ {
event := model.Event{ID: strconv.Itoa(i)}
events[event.ID] = event
}
err := ep.Queue(ctx, events)
if err != nil {
log.Printf("discarded %d unique events: %s", len(events), err.Error())
return
}
log.Printf("queued %d unique events", len(events))
return
}
// Output:
//
// 2025/05/31 20:25:27 queued 100 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processor is stopped
// 2025/05/31 20:25:27 discarded 100 unique events: processor has been stopped