Contents

Batch Event Processing

This post contains a simple batch event processing Golang implementation example

Event model

First, let’s define a simplified event model:

package model

type Event struct {
  ID string
}

This model at least must contain a unique identifier

In this example we’ll use a map as a data structure for batch to show how multiple duplicates of the same event can be handled

The event model may also include the Version integer field that could help with checking what version is higher before updating batch entry

Processor

Structure

package processor

import (
  "sync/atomic"
  "time"

  "processor/internal/model"
)

type EventProcessor struct {
  batchSize            int
  batchProcessInterval time.Duration
  batchProcessFunc     BatchProcessFunc

  queue chan model.Event

  // suspend signalling
  suspendThreshold int
  suspended        atomic.Bool

  // graceful stop signalling
  stop    chan struct{}
  stopped atomic.Bool
}

func New(options ...Option) *EventProcessor {
  ep := &EventProcessor{
    batchSize:            defaultBatchSize,
    batchProcessInterval: defaultBatchProcessInterval,
    batchProcessFunc:     defaultBatchProcessFunc,
    suspendThreshold:     defaultSuspendThresholdPercent,
    queue:                make(chan model.Event, defaultQueueSize),
    stop:                 make(chan struct{}),
  }
  for _, option := range options {
    option(ep)
  }
  return ep
}

Configuration

package processor

import (
  "context"
  "time"

  "processor/internal/model"
)

const (
  defaultQueueSize               = 10000
  defaultBatchSize               = 100
  defaultBatchProcessInterval    = time.Minute
  defaultSuspendThresholdPercent = 99
)

type BatchProcessFunc func(ctx context.Context, events map[string]model.Event) (succeed bool)

var defaultBatchProcessFunc = BatchProcessFunc(
  func(ctx context.Context, events map[string]model.Event) (succeed bool) {
    return true
  },
)

The event processor itself is an abstraction that requires 4 basic parameters:

type Option func(*EventProcessor)

queue size

func OptionQueueSize(queueSize int) func(*EventProcessor) {
  return func(ep *EventProcessor) {
    ep.queue = make(chan model.Event, queueSize)
  }
}

Max number of events stored in queue at the moment
When BatchProcessFunc fails, we keep the events for the next process attempt

$$ qs = maxRSS / sizeOf(event)$$

Proper value of this variable depends on the maximum acceptable service memory consumption, divided by the size of an average model.Event structure object

batch size

func OptionBatchSize(batchSize int) func(*EventProcessor) {
  return func(ep *EventProcessor) {
    ep.batchSize = batchSize
  }
}

Number of events, our service will process at the moment

$$ max(bs) = storage.maxFieldsPerQuery / fieldCount(model.Event) $$

If the process involves database upserts, maximum value of this variable depends on the maximum records we can update in single operation

For example, Postgres can update up to 65535 parameters, so the batch size can not be higher than 65535 divided by number of fields in model.Event

$$ bs \uparrow \downarrow bps $$

We tend to increase the batch size and decrease a batch process interval

batch process interval

func OptionBatchProcessInterval(batchProcessInterval time.Duration) func(*EventProcessor) {
  return func(ep *EventProcessor) {
    ep.batchProcessInterval = batchProcessInterval
  }
}

Normally, we process the event batch as soon as it’s size reaches batch size,
this is an additional parameter to guarantee that batch will also be forcibly processed every batch process interval

batch process function

func OptionBatchProcessFunc(batchProcessFunc BatchProcessFunc) func(*EventProcessor) {
  return func(ep *EventProcessor) {
    ep.batchProcessFunc = batchProcessFunc
  }
}

This is a function we pass to the processor, that will be called for every batch
The function must return boolean value to evaluate new batch creation is required

suspend threshold

func OptionSuspendThreshold(thresholdPercent int) func(*EventProcessor) {
  return func(ep *EventProcessor) {
    ep.suspendThreshold = thresholdPercent
  }
}

When batch process function return false too often (usually because of some problems related with external dependencies e.g. database) we keep collecting old data and getting close to reaching the maximum queue size, which means all the future requests will end up timed out

We can prevent that by an additional parameter allowing us to preventively respond with error on new requests

func (ep *EventProcessor) isSuspendThresholdExceeded() bool {
  if ep.suspendThreshold == 0 {
    return false
  }
  thresholdQueueSize := (cap(ep.queue) / 100) * ep.suspendThreshold
  currentQueueSize := len(ep.queue)
  if currentQueueSize >= thresholdQueueSize {
    return true
  }
  return false
}

func (ep *EventProcessor) suspendQueueing() {
  if !ep.isSuspended() {
    ep.suspended.Store(true)
  }
}

func (ep *EventProcessor) continueQueuing() {
  if ep.isSuspended() {
    ep.suspended.Store(false)
  }
}

func (ep *EventProcessor) isSuspended() bool {
  return ep.suspended.Load()
}

Event processing

Every received event will be queued via public interface
A dedicated goroutine will aggregate event batch and try to execute the batchProcessFunc on:

  • every batchProcessInterval
  • batch size reaches the max configured value

If batchProcessFunc succeeds, we create a new batch, otherwise retry on the next iteration

package processor

import (
  "context"
  "time"

  "processor/internal/model"
)

func (ep *EventProcessor) Start(ctx context.Context) {
  ticker := time.NewTicker(ep.batchProcessInterval)
  defer ticker.Stop()

  events := map[string]model.Event{}

  // when Stop(ctx) is called, some of the events can still be stored in ep.queue chan
  // we need to save them in different way, otherwise this data will be lost
  onStop := func() {
    for event := range ep.queue {
      events[event.ID] = event
    }
    ctx = context.Background()
    ep.batchProcessFunc(ctx, events)
    close(ep.stop)
  }
  defer onStop()

  for {
    select {
    // ok is false after the Stop(ctx) is called
    // since we cannot close ep.queue as a receiver,
    // this is the only case we execute the deferred function 
    // and return (no <-ctx.Done handling here)
    case event, ok := <-ep.queue:
      if !ok {
        return
      }
      events[event.ID] = event
      if len(events) >= ep.batchSize {
        if ep.batchProcessFunc(ctx, events) {
          events = map[string]model.Event{}
        }
      }
    case <-ticker.C:
      if ep.isSuspendThresholdExceeded() {
        ep.suspendQueueing()
      } else {
        ep.continueQueuing()
      }
      if ep.batchProcessFunc(ctx, events) {
        events = map[string]model.Event{}
      }
    }
  }
}

func (ep *EventProcessor) Stop(ctx context.Context) {
  ep.stopped.Store(true)
  close(ep.queue)
  <-ep.stop
}

func (ep *EventProcessor) isStopped() bool {
  return ep.stopped.Load()
}

Public interface

We’ll use Queue function as a public interface that either put event to the queue, or return error when:

  • processor is stopped
  • queueing is suspended

Both errors will be used as a signal for client side that processor is discarding all new events at the moment

package processor

import (
  "context"
  "errors"

  "processor/internal/model"
)

var (
  ErrProcessorStopped  = errors.New("processor has been stopped")
  ErrQueueingSuspended = errors.New("queueing has been suspended")
)

func (ep *EventProcessor) Queue(ctx context.Context, events map[string]model.Event) error {
  if ep.isStopped() {
    return ErrProcessorStopped
  }
  if ep.isSuspended() {
    return ErrQueueingSuspended
  }
  for _, event := range events {
    ep.queue <- event
  }
  return nil
}

Monitoring

It takes to expose:

  • gauge with actual length of ep.queue event channel
  • resident set size to tune the queue size parameter
  • counter with number of events received / processed
  • duration of batchProcessFunc

Example

Let’s generate and queue 100-element batch, stop the processor and try again:

package main

import (
  "context"
  "log"
  "strconv"
  "time"

  "processor/internal/model"
  "processor/internal/processor"
)

func main() {
  batchProcessFunc := processor.BatchProcessFunc(
    func(ctx context.Context, events map[string]model.Event) (succeed bool) {
      if len(events) == 0 {
        return true
      }
      log.Printf("processed %d unique events", len(events))
      return true
    },
  )

  ep := processor.New(
    processor.OptionQueueSize(100),
    processor.OptionBatchSize(10),
    processor.OptionBatchProcessInterval(time.Second),
    processor.OptionBatchProcessFunc(batchProcessFunc),
  )

  ctx := context.Background()

  go ep.Start(ctx)

  generateAndSaveEvents(ctx, ep, 100)

  ep.Stop(ctx)
  log.Println("processor is stopped")

  generateAndSaveEvents(ctx, ep, 100)
}

func generateAndSaveEvents(ctx context.Context, ep *processor.EventProcessor, batchSize int) {
  events := make(map[string]model.Event)
  for i := 0; i < batchSize; i++ {
    event := model.Event{ID: strconv.Itoa(i)}
    events[event.ID] = event
  }

  err := ep.Queue(ctx, events)
  if err != nil {
    log.Printf("discarded %d unique events: %s", len(events), err.Error())
    return
  }

  log.Printf("queued %d unique events", len(events))
  return
}

// Output:
//
// 2025/05/31 20:25:27 queued 100 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processed 10 unique events
// 2025/05/31 20:25:27 processor is stopped
// 2025/05/31 20:25:27 discarded 100 unique events: processor has been stopped