Batch CRUD: SQL
This post contains information about batch CRUD operations using native mechanisms provided by RDBMS
Chosen dialect is PostgreSQL, but following snippets can be easily adopted to other management systems
Model
Record model
First, let’s define a simplified database record model:
package model
// Record is an abstract database record
type Record struct {
ID string // ID is an internal id we'll use in CRUD within our logic
TenantID string // TenantID is a synthetic tenant id (same for ID and ExternalID
ExternalID string // ExternalID is an id of the record we've received from external service
}
We’ll use a little bit complicated case with multiple id’s to show how to work with ON CONFLICT cause with such conditions:
create table if not exists service_record
(
id text not null primary key,
tenant_id text not null,
external_id text not null,
constraint service_record_unique_record unique (tenant_id, external_id)
);
Concurrency model
Create a common table for different locking approaches for Read and Write operations:
create table if not exists service_replica_lock
(
kind text not null,
ts bigint not null, -- current epoch timestamp (when acquired)
ttl integer not null, -- configurable timeout (release lock after)
constraint service_replica_lock_unique_kind unique (kind)
);
See Distributed locks: SQL for more examples
Storage model
Storage will use native Go sql connector:
package storage
// Storage is a record data repository
type Storage struct {
db *sql.DB
}
Controller model
Controller in our example only holds access to storage and logger:
package controller
// Controller is a single query entry point
type Controller struct {
s storage.Storage
logger *slog.Logger
}
Read operations
Concurrency control
This time we may or may not need to read only by one replica too (by running some specific task)
So for using one replica at a time, we can simply use Rows Affected value while setting bigger timestamp value
Register/re-register one record for this exact case using kind field:
insert into service_replica_lock (kind, ts, ttl) values ($1, $2, $3)
on conflict on constraint service_replica_lock_unique_kind do update set ttl = $3
Try to acquire the lock:
update service_replica_lock set ts = $1 where kind = $2 and ts + ttl < $1
-- UPDATE 1 // lock acquired, only one of the clients will receive this
-- or
-- UPDATE 0 // lock already acquired
Batch Read
| Constructor | Caller | Signature |
|---|---|---|
| Storage | Controller | type RecordIterator func(RecordYieldFunc) |
| Storage | Controller | type RecordReadFunc func() (*Record, error) |
| Controller | Storage | type RecordYieldFunc func(RecordReadFunc) bool |
We’ll create an iterator function that (in our case) will be iterating over records via rows cursor
Storage
In our repository we’ll define function BuildRecordIterator with:
- select query with where clause
- RecordIterator function returned with predefined
RecordReadFunc(simple rows scan) and leftRecordYieldFuncdependency
// BuildRecordIterator returns record iterator built for specified tenantID
func (s *Storage) BuildRecordIterator(
ctx context.Context,
tenantID string,
) (model.RecordIterator, error) {
const query = `
select id,
tenant_id,
external_id
from service_record
where tenant_id = $1`
rows, err := s.db.QueryContext(ctx, query, tenantID)
if err != nil {
return nil, err
}
return func(yieldFunc model.RecordYieldFunc) {
if rows != nil {
defer func() {
_ = rows.Close()
}()
}
readFunc := model.RecordReadFunc(
func() (*model.Record, error) {
record := model.Record{}
err = rows.Scan(
&record.ID,
&record.TenantID,
&record.ExternalID,
)
if err != nil {
return nil, err
}
return &record, nil
})
for cont := true; cont; cont = rows.NextResultSet() {
for rows.Next() {
if !yieldFunc(readFunc) {
return
}
}
}
}, nil
}
Controller
The place we’ll use our iterator from must contain RecordYieldFunc definition (with access to RecordReadFunc)
Passing the yield func to the iterator then launches the iteration process
// queryRecords iterates over all the records found for given tenantID
func (ctrl *Controller) queryRecords(
ctx context.Context,
tenantID string,
) error {
iterator, err := ctrl.s.BuildRecordIterator(ctx, tenantID)
if err != nil {
return err
}
yieldFunc := model.RecordYieldFunc(
func(readFunc model.RecordReadFunc) bool {
var record *model.Record
record, err = readFunc()
if err != nil {
s.logger.Error("failed to read record",
slog.Any("error", err),
)
return true // true = continue, false = break
}
s.logger.Debug("found record",
slog.String("id", record.ID),
slog.String("tenant id", record.TenantID),
slog.String("external id", record.ExternalID)
)
return true // true = continue, false = break
},
)
iterator(yieldFunc)
return nil
}
Write operations
Concurrency control
Having multiple batch write replicas may bring up 2 possible workarounds:
- every replica tries to create or update different data objects (unique by its identifier)
- every replica tries to create or update the same data objects or of mixed type
Covering the second case we must avoid the possible deadlock (concurrent same record update)
Serialize concurrent transactions
The easiest way to process one batch write transaction at a time is to lock one specific row to update
To do that we have a special lock table, which is obviously better than lock rows within actual data
- Register one record for this exact case using
kindfield:
insert into service_replica_lock (kind) values ($1) on conflict do nothing;
- Try to lock when writing batch (lock will be acquired right after the previous transaction), otherwise wait:
select 1 from service_replica_lock where kind = $1 for update
Avoid concurrent transactions
On the other side, we can simply use pg_try_advisory_xact_lock
select pg_try_advisory_xact_lock($1)
But this time we’ll receive acquired / alreadyAcquired boolean immediately
So this solution is more convenient when it’s easier to cancel the transaction at all (changes can be canceled or rescheduled)
Batch Upsert
In general, the upsert function will have the following steps:
- pre-build insert query using multirow VALUES syntax and (optional) define ON CONFLICT conditions
- begin transaction
- execute lock part of the transaction
(serialize bylock for updateor avoid concurrency bypg_try_advisory_xact_lock) - execute pre-built insert query
- commit transaction
// UpdateRecords updates or creates given records and returns the number of records changed
func (s *Storage) UpdateOrCreateRecords(
ctx context.Context,
records map[string]model.Record,
) (int64, error) {
const (
query = `
insert into service_record (
id,
tenant_id,
external_id
)
values %s
on conflict do nothing` // or do update set additional_field = value
fieldCount = 3 // id + tenant_id + ... = reflect.TypeOf(model.Record{}).NumField()
lockKind = "record_batch_write"
)
// 1. pre-build insert query using multirow VALUES syntax
valueStrings := make([]string, 0, len(records))
valueArgs := make([]interface{}, 0, len(records)*fieldCount)
var recordInd int
for _, record := range records {
valueString := fmt.Sprintf(
"($%d, $%d, $%d)",
recordInd*fieldsCount+1, recordInd*fieldsCount+2, recordInd*fieldsCount+3,
)
valueStrings = append(valueStrings, valueString)
valueArgs = append(valueArgs,
GenerateRecordID(record),
record.TenantID,
record.ExternalID,
)
recordInd++
}
var recordsChanged int64
err := func() error {
// 2. begin transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
// 3. execute lock part of the transaction
_, err = tx.ExecContext(
ctx,
`select 1 from service_lock where kind = $1 for update`,
lockKind,
)
if err != nil {
return err
}
// 4. execute pre-built insert query
stmt := fmt.Sprintf(query, strings.Join(valueStrings, ","))
stmtRes, err := tx.ExecContext(ctx, stmt, valueArgs...)
if err != nil {
return err
}
recordsChanged, _ = stmtRes.RowsAffected()
// 5. commit transaction
return tx.Commit()
}()
return recordsChanged, err
}
Example of query executed:
begin;
select 1 from service_replica_lock where kind = 'record_batch_write' for update;
insert into service_record (
id,
tenant_id,
external_id
)
values ('record-1', 'tenant_a', 'external-tenant_a-record-1'),
('record-2', 'tenant_a', 'external-tenant_a-record-2'),
('record-3', 'tenant_a', 'external-tenant_a-record-3'),
------------------------------------------------------
('record-n', 'tenant_a', 'external-tenant_a-record-n')
---------------------------------- n = up to 65535 / 3
on conflict do nothing;
commit;
Limitations
model field count
This value is required to use, because SQL RDBMS will limit number of records that can be updated at a time
For example, Postgres can update up only to 65535 parameters, so the batch size will depend on this number:
$$ max(bs) = storage.maxFieldsPerQuery / fieldCount $$
record id generation
Integer-only record ids is a bad choice for distributed systems
On the other hand, RDBMS may not provide an easy workaround for the composite id generation you want to use
So for our example we hid the implementation of this logic within GenerateRecordID(record)
It is better to make this function depend on current timestamp to avoid CONFLICT scenarios with rescheduled updates of data that already exists
Monitoring
It takes to expose:
- gauge with number of records processed within
ctrl.queryRecords - gauge with number of records changed by
storage.UpdateOrCreateRecords - duration of both
ctrl.queryRecordsandstorage.UpdateOrCreateRecords