Contents

Batch CRUD: SQL

This post contains information about batch CRUD operations using native mechanisms provided by RDBMS
Chosen dialect is PostgreSQL, but following snippets can be easily adopted to other management systems

Model

Record model

First, let’s define a simplified database record model:

package model

// Record is an abstract database record
type Record struct {
  ID         string // ID is an internal id we'll use in CRUD within our logic
  TenantID   string // TenantID is a synthetic tenant id (same for ID and ExternalID
  ExternalID string // ExternalID is an id of the record we've received from external service
}

We’ll use a little bit complicated case with multiple id’s to show how to work with ON CONFLICT cause with such conditions:

create table if not exists service_record
(
  id          text   not null primary key,
  tenant_id   text   not null,
  external_id text   not null,
  constraint service_record_unique_record unique (tenant_id, external_id)
);

Concurrency model

Create a common table for different locking approaches for Read and Write operations:

create table if not exists service_replica_lock
(
  kind       text    not null,
  ts         bigint  not null, -- current epoch timestamp (when acquired)
  ttl        integer not null, -- configurable timeout (release lock after)
  constraint service_replica_lock_unique_kind unique (kind)
);

See Distributed locks: SQL for more examples

Storage model

Storage will use native Go sql connector:

package storage

// Storage is a record data repository
type Storage struct {
  db *sql.DB
}

Controller model

Controller in our example only holds access to storage and logger:

package controller

// Controller is a single query entry point
type Controller struct {
  s      storage.Storage
  logger *slog.Logger
}

Read operations

Concurrency control

This time we may or may not need to read only by one replica too (by running some specific task)

So for using one replica at a time, we can simply use Rows Affected value while setting bigger timestamp value Register/re-register one record for this exact case using kind field:

insert into service_replica_lock (kind, ts, ttl) values ($1, $2, $3)
on conflict on constraint service_replica_lock_unique_kind do update set ttl = $3

Try to acquire the lock:

update service_replica_lock set ts = $1 where kind = $2 and ts + ttl < $1
-- UPDATE 1 // lock acquired, only one of the clients will receive this
-- or
-- UPDATE 0 // lock already acquired

Batch Read

Constructor Caller Signature
Storage Controller type RecordIterator func(RecordYieldFunc)
Storage Controller type RecordReadFunc func() (*Record, error)
Controller Storage type RecordYieldFunc func(RecordReadFunc) bool

We’ll create an iterator function that (in our case) will be iterating over records via rows cursor

Storage

In our repository we’ll define function BuildRecordIterator with:

  • select query with where clause
  • RecordIterator function returned with predefined RecordReadFunc (simple rows scan) and left RecordYieldFunc dependency
// BuildRecordIterator returns record iterator built for specified tenantID
func (s *Storage) BuildRecordIterator(
  ctx context.Context, 
  tenantID string,
) (model.RecordIterator, error) {
  
  const query = `
    select id,
           tenant_id,
           external_id
    from service_record
    where tenant_id = $1`
  rows, err := s.db.QueryContext(ctx, query, tenantID)
  if err != nil {
    return nil, err
  }
  
  return func(yieldFunc model.RecordYieldFunc) {
    if rows != nil {
      defer func() {
        _ = rows.Close()
      }()
    }
    readFunc := model.RecordReadFunc(
      func() (*model.Record, error) {
        record := model.Record{}
        err = rows.Scan(
          &record.ID,
          &record.TenantID,
          &record.ExternalID,
        )
        if err != nil {
          return nil, err
        }
        return &record, nil
      })
    for cont := true; cont; cont = rows.NextResultSet() {
      for rows.Next() {
        if !yieldFunc(readFunc) {
          return
        }
      }
    }
  }, nil
}

Controller

The place we’ll use our iterator from must contain RecordYieldFunc definition (with access to RecordReadFunc)

Passing the yield func to the iterator then launches the iteration process

// queryRecords iterates over all the records found for given tenantID
func (ctrl *Controller) queryRecords(
  ctx context.Context, 
  tenantID string,
) error {
  
  iterator, err := ctrl.s.BuildRecordIterator(ctx, tenantID)
  if err != nil {
    return err
  }

  yieldFunc := model.RecordYieldFunc(
    func(readFunc model.RecordReadFunc) bool {
      var record *model.Record
      record, err = readFunc()
      if err != nil {
        s.logger.Error("failed to read record",
          slog.Any("error", err),
        )
        return true // true = continue, false = break
      }
      s.logger.Debug("found record", 
        slog.String("id", record.ID),
        slog.String("tenant id", record.TenantID), 
        slog.String("external id", record.ExternalID)
      )
      return true // true = continue, false = break
    },
  )

  iterator(yieldFunc)
  return nil
}

Write operations

Concurrency control

Having multiple batch write replicas may bring up 2 possible workarounds:

  • every replica tries to create or update different data objects (unique by its identifier)
  • every replica tries to create or update the same data objects or of mixed type

Covering the second case we must avoid the possible deadlock (concurrent same record update)

Serialize concurrent transactions

The easiest way to process one batch write transaction at a time is to lock one specific row to update

To do that we have a special lock table, which is obviously better than lock rows within actual data

  1. Register one record for this exact case using kind field:
insert into service_replica_lock (kind) values ($1) on conflict do nothing;
  1. Try to lock when writing batch (lock will be acquired right after the previous transaction), otherwise wait:
select 1 from service_replica_lock where kind = $1 for update

Avoid concurrent transactions

On the other side, we can simply use pg_try_advisory_xact_lock

select pg_try_advisory_xact_lock($1)

But this time we’ll receive acquired / alreadyAcquired boolean immediately

So this solution is more convenient when it’s easier to cancel the transaction at all (changes can be canceled or rescheduled)

Batch Upsert

In general, the upsert function will have the following steps:

  1. pre-build insert query using multirow VALUES syntax and (optional) define ON CONFLICT conditions
  2. begin transaction
  3. execute lock part of the transaction
    (serialize by lock for update or avoid concurrency by pg_try_advisory_xact_lock)
  4. execute pre-built insert query
  5. commit transaction
// UpdateRecords updates or creates given records and returns the number of records changed
func (s *Storage) UpdateOrCreateRecords(
  ctx context.Context, 
  records map[string]model.Record, 
) (int64, error) {
  
  const (
    query = `
      insert into service_record (
        id,
        tenant_id,
        external_id
      )
      values %s
      on conflict do nothing` // or do update set additional_field = value
      fieldCount = 3 // id + tenant_id + ... = reflect.TypeOf(model.Record{}).NumField()
      lockKind = "record_batch_write"
  )
    
  // 1. pre-build insert query using multirow VALUES syntax

  valueStrings := make([]string, 0, len(records))
  valueArgs := make([]interface{}, 0, len(records)*fieldCount)

  var recordInd int

  for _, record := range records {
    valueString := fmt.Sprintf(
      "($%d, $%d, $%d)", 
      recordInd*fieldsCount+1, recordInd*fieldsCount+2, recordInd*fieldsCount+3,
    )
    valueStrings = append(valueStrings, valueString)
    valueArgs = append(valueArgs,
      GenerateRecordID(record),
      record.TenantID,
      record.ExternalID,
    )
    recordInd++
  }
    
  var recordsChanged int64

  err := func() error {
    // 2. begin transaction
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
      return err
    }
    defer func() {
      _ = tx.Rollback()
    }()
        
    // 3. execute lock part of the transaction
    _, err = tx.ExecContext(
      ctx, 
      `select 1 from service_lock where kind = $1 for update`,
      lockKind, 
    )
    if err != nil {
      return err
    }
        
    // 4. execute pre-built insert query
    stmt := fmt.Sprintf(query, strings.Join(valueStrings, ","))
    stmtRes, err := tx.ExecContext(ctx, stmt, valueArgs...)
    if err != nil { 
      return err
    }
        
    recordsChanged, _ = stmtRes.RowsAffected()
        
    // 5. commit transaction
    return tx.Commit()
  }()
    
  return recordsChanged, err
}

Example of query executed:

begin;
select 1 from service_replica_lock where kind = 'record_batch_write' for update;
insert into service_record (
  id,
  tenant_id,
  external_id
)
values ('record-1', 'tenant_a', 'external-tenant_a-record-1'),
       ('record-2', 'tenant_a', 'external-tenant_a-record-2'),
       ('record-3', 'tenant_a', 'external-tenant_a-record-3'),
       ------------------------------------------------------
       ('record-n', 'tenant_a', 'external-tenant_a-record-n')
       ---------------------------------- n = up to 65535 / 3
on conflict do nothing;
commit;

Limitations

model field count

This value is required to use, because SQL RDBMS will limit number of records that can be updated at a time

For example, Postgres can update up only to 65535 parameters, so the batch size will depend on this number:

$$ max(bs) = storage.maxFieldsPerQuery / fieldCount $$

record id generation

Integer-only record ids is a bad choice for distributed systems

On the other hand, RDBMS may not provide an easy workaround for the composite id generation you want to use

So for our example we hid the implementation of this logic within GenerateRecordID(record)

It is better to make this function depend on current timestamp to avoid CONFLICT scenarios with rescheduled updates of data that already exists

Monitoring

It takes to expose:

  • gauge with number of records processed within ctrl.queryRecords
  • gauge with number of records changed by storage.UpdateOrCreateRecords
  • duration of both ctrl.queryRecords and storage.UpdateOrCreateRecords