diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8779fef34..f06aec6a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -171,7 +171,7 @@ jobs: - name: Stop shared PgDog run: bash -lc 'source integration/common.sh; stop_pgdog' - name: Data sync - run: bash integration/copy_data/dev.sh + run: bash integration/copy_data/run.sh - name: Python run: bash integration/python/run.sh - name: Load balancer diff --git a/.rwx/integration.yml b/.rwx/integration.yml index b0152d527..8051c7704 100644 --- a/.rwx/integration.yml +++ b/.rwx/integration.yml @@ -423,13 +423,14 @@ tasks: - key: integration-copy-data use: integration-build-pgdog-cov + docker: true background-processes: *postgres-bg–processes - timeout: 15m + timeout: 20m run: | export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/copy-data-%p-%m.profraw" bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 10m bash integration/copy_data/dev.sh + timeout --signal=TERM --kill-after=30s 18m bash integration/copy_data/run.sh cargo llvm-cov report --release --package pgdog --lcov --output-path copy-data.lcov outputs: diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index aa70e980a..c1d94a18d 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -92,6 +92,8 @@ "regex_parser_limit": 1000, "reload_schema_on_ddl": true, "resharding_copy_format": "binary", + "resharding_copy_retry_max_attempts": 5, + "resharding_copy_retry_min_delay": 1000, "resharding_parallel_copies": 1, "rollback_timeout": 5000, "server_lifetime": 86400000, @@ -964,6 +966,20 @@ "$ref": "#/$defs/CopyFormat", "default": "binary" }, + "resharding_copy_retry_max_attempts": { + "description": "Maximum number of retries for a failed table copy during resharding (per-table).\nRetries use exponential backoff starting at `resharding_copy_retry_min_delay`.\n_Default:_ `5`", + "type": "integer", + "format": "uint", + "default": 5, + "minimum": 0 + }, + "resharding_copy_retry_min_delay": { + "description": "Base delay in milliseconds between table copy retries.\nEach successive attempt doubles the delay, capped at 32×.\n_Default:_ `1000`", + "type": "integer", + "format": "uint64", + "default": 1000, + "minimum": 0 + }, "resharding_parallel_copies": { "description": "How many parallel copies to launch, irrespective of the number of available replicas.", "type": "integer", diff --git a/docs/RESHARDING.md b/docs/RESHARDING.md new file mode 100644 index 000000000..c65d7f36b --- /dev/null +++ b/docs/RESHARDING.md @@ -0,0 +1,292 @@ +# Resharding — Implementation + +This document describes how the resharding pipeline works at the code level. For the user-facing +prerequisites, step-by-step guide, and cutover configuration see +[Resharding Postgres](https://docs.pgdog.dev/features/sharding/resharding/) and the companion +blog post [Shard Postgres with one command](https://pgdog.dev/blog/shard-postgres-with-one-command). +For sharding routing internals see [SHARDING.md](./SHARDING.md). + +--- + +## Entry point — `RESHARD` command + +```sql +RESHARD ; +``` + +Issued against the admin database. Parsed in [`pgdog/src/admin/reshard.rs`](../pgdog/src/admin/reshard.rs), which calls +`Orchestrator::new(source, destination, publication, slot_name)` and then +`orchestrator.replicate_and_cutover().await`. + +> **Multi-node deployments:** Traffic cutover via `RESHARD` is supported on single-node PgDog only. +> The [Enterprise Edition control plane](https://docs.pgdog.dev/enterprise_edition/control_plane/) +> is required for coordinated cutover across multiple PgDog containers. + +--- + +## Orchestrator + +`Orchestrator` in [`pgdog/src/backend/replication/logical/orchestrator.rs`](../pgdog/src/backend/replication/logical/orchestrator.rs) owns: +- `source: Cluster` / `destination: Cluster` — connection handles to the two database clusters +- `publisher: Arc>` — manages replication slots, table list, and lag tracking +- `replication_slot: String` — auto-generated as `__pgdog_repl_` unless overridden + +`replicate_and_cutover()` is the top-level method and calls the five steps below in sequence: + +```mermaid +flowchart LR + A["1. load_schema
pg_dump on source"] + B["2. schema_sync_pre
pre-data to dest
reload schema cache"] + C["3. data_sync
ParallelSyncManager
binary COPY"] + D["4. schema_sync_post
secondary indexes"] + E["5. replicate().cutover()
WAL drain
traffic swap"] + + A --> B --> C --> D --> E +``` + +--- + +## Step 1 — Schema dump + +`Orchestrator::load_schema()` creates a `PgDump` ([`pgdog/src/backend/schema/sync/pg_dump.rs`](../pgdog/src/backend/schema/sync/pg_dump.rs)) +with the source cluster and publication name, calls `pg_dump.dump().await`, and stores the +`PgDumpOutput` on the orchestrator. This output carries pre-data (tables, types, extensions, +primary key constraints), secondary index DDL, post-cutover operations, and sequences — split +into `SyncState` phases so they can be applied in the right order later. + +--- + +## Step 2 — Pre-data schema sync + +`schema_sync_pre()` restores `SyncState::PreData` from the dump to the destination cluster, then: +1. Calls `reload_from_existing()` to refresh PgDog's in-memory schema cache so subsequent routing + decisions reflect the new destination schema. +2. Re-fetches `source` and `destination` clusters from `databases()` (addresses may have changed + after the reload). +3. If the destination has `RewriteMode::RewriteOmni`, installs the sharded sequence schema via + `Schema::install()`. + +> **Prerequisite:** all tables in the publication must have a primary key. `Table::valid()` in +> [`pgdog/src/backend/replication/logical/publisher/table.rs`](../pgdog/src/backend/replication/logical/publisher/table.rs) checks this and returns +> `Error::NoPrimaryKey(table)` before any data moves. Without a PK, the upsert conflict target +> is undefined and the replication stream cannot be made idempotent. + +--- + +## Step 3 — Data sync (parallel COPY) + +`Orchestrator::data_sync()` delegates to `Publisher::data_sync()`, which builds a +`ParallelSyncManager` and calls `manager.run().await`. + +### ParallelSyncManager ([`publisher/parallel_sync.rs`](../pgdog/src/backend/replication/logical/publisher/parallel_sync.rs)) + +`ParallelSyncManager::new()` takes the table list, a set of source replica connection pools, and +the destination cluster. It sizes a `Semaphore` to +`replicas.len() × dest.resharding_parallel_copies()`. Each table is spawned as a `tokio::spawn` +task via `ParallelSync::run()`. All tasks share an `UnboundedSender`; the manager collects +completions via `rx.recv()`. Replicas are round-robined across tasks. + +> **Replica isolation:** replicas tagged `resharding_only = true` in `pgdog.toml` are included +> here and excluded from normal application traffic. The `Semaphore` ensures the source replicas +> and destination shards are not overwhelmed. + +> **WAL disk space:** each per-table `ReplicationSlot` created during the copy prevents PostgreSQL +> from recycling WAL on the source until the slot is drained. Estimate WAL write rate × copy +> duration and provision that headroom before starting. An orphaned slot from a failed reshard +> accumulates WAL indefinitely — drop it before retrying (see "When things go wrong" below). + +### Per-table copy flow ([`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs)) + +Each task performs this sequence against its assigned source replica: + +1. Creates a `CopySubscriber` — opens connections to all destination shards. +2. Creates a `ReplicationSlot::data_sync()` — opens a streaming replication connection to the + source replica. +3. `slot.create_slot()` — creates a **temporary** logical replication slot, returning the current + LSN. This pins the WAL position atomically inside the same transaction as the copy. +4. `copy.start()` — issues `COPY table TO STDOUT (FORMAT BINARY)` on the source. +5. Streams each row through `copy_sub.copy_data(row)` — the `CopySubscriber` runs the same + `ContextBuilder` → `Context::apply()` sharding pipeline used for live queries, and forwards + each row to the correct destination shard(s). +6. `copy_sub.copy_done()` — sends `CopyDone` to each destination shard, flushes, disconnects. +7. `slot.start_replication()` + drain loop — replays any WAL accumulated since slot creation, + then sends a status update confirming the slot position. The slot is `TEMPORARY` and is + automatically dropped when the replication connection closes. +8. `COMMIT` closes the transaction on the source replica. + +The recorded LSN becomes the replay watermark for that table's WAL stream in Step 5. + +--- + +## Step 4 — Post-data schema sync + +`schema_sync_post()` restores `SyncState::PostData` — secondary indexes, non-PK constraints, +and any other DDL that was deferred. Deferring index creation until after the bulk copy avoids +index maintenance overhead during the high-throughput copy phase. + +--- + +## Step 5 — Replication and cutover + +`replicate()` creates a `ReplicationWaiter` that wraps a `Waiter` from `Publisher::replicate()`. +`ReplicationWaiter::cutover()` then runs two serial wait phases followed by the atomic swap. + +### Publisher and StreamSubscriber + +`Publisher` in [`publisher/publisher_impl.rs`](../pgdog/src/backend/replication/logical/publisher/publisher_impl.rs) owns the replication slot, table list, and lag map. +It opens a logical replication connection to the source and streams `XLogPayload` messages to +`StreamSubscriber` in [`subscriber/stream.rs`](../pgdog/src/backend/replication/logical/subscriber/stream.rs). + +`StreamSubscriber` maintains: +- `relations: HashMap` — table OID metadata sent once per connection +- `statements: HashMap` — one set of prepared statements per table OID, + generated once from `Table::insert()` / `update()` / `delete()`: + - `insert`: `INSERT INTO "schema"."table" ($1,$2,...) ON CONFLICT (pk_cols) DO UPDATE SET non_pk=$N` + - `update`: `UPDATE "schema"."table" SET non_pk=$N WHERE pk=$M` + - `delete`: `DELETE FROM "schema"."table" WHERE pk=$N` +- `table_lsns: HashMap` — per-table replay watermark (set from Step 3 LSNs) +- `connections: Vec` — one open connection per destination shard + +**Per-message handling:** + +| WAL message | Action | +|---|---| +| `Insert` | Check `lsn_applied(oid)` → if above watermark, run upsert prepared statement on correct shard | +| `Update` | PK change → decomposed into `delete(old) + insert(new)`; no PK change → `update` statement | +| `Delete` | `delete` prepared statement on correct shard | +| `Commit` | Send `Sync` to all open shard connections to close the transaction | + +**LSN guard** (`lsn_applied()`): if `current_lsn ≤ table_lsns[oid]`, the row was already +bulk-copied in Step 3 and is skipped. Watermarks advance per-table on `COMMIT`. + +**Omnisharded tables** (`statements.omni = true`): upsert is sent to all shards simultaneously. + +**Routing**: `StreamContext::shard()` in [`subscriber/context.rs`](../pgdog/src/backend/replication/logical/subscriber/context.rs) runs `ContextBuilder` + `Context::apply()` on the sharding +key column extracted from the WAL tuple — identical to the live query routing path. + +### Cutover phases + +**Phase 1 — `wait_for_replication()`**: polls lag every 1 second. When +`lag ≤ cutover_traffic_stop_threshold`: +1. Calls `maintenance_mode::start()` — new queries queue behind a barrier. +2. Calls `cancel_all(source_db)` — cancels any queries already in flight. + +**Phase 2 — `wait_for_cutover()`**: polls at 50 ms intervals. Three independent triggers can fire +cutover (whichever comes first): + +| Trigger | Config key | Action | +|---|---|---| +| `lag ≤ threshold` | `cutover_replication_lag_threshold` | `CutoverReason::Lag` → proceed | +| elapsed ≥ timeout | `cutover_timeout` | `CutoverReason::Timeout` → proceed or abort (see `cutover_timeout_action`) | +| no transactions for N ms | `cutover_last_transaction_delay` | `CutoverReason::LastTransaction` → proceed | + +**Point of no return** — the `ok_or_abort!` macro wraps every subsequent call. Any failure resumes +traffic immediately via `maintenance_mode::stop()` and returns an error. Steps in order: + +1. `publisher.request_stop()` + `waiter.wait()` — stops the replication stream; drains remaining WAL. +2. `schema_sync_cutover()` — applies `SyncState::Cutover` operations (e.g. drops sequences that + won't be used in the sharded cluster). +3. `cutover(source_db, dest_db)` in [`pgdog/src/backend/databases.rs`](../pgdog/src/backend/databases.rs) — atomically swaps source and + destination in the in-memory routing table. +4. `orchestrator.refresh()` — re-fetches both clusters from `databases()` so the orchestrator now + treats the new cluster as source for reverse replication. +5. `schema_sync_post_cutover()` — applies `SyncState::PostCutover` (removes blockers that would + prevent reverse replication, such as unique constraints on sequence columns). +6. `orchestrator.replicate()` — starts reverse replication (new cluster → old cluster) as a + background `AsyncTasks` task. This enables rollback without data loss. +7. `maintenance_mode::stop()` — releases the barrier; queued and new queries flow to the new cluster. + +--- + +## Error handling and fault tolerance + +### Pre-cutover failures — plain propagation + +Steps 1–4 (`load_schema`, `schema_sync_pre`, `data_sync`, `schema_sync_post`) propagate errors +with `?` directly from `replicate_and_cutover()`. Maintenance mode is never entered during these +steps. A failure here leaves traffic unaffected and the source untouched, making a full restart safe. + +### Schema DDL — intentional error tolerance + +`schema_sync_pre`, `schema_sync_post`, `schema_sync_cutover`, and `schema_sync_post_cutover` are +all called with `ignore_errors = true`. The `PgDumpOutput::restore()` method logs errors and +continues when this flag is set. The intent is to tolerate pre-existing objects on the destination +— a common condition when a previous reshard attempt failed mid-schema-sync and left partial DDL +behind. Re-running `RESHARD` after such a failure will not abort on `table already exists` or +similar conflicts. + +### Data sync — abort propagation and cooperative cancellation + +[`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs) runs the COPY row loop under a `tokio::select!` that races two futures: +the next row from the source, and `AbortSignal::aborted()`. `AbortSignal` wraps the closed-state +of the `UnboundedSender` shared with `ParallelSyncManager` — it resolves when the channel is +dropped. If the channel closes mid-copy (e.g. because another table's task failed and the manager +is torn down), the loop returns `Error::CopyAborted`. The task does not need to be explicitly +cancelled. + +[`ParallelSync::run()`](../pgdog/src/backend/replication/logical/publisher/parallel_sync.rs) checks `tx.is_closed()` before acquiring the semaphore permit. A task that +wakes after the channel is already closed returns `Error::DataSyncAborted` immediately without +starting a copy. + +Error propagation from the manager: `run()` drives completion via `rx.recv()`. The first `Err` +returned by any task surfaces via `table?` and aborts the manager's loop. Remaining tasks run to +completion or abort via their own `AbortSignal`, but their results are ignored once the channel +is dropped. + +### Temporary vs permanent replication slots + +Per-table slots created in [`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs) are `TEMPORARY` — PostgreSQL drops them +automatically when the replication connection closes, including on error or panic. A failed copy +task leaves no orphaned per-table slot. + +The `Publisher`'s named replication slot (the one used for the WAL streaming phase) is permanent. +[`Publisher::cleanup()`](../pgdog/src/backend/replication/logical/publisher/publisher_impl.rs) drops it by calling `slot.drop_slot()`, which issues +`DROP_REPLICATION_SLOT "name" WAIT` over the replication protocol connection. `cleanup()` is an +explicit method on `Orchestrator` — it is not called automatically inside `replicate_and_cutover()`. +If the orchestrator is dropped after Step 5 begins but before `cleanup()` is called (e.g. a +process crash), the permanent slot survives and continues accumulating WAL on the source. + +### The `ok_or_abort!` macro — guaranteed traffic resumption after cutover starts + +```rust +macro_rules! ok_or_abort { + ($expr:expr) => { + match $expr { + Ok(res) => res, + Err(err) => { + maintenance_mode::stop(); + cutover_state(CutoverState::Abort { error: err.to_string() }); + return Err(err.into()); + } + } + }; +} +``` + +Once `maintenance_mode::start()` is called in `wait_for_replication()`, traffic is paused. +`ok_or_abort!` is the only place that calls `maintenance_mode::stop()` for the remaining steps. +Every call after the point of no return — `waiter.wait()`, `schema_sync_cutover()`, `cutover()`, +`orchestrator.refresh()`, `schema_sync_post_cutover()`, `orchestrator.replicate()` — is wrapped +in it. This guarantees traffic always resumes, regardless of which step fails. + +The macro also transitions the global `CutoverState` to `Abort`, which is visible via +`SHOW REPLICATION_SLOTS` in the admin database. + +### AbortTimeout — the one pre-point-of-no-return stop + +When `cutover_timeout_action = "abort"` and the timeout fires in `wait_for_cutover()`, the code +explicitly calls `maintenance_mode::stop()` before returning `Err(Error::AbortTimeout)`. This is +the only code path that stops maintenance mode without being inside `ok_or_abort!` — it is the +case where the cutover was never attempted, so no data was moved and no swap occurred. + +### Idempotency guarantees + +Several mechanisms make it safe to replay data across a restart: + +| Mechanism | Where | Effect | +|---|---|---| +| Temporary replication slots | `Table::data_sync()` | Auto-dropped on connection close; no orphaned per-table slots | +| `ignore_errors = true` | All schema sync steps | Pre-existing DDL on destination does not abort the run | +| LSN watermark guard | `StreamSubscriber::lsn_applied()` | Rows bulk-copied in Step 3 are skipped during WAL replay in Step 5 | +| Upsert on INSERT messages | `Table::insert(upsert=true)` | `ON CONFLICT (pk) DO UPDATE SET` prevents duplicates on WAL re-delivery | +| PK validation | `Table::valid()` | Fails before any data moves; restart is clean | \ No newline at end of file diff --git a/docs/SHARDING.md b/docs/SHARDING.md new file mode 100644 index 000000000..8aa7ba503 --- /dev/null +++ b/docs/SHARDING.md @@ -0,0 +1,250 @@ +# Sharding — Implementation + +This document describes how PgDog routes queries across shards at the code level. For user-facing +configuration and concepts see [Sharding basics](https://docs.pgdog.dev/features/sharding/basics/) and +[Sharding functions](https://docs.pgdog.dev/features/sharding/sharding-functions/). For the resharding +workflow see [RESHARDING.md](./RESHARDING.md). + +--- + +## Architecture overview + +```mermaid +flowchart TD + APP["Application
(unmodified)"] + + subgraph PgDog + FE["Frontend
(client connection)"] + QP["QueryParser
(SQL → Shard)"] + COPY["CopyRow
(COPY → Shard)"] + ROUTER["Router
(Shard dispatcher)"] + end + + subgraph Shards + S0["Shard 0"] + S1["Shard 1"] + SN["Shard N"] + end + + APP -->|"SQL / COPY"| FE + FE --> QP + FE --> COPY + QP --> ROUTER + COPY --> ROUTER + ROUTER --> S0 + ROUTER --> S1 + ROUTER --> SN +``` + +The universal routing token is `Shard` in [`pgdog/src/frontend/router/parser/route.rs`](../pgdog/src/frontend/router/parser/route.rs): + +```rust +pub enum Shard { + Direct(usize), // single shard by index + Multi(Vec), // explicit subset + All, // broadcast to every shard +} +``` + +Every routing decision — SQL, COPY, WAL replay — produces one of these three variants and nothing else. +The rest of the system only needs to know which variant it received. + +--- + +## Router entry point + +`Router` in [`pgdog/src/frontend/router/mod.rs`](../pgdog/src/frontend/router/mod.rs) holds a `QueryParser` and the last-computed `Command`. + +- **SQL messages** → `Router::query()` calls `QueryParser::parse()` and stores the result as + `Command::Query(Route)`. In COPY mode it returns the cached command without reparsing. +- **COPY data** → `Router::copy_data()` matches on `Command::Copy` and calls `Copy::shard()`, + which returns `Vec`. If the current command is not a COPY (i.e. not a sharded table), + every row gets `CopyRow::omnishard` — `Shard::All`. + +`Route` (also in [`route.rs`](../pgdog/src/frontend/router/parser/route.rs)) wraps a `Shard` together with metadata the connection pool and response +merger need: `read: bool` (primary vs replica), `order_by`, `aggregate`, `limit`, `distinct`, and +`rewrite_plan`. The connection pool reads `route.shard()` to select the backend; the response merger +reads the rest to reassemble cross-shard results. + +--- + +## SQL routing — QueryParser + +`QueryParser` in [`pgdog/src/frontend/router/parser/query/mod.rs`](../pgdog/src/frontend/router/parser/query/mod.rs) is re-created per query. + +### Pre-parse shortcuts + +Before touching the SQL AST, `parse()` checks two bypass conditions: + +1. **Comment override** — `/* shard=N */` or `/* role=primary */` embedded in the SQL is extracted + from `Ast::comment_shard` and pushed onto `shards_calculator` before the AST walk. The role + override (`/* role=primary */`) sets `write_override = true`. + +2. **Parser bypass** — if a shard is already known (from a prior `SET pgdog.shard = N` command or + a sticky connection) and the cluster has only one shard, the full AST walk is skipped entirely + via `query_parser_bypass()`, which returns a `Route` directly. + +### Per-statement dispatch + +After the pre-parse phase, the root `NodeEnum` is matched: + +| NodeEnum | Handler | Notes | +|---|---|---| +| `SelectStmt` | `select()` | Key extraction + aggregation metadata | +| `InsertStmt` | `insert()` | Key from VALUES column list | +| `UpdateStmt` | `update()` | Key from SET + WHERE; may trigger shard-key rewrite (see below) | +| `DeleteStmt` | `delete()` | Key from WHERE | +| `CopyStmt` | `copy()` | Sets up `Command::Copy` for row-level routing | +| `VariableSetStmt` | `set()` | Handles `SET pgdog.shard` / `SET pgdog.role` | +| `VariableShowStmt` | `show()` | Admin SHOW commands | +| `DeallocateStmt` | — | Returns `Command::Deallocate` immediately | + +Empty queries (no FROM clause, e.g. `SELECT 1` or `SELECT NOW()`) are round-robined to +`Shard::Direct(round_robin::next() % shards)` and never hit the WHERE clause walker. + +### SELECT routing decision tree + +`select()` in [`pgdog/src/frontend/router/parser/query/select.rs`](../pgdog/src/frontend/router/parser/query/select.rs) applies this decision sequence: + +1. **Already routed** — if `shards_calculator.shard().is_direct()` (set by a prior `SET` or comment), + skip the AST walk and return immediately with that shard. + +2. **WHERE clause key** — `StatementParser::from_select().shard()` walks the AST looking for an + equality predicate on the configured sharding column. If found, produces `Shard::Direct(hash % n)`. + +3. **Vector ORDER BY** — if the ORDER BY contains an `<->` expression (L2 distance) against a + vector-type sharding column, `Centroids::shard()` is called on the query vector. The nearest + centroid's shard index is used. + +4. **Sharded table, no key** — table is in `[[sharded_tables]]` but WHERE has no equality on the + sharding key. Result: `Shard::All`. Aggregates, ORDER BY, and LIMIT recorded for cross-shard merge. + +5. **Unsharded table (omnishard)** — table is not in `[[sharded_tables]]`: + - If marked `sticky` → routes to `sticky.omni_index` (connection-pinned, same shard for the + session). + - Otherwise → `round_robin::next() % shards`. + +6. **Single-shard cluster** — after the above, if result is `Shard::All` or `Shard::Multi` but + `context.shards == 1`, it is collapsed to `Shard::Direct(0)`. + +Cross-shard queries (`Shard::All` or `Shard::Multi`) carry `AggregateRewritePlan` in the `Route`. +The plan describes which columns are aggregated (`COUNT`, `SUM`, `MAX`, `MIN`, `AVG`, etc.) so the +response handler can merge partial results from each shard before returning them to the client. + +### UPDATE on the sharding key + +When an UPDATE sets the sharding key to a new value the row must move shards. `StatementRewrite` in +[`pgdog/src/frontend/router/parser/rewrite/statement/`](../pgdog/src/frontend/router/parser/rewrite/statement/) detects this case and rewrites the statement +as three operations: `SELECT` (fetch the full old row), `DELETE` (remove from the source shard), and +`INSERT ... ON CONFLICT DO UPDATE` (upsert on the destination shard). The rewrite is transparent to +the client. It is enabled by default and can be disabled via `rewrite.shard_key = "ignore"` in +`pgdog.toml`. + +--- + +## Sharding functions + +### Operator selection + +`ContextBuilder` in [`pgdog/src/frontend/router/sharding/context_builder.rs`](../pgdog/src/frontend/router/sharding/context_builder.rs) reads the `ShardedTable` +config entry for the matched column and constructs an [`Operator`](../pgdog/src/frontend/router/sharding/operator.rs) in this priority order: + +1. `centroids` populated → `Operator::Centroids { shards, probes, centroids }` +2. `mapping` is `Mapping::Range(_)` → `Operator::Range(ranges)` +3. `mapping` is `Mapping::List(_)` → `Operator::List(lists)` +4. Otherwise → `Operator::Shards(n)` (hash) + +`Context::apply()` in [`context.rs`](../pgdog/src/frontend/router/sharding/context.rs) matches on the operator and calls the appropriate function, +returning `Shard::All` if the value cannot be parsed (rather than erroring): + +``` +Operator::Shards(n) → hash(value) % n → Shard::Direct +Operator::Centroids → nearest centroid index → Shard::Direct or Shard::Multi +Operator::Range → ranges.shard(value) → Shard::Direct or Shard::All (no match) +Operator::List → lists.shard(value) → Shard::Direct or Shard::All (no match) +``` + +### Hash functions + +`bigint()`, `uuid()`, `varchar()` in [`pgdog/src/frontend/router/sharding/mod.rs`](../pgdog/src/frontend/router/sharding/mod.rs) all call into +[`hashfn.c`](../pgdog/src/frontend/router/sharding/hashfn.c) via FFI ([`pgdog/src/frontend/router/sharding/ffi.rs`](../pgdog/src/frontend/router/sharding/ffi.rs)). The functions are PostgreSQL's own +`hashint8extended` and `hash_bytes_extended`, so `hash(42) % N` in PgDog produces the same shard +as PostgreSQL's own hash partitioning would. + +`shard_value()` handles text-format parameters; `shard_binary()` handles wire-format binary +parameters by decoding them first. `shard_str()` is called when the type is unknown — it tries +`i64` parse, then `Uuid` parse, then falls through to varchar. This is a best-effort path with a +TODO noting that having the type OID would be more reliable. + +For the `SHA1` hasher (configured via `hasher = "sha1"`), `Hasher::Sha1` routes through +[`pgdog/src/frontend/router/sharding/hasher.rs`](../pgdog/src/frontend/router/sharding/hasher.rs) instead of the FFI functions. + +### List and Range: unmatched values + +Neither `Lists::shard()` nor `Ranges::shard()` errors on a value that matches no rule — both return +`Shard::All`. This means a misconfigured mapping silently broadcasts instead of failing. If strict +routing is required, all possible values must be covered in the mapping. + +### Vector routing + +`Centroids` lives in the `pgdog-vector` crate (re-exported from +[`pgdog/src/frontend/router/sharding/vector.rs`](../pgdog/src/frontend/router/sharding/vector.rs)). `Centroids::shard()` computes the L2 distance from +the query vector to each centroid using SIMD (AVX2 on x86-64, NEON on ARM64) and returns the index +of the nearest centroid. `centroid_probes` controls how many centroids are checked — higher values +improve recall at the cost of more shards being queried. + +--- + +## COPY routing + +`CopyRow` in [`pgdog/src/frontend/router/copy.rs`](../pgdog/src/frontend/router/copy.rs) wraps a `CopyData` message with a `Shard`. + +The `Copy` router (in [`pgdog/src/frontend/router/parser/copy.rs`](../pgdog/src/frontend/router/parser/copy.rs)) is set up when the parser sees a +`CopyStmt` targeting a sharded table. As each `CopyData` arrives, `Copy::shard()` extracts the +sharding key column from the row (text or binary format), runs it through the same +`ContextBuilder` → `Context::apply()` pipeline, and produces a tagged `CopyRow`. + +Two special cases: +- **CSV/text headers** — the header row (column names) is sent to all shards via `CopyRow::headers()`. +- **Non-sharded table** — if `Command::Copy` is not set (the target table is unsharded), every row + becomes `CopyRow::omnishard(row)` with `Shard::All`. + +The binary COPY format is handled transparently; `shard_binary()` decodes each column value from the +PostgreSQL wire format before hashing. + +--- + +## Configuration + +Config types are in [`pgdog-config/src/sharding.rs`](../pgdog-config/src/sharding.rs). The TOML keys map directly to struct fields +(all `snake_case`, `deny_unknown_fields`). + +Key fields on `ShardedTable`: + +| Field | Type | Notes | +|---|---|---| +| `database` | `String` | Matches a `[[databases]]` entry | +| `name` | `Option` | Restricts rule to one table; absent = all tables with this column | +| `schema` | `Option` | PostgreSQL schema scope | +| `column` | `String` | Sharding key column name | +| `data_type` | `DataType` | `bigint` (default), `uuid`, `varchar`, `vector` | +| `hasher` | `Hasher` | `postgres` (default, FFI to `hashint8extended`) or `sha1` | +| `centroids` | `Vec` | Inline centroid vectors for vector sharding | +| `centroids_path` | `Option` | External JSON file for large centroid sets | +| `centroid_probes` | `usize` | Probes per query; defaults to `√(centroid count)` | +| `mapping` | `Option` | Resolved from `[[sharded_mappings]]` at startup; not set in TOML directly | +| `primary` | `bool` | Marks this table as the sharding anchor for FK-based query resolution | + +`[[sharded_mappings]]` entries resolve to either `Mapping::List(ListShards)` or `Mapping::Range(Ranges)` +and are joined to their `ShardedTable` at startup. A mapping that covers only some values leaves the +rest as `Shard::All` — there is no error. A `ShardedMappingKind::Default` entry acts as a catch-all. + +### Schema-based sharding + +`[[sharded_schemas]]` in `pgdog.toml` maps a PostgreSQL schema name to a fixed shard index. This is +useful for schema-per-tenant deployments where each tenant's data lives in a dedicated schema. +`SchemaSharder` in [`pgdog/src/frontend/router/sharding/schema.rs`](../pgdog/src/frontend/router/sharding/schema.rs) resolves the current +`search_path` against the `ShardedSchema` list (`pgdog-config/src/sharding.rs`). A `name = null` +entry acts as the catch-all; a specific schema name overrides it even if the catch-all was matched first. + +Schema routing takes effect at the connection level and is re-evaluated whenever `search_path` changes. \ No newline at end of file diff --git a/docs/issues/894-897-copy-data-retry.md b/docs/issues/894-897-copy-data-retry.md new file mode 100644 index 000000000..e603b9708 --- /dev/null +++ b/docs/issues/894-897-copy-data-retry.md @@ -0,0 +1,62 @@ +# COPY_DATA Retry Reliability (Issues #894 + #897) + +**#894:** During `COPY_DATA` resharding, if a destination shard temporarily went down, PgDog +continued processing the next tables without retrying. The destination was left incompletely +populated (276 started, 63 finished in the reported case). + +**#897:** Two failure modes need coverage: +1. **Destination shard is down** — connection to dest fails or drops mid-COPY +2. **Origin shard is down** — source connection drops, temporary replication slot is lost + +--- + +## Solution + +### Retry mechanism + +`Table::data_sync()` in `publisher/table.rs` opens fresh connections on every call, so a +single retry loop covers both failure modes: + +- **Dest down** → new `CopySubscriber` reconnects to destination. +- **Origin down** → new `ReplicationSlot` with a fresh random name. The old slot was + `TEMPORARY` and was auto-dropped by Postgres when its connection closed. + +The retry loop lives in `ParallelSync::run_with_retry()` (`publisher/parallel_sync.rs`): +exponential backoff starting at `resharding_copy_retry_min_delay`, doubling each attempt, +capped at 32×, up to `resharding_copy_retry_max_attempts` tries. + +| Key | Default | Description | +|-----|---------|-------------| +| `resharding_copy_retry_max_attempts` | `5` | Per-table retry attempts | +| `resharding_copy_retry_min_delay` | `1000` ms | Base backoff; doubles each attempt, capped at 32× | + +### Truncation + +Each table COPY runs inside PostgreSQL's own implicit transaction — no explicit `BEGIN`. +There is no cross-shard atomicity. + +| Failure point | Destination state | +|---|---| +| During row streaming | PG rolls back implicit tx → dest empty | +| Inside `copy_done()` — some shards committed, others not | Partially committed | +| After `copy_done()`, before `data_sync` returns `Ok` | All shards have rows | + +The common case (drop during streaming) leaves the destination clean. The rare race is a +commit that landed before the connection dropped — a retry then hits primary key violations. + +PgDog does not auto-TRUNCATE before retrying (wrong-cluster risk). On fatal failure, +`Table::destination_has_rows` checks each shard and logs a `warn!` with the exact `TRUNCATE` +statement to run manually. Auto-truncate is stubbed as a future extension in `run_with_retry()`. + +### Error handling + +`is_retryable()` in `error.rs` uses a whitelist: `Net`, `Pool`, `NotConnected`, `NoPrimary`, +and `ReplicationTimeout` return `true`; everything else defaults to `false`. Non-retryable +errors (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`) bypass the +retry loop immediately but still trigger the destination row check. + +### Tests + +`integration/copy_data/retry_test/run.sh` kills shard_1 mid-sync, brings it back after ~2 s, +and asserts exit 0 with correct row counts. Uses faster retry settings in +`retry_test/pgdog.toml` for CI speed. diff --git a/integration/copy_data/init.sql b/integration/copy_data/data_sync/init.sql similarity index 96% rename from integration/copy_data/init.sql rename to integration/copy_data/data_sync/init.sql index 771564768..038f36787 100644 --- a/integration/copy_data/init.sql +++ b/integration/copy_data/data_sync/init.sql @@ -10,4 +10,3 @@ DROP SCHEMA IF EXISTS copy_data CASCADE; DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; -\i setup.sql diff --git a/integration/copy_data/pgbench.sql b/integration/copy_data/data_sync/pgbench.sql similarity index 100% rename from integration/copy_data/pgbench.sql rename to integration/copy_data/data_sync/pgbench.sql diff --git a/integration/copy_data/pgdog.toml b/integration/copy_data/data_sync/pgdog.toml similarity index 100% rename from integration/copy_data/pgdog.toml rename to integration/copy_data/data_sync/pgdog.toml diff --git a/integration/copy_data/dev.sh b/integration/copy_data/data_sync/run.sh old mode 100644 new mode 100755 similarity index 57% rename from integration/copy_data/dev.sh rename to integration/copy_data/data_sync/run.sh index e39a6c245..94c684202 --- a/integration/copy_data/dev.sh +++ b/integration/copy_data/data_sync/run.sh @@ -1,8 +1,20 @@ #!/bin/bash -set -e +# Integration test: 0→2 and 2→2 resharding with live write traffic. +# +# Requires: +# - local postgres at port 5432 +# - databases: pgdog, pgdog1, pgdog2, shard_0, shard_1 (created by integration/setup.sh) +# - max_replication_slots >= 32 in postgresql.conf +# Each data-sync creates one permanent slot per source shard plus one temporary +# slot per parallel table copy. With resharding_parallel_copies=5 and a 2-shard +# source, peak usage is 3 permanent + 2×5 temporary = 13 slots. The default of 10 +# is not enough; set max_replication_slots = 32 in postgresql.conf and reload. +set -euo pipefail SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog" +DEFAULT_BIN="${SCRIPT_DIR}/../../../target/debug/pgdog" PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} +PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.toml" +PGDOG_USERS="${SCRIPT_DIR}/users.toml" export PGUSER=pgdog export PGDATABASE=pgdog @@ -10,6 +22,9 @@ export PGHOST=127.0.0.1 export PGPORT=5432 export PGPASSWORD=pgdog +BENCH_PID="" +REPL_PID="" + cleanup() { if [ -n "${BENCH_PID}" ]; then kill ${BENCH_PID} 2>/dev/null || true @@ -24,7 +39,6 @@ trap cleanup EXIT start_pgbench() { ( - pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \ -t 100000000 -c 3 --protocol extended \ -f "${SCRIPT_DIR}/pgbench.sql" -P 1 @@ -41,16 +55,24 @@ stop_pgbench() { fi } +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" + pushd ${SCRIPT_DIR} -psql -f init.sql +# Teardown: drop stale slots and schemas. +psql -f "${SCRIPT_DIR}/init.sql" +# Setup: populate source database. +psql -f "${SCRIPT_DIR}/../setup.sql" # # 0 -> 2 # -${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database source --to-database destination --publication pgdog start_pgbench -${PGDOG_BIN} data-sync --from-database source --to-database destination --publication pgdog & +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + data-sync --from-database source --to-database destination --publication pgdog & REPL_PID=$! # Give replication a moment to connect. @@ -82,14 +104,18 @@ if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 fi stop_pgbench -${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database source --to-database destination --publication pgdog --cutover # # 2 --> 2 # -${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog -${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data_2 -${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database destination --to-database destination2 --publication pgdog +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data_2 +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover # Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1) echo "Checking row counts: destination -> destination2..." @@ -118,6 +144,6 @@ for TABLE in ${OMNI_TABLES}; do echo "OK ${TABLE}: ${SRC} rows on each shard" done -psql -f init.sql +psql -f "${SCRIPT_DIR}/init.sql" popd diff --git a/integration/copy_data/users.toml b/integration/copy_data/data_sync/users.toml similarity index 100% rename from integration/copy_data/users.toml rename to integration/copy_data/data_sync/users.toml diff --git a/integration/copy_data/docker-compose.yml b/integration/copy_data/docker-compose.yml index d7cdaf10a..6975bc019 100644 --- a/integration/copy_data/docker-compose.yml +++ b/integration/copy_data/docker-compose.yml @@ -1,6 +1,6 @@ services: source: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -14,7 +14,7 @@ services: - postgres shard_0: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -26,7 +26,7 @@ services: - postgres shard_1: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog diff --git a/integration/copy_data/retry_test/init.sql b/integration/copy_data/retry_test/init.sql new file mode 100644 index 000000000..a5077e964 --- /dev/null +++ b/integration/copy_data/retry_test/init.sql @@ -0,0 +1,6 @@ +-- Source-only reset for the docker-compose retry test. +-- Each database lives in its own container, so \c cannot cross server boundaries. +-- Destination shards are reset by retry_test.sh via separate psql invocations. +DROP SCHEMA IF EXISTS copy_data CASCADE; +SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; +\i setup.sql diff --git a/integration/copy_data/retry_test/pgdog.toml b/integration/copy_data/retry_test/pgdog.toml new file mode 100644 index 000000000..daa29a71f --- /dev/null +++ b/integration/copy_data/retry_test/pgdog.toml @@ -0,0 +1,32 @@ +[general] +resharding_copy_format = "binary" +resharding_copy_retry_max_attempts = 8 +resharding_copy_retry_min_delay = 500 + +[[databases]] +name = "source" +host = "127.0.0.1" +port = 15432 +database_name = "pgdog" + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15433 +database_name = "pgdog1" +shard = 0 + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15434 +database_name = "pgdog2" +shard = 1 + +[[sharded_tables]] +database = "destination" +column = "tenant_id" +data_type = "bigint" + +[admin] +password = "pgdog" diff --git a/integration/copy_data/retry_test/run.sh b/integration/copy_data/retry_test/run.sh new file mode 100755 index 000000000..bed0022a5 --- /dev/null +++ b/integration/copy_data/retry_test/run.sh @@ -0,0 +1,164 @@ +#!/bin/bash +# Integration test: data-sync retries when a destination shard connection drops mid-copy. +# +# What is tested: +# - data-sync --sync-only completes (exit 0) when shard_1 is killed mid-copy and +# brought back while the retry loop is running. +# - Row counts on all destination tables match the source after sync completes. +# +# Manages its own docker-compose stack — no pre-started containers required. +set -euo pipefail + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +COMPOSE_DIR="${SCRIPT_DIR}/.." +DEFAULT_BIN="${SCRIPT_DIR}/../../../target/debug/pgdog" +PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} +PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.toml" +USERS_CONFIG="${SCRIPT_DIR}/users.toml" +export PGPASSWORD=pgdog + +SYNC_PID="" + +cleanup() { + if [ -n "${SYNC_PID}" ]; then + kill "${SYNC_PID}" 2>/dev/null || true + wait "${SYNC_PID}" 2>/dev/null || true + fi + cd "${COMPOSE_DIR}" && docker compose down 2>/dev/null || true +} +trap cleanup EXIT + +pushd "${COMPOSE_DIR}" + +# Start the docker-compose stack and wait for all three postgres instances. +echo "[retry_test] Starting docker-compose stack..." +docker compose up -d + +echo "[retry_test] Waiting for postgres instances to be ready..." +for PORT in 15432 15433 15434; do + READY_ATTEMPTS=0 + until pg_isready -h 127.0.0.1 -p "${PORT}" -q 2>/dev/null; do + READY_ATTEMPTS=$((READY_ATTEMPTS + 1)) + if [ "${READY_ATTEMPTS}" -ge 60 ]; then + echo "[retry_test] FAIL: postgres on port ${PORT} not ready after 30s" + exit 1 + fi + sleep 0.5 + done +done +echo "[retry_test] All postgres instances ready." + +# Reset destination shards then source — each is a separate container. +psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -c "DROP SCHEMA IF EXISTS copy_data CASCADE;" +psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -c "DROP SCHEMA IF EXISTS copy_data CASCADE;" +psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -f "${SCRIPT_DIR}/init.sql" + +# Schema sync: create tables on destination shards. +"${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ + schema-sync --from-database source --to-database destination --publication pgdog + +# Start data-sync with all shards up so the connection pool initialises cleanly +# and CopySubscriber can connect to both shards before we inject a failure. +"${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ + data-sync --sync-only \ + --from-database source \ + --to-database destination \ + --publication pgdog & +SYNC_PID=$! + +# Wait until data-sync has created a temporary replication slot on the source. +# The slot is created after copy_sub.connect() — meaning CopySubscriber already +# holds open connections to both shards. Killing shard_1 now lands a mid-copy +# network error that run_with_retry() must handle, not a pre-connection timeout. +echo "[retry_test] Waiting for data-sync to start copying..." +WAIT_ATTEMPTS=0 +until psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc \ + "SELECT 1 FROM pg_replication_slots WHERE temporary = true LIMIT 1" 2>/dev/null | grep -q 1; do + WAIT_ATTEMPTS=$((WAIT_ATTEMPTS + 1)) + if [ "${WAIT_ATTEMPTS}" -ge 200 ]; then + echo "[retry_test] FAIL: data-sync never created a replication slot after $((WAIT_ATTEMPTS / 20))s" + exit 1 + fi + if ! kill -0 "${SYNC_PID}" 2>/dev/null; then + echo "[retry_test] FAIL: data-sync exited before copy started" + exit 1 + fi + sleep 0.05 +done + +# SIGKILL the container immediately — no grace period — so the kill lands before +# the in-flight COPY can finish. docker compose stop has a 10s grace period. +echo "[retry_test] Killing shard_1 during active copy..." +docker compose kill shard_1 + +# Let the retry loop run a few cycles before bringing shard_1 back. +sleep 2 + +echo "[retry_test] Starting shard_1..." +docker compose start shard_1 + +# Wait for shard_1 postgres to be ready. +READY_ATTEMPTS=0 +until pg_isready -h 127.0.0.1 -p 15434 -U pgdog -d pgdog2 -q; do + READY_ATTEMPTS=$((READY_ATTEMPTS + 1)) + if [ "${READY_ATTEMPTS}" -ge 120 ]; then + echo "[retry_test] FAIL: shard_1 not ready after $((READY_ATTEMPTS / 2))s" + exit 1 + fi + sleep 0.5 +done +echo "[retry_test] shard_1 is ready." + +# Wait for data-sync to complete. +set +e +wait "${SYNC_PID}" +SYNC_EXIT=$? +set -e +SYNC_PID="" + +if [ "${SYNC_EXIT}" -ne 0 ]; then + echo "[retry_test] FAIL: data-sync exited with code ${SYNC_EXIT}" + exit "${SYNC_EXIT}" +fi + +# Verify row counts. +# Sharded tables: sum across both destination shards must equal source. +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" +# Omni tables: each shard must have the full source row count. +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" + +FAILED=0 + +for TABLE in ${SHARDED_TABLES}; do + SRC=$(psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST0=$(psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST=$((DST0 + DST1)) + if [ "${SRC}" -ne "${DST}" ]; then + echo "[retry_test] MISMATCH ${TABLE}: source=${SRC} total_dest=${DST} (shard0=${DST0} shard1=${DST1})" + FAILED=1 + else + echo "[retry_test] OK ${TABLE}: ${SRC} rows" + fi +done + +for TABLE in ${OMNI_TABLES}; do + SRC=$(psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST0=$(psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + if [ "${SRC}" -ne "${DST0}" ] || [ "${SRC}" -ne "${DST1}" ]; then + echo "[retry_test] MISMATCH ${TABLE} (omni): source=${SRC} shard0=${DST0} shard1=${DST1}" + FAILED=1 + else + echo "[retry_test] OK ${TABLE} (omni): ${SRC} rows on each shard" + fi +done + +if [ "${FAILED}" -ne 0 ]; then + echo "[retry_test] FAIL: row count mismatches detected." + exit 1 +fi + +echo "[retry_test] PASS: all row counts match. Retry test succeeded." +docker compose down +popd diff --git a/integration/copy_data/retry_test/users.toml b/integration/copy_data/retry_test/users.toml new file mode 100644 index 000000000..67142d309 --- /dev/null +++ b/integration/copy_data/retry_test/users.toml @@ -0,0 +1,11 @@ +[[users]] +database = "source" +name = "pgdog" +password = "pgdog" +schema_admin = true + +[[users]] +database = "destination" +name = "pgdog" +password = "pgdog" +schema_admin = true diff --git a/integration/copy_data/run.sh b/integration/copy_data/run.sh new file mode 100644 index 000000000..17d13a608 --- /dev/null +++ b/integration/copy_data/run.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Runs all copy_data integration tests in sequence. +# +# Tests: +# data_sync/run.sh — 0→2 and 2→2 resharding with live write traffic +# (uses local postgres from integration/setup.sh) +# retry_test/run.sh — data-sync retry loop under mid-copy shard failure +# (manages its own docker-compose stack) +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +echo "=== [copy_data] data_sync ===" +bash "${SCRIPT_DIR}/data_sync/run.sh" + +echo "=== [copy_data] retry_test ===" +bash "${SCRIPT_DIR}/retry_test/run.sh" + +echo "=== [copy_data] all tests passed ===" diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index bb3bd6863..bcf9a2e2e 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -600,6 +600,18 @@ pub struct General { #[serde(default = "General::resharding_parallel_copies")] pub resharding_parallel_copies: usize, + /// Maximum number of retries for a failed table copy during resharding (per-table). + /// Retries use exponential backoff starting at `resharding_copy_retry_min_delay`. + /// _Default:_ `5` + #[serde(default = "General::resharding_copy_retry_max_attempts")] + pub resharding_copy_retry_max_attempts: usize, + + /// Base delay in milliseconds between table copy retries. + /// Each successive attempt doubles the delay, capped at 32×. + /// _Default:_ `1000` + #[serde(default = "General::resharding_copy_retry_min_delay")] + pub resharding_copy_retry_min_delay: u64, + /// Automatically reload the schema cache used by PgDog to route queries upon detecting DDL statements. /// /// **Note:** This setting requires PgDog Enterprise Edition to work as expected. If using the open source edition, it will only work with single-node PgDog deployments, e.g., in local development or CI. @@ -745,6 +757,8 @@ impl Default for General { omnisharded_sticky: bool::default(), resharding_copy_format: CopyFormat::default(), resharding_parallel_copies: Self::resharding_parallel_copies(), + resharding_copy_retry_max_attempts: Self::resharding_copy_retry_max_attempts(), + resharding_copy_retry_min_delay: Self::resharding_copy_retry_min_delay(), reload_schema_on_ddl: Self::reload_schema_on_ddl(), load_schema: Self::load_schema(), cutover_replication_lag_threshold: Self::cutover_replication_lag_threshold(), @@ -966,6 +980,14 @@ impl General { 1 } + fn resharding_copy_retry_max_attempts() -> usize { + 5 + } + + fn resharding_copy_retry_min_delay() -> u64 { + 1000 + } + fn default_shutdown_termination_timeout() -> Option { Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT") } diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index a69962f1b..f855d5535 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -300,7 +300,7 @@ impl User { .passwords .clone() .into_iter() - .map(|p| PasswordKind::Plain(p)) + .map(PasswordKind::Plain) .collect(); if !self.password().is_empty() { passwords.push(PasswordKind::Plain(self.password().to_string())); diff --git a/pgdog/src/backend/error.rs b/pgdog/src/backend/error.rs index 6024079df..e75b8f527 100644 --- a/pgdog/src/backend/error.rs +++ b/pgdog/src/backend/error.rs @@ -163,4 +163,22 @@ impl Error { _ => false, } } + + /// Transient network/pool fault worth retrying. + pub fn is_retryable(&self) -> bool { + match self { + Self::Io(_) => true, + Self::Net(inner) => inner.is_retryable(), + Self::Pool(inner) => inner.is_retryable(), + // Connection dropped between operations. + Self::NotConnected + | Self::DirectToShardNotConnected + | Self::MultiShardNotConnected + | Self::CopyNotConnected + | Self::ClusterNotConnected => true, + // Server stopped responding mid-stream. + Self::ReadTimeout => true, + _ => false, + } + } } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 26523c44b..b2c117a1e 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -82,6 +82,8 @@ pub struct Cluster { reload_schema_on_ddl: bool, load_schema: LoadSchema, resharding_parallel_copies: usize, + resharding_copy_retry_max_attempts: usize, + resharding_copy_retry_min_delay: Duration, regex_parser: RegexParser, } @@ -159,6 +161,8 @@ pub struct ClusterConfig<'a> { pub reload_schema_on_ddl: bool, pub load_schema: LoadSchema, pub resharding_parallel_copies: usize, + pub resharding_copy_retry_max_attempts: usize, + pub resharding_copy_retry_min_delay: u64, pub regex_parser_limit: usize, } @@ -213,6 +217,8 @@ impl<'a> ClusterConfig<'a> { reload_schema_on_ddl: general.reload_schema_on_ddl, load_schema: general.load_schema, resharding_parallel_copies: general.resharding_parallel_copies, + resharding_copy_retry_max_attempts: general.resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay: general.resharding_copy_retry_min_delay, regex_parser_limit: general.regex_parser_limit, } } @@ -251,6 +257,8 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay, regex_parser_limit, } = config; @@ -301,6 +309,8 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay: Duration::from_millis(resharding_copy_retry_min_delay), regex_parser: RegexParser::new(regex_parser_limit, query_parser), } } @@ -553,6 +563,16 @@ impl Cluster { self.resharding_parallel_copies } + /// Maximum retries for a per-table copy during resharding. + pub fn resharding_copy_retry_max_attempts(&self) -> usize { + self.resharding_copy_retry_max_attempts + } + + /// Base delay between table copy retry attempts. Doubles each attempt, capped at 32×. + pub fn resharding_copy_retry_min_delay(&self) -> &Duration { + &self.resharding_copy_retry_min_delay + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { @@ -791,7 +811,7 @@ mod test { database: "pgdog".into(), }); - let cluster = Cluster { + Cluster { shards: vec![Shard::new(ShardConfig { number: 0, primary: &Some(PoolConfig { @@ -816,9 +836,7 @@ mod test { two_phase_commit: config.config.general.two_phase_commit, two_phase_commit_auto: config.config.general.two_phase_commit_auto.unwrap_or(false), ..Default::default() - }; - - cluster + } } pub fn set_read_write_strategy(&mut self, rw_strategy: ReadWriteStrategy) { diff --git a/pgdog/src/backend/pool/error.rs b/pgdog/src/backend/pool/error.rs index 9d854d6d5..27e32ba8b 100644 --- a/pgdog/src/backend/pool/error.rs +++ b/pgdog/src/backend/pool/error.rs @@ -80,3 +80,63 @@ pub enum Error { #[error("replica lag")] ReplicaLag, } + +impl Error { + /// Transient availability fault worth retrying. + /// + /// Non-retryable: config errors, admin decisions, programming errors. + /// Everything else (timeouts, server faults, lag, health misses) is transient. + pub fn is_retryable(&self) -> bool { + !matches!( + self, + // Config / wiring errors — retrying changes nothing. + Self::NullBytes + | Self::NoShard(_) + | Self::NoDatabases + | Self::PubSubDisabled + | Self::PoolNoHealthTarget(_) + | Self::MappingMissing(_) + // Admin decisions — respect them. + | Self::ManualBan + // Programming errors. + | Self::UntrackedConnCheckin(_) + // Deliberate shutdown. + | Self::FastShutdown + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn retryable() { + assert!(Error::CheckoutTimeout.is_retryable()); + assert!(Error::ConnectTimeout.is_retryable()); + assert!(Error::ReplicaCheckoutTimeout.is_retryable()); + assert!(Error::NoPrimary.is_retryable()); + assert!(Error::AllReplicasDown.is_retryable()); + assert!(Error::Banned.is_retryable()); + assert!(Error::NoReplicas.is_retryable()); + assert!(Error::ServerError.is_retryable()); + assert!(Error::HealthcheckTimeout.is_retryable()); + assert!(Error::HealthcheckError.is_retryable()); + assert!(Error::PrimaryLsnQueryFailed.is_retryable()); + assert!(Error::ReplicaLsnQueryFailed.is_retryable()); + assert!(Error::Offline.is_retryable()); + assert!(Error::ReplicaLag.is_retryable()); + assert!(Error::PoolUnhealthy.is_retryable()); + } + + #[test] + fn not_retryable() { + assert!(!Error::ManualBan.is_retryable()); + assert!(!Error::NullBytes.is_retryable()); + assert!(!Error::NoDatabases.is_retryable()); + assert!(!Error::PubSubDisabled.is_retryable()); + assert!(!Error::FastShutdown.is_retryable()); + assert!(!Error::NoShard(0).is_retryable()); + assert!(!Error::MappingMissing(0).is_retryable()); + } +} diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 3f9bbf910..362c73d7f 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -147,3 +147,86 @@ impl From for Error { Self::SchemaSync(Box::new(value)) } } + +impl Error { + /// Whether the table copy should be retried after this error. + pub fn is_retryable(&self) -> bool { + match self { + Self::Net(inner) => inner.is_retryable(), + Self::Pool(inner) => inner.is_retryable(), + Self::Backend(inner) => inner.is_retryable(), + // No connection yet, or primary is down. + Self::NotConnected | Self::NoPrimary => true, + // Replication stalled; temporary slot is gone, next attempt starts fresh. + Self::ReplicationTimeout => true, + // TODO: escape-hatch when using ParallelConnection wrapper + // the underlying error could be anything and to handler it properly + // either the ParallelConnection wrapper should be removed or + // the proper error should be propagated + Self::ParallelConnection => true, + _ => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::pool::Error as PE; + use crate::backend::replication::publisher::PublicationTable; + use crate::net::Error as NE; + + #[test] + fn retryable() { + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Net(NE::Io(io)).is_retryable()); + assert!(Error::Net(NE::UnexpectedEof).is_retryable()); + assert!(Error::Pool(PE::NoPrimary).is_retryable()); + assert!(Error::Pool(PE::CheckoutTimeout).is_retryable()); + assert!(Error::NotConnected.is_retryable()); + assert!(Error::NoPrimary.is_retryable()); + assert!(Error::ReplicationTimeout.is_retryable()); + } + + #[test] + fn retryable_via_backend_wrapper() { + use crate::backend::Error as BE; + + // IO reset wrapped as Backend — the common path for network drops during COPY. + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Backend(BE::Io(io)).is_retryable()); + + // Read timeout mid-stream. + assert!(Error::Backend(BE::ReadTimeout).is_retryable()); + + // Pool couldn't hand out a connection. + assert!(Error::Backend(BE::Pool(PE::CheckoutTimeout)).is_retryable()); + assert!(Error::Backend(BE::Pool(PE::NoPrimary)).is_retryable()); + assert!(Error::Backend(BE::Pool(PE::AllReplicasDown)).is_retryable()); + + // Connection variants. + assert!(Error::Backend(BE::NotConnected).is_retryable()); + assert!(Error::Backend(BE::ClusterNotConnected).is_retryable()); + } + + #[test] + fn not_retryable_via_backend_wrapper() { + use crate::backend::Error as BE; + use crate::net::messages::ErrorResponse; + + // Postgres-level error response: permanent, not a network fault. + let pg_err = ErrorResponse::default(); + assert!(!Error::Backend(BE::ConnectionError(Box::new(pg_err))).is_retryable()); + + // Protocol violations are not transient. + assert!(!Error::Backend(BE::ProtocolOutOfSync).is_retryable()); + } + + #[test] + fn not_retryable() { + assert!(!Error::CopyAborted(PublicationTable::default()).is_retryable()); + assert!(!Error::DataSyncAborted.is_retryable()); + assert!(!Error::NoPrimaryKey(PublicationTable::default()).is_retryable()); + assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); + } +} diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index c0cb3ef6d..3b2047f62 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -12,8 +12,9 @@ use tokio::{ Semaphore, }, task::JoinHandle, + time::sleep, }; -use tracing::info; +use tracing::{info, warn}; use super::super::Error; use super::AbortSignal; @@ -40,9 +41,10 @@ impl ParallelSync { // This won't acquire until we have at least 1 available permit. // Permit will be given back when this task completes. - let _permit = self - .permit - .acquire() + // acquire_owned() consumes a cloned Arc, returning an OwnedSemaphorePermit with + // no lifetime tied to `self`, which allows the subsequent `&mut self` borrow. + let _permit = Arc::clone(&self.permit) + .acquire_owned() .await .map_err(|_| Error::ParallelConnection)?; @@ -50,26 +52,75 @@ impl ParallelSync { return Err(Error::DataSyncAborted); } + self.run_with_retry(&tracker).await + }) + } + + /// Retry loop: attempt the table copy up to `max_retries` times. + /// Abort signals and schema errors are not retried. + async fn run_with_retry(&mut self, tracker: &TableCopy) -> Result<(), Error> { + let max_retries = self.dest.resharding_copy_retry_max_attempts(); + let base_delay = *self.dest.resharding_copy_retry_min_delay(); + let mut attempt = 0usize; + + loop { let abort = AbortSignal::new(self.tx.clone()); - let result = match self + match self .table - .data_sync(&self.addr, &self.dest, abort, &tracker) + .data_sync(&self.addr, &self.dest, abort, tracker) .await { - Ok(_) => Ok(self.table), - Err(err) => { + Ok(_) => { + self.tx + .send(Ok(self.table.clone())) + .map_err(|_| Error::ParallelConnection)?; + return Ok(()); + } + Err(err) if !err.is_retryable() || attempt >= max_retries => { tracker.error(&err); + // COPY is usually atomic, but rows may remain if the connection dropped + // after COMMIT. Warn so the user can truncate manually before retrying. + match self.table.destination_has_rows(&self.dest).await { + Ok(true) => warn!( + "data sync for \"{}\".\"{}\" failed with rows remaining in destination; \ + truncate manually before retrying: TRUNCATE \"{}\".\"{}\";", + self.table.table.schema, + self.table.table.name, + self.table.table.destination_schema(), + self.table.table.destination_name(), + ), + Ok(false) => {} // destination is clean; next run starts fresh + Err(check_err) => warn!( + "could not check destination row count for \"{}\".\"{}\" after failure: {check_err}", + self.table.table.schema, + self.table.table.name, + ), + } return Err(err); } - }; - - self.tx - .send(result) - .map_err(|_| Error::ParallelConnection)?; - - Ok::<(), Error>(()) - }) + Err(err) => { + let backoff = base_delay * 2u32.pow(attempt.min(5) as u32); + attempt += 1; + + warn!( + "data sync for \"{}\".\"{}\" failed (attempt {}/{}): {err}, retrying after {}ms...", + self.table.table.schema, + self.table.table.name, + attempt, + max_retries, + backoff.as_millis(), + ); + + // Reset counters so the next attempt's progress is reported accurately. + tracker.reset(); + + sleep(backoff).await; + // FUTURE: truncate before retry to handle the COPY-committed-but-dropped + // race (rows remain → PK violations). Safe once source-guard checks exist. + } + } + } } } @@ -89,6 +140,11 @@ impl ParallelSyncManager { } Ok(Self { + // TODO: this single shared semaphore cannot enforce per-replica limits — all + // permits could be consumed by tasks that round-robin happened to assign to the + // same replica, leaving others idle. Fix: replace with one Semaphore per replica, + // each sized to `parallel_copies`, and have each ParallelSync acquire from its + // assigned replica's semaphore. permit: Arc::new(Semaphore::new( replicas.len() * dest.resharding_parallel_copies(), )), @@ -106,22 +162,19 @@ impl ParallelSyncManager { self.permit.available_permits() / self.replicas.len(), ); - let mut replicas_iter = self.replicas.iter(); - // Loop through replicas, one at a time. - // This works around Rust iterators not having a "rewind" function. - let replica = loop { - if let Some(replica) = replicas_iter.next() { - break replica; - } else { - replicas_iter = self.replicas.iter(); - } - }; + // cycle() is the idiomatic "rewind": it restarts the iterator from the + // beginning once exhausted, giving round-robin distribution across replicas. + let mut replicas_iter = self.replicas.iter().cycle(); let (tx, mut rx) = unbounded_channel(); let mut tables = vec![]; let mut handles = vec![]; for table in self.tables { + // SAFETY: cycle() on a non-empty slice never returns None. + let replica = replicas_iter + .next() + .expect("replicas is non-empty; checked in new()"); handles.push( ParallelSync { table, diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index ad3bc1b75..8e0dae637 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -10,10 +10,12 @@ use crate::backend::pool::Address; use crate::backend::replication::publisher::progress::Progress; use crate::backend::replication::publisher::Lsn; +use crate::backend::pool::Request; use crate::backend::replication::status::TableCopy; use crate::backend::{Cluster, Server, ShardedTables}; use crate::config::config; use crate::frontend::router::parser::Column; +use crate::net::messages::Protocol; use crate::net::replication::StatusUpdate; use crate::util::escape_identifier; @@ -285,6 +287,28 @@ impl Table { Ok(self.lsn) } + + /// Returns `true` if the destination table has any rows on any shard. + /// + /// COPY is transactional and normally auto-rolls back on failure, leaving + /// the destination empty. This check catches the rare race where COPY + /// committed but an error was returned afterward (e.g., network drop during + /// CommandComplete), resulting in rows the retry would collide with. + pub async fn destination_has_rows(&self, dest: &Cluster) -> Result { + let sql = format!( + "SELECT 1 FROM \"{}\".\"{}\" LIMIT 1", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + ); + for (shard, _) in dest.shards().iter().enumerate() { + let mut server = dest.primary(shard, &Request::default()).await?; + let messages = server.execute_checked(sql.as_str()).await?; + if messages.iter().any(|m| m.code() == 'D') { + return Ok(true); + } + } + Ok(false) + } } #[cfg(test)] diff --git a/pgdog/src/backend/replication/logical/status.rs b/pgdog/src/backend/replication/logical/status.rs index b4e575031..f9560aee2 100644 --- a/pgdog/src/backend/replication/logical/status.rs +++ b/pgdog/src/backend/replication/logical/status.rs @@ -79,6 +79,19 @@ impl TableCopy { state.sql = Arc::new(sql.to_owned()); } } + + /// Reset byte and row counters before retrying a failed table copy. + /// Prevents accumulated counts from a discarded attempt inflating totals + /// and throughput calculations across retries. + pub(crate) fn reset(&self) { + if let Some(mut state) = TableCopies::get().get_mut(self) { + state.bytes = 0; + state.rows = 0; + state.bytes_per_sec = 0; + state.last_update = SystemTime::now(); + data_sync_progress(self, &state); + } + } } impl Drop for TableCopy { diff --git a/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs b/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs index ca3df408c..8e95be6f0 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs @@ -32,11 +32,11 @@ impl TwoPcTransactions { let mut transactions = vec![]; for record in records { - let transaction = record.get_text(1).and_then(|name| { + let transaction = record.get_text(1).map(|name| { if let Ok(ours) = TwoPcTransaction::from_str(&name) { - Some(TwoPcServerTransaction::Ours(ours)) + TwoPcServerTransaction::Ours(ours) } else { - Some(TwoPcServerTransaction::Other { name }) + TwoPcServerTransaction::Other { name } } }); diff --git a/pgdog/src/frontend/prepared_statements/mod.rs b/pgdog/src/frontend/prepared_statements/mod.rs index 62f8bddea..af96195f2 100644 --- a/pgdog/src/frontend/prepared_statements/mod.rs +++ b/pgdog/src/frontend/prepared_statements/mod.rs @@ -300,7 +300,7 @@ mod test { let global = statements.global.read(); assert_eq!(global.statements().len(), 2); - for (_, stmt) in global.statements() { + for stmt in global.statements().values() { if stmt.name() == first_name { // Old entry: should be decremented to 0 (no longer referenced). assert_eq!(stmt.used, 0, "old entry should be decremented"); diff --git a/pgdog/src/frontend/router/parser/context.rs b/pgdog/src/frontend/router/parser/context.rs index e66702702..712a29c7e 100644 --- a/pgdog/src/frontend/router/parser/context.rs +++ b/pgdog/src/frontend/router/parser/context.rs @@ -93,7 +93,7 @@ impl<'a> QueryParserContext<'a> { pub(super) fn use_parser(&self) -> bool { self.router_context .cluster - .use_query_parser(&self.router_context.client_request) + .use_query_parser(self.router_context.client_request) } /// Get the query we're parsing, if any. diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index 2c0eba9a1..cec05c806 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -182,7 +182,7 @@ fn integer_arg(node: &Node, bind: Option<&Bind>) -> Option { fn is_param_ref(node: &Node) -> bool { match node.node.as_ref() { Some(NodeEnum::ParamRef(_)) => true, - Some(NodeEnum::TypeCast(cast)) => cast.arg.as_deref().map_or(false, is_param_ref), + Some(NodeEnum::TypeCast(cast)) => cast.arg.as_deref().is_some_and(is_param_ref), _ => false, } } diff --git a/pgdog/src/net/error.rs b/pgdog/src/net/error.rs index 18288ed12..7d6ce0b39 100644 --- a/pgdog/src/net/error.rs +++ b/pgdog/src/net/error.rs @@ -103,3 +103,34 @@ pub enum Error { #[error("{0}")] TypeError(#[from] pgdog_postgres_types::Error), } + +impl Error { + /// Transient network fault worth retrying. + pub fn is_retryable(&self) -> bool { + matches!( + self, + Self::Io(_) | Self::UnexpectedEof | Self::ConnectionDown + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn retryable() { + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Io(io).is_retryable()); + assert!(Error::UnexpectedEof.is_retryable()); + assert!(Error::ConnectionDown.is_retryable()); + } + + #[test] + fn not_retryable() { + // TLS and protocol errors are permanent. + assert!(!Error::UnexpectedMessage('Z', 'Q').is_retryable()); + assert!(!Error::NotTextEncoding.is_retryable()); + assert!(!Error::UnexpectedPayload.is_retryable()); + } +}