Транзакции и автокоммит¶
go-activerecord не поддерживает транзакции. Все операции выполняются в режиме автокоммита.
Почему нет транзакций¶
Распределённые данные¶
ORM проектировался для работы с данными, распределёнными по нескольким базам данных:
graph LR
subgraph App["Приложение"]
AR[go-activerecord]
end
AR --> DB1[(PostgreSQL<br/>users)]
AR --> DB2[(PostgreSQL<br/>orders)]
AR --> DB3[(Octopus<br/>sessions)]
Транзакция в рамках одной БД не решает проблему согласованности между разными БД:
// Псевдокод: что могло бы быть с транзакциями
tx1 := usersDB.Begin()
tx2 := ordersDB.Begin()
user.SetBalance(100)
user.Update(ctx) // tx1
order.SetStatus("paid")
order.Update(ctx) // tx2
tx1.Commit() // ✅ Успех
tx2.Commit() // ❌ Ошибка — user уже обновлён, order нет
Ложное чувство безопасности
Транзакции в рамках одной БД создают иллюзию согласованности, но не защищают от рассинхрона между разными хранилищами.
Заблокированные соединения¶
Открытая транзакция удерживает соединение из пула на всё время своего существования:
sequenceDiagram
participant App as Приложение
participant Pool as Пул соединений
participant DB as База данных
App->>Pool: Получить соединение
Pool->>DB: BEGIN
Note over Pool,DB: Соединение заблокировано
App->>DB: UPDATE users...
App->>App: Внешний API вызов (2 сек)
App->>DB: UPDATE orders...
App->>DB: COMMIT
Pool->>Pool: Соединение возвращено
Note over Pool: 2 секунды соединение<br/>было недоступно
Проблемы:
| Проблема | Последствие |
|---|---|
| Долгие транзакции | Исчерпание пула соединений |
| Забытый Rollback | Утечка соединений |
| Deadlock в БД | Каскадные таймауты |
| Высокая нагрузка | Все соединения заняты транзакциями |
Octopus/Tarantool
In-memory БД особенно чувствительны к долгим транзакциям — они блокируют не только соединение, но и могут блокировать другие операции из-за однопоточной модели.
Архитектурный подход¶
Автокоммит по умолчанию¶
Каждая операция — отдельный автокоммит:
user.SetName("Alice")
user.Update(ctx) // Автокоммит
order.SetStatus("paid")
order.Update(ctx) // Автокоммит
Мутаторы vs Идемпотентность¶
go-activerecord предоставляет два подхода к обновлению данных:
Мутаторы — атомарные операции¶
Мутаторы (Inc, Dec, SetBit, ClearBit) выполняются атомарно на уровне БД:
// Атомарный инкремент — безопасен при конкурентных запросах
user.IncBalance(100)
user.IncLoginCount(1)
user.Update(ctx)
// → UPDATE users SET balance = balance + 100, login_count = login_count + 1 WHERE id = ?
Когда использовать мутаторы:
| Сценарий | Пример |
|---|---|
| Счётчики | IncViewCount(1), IncLoginCount(1) |
| Балансы с конкурентным доступом | IncBalance(100), DecBalance(50) |
| Битовые флаги | SetBitFlags(FlagPremium), ClearBitFlags(FlagBanned) |
| Рейтинги | IncRating(1), DecRating(1) |
// Конкурентное обновление счётчика — мутатор безопасен
// Горутина 1:
user.IncViewCount(1) // → view_count = view_count + 1
// Горутина 2 (параллельно):
user.IncViewCount(1) // → view_count = view_count + 1
// Результат: view_count увеличился на 2 (корректно)
Идемпотентные операции — для повторных запросов¶
Идемпотентность нужна когда операция может быть повторена (retry, сбой сети, дубликат сообщения):
// Обработка платежа — должна быть идемпотентной
func ProcessPayment(ctx context.Context, paymentId string, amount int64) error {
// Проверка дубликата
existing, err := payment.SelectByExternalId(ctx, paymentId)
if err == nil {
return nil // Уже обработан
}
user, _ := user.SelectById(ctx, userId)
user.SetBalance(user.GetBalance() + amount) // Идемпотентно с проверкой выше
user.Update(ctx)
p := payment.New(ctx)
p.SetExternalId(paymentId) // Уникальный ключ для дедупликации
p.SetAmount(amount)
return p.Insert(ctx)
}
Когда нужна идемпотентность:
| Сценарий | Решение |
|---|---|
| Webhook от платёжной системы | Уникальный external_id |
| Обработка сообщений из очереди | idempotency_key |
| Retry после таймаута | Проверка существующей записи |
Комбинируйте подходы
Мутаторы и идемпотентность не исключают друг друга. Используйте мутаторы для атомарности, а идемпотентность — для защиты от дубликатов.
Валидация согласованности¶
При изменении данных в нескольких БД сначала записывайте намерение, потом выполняйте:
graph TD
subgraph Runtime["Основной поток"]
A[Записать в очередь] --> B[Обновить user]
B --> C[Обновить order]
C --> D[Подтвердить в очереди]
end
subgraph Validator["Валидатор (отдельный процесс)"]
E[Читать неподтверждённые] --> F{Таймаут?}
F -->|Да| G[Проверить состояние]
G --> H{Согласовано?}
H -->|Да| I[Подтвердить]
H -->|Нет| J[Исправить / Алерт]
end
Порядок важен:
- Сначала очередь — записываем намерение (что собираемся сделать)
- Потом изменения — выполняем операции в БД
- Подтверждение — помечаем операцию как завершённую
Если процесс упал между шагами 2 и 3, валидатор обнаружит неподтверждённую запись и проверит фактическое состояние.
// Запись намерения ПЕРЕД изменениями
func TransferBalance(ctx context.Context, fromId, toId, amount int64) error {
// 1. Записываем намерение
op := operation.New(ctx)
op.SetType("transfer")
op.SetFromUserId(fromId)
op.SetToUserId(toId)
op.SetAmount(amount)
op.SetStatus("pending")
if err := op.Insert(ctx); err != nil {
return err
}
// 2. Выполняем изменения
from, _ := user.SelectById(ctx, fromId)
from.IncBalance(-amount)
if err := from.Update(ctx); err != nil {
return err // Валидатор увидит pending операцию
}
to, _ := user.SelectById(ctx, toId)
to.IncBalance(amount)
if err := to.Update(ctx); err != nil {
return err // Валидатор увидит частично выполненную операцию
}
// 3. Подтверждаем
op.SetStatus("completed")
return op.Update(ctx)
}
Компенсирующие транзакции (Saga)¶
Для сложных операций используйте паттерн Saga:
func CreateOrderSaga(ctx context.Context, userId int64, amount int64) error {
// Шаг 1: Резервируем баланс
user, err := user.SelectById(ctx, userId)
if err != nil {
return err
}
user.SetReservedBalance(user.GetReservedBalance() + amount)
if err := user.Update(ctx); err != nil {
return err
}
// Шаг 2: Создаём заказ
order := order.New(ctx)
order.SetUserId(userId)
order.SetAmount(amount)
order.SetStatus("pending")
if err := order.Insert(ctx); err != nil {
// Компенсация: откатываем резерв
user.SetReservedBalance(user.GetReservedBalance() - amount)
user.Update(ctx)
return err
}
// Шаг 3: Подтверждаем
user.SetBalance(user.GetBalance() - amount)
user.SetReservedBalance(user.GetReservedBalance() - amount)
if err := user.Update(ctx); err != nil {
// Компенсация: отменяем заказ
order.SetStatus("cancelled")
order.Update(ctx)
return err
}
order.SetStatus("confirmed")
return order.Update(ctx)
}
Оптимистичная блокировка (SetAndCheck)¶
go-activerecord предоставляет методы SetAndCheck* для обнаружения гонок без блокировок на уровне БД.
Как это работает¶
sequenceDiagram
participant App as Приложение
participant AR as go-activerecord
participant DB as База данных
App->>DB: SELECT * FROM users WHERE id=1
DB-->>App: {id:1, balance:100, version:5}
Note over App: user.SetAndCheckVersion(6)
Note over App: user.SetBalance(150)
App->>AR: user.Update(ctx)
AR->>DB: UPDATE users SET balance=150, version=6<br/>WHERE id=1 AND version=5
alt Никто не менял version
DB-->>AR: affected: 1
AR-->>App: nil (успех)
else Кто-то изменил version
DB-->>AR: affected: 0
AR-->>App: error (гонка обнаружена)
end
SetAndCheck* запоминает текущее значение поля и добавляет его в WHERE при Update:
user, _ := user.SelectById(ctx, 123)
// user.GetVersion() == 5
// SetAndCheck запоминает текущее значение (5) и устанавливает новое (6)
user.SetAndCheckVersion(user.GetVersion() + 1)
user.SetBalance(150)
err := user.Update(ctx)
// SQL: UPDATE users SET balance=150, version=6 WHERE id=123 AND version=5
if err != nil {
// Гонка! Кто-то изменил version между SELECT и UPDATE
// Нужно перечитать и повторить
}
Преимущества перед блокировками¶
| Аспект | Пессимистичная блокировка (SELECT FOR UPDATE) | Оптимистичная (SetAndCheck) |
|---|---|---|
| Блокировка соединения | Да, на всё время транзакции | Нет |
| Deadlock | Возможен | Невозможен |
| Производительность | Низкая при конкуренции | Высокая |
| Retry при конфликте | Не нужен | Нужен |
| Подходит для | Редкие конфликты, критичные данные | Частые чтения, редкие записи |
Без deadlock
SetAndCheck не захватывает блокировки в БД, поэтому deadlock невозможен. При обнаружении гонки вы просто перечитываете данные и повторяете операцию.
Пример: безопасное обновление баланса¶
func UpdateBalanceWithRetry(ctx context.Context, userId int64, delta int64) error {
maxRetries := 3
for i := 0; i < maxRetries; i++ {
user, err := user.SelectById(ctx, userId)
if err != nil {
return err
}
// Проверяем version при Update
if err := user.SetAndCheckVersion(user.GetVersion() + 1); err != nil {
return err
}
user.SetBalance(user.GetBalance() + delta)
err = user.Update(ctx)
if err == nil {
return nil // Успех
}
// Гонка — повторяем
log.Printf("Race detected, retry %d/%d", i+1, maxRetries)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
Когда использовать SetAndCheck¶
| Сценарий | Рекомендация |
|---|---|
| Обновление баланса с проверкой | SetAndCheckVersion |
| Изменение статуса (только из определённого) | SetAndCheckStatus |
| Редактирование профиля | SetAndCheckUpdatedAt |
| Конкурентное изменение настроек | SetAndCheckVersion |
// Изменение статуса только если он "pending"
order, _ := order.SelectById(ctx, orderId)
if order.GetStatus() != "pending" {
return fmt.Errorf("order is not pending")
}
// SetAndCheck гарантирует, что статус всё ещё "pending" в момент UPDATE
order.SetAndCheckStatus("processing")
if err := order.Update(ctx); err != nil {
// Кто-то уже изменил статус
return fmt.Errorf("order status changed concurrently")
}
Ограничения¶
- Можно вызвать
SetAndCheck*для поля только один раз (повторный вызов вернёт ошибку) - Нельзя комбинировать
SetAndCheck*иSet*для одного поля - При гонке нужно реализовать retry логику
Когда нужны транзакции¶
Если транзакции действительно необходимы (данные в одной БД, критичная согласованность), используйте драйвер напрямую:
PostgreSQL (pgx)¶
import "github.com/jackc/pgx/v5"
func WithTransaction(ctx context.Context, db *pgx.Conn, fn func(tx pgx.Tx) error) error {
tx, err := db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
if err := fn(tx); err != nil {
return err
}
return tx.Commit(ctx)
}
Вне go-activerecord
При использовании транзакций напрямую через драйвер, вы обходите go-activerecord. Объекты AR не знают о транзакции и работают через свои соединения.
Best Practices¶
1. Минимизируйте окно несогласованности¶
// ❌ Плохо: долгое окно несогласованности
user.SetStatus("processing")
user.Update(ctx)
result := callExternalAPI() // 2 секунды
user.SetStatus(result.Status)
user.Update(ctx)
// ✅ Хорошо: атомарное обновление
result := callExternalAPI()
user.SetStatus(result.Status)
user.SetProcessedAt(time.Now().Unix())
user.Update(ctx)
2. Используйте статусы и timestamps¶
type FieldsOrder struct {
Id int64 `ar:"primary_key;init_by_db"`
Status string `ar:"size:32"` // pending → processing → done
ProcessedAt int64 `ar:""` // Когда обработан
IdempotencyKey string `ar:"unique;size:64"` // Для дедупликации
}
3. Логируйте для аудита¶
user.SetBalance(newBalance)
if err := user.Update(ctx); err != nil {
return err
}
// Лог для восстановления
log.Info("Balance updated",
"user_id", user.GetId(),
"old_balance", oldBalance,
"new_balance", newBalance,
"operation_id", operationId,
)
4. Eventual Consistency¶
Примите, что данные в конечном итоге станут согласованными:
// Очередь на обработку
queue.Push(Event{
Type: "balance_changed",
UserId: user.GetId(),
Amount: amount,
})
// Воркер обрабатывает и валидирует
func ProcessEvents(ctx context.Context) {
for event := range queue.Consume() {
validateAndFix(ctx, event)
}
}
Следующие шаги¶
- CRUD операции — базовые операции
- BulkUpdate — особенности массового обновления
- Бэкенды — сравнение PostgreSQL и Octopus