Goのジョブキューライブラリ River 使ってみた

こんにちは。サイオステクノロジーの和田です。前回はこちらでアウトボックスパターンという設計パターンを紹介しましたが、今回はその設計パターンを実現できる River というライブラリを使ってみたので紹介したいと思います。それではいきましょう。

River とは

River は Go 言語で書かれた PostgreSQL 専用のジョブキューライブラリです。PostgreSQL をバックエンドとして使用することで、アウトボックスパターンを簡単に実装できます。主な特徴として、Redis や RabbitMQ などの外部ジョブキューを必要とせず River 単体でジョブキューを管理することができます。

ディレクトリ構成

今回作るサンプルアプリは river_examples 配下に、以下のような構成で配置します。

river_examples
├── db
│   └── init.sql
├── docker-compose.yml
├── go.mod
├── go.sum
├── handlers
│   └── user.go
├── jobs
│   └── send_email.go
└── main.go

実装

まず、必要なパッケージを公式の手順に従ってインストールします。

go get github.com/riverqueue/river
go get github.com/riverqueue/river/riverdriver/riverpgxv5
go mod tidy

River を動かすためには PostgreSQL が必須なので、Docker Compose で DB を作っていきます。
ここでは、River 本体が使うテーブルのマイグレーションと、今回サンプルで作成するアプリケーションで使うテーブルのマイグレーションを行います。

まず最初にアプリケーションで使うユーザーテーブルを作成するための SQL を作ります。

-- filepath: db/init.sql

CREATE TABLE IF NOT EXISTS users (
  id BIGSERIAL PRIMARY KEY,
  email TEXT NOT NULL UNIQUE,
  password_hash TEXT NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

次に、postgres 本体とマイグレーション用のコンテナを Docker Compose で作ります。
River にはマイグレーション用の go コマンドがあるので、そちらを利用してマイグレーションを行っています。

# filepath: docker-compose.yml
services:
  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mydb
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data
      - ./db/init.sql:/docker-entrypoint-initdb.d/001-init.sql:ro
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d mydb"]
      interval: 2s
      timeout: 5s
      retries: 30

  river-migrate:
    image: golang:1.24
    depends_on:
      db:
        condition: service_healthy
    environment:
      DATABASE_URL: postgres://user:password@db:5432/mydb?sslmode=disable
    command:
      - bash
      - -c
      - |
        set -euo pipefail
        go install github.com/riverqueue/river/cmd/river@v0.28.0
        /go/bin/river migrate-up --line main --database-url "$$DATABASE_URL"
    restart: "no"

volumes:
  pgdata:

以下のコマンドで作成します。

cd river_examples

docker compose up -d db

docker compose run --rm river-migrate

1. ジョブの定義

続いて、メール送信ジョブを定義します

// filepath: jobs/send_email.go
package jobs

import (
    "context"
    "fmt"
    "log"

    "github.com/riverqueue/river"
)

// SendEmailArgs はメール送信ジョブの引数
type SendEmailArgs struct {
    UserID  int64  `json:"user_id"`
    Email   string `json:"email"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

// Kind はジョブの種類を識別する名前を返す
func (SendEmailArgs) Kind() string {
    return "send_email"
}

// SendEmailWorker はメール送信を実行するワーカー
type SendEmailWorker struct {
    river.WorkerDefaults[SendEmailArgs]
}

// Work は実際のメール送信処理を実行
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
    log.Printf("Sending email to %s (UserID: %d)", job.Args.Email, job.Args.UserID)
    log.Printf("Subject: %s", job.Args.Subject)

    // ここで実際のメール送信処理を行う
    // 例:外部のメール配信サービスAPIを呼び出す
    err := sendEmailViaSMTP(job.Args.Email, job.Args.Subject, job.Args.Body)
    if err != nil {
        return fmt.Errorf("failed to send email: %w", err)
    }

    log.Printf("Email sent successfully to %s", job.Args.Email)
    return nil
}

func sendEmailViaSMTP(email, subject, body string) error {
    // 実際のメール送信ロジック
    // ここでは簡略化のため省略
    return nil
}

2. メイン処理

メイン処理では主に以下を行います。

  • River クライアントの初期化
  • River ワーカーの登録
  • ユーザー登録処理
// filepath: main.go
package main

import (
    "context"
    "log"
    "time"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"

    "river_examples/handlers"
    "river_examples/jobs"
)

func setupRiver(ctx context.Context, dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
    workers := river.NewWorkers()

    // メール送信ワーカーを登録
    river.AddWorker(workers, &jobs.SendEmailWorker{})

    riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
        Queues: map[string]river.QueueConfig{
            river.QueueDefault: {MaxWorkers: 100},
        },
        Workers: workers,
    })
    if err != nil {
        return nil, err
    }

    return riverClient, nil
}

func main() {
    ctx := context.Background()

    // PostgreSQL接続プールの作成
    dbPool, err := pgxpool.New(ctx, "postgres://user:password@localhost:5432/mydb?sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer dbPool.Close()

    // Riverクライアントのセットアップ
    riverClient, err := setupRiver(ctx, dbPool)
    if err != nil {
        log.Fatal(err)
    }

    // Riverワーカーを起動
    if err := riverClient.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer riverClient.Stop(ctx)

    log.Println("River worker started")

    // アプリケーションのメイン処理
    userHandler := handlers.NewUserHandler(dbPool, riverClient)
    if err := userHandler.RegisterUser(ctx, "test@example.com", "password"); err != nil {
        log.Fatal(err)
    }

    // 動作確認用(ジョブが処理されるまで少し待つ)
    time.Sleep(2 * time.Second)
}

3. ユーザー登録処理

ユーザー登録時に、DB への書き込みとジョブの投入を同じトランザクションで行います

// filepath: handlers/user.go
package handlers

import (
    "context"
    "fmt"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/riverqueue/river"

    "river_examples/jobs"
)

type UserHandler struct {
    dbPool      *pgxpool.Pool
    riverClient *river.Client[pgx.Tx]
}

func NewUserHandler(dbPool *pgxpool.Pool, riverClient *river.Client[pgx.Tx]) *UserHandler {
    return &UserHandler{
        dbPool:      dbPool,
        riverClient: riverClient,
    }
}

func (h *UserHandler) RegisterUser(ctx context.Context, email, password string) error {
    // トランザクション開始
    tx, err := h.dbPool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }
    defer tx.Rollback(ctx)

    // 1. ユーザーテーブルに挿入
    var userID int64
    err = tx.QueryRow(ctx, `
        INSERT INTO users (email, password_hash, created_at)
        VALUES ($1, $2, NOW())
        RETURNING id
    `, email, hashPassword(password)).Scan(&userID)
    if err != nil {
        return fmt.Errorf("failed to insert user: %w", err)
    }

    // 2. メール送信ジョブをエンキュー(同じトランザクション内)
    _, err = h.riverClient.InsertTx(ctx, tx, jobs.SendEmailArgs{
        UserID:  userID,
        Email:   email,
        Subject: "Welcome to Our Service!",
        Body:    "Thank you for registering. Please verify your email...",
    }, nil)
    if err != nil {
        return fmt.Errorf("failed to enqueue email job: %w", err)
    }

    // 3. トランザクションをコミット
    if err := tx.Commit(ctx); err != nil {
        return fmt.Errorf("failed to commit transaction: %w", err)
    }

    return nil
}

func hashPassword(password string) string {
    // パスワードハッシュ化(今回はそのまま登録)
    return password
}

サンプルでは UserHandlermain から呼び出す形にしていて、実行すると users への INSERT と River のジョブテーブルへの INSERT が同一トランザクションで行われ、ワーカーがジョブを処理します。

動作の流れ

実際の処理の流れは以下のようになります。

  1. ユーザー登録リクエストが来る
  2. トランザクション開始
  3. ユーザーテーブルに挿入(INSERT)
  4. River のジョブテーブルに挿入(InsertTx)
  5. トランザクションコミット
  6. River ワーカーがジョブテーブルを監視
  7. メール送信ジョブを実行

これにより、ユーザー登録が成功した場合のみメール送信ジョブが確実に実行され、前回説明したアウトボックスパターンが実現できます。

まとめ

今回は、River を使ったサンプルアプリを作ってみました。River を使うことで、 PostgreSQL のトランザクション機能を活用し、アウトボックスパターンを簡単に実装できます。データベースへの書き込みとジョブの投入を単一のトランザクションで行えるため、データの整合性を保ちながら非同期処理を実現できます。

ご覧いただきありがとうございます! この投稿はお役に立ちましたか?

役に立った 役に立たなかった

0人がこの投稿は役に立ったと言っています。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です