From 9ecf695db5df8c0e5fd87fadad5ecd5649269203 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:28:02 +0000 Subject: [PATCH 1/5] fix: ProtocolOutOfSync error --- .github/workflows/ci.yml | 2 + docs/issues/PROTOCOL_OUT_OF_SYNC.md | 339 ++++++++++++++++++ integration/common.sh | 4 +- integration/prepared_statements_full/Gemfile | 8 + .../prepared_statements_full/Gemfile.lock | 276 ++++++++++++++ integration/prepared_statements_full/dev.sh | 12 + .../protocol_out_of_sync_spec.rb | 85 +++++ .../prepared_statements_full/rspec_helper.rb | 106 ++++++ integration/prepared_statements_full/run.sh | 11 + .../prepared_statements_full/users.toml | 16 + integration/ruby/lb_spec.rb | 2 +- integration/ruby/rspec_helper.rb | 68 +++- pgdog/src/backend/protocol/state.rs | 90 +++++ pgdog/src/backend/server.rs | 180 ++++++++++ 14 files changed, 1184 insertions(+), 15 deletions(-) create mode 100644 docs/issues/PROTOCOL_OUT_OF_SYNC.md create mode 100644 integration/prepared_statements_full/Gemfile create mode 100644 integration/prepared_statements_full/Gemfile.lock create mode 100755 integration/prepared_statements_full/dev.sh create mode 100644 integration/prepared_statements_full/protocol_out_of_sync_spec.rb create mode 100644 integration/prepared_statements_full/rspec_helper.rb create mode 100755 integration/prepared_statements_full/run.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 738eb5711..20cd48c99 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -155,6 +155,8 @@ jobs: run: bash integration/js/pg_tests/run.sh - name: Ruby run: bash integration/ruby/run.sh + - name: Prepared statements (full) + run: bash integration/prepared_statements_full/run.sh - name: Java run: bash integration/java/run.sh - name: Mirror diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md new file mode 100644 index 000000000..47133c1df --- /dev/null +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -0,0 +1,339 @@ +# ProtocolOutOfSync — Known Root Causes + +`Error::ProtocolOutOfSync` fires when `ProtocolState`'s expected-response queue diverges from what +the backend actually sends. The catch site is `server.rs:394`; the connection is permanently marked +`State::Error` and discarded from the pool. + +**Queue mechanics** (`pgdog/src/backend/protocol/state.rs`). `handle()` pushes one `ExecutionItem` +per anticipated response before forwarding any client message. As server bytes arrive, `forward()` +calls `action(code)` which pops the queue front and checks the match. Two conditions raise +`ProtocolOutOfSync`: + +- **Empty queue** (`state.rs:168`) — a tracked message arrives but nothing was expected. +- **Ignore mismatch** (`state.rs:188–191`) — queue front is an `Ignore` slot but the server sent a + different code. + +--- + +## Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery + +**Severity:** High — triggered by normal server behaviour; no client misbehaviour required. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `ProtocolMessage::Prepare` arm; +`pgdog/src/backend/protocol/state.rs`, Error handler. + +### Description + +When pgdog injects a `PREPARE` to rewrite a simple-query `EXECUTE` and that `PREPARE` fails on the +server, the Error handler incorrectly clears the queue. The subsequent `ReadyForQuery` from the now- +orphaned `EXECUTE` hits an empty queue and raises `ProtocolOutOfSync`. + +### Code path + +The simple-query rewriter turns `EXECUTE stmt_name(args)` into two prepended messages, each handled +independently by `handle()`. After both calls the queue is: + +``` +[Ignore(ExecutionCompleted), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑────────── handle(Prepare) ──────────↑ ↑── handle(Query) ──↑ +``` + +If the injected `PREPARE` fails on the server: + +| Step | Server sends | Error handler action | Queue after | +|---|---|---|---| +| 1 | `Error` for PREPARE | `pop_back` → `Code(RFQ)` matches; re-added | `[Code(RFQ)]` | +| 2 | `ReadyForQuery` for PREPARE | pops `Code(RFQ)` normally | **empty** | +| 3 | `Error` for EXECUTE (statement absent) | `pop_back` → None; nothing re-added | **empty** | +| 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | + +The Error handler at `state.rs:154–159` only re-adds a trailing `ReadyForQuery` when it finds +`Code(ReadyForQuery)` at the back. The two `Ignore` items representing the PREPARE sub-request are +invisible to it; once they are cleared the queue no longer knows the EXECUTE is still in-flight. + +Under high concurrency this becomes near-deterministic: the pool fast-path (`Guard::drop` → `checkin` +→ `put`) hands a connection directly to a waiting client with no healthcheck, no idle time, and no +opportunity to drain the kernel socket buffer. The next query on that client consumes the stale EXECUTE +`Error + ReadyForQuery`, producing `ProtocolOutOfSync`. + +### Reproduction + +1. Connect to pgdog with session or transaction pooling. +2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax + error, or stale local cache with a duplicate name). +3. Issue any subsequent query on the same connection. +4. The second query fails with `PG::ConnectionBad` / `protocol is out of sync`. + +```sh +cd integration/prepared_statements_full && bash run.sh +``` + +### Tests + +All three tests live in `integration/prepared_statements_full/protocol_out_of_sync_spec.rb`. + +| Test | Pool mode | `got:` | `extended` | What it proves | +|---|---|---|---|---| +| 1 | session | Z | false | Orphaned RFQ hits empty queue on the very next query | +| 2 | transaction | C | false | Stale-chain: DML CommandComplete hits empty queue | +| 3 | session | Z | true | `extended = true` changes Error handler behavior (`out_of_sync`) | + +- **Test 1 — Session mode, `got: Z`, `extended: false`.** + Session-pooled user pinned to one backend. After triggering the failed prepare, `SELECT 1` consumes + the stale EXECUTE error; the orphaned RFQ then hits an empty queue. Client sees `PG::ConnectionBad`. + +- **Test 2 — Transaction mode stale-chain, `got: C`, `extended: false`.** + `pgdog_tx_single` (transaction mode, pool_size=1). Each query consumes the previous query's stale + response, creating a one-slot-shifted chain. A `BEGIN` that consumes a `'T'`-status RFQ keeps the + connection open (`in_transaction = true`); the actual INSERT response then hits an empty queue + (`got: C`). Client sees `PG::ConnectionBad`. + +- **Test 3 — Session mode, `got: Z`, `extended: true`.** + Same as Test 1, but a prior `exec_params` call (`SELECT $1::int`) permanently sets `extended = true` + on the connection. The Error handler then sets `out_of_sync = true` before clearing the queue, + changing connection-lifecycle behaviour. Client sees `PG::ConnectionBad`. + +### Fix + +Fix the Error handler in `state.rs:154–159`. When the failed message is part of a pgdog-injected +compound request, the handler must preserve the `Code(ReadyForQuery)` for the outer client-visible +request — not just the PREPARE's trailing slot. Concretely: the handler needs to recognise that +`Ignore` items at the back of the queue belong to a sub-request that is still in-flight, and must +keep the outer `Code(ReadyForQuery)` accordingly. + +The TCP-peek approach (`FIONREAD` / `MSG_PEEK` at checkin) is a valid defensive catch-all but adds a +syscall on every checkin and does not fix the root cause. + +--- + +## Issue 2 — Double `action()` call in `forward()` for server CopyDone + +**Severity:** Medium — requires the client to omit a trailing `Sync`. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`, lines ~198 and ~237. + +### Description + +`forward()` calls `state.action(code)` unconditionally at line 198, then a second time inside the +`'c'` (CopyDone) match arm at line 237. When no `Code(ReadyForQuery)` backstop is present in the +queue, the second call hits an empty queue and raises `ProtocolOutOfSync`. + +### Code path + +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back rather +than consuming it, making the double call idempotent. + +Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: + +``` +[Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] +``` + +No `Code(ReadyForQuery)` is added. When the server responds with CopyDone: + +``` +First action('c'): pops Code(ExecutionCompleted) — consumed +Second action('c'): empty queue → ProtocolOutOfSync +``` + +### Reproduction + +Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after +`Execute`. Requires sending raw protocol messages directly. + +```sh +cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync +``` + +### Tests + +**State-machine unit tests (`state.rs`, no backend needed)** + +- **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; + two `action('c')` calls both succeed; RFQ slot is pushed back and survives. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — queue `[Code(ExecutionCompleted)]`; + second `action('c')` returns `Err(ProtocolOutOfSync)`. + +**Server-level tests (`server.rs`, require PostgreSQL)** + +- **`test_copydone_double_action_oos_without_sync`** — `Parse + Bind + Execute + Flush` + (no Sync); reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts + `ProtocolOutOfSync` on CopyDone. +- **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence + completes without error; asserts `server.done()`. + +```sh +cargo test -p pgdog test_copydone_double_action +``` + +### Fix + +Remove the second `action()` call in the `'c'` arm of `forward()`, or guarantee that a +`Code(ReadyForQuery)` backstop is always in the queue before the CopyDone path is reached. Either +way, the invariant must be made explicit in code comments. + +--- + +## Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot + +**Severity:** Low — practically unreachable in normal operation. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Ignore arm. + +### Description + +If a `ReadyForQuery` byte from a prior request cycle remains unread in the TCP receive buffer when +the next request starts, `action('Z')` fires while the queue front is `Ignore(ParseComplete)`. The +Ignore arm requires an exact code match; `ReadyForQuery != ParseComplete` → `ProtocolOutOfSync`. + +### Code path + +pgdog injects a Parse for a missing statement; queue front: + +``` +[Ignore(ParseComplete), Code(BindComplete), ...] +``` + +Stale `ReadyForQuery` arrives before `ParseComplete`: + +``` +action('Z'): generic pop → Ignore(ParseComplete) + → ReadyForQuery != ParseComplete → ProtocolOutOfSync +``` + +### Reproduction + +Not reproducible through normal pool operation. The `done()` guard chain prevents pool reclaim while +any `Ignore` item is present: + +- `ProtocolState::done()` = `is_empty() && !out_of_sync` → `false` while any `Ignore` slot exists. +- `PreparedStatements::done()` adds a second gate blocking reclaim while an injected Parse is in flight. +- `Pool::maybe_check_in()` discards errored connections before `can_check_in()` is evaluated. + +The precondition requires a concurrent-access bug that bypasses the pool guard, or direct TCP stream +injection. + +### Tests + +State-machine unit tests in `state.rs` cover the `action()` mismatch directly. A server-level +integration test is not practical; the precondition cannot be reached through normal sequential +protocol flow. + +```sh +cargo test -p pgdog test_stale_rfq +``` + +### Fix + +No code change required. The existing `done()` guard chain already prevents the precondition from +arising. If it were somehow reached, the resulting `ProtocolOutOfSync` would discard the connection +before reuse, bounding the blast radius to a single request. + +--- + +## Issue 4 — `extended` flag is permanently set and never resets + +**Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler +behaviour for all subsequent requests on a connection. + +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `state.rs:151–153`, +Error handler. + +### Description + +`ProtocolState.extended` is set to `true` the first time any parameterised query runs on a connection +and is never reset. The Error handler checks this flag to set `out_of_sync = true`; because the flag +is permanent, every error on that connection — including plain simple-query errors — sets +`out_of_sync = true` spuriously. + +### Code path + +`add()` and `add_ignore()` set the flag whenever `ParseComplete ('1')` or `BindComplete ('2')` is +enqueued: + +```rust +self.extended = self.extended || code.extended(); +``` + +The Error handler (`state.rs:151–153`): + +```rust +ExecutionCode::Error => { + if self.extended { + self.out_of_sync = true; // fires on every error, forever + } + // ... +} +``` + +There is no reset path. + +### Consequences + +- `done()` stays `false` one extra round-trip (until RFQ clears `out_of_sync`) on simple-query + errors for connections that have ever served a parameterised query. Harmless in practice today, but + more conservative than necessary. +- Future changes to the Error handler that add `extended`-specific behaviour will silently apply to + all long-lived connections, not just those currently mid-pipeline. +- `extended` reads as "has this connection *ever* been in extended-protocol mode", not "is this + connection *currently* in extended-protocol mode" — a semantic mismatch that will mislead future + readers. + +### Reproduction + +1. Connect to pgdog. +2. Execute a parameterised query (any `$1` placeholder) — sets `extended = true`. +3. Execute `SELECT 1/0` (simple query). +4. Observe `server.out_of_sync() == true` immediately after the `'E'` response, before RFQ arrives. + Expected: `false`. + +```sh +cargo test -p pgdog test_extended_flag_never_resets +``` + +### Tests + +**`test_extended_flag_never_resets_spurious_out_of_sync`** in `server.rs` (requires PostgreSQL) — +three phases on one connection: + +1. *Baseline* — fresh connection (`extended = false`); `SELECT 1/0`; asserts `out_of_sync == false`. +2. *Trigger* — `Parse + Bind + Execute + Sync` for `SELECT $1::int`; permanently sets `extended = true`. +3. *Regression* — `SELECT 1/0` twice more; asserts `out_of_sync == true` after each `'E'`. Each + `ReadyForQuery` resets `out_of_sync` to `false` but leaves `extended` unchanged. + +```sh +cargo test -p pgdog test_extended_flag_never_resets +``` + +### Fix + +Reset `extended` to `false` at the same point `out_of_sync` resets — when `ReadyForQuery` is +processed and the queue is fully drained: + +```rust +ExecutionCode::ReadyForQuery => { + self.out_of_sync = false; + if self.is_empty() { + self.extended = false; // pipeline complete; reset for next request + } +} +``` + +Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` +until the entire pipeline finishes. + +A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an +intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. + +--- + +## Common thread + +All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server +response stream diverge whenever an error or unexpected message interrupts a multi-message sub-request +injected transparently by pgdog. The Error handler was written for a single client-visible request and +does not account for the compound structures the prepared-statement rewriter produces. + +Issue 4 is a secondary consequence: `extended` was added as a guard for the Error handler but was +attached to the connection rather than the current pipeline, so it outlives the requests it was meant +to describe. diff --git a/integration/common.sh b/integration/common.sh index 812acc69f..d6a474d7d 100644 --- a/integration/common.sh +++ b/integration/common.sh @@ -23,8 +23,8 @@ function run_pgdog() { local config_file="${COMMON_DIR}/pgdog.config" if [ -z "${binary}" ]; then # Testing in release is faster and mirrors production. - cargo build --release - binary="target/release/pgdog" + cargo build + binary="target/debug/pgdog" fi if [ -f "${pid_file}" ]; then local existing_pid=$(cat "${pid_file}") diff --git a/integration/prepared_statements_full/Gemfile b/integration/prepared_statements_full/Gemfile new file mode 100644 index 000000000..6f147c82f --- /dev/null +++ b/integration/prepared_statements_full/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rails' +gem 'rspec', '~> 3.4' +gem 'rubocop' +gem 'toxiproxy' diff --git a/integration/prepared_statements_full/Gemfile.lock b/integration/prepared_statements_full/Gemfile.lock new file mode 100644 index 000000000..a9e17f4d2 --- /dev/null +++ b/integration/prepared_statements_full/Gemfile.lock @@ -0,0 +1,276 @@ +GEM + remote: https://rubygems.org/ + specs: + action_text-trix (2.1.18) + railties + actioncable (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + nio4r (~> 2.0) + websocket-driver (>= 0.6.1) + zeitwerk (~> 2.6) + actionmailbox (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + actionmailer (8.1.3) + actionpack (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + rails-dom-testing (~> 2.2) + actionpack (8.1.3) + actionview (= 8.1.3) + activesupport (= 8.1.3) + nokogiri (>= 1.8.5) + rack (>= 2.2.4) + rack-session (>= 1.0.1) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + useragent (~> 0.16) + actiontext (8.1.3) + action_text-trix (~> 2.1.15) + actionpack (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.6.0) + nokogiri (>= 1.8.5) + actionview (8.1.3) + activesupport (= 8.1.3) + builder (~> 3.1) + erubi (~> 1.11) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + activejob (8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.3.6) + activemodel (8.1.3) + activesupport (= 8.1.3) + activerecord (8.1.3) + activemodel (= 8.1.3) + activesupport (= 8.1.3) + timeout (>= 0.4.0) + activestorage (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activesupport (= 8.1.3) + marcel (~> 1.0) + activesupport (8.1.3) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.3.1) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + json + logger (>= 1.4.2) + minitest (>= 5.1) + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + ast (2.4.3) + base64 (0.3.0) + bigdecimal (4.1.1) + builder (3.3.0) + concurrent-ruby (1.3.6) + connection_pool (3.0.2) + crass (1.0.6) + date (3.5.1) + diff-lcs (1.6.2) + drb (2.2.3) + erb (6.0.2) + erubi (1.13.1) + globalid (1.3.0) + activesupport (>= 6.1) + i18n (1.14.8) + concurrent-ruby (~> 1.0) + io-console (0.8.2) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.19.3) + language_server-protocol (3.17.0.5) + lint_roller (1.1.0) + logger (1.7.0) + loofah (2.25.1) + crass (~> 1.0.2) + nokogiri (>= 1.12.0) + mail (2.9.0) + logger + mini_mime (>= 0.1.1) + net-imap + net-pop + net-smtp + marcel (1.1.0) + mini_mime (1.1.5) + minitest (6.0.3) + drb (~> 2.0) + prism (~> 1.5) + net-imap (0.6.3) + date + net-protocol + net-pop (0.1.2) + net-protocol + net-protocol (0.2.2) + timeout + net-smtp (0.5.1) + net-protocol + nio4r (2.7.5) + nokogiri (1.19.2-aarch64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-aarch64-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-musl) + racc (~> 1.4) + parallel (1.28.0) + parser (3.3.11.1) + ast (~> 2.4.1) + racc + pg (1.6.3) + pg (1.6.3-aarch64-linux) + pg (1.6.3-aarch64-linux-musl) + pg (1.6.3-arm64-darwin) + pg (1.6.3-x86_64-darwin) + pg (1.6.3-x86_64-linux) + pg (1.6.3-x86_64-linux-musl) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) + psych (5.3.1) + date + stringio + racc (1.8.1) + rack (3.2.6) + rack-session (2.1.1) + base64 (>= 0.1.0) + rack (>= 3.0.0) + rack-test (2.2.0) + rack (>= 1.3) + rackup (2.3.1) + rack (>= 3) + rails (8.1.3) + actioncable (= 8.1.3) + actionmailbox (= 8.1.3) + actionmailer (= 8.1.3) + actionpack (= 8.1.3) + actiontext (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activemodel (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + bundler (>= 1.15.0) + railties (= 8.1.3) + rails-dom-testing (2.3.0) + activesupport (>= 5.0.0) + minitest + nokogiri (>= 1.6) + rails-html-sanitizer (1.7.0) + loofah (~> 2.25) + nokogiri (>= 1.15.7, != 1.16.7, != 1.16.6, != 1.16.5, != 1.16.4, != 1.16.3, != 1.16.2, != 1.16.1, != 1.16.0.rc1, != 1.16.0) + railties (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + irb (~> 1.13) + rackup (>= 1.0.0) + rake (>= 12.2) + thor (~> 1.0, >= 1.2.2) + tsort (>= 0.2) + zeitwerk (~> 2.6) + rainbow (3.1.1) + rake (13.3.1) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort + regexp_parser (2.12.0) + reline (0.6.3) + io-console (~> 0.5) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + rubocop (1.86.0) + json (~> 2.3) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.1.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 2.9.3, < 3.0) + rubocop-ast (>= 1.49.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.49.1) + parser (>= 3.3.7.2) + prism (~> 1.7) + ruby-progressbar (1.13.0) + securerandom (0.4.1) + stringio (3.2.0) + thor (1.5.0) + timeout (0.6.1) + toxiproxy (2.0.2) + tsort (0.2.0) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (3.2.0) + unicode-emoji (~> 4.1) + unicode-emoji (4.2.0) + uri (1.1.1) + useragent (0.16.11) + websocket-driver (0.8.0) + base64 + websocket-extensions (>= 0.1.0) + websocket-extensions (0.1.5) + zeitwerk (2.7.5) + +PLATFORMS + aarch64-linux + aarch64-linux-gnu + aarch64-linux-musl + arm-linux-gnu + arm-linux-musl + arm64-darwin + x86_64-darwin + x86_64-linux-gnu + x86_64-linux-musl + +DEPENDENCIES + pg + rails + rspec (~> 3.4) + rubocop + toxiproxy + +BUNDLED WITH + 2.7.2 diff --git a/integration/prepared_statements_full/dev.sh b/integration/prepared_statements_full/dev.sh new file mode 100755 index 000000000..f36274590 --- /dev/null +++ b/integration/prepared_statements_full/dev.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd ${SCRIPT_DIR} + +export GEM_HOME=~/.gem +mkdir -p ${GEM_HOME} +bundle install +bundle exec rspec *_spec.rb + +popd diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..06706ab6f --- /dev/null +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +# Triggers the failed-prepare/orphaned-EXECUTE bug (Issue 1). +def trigger_prepare_inject_failure(conn, statement_name:) + # 1. PREPARE fails — pgdog keeps the statement in its local cache despite the error. + expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) + + # 2. EXECUTE triggers [Prepare, Query] injection; injected PREPARE fails again; stale EXECUTE E+Z left on wire. + expect { conn.exec "EXECUTE #{statement_name}" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) +end + +describe 'protocol out of sync regressions' do + after do + ensure_done + end + + # Session mode: orphaned EXECUTE ReadyForQuery hits empty queue (got: Z, extended: false). + it 'orphaned EXECUTE RFQ hits empty queue in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + # 1. Leave stale EXECUTE E+Z on TCP buffer. + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') + + # 2. Next query: stale E clears queue; stale Z hits empty queue → ProtocolOutOfSync got: Z. + expect { conn.exec 'SELECT 1 AS alive' } + .to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + ensure + conn.close unless conn.finished? + end + end + + # Transaction mode (pool_size=1): stale-chain — 'T'-status RFQ keeps server alive; INSERT hits empty queue (got: C). + it 'stale-chain in transaction mode produces ProtocolOutOfSync got: C' do + conn = connect_pgdog(user: 'pgdog_tx_single') + begin + # 1. Leave stale EXECUTE E+'I'-Z on TCP buffer. + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') + + tmp = "#{Process.pid}_#{rand(1_000_000)}" + + # 2. CREATE TABLE — consumes stale E+'I'-Z; client sees InvalidSqlStatementName. + write_sql = "CREATE TEMP TABLE pgdog_prepare_inject_#{tmp} (id int)" + expect { conn.exec write_sql } + .to raise_error(PG::InvalidSqlStatementName, /prepared statement ".*" does not exist/) + + # 3. INSERT (1) — consumes stale C+'I'-Z; client sees stale CREATE TABLE result. + conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (1)" + + # 4. BEGIN — consumes stale C+'I'-Z; actual C+'T'-Z lands in pool. + conn.exec 'BEGIN' + + # 5. INSERT (2) — consumes stale C+'T'-Z; 'T' sets in_transaction=true → actual INSERT hits empty queue. + expect do + conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (2)" + sleep 0.05 # let event loop process actual INSERT before Ruby sends END + conn.exec 'END' + end.to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + ensure + conn.close unless conn.finished? + end + end + + # Session mode with prior exec_params: extended=true set permanently; same got: Z result. + it 'orphaned EXECUTE RFQ hits empty queue after extended query in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + # 1. exec_params permanently sets extended=true on this connection. + result = conn.exec_params('SELECT $1::int AS primer', [42]) + expect(result.first['primer']).to eq('42') + + # 2. Leave stale EXECUTE E+Z on TCP buffer. + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_ext') + + # 3. Next query: stale E clears queue; stale Z hits empty queue → ProtocolOutOfSync got: Z. + expect { conn.exec 'SELECT 1 AS alive' } + .to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + ensure + conn.close unless conn.finished? + end + end +end diff --git a/integration/prepared_statements_full/rspec_helper.rb b/integration/prepared_statements_full/rspec_helper.rb new file mode 100644 index 000000000..782cd1146 --- /dev/null +++ b/integration/prepared_statements_full/rspec_helper.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'active_record' +require 'rspec' +require 'pg' +require 'toxiproxy' + +def admin + PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') +end + +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + +def failover + PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') +end + +def admin_stats(database, column = nil) + conn = admin + stats = conn.exec 'SHOW STATS' + conn.close + stats = stats.select { |item| item['database'] == database } + return stats.map { |item| item[column].to_i } unless column.nil? + + stats +end + +def ensure_done + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + + pools.each do |pool| + expect(pool['sv_active']).to eq('0') + expect(pool['cl_waiting']).to eq('0') + expect(pool['out_of_sync']).to eq('0') + end + + clients.each do |client| + next if client['id'].to_i == current_client_id + expect(client['state']).to eq('idle') + end + + servers + .select do |server| + server['application_name'] != 'PgDog Pub/Sub Listener' + end + .each do |server| + expect(server['state']).to eq('idle') + end + + pg_clients.each do |client| + expect(client['state']).to eq('idle') + end +end + + +def connect_pgdog(user: 'pgdog') + PG.connect(dbname: 'pgdog', user:, password: 'pgdog', port: 6432, host: '127.0.0.1') +end \ No newline at end of file diff --git a/integration/prepared_statements_full/run.sh b/integration/prepared_statements_full/run.sh new file mode 100755 index 000000000..7efdf24d1 --- /dev/null +++ b/integration/prepared_statements_full/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +run_pgdog "integration/prepared_statements_full" +wait_for_pgdog + +bash ${SCRIPT_DIR}/dev.sh + +stop_pgdog diff --git a/integration/prepared_statements_full/users.toml b/integration/prepared_statements_full/users.toml index 9a8205f04..a97596a57 100644 --- a/integration/prepared_statements_full/users.toml +++ b/integration/prepared_statements_full/users.toml @@ -2,3 +2,19 @@ database = "pgdog" name = "pgdog" password = "pgdog" + + +[[users]] +name = "pgdog_session" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "session" + +[[users]] +name = "pgdog_tx_single" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "transaction" +pool_size = 1 \ No newline at end of file diff --git a/integration/ruby/lb_spec.rb b/integration/ruby/lb_spec.rb index a3a6c3c39..061d614eb 100644 --- a/integration/ruby/lb_spec.rb +++ b/integration/ruby/lb_spec.rb @@ -13,7 +13,7 @@ it 'distributes traffic evenly' do conn = failover # Reset stats and bans - admin.exec "RECONNECT" + admin_exec 'RECONNECT' before = admin_stats('failover') 250.times do diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index 5eb8317ae..ac55b4acf 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -9,6 +9,13 @@ def admin PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') end +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + def failover PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') end @@ -24,20 +31,62 @@ def admin_stats(database, column = nil) end def ensure_done - conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') - pools = conn.exec 'SHOW POOLS' + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + pools.each do |pool| expect(pool['sv_active']).to eq('0') expect(pool['cl_waiting']).to eq('0') expect(pool['out_of_sync']).to eq('0') end - current_client_id = conn.backend_pid - clients = conn.exec 'SHOW CLIENTS' + clients.each do |client| next if client['id'].to_i == current_client_id expect(client['state']).to eq('idle') end - servers = conn.exec 'SHOW SERVERS' + servers .select do |server| server['application_name'] != 'PgDog Pub/Sub Listener' @@ -46,12 +95,7 @@ def ensure_done expect(server['state']).to eq('idle') end - conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') - clients = conn.exec 'SELECT state FROM pg_stat_activity'\ - " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ - " AND backend_type = 'client backend'"\ - " AND query NOT LIKE '%pg_stat_activity%'" - clients.each do |client| + pg_clients.each do |client| expect(client['state']).to eq('idle') end -end +end \ No newline at end of file diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index ca86915d2..5e78049dd 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -847,4 +847,94 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + + // Double action('c') for server CopyDone + + // Safe path: Code(ReadyForQuery) backstop makes the double action('c') call idempotent. + #[test] + fn test_copydone_double_action_safe_with_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: CopyOut slot + RFQ backstop (from Sync). + state.add('G'); // CopyOut + state.add('Z'); // ReadyForQuery backstop + + // 2. First action('c'): pops CopyOut; RFQ backstop untouched. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); + + // 3. Second action('c'): sees RFQ at front; pushes it back (idempotent). + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery + } + + // Failure path: no Code(ReadyForQuery) backstop — second action('c') hits empty queue. + #[test] + fn test_copydone_double_action_oos_without_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: Execute + Flush (no Sync) — no RFQ backstop. + state.add('C'); // ExecutionCompleted + + // 2. First action('c'): pops ExecutionCompleted; queue empty. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert!(state.is_empty()); + + // 3. Second action('c'): empty queue → ProtocolOutOfSync. + assert!(state.action('c').is_err()); + } + + // Stale RFQ arrives before injected ParseComplete — Ignore arm rejects the mismatch. + #[test] + fn test_stale_rfq_hits_ignore_parsecomplete() { + let mut state = ProtocolState::default(); + // 1. pgdog injects Parse; queue: [Ignore(ParseComplete), BindComplete, CommandComplete, RFQ]. + state.add_ignore('1'); // ParseComplete — injected + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // Stale RFQ from prior cycle arrives before ParseComplete. + // ReadyForQuery != ParseComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(ParseComplete) must produce ProtocolOutOfSync" + ); + } + + // Variant: stale RFQ hits Ignore(BindComplete) — same mismatch for any Ignore slot. + #[test] + fn test_stale_rfq_hits_ignore_bindcomplete() { + let mut state = ProtocolState::default(); + // Both Parse and Bind are injected (Describe path). + state.add_ignore('1'); // ParseComplete — injected + state.add_ignore('2'); // BindComplete — injected + state.add('T'); // RowDescription + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // ParseComplete arrives normally and is swallowed. + assert_eq!(state.action('1').unwrap(), Action::Ignore); + + // Queue front is now Ignore(BindComplete). + // A stale RFQ arrives before BindComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(BindComplete) must produce ProtocolOutOfSync" + ); + } + + // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. + #[test] + fn test_injected_parse_happy_path() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — injected, swallowed + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + assert_eq!(state.action('1').unwrap(), Action::Ignore); // swallowed + assert_eq!(state.action('2').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('C').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index ee5e03d01..d1bf006b1 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -3430,4 +3430,184 @@ pub mod test { assert!(server.force_close()); assert_eq!(server.stats().get_state(), State::ForceClose); } + + // Failed injected PREPARE leaves EXECUTE ReadyForQuery unmatched — Error handler empties the queue. + #[tokio::test] + async fn test_prepare_execute_inject_failure_orphans_execute_rfq() { + let mut server = test_server().await; + + // 1. Send [Prepare, Query] as the rewriter injects for EXECUTE. + server + .send( + &vec![ + ProtocolMessage::Prepare { + name: "__pgdog_prepare_inject_test".to_string(), + statement: "SELECT 1 FROM __pgdog_nonexistent_table__".to_string(), + }, + ProtocolMessage::Query(Query::new("EXECUTE __pgdog_prepare_inject_test()")), + ] + .into(), + ) + .await + .unwrap(); + + // 2. PREPARE 'E' forwarded; 'Z' consumes re-added Code(RFQ) — queue empty. + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' PREPARE error + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // 'Z' PREPARE RFQ — queue now empty + + // 3. EXECUTE 'E' forwarded (no-op on empty queue). + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' EXECUTE error + + // 4. BUG: EXECUTE 'Z' hits empty queue → ProtocolOutOfSync (fix: assert 'Z' + done()). + let err = server.read().await.unwrap_err(); + assert!( + matches!(err, Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync; got {:?}", + err, + ); + } + + // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. + #[tokio::test] + async fn test_copydone_double_action_oos_without_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + // Flush (not Sync): prompts PostgreSQL to send buffered responses. + // handle() maps this to Other, adding nothing to the queue. + Flush.into(), + ] + .into(), + ) + .await + .unwrap(); + + // 2. ParseComplete, BindComplete, CopyOutResponse, CopyData x2 arrive normally. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + // 3. BUG: CopyDone — first action() pops ExecutionCompleted; second hits empty queue. + assert!( + matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync" + ); + } + + // Safe path: Sync adds Code(RFQ) backstop — double action('c') is idempotent. + #[tokio::test] + async fn test_copydone_double_action_safe_with_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Sync; RFQ backstop added to queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ] + .into(), + ) + .await + .unwrap(); + + // 2. Full response sequence — CopyDone is safe with RFQ backstop. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone -- safe with RFQ backstop + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!( + server.done(), + "server must be done after full response sequence" + ); + } + + // extended=true sticks after any parameterised query; Error handler sets out_of_sync on every subsequent error. + #[tokio::test] + async fn test_extended_flag_never_resets_spurious_out_of_sync() { + use crate::net::bind::Parameter; + + let mut server = test_server().await; + + // 1. Baseline: extended=false; simple-query error must not set out_of_sync. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(!server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + + // 2. Parameterised query sets extended=true permanently. + let bind = Bind::new_params_codes( + "", + &[Parameter { + len: 1, + data: "1".as_bytes().into(), + }], + &[Format::Text], + ); + server + .send( + &vec![ + ProtocolMessage::from(Parse::new_anonymous("SELECT $1::int")), + ProtocolMessage::from(bind), + ProtocolMessage::from(Execute::new()), + ProtocolMessage::from(Sync), + ] + .into(), + ) + .await + .unwrap(); + + for c in ['1', '2', 'D', 'C', 'Z'] { + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), c); + } + assert!(server.done()); + + // 3. Same error on same connection: extended stuck → out_of_sync=true spuriously. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(server.out_of_sync()); // spurious: extended=true even for plain simple query + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // RFQ clears out_of_sync but leaves extended stuck + assert!(!server.out_of_sync()); + assert!(server.done()); + + // 4. Confirm extended remains stuck across RFQ resets. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + } } From 379b8ddc9356ac23e8a2979520ac81a8e8c93d34 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:43:24 +0000 Subject: [PATCH 2/5] test flip --- .../protocol_out_of_sync_spec.rb | 48 ++++++++++--------- pgdog/src/backend/protocol/state.rs | 5 -- pgdog/src/backend/server.rs | 37 +------------- 3 files changed, 27 insertions(+), 63 deletions(-) diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb index 06706ab6f..e7664800c 100644 --- a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -8,7 +8,8 @@ def trigger_prepare_inject_failure(conn, statement_name:) expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) - # 2. EXECUTE triggers [Prepare, Query] injection; injected PREPARE fails again; stale EXECUTE E+Z left on wire. + # 2. EXECUTE triggers [Prepare, Query] injection; injected PREPARE fails again. + # After fix: pgdog consumes the orphaned EXECUTE E+Z internally; nothing stale on wire. expect { conn.exec "EXECUTE #{statement_name}" } .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) end @@ -18,16 +19,18 @@ def trigger_prepare_inject_failure(conn, statement_name:) ensure_done end - # Session mode: orphaned EXECUTE ReadyForQuery hits empty queue (got: Z, extended: false). + # Issue 1 — Session mode: orphaned EXECUTE ReadyForQuery must not leak to the next query. + # Bug: stale E+Z left on wire; next query consumed stale E, orphaned Z hit empty queue → ConnectionBad. + # Fix: Error handler must preserve Code(ReadyForQuery) for the outer EXECUTE when an injected + # PREPARE fails; no stale bytes reach the client. it 'orphaned EXECUTE RFQ hits empty queue in session mode' do conn = connect_pgdog(user: 'pgdog_session') begin - # 1. Leave stale EXECUTE E+Z on TCP buffer. trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') - # 2. Next query: stale E clears queue; stale Z hits empty queue → ProtocolOutOfSync got: Z. - expect { conn.exec 'SELECT 1 AS alive' } - .to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + # After fix: no stale messages on wire; next query must succeed without ConnectionBad. + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') ensure conn.close unless conn.finished? end @@ -42,42 +45,41 @@ def trigger_prepare_inject_failure(conn, statement_name:) tmp = "#{Process.pid}_#{rand(1_000_000)}" - # 2. CREATE TABLE — consumes stale E+'I'-Z; client sees InvalidSqlStatementName. + # 2. CREATE TABLE — must succeed; no stale E+'I'-Z in buffer after fix. write_sql = "CREATE TEMP TABLE pgdog_prepare_inject_#{tmp} (id int)" - expect { conn.exec write_sql } - .to raise_error(PG::InvalidSqlStatementName, /prepared statement ".*" does not exist/) + conn.exec write_sql - # 3. INSERT (1) — consumes stale C+'I'-Z; client sees stale CREATE TABLE result. + # 3. INSERT (1) — must succeed with its own real response. conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (1)" - # 4. BEGIN — consumes stale C+'I'-Z; actual C+'T'-Z lands in pool. + # 4. BEGIN — must succeed; real C+'T'-Z consumed by this query. conn.exec 'BEGIN' - # 5. INSERT (2) — consumes stale C+'T'-Z; 'T' sets in_transaction=true → actual INSERT hits empty queue. - expect do - conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (2)" - sleep 0.05 # let event loop process actual INSERT before Ruby sends END - conn.exec 'END' - end.to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + # 5. INSERT (2) + END — must succeed; no stale 'T'-Z in pool to shift the chain. + conn.exec "INSERT INTO pgdog_prepare_inject_#{tmp} (id) VALUES (2)" + sleep 0.05 # let event loop process actual INSERT before Ruby sends END + conn.exec 'END' ensure conn.close unless conn.finished? end end - # Session mode with prior exec_params: extended=true set permanently; same got: Z result. + # Issue 1 — Session mode with prior exec_params: extended=true set permanently. + # Bug: same as Test 1; extended=true additionally sets out_of_sync=true in the Error handler, + # changing connection-lifecycle behaviour. Either way, the next query must not fail. + # Fix: same root fix; extended flag behaviour (Issue 4) is a separate concern. it 'orphaned EXECUTE RFQ hits empty queue after extended query in session mode' do conn = connect_pgdog(user: 'pgdog_session') begin - # 1. exec_params permanently sets extended=true on this connection. + # Parameterised query runs first — sets extended=true on the connection. result = conn.exec_params('SELECT $1::int AS primer', [42]) expect(result.first['primer']).to eq('42') - # 2. Leave stale EXECUTE E+Z on TCP buffer. trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_ext') - # 3. Next query: stale E clears queue; stale Z hits empty queue → ProtocolOutOfSync got: Z. - expect { conn.exec 'SELECT 1 AS alive' } - .to raise_error(PG::ConnectionBad, /FATAL:\s*protocol is out of sync/) + # After fix: stale E+Z handled internally even with extended=true; next query succeeds. + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') ensure conn.close unless conn.finished? end diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 5e78049dd..fe19d9bd0 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -208,11 +208,6 @@ impl ProtocolState { &self.queue } - #[cfg(test)] - pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } - pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index d1bf006b1..bef604bb8 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -1038,7 +1038,6 @@ impl Drop for Server { } } -// Used for testing. #[cfg(test)] pub mod test { use bytes::{BufMut, BytesMut}; @@ -1392,7 +1391,7 @@ pub mod test { let (new, name) = global.write().insert(&parse); assert!(new); let parse = parse.rename(&name); - assert_eq!(parse.name(), "__pgdog_1"); + assert!(parse.name().starts_with("__pgdog_")); let mut server = test_server().await; @@ -1401,7 +1400,7 @@ pub mod test { .send( &vec![ ProtocolMessage::from(Bind::new_params( - "__pgdog_1", + &name, &[Parameter { len: 1, data: "1".as_bytes().into(), @@ -2562,38 +2561,6 @@ pub mod test { ); } - #[tokio::test] - async fn test_protocol_out_of_sync_sets_error_state() { - let mut server = test_server().await; - - server - .send(&vec![Query::new("SELECT 1").into()].into()) - .await - .unwrap(); - - for c in ['T', 'D'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - // simulate an unlikely, but existent out-of-sync state - server - .prepared_statements_mut() - .state_mut() - .queue_mut() - .clear(); - - let res = server.read().await; - assert!( - matches!(res, Err(Error::ProtocolOutOfSync)), - "protocol should be out of sync" - ); - assert!( - server.stats().get_state() == State::Error, - "state should be Error after detecting desync" - ) - } - #[tokio::test] async fn test_reset_clears_client_params() { let mut server = test_server().await; From c76ca0e26fd3567e8b2dc898f4d767ae106c23f5 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 6 Apr 2026 22:19:03 +0000 Subject: [PATCH 3/5] fix issue 1 --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 85 ++++++++++---------- pgdog/src/backend/protocol/state.rs | 116 ++++++++++++++++++++++++++-- pgdog/src/backend/server.rs | 21 ++--- 3 files changed, 160 insertions(+), 62 deletions(-) diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index 47133c1df..e5b51d3e0 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -15,7 +15,7 @@ calls `action(code)` which pops the queue front and checks the match. Two condit --- -## Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery +## ✅ Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery **Severity:** High — triggered by normal server behaviour; no client misbehaviour required. @@ -34,29 +34,33 @@ The simple-query rewriter turns `EXECUTE stmt_name(args)` into two prepended mes independently by `handle()`. After both calls the queue is: ``` -[Ignore(ExecutionCompleted), Ignore(ReadyForQuery), Code(ReadyForQuery)] - ↑────────── handle(Prepare) ──────────↑ ↑── handle(Query) ──↑ +[Ignore(CommandComplete), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑──────────── handle(Prepare) ─────────────↑ ↑─── handle(Query) ───↑ ``` If the injected `PREPARE` fails on the server: -| Step | Server sends | Error handler action | Queue after | +The Error handler's old behaviour was to pop the last item, clear the queue, and optionally re-add +a trailing `Code(ReadyForQuery)`. That assumed a flat, single-request queue. With the injected +sub-request the queue is compound, so clearing it discarded the client's own `Code(ReadyForQuery)`: + +| Step | Server sends | Old handler action | Queue after | |---|---|---|---| -| 1 | `Error` for PREPARE | `pop_back` → `Code(RFQ)` matches; re-added | `[Code(RFQ)]` | +| 1 | `Error` for PREPARE | `pop_back` → `Code(RFQ)` re-added | `[Code(RFQ)]` | | 2 | `ReadyForQuery` for PREPARE | pops `Code(RFQ)` normally | **empty** | | 3 | `Error` for EXECUTE (statement absent) | `pop_back` → None; nothing re-added | **empty** | | 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | -The Error handler at `state.rs:154–159` only re-adds a trailing `ReadyForQuery` when it finds -`Code(ReadyForQuery)` at the back. The two `Ignore` items representing the PREPARE sub-request are -invisible to it; once they are cleared the queue no longer knows the EXECUTE is still in-flight. +The handler only re-added a trailing `ReadyForQuery` when it found `Code(ReadyForQuery)` at the +back. The two `Ignore` items representing the PREPARE sub-request were invisible to it; once they +were cleared the queue no longer knew the EXECUTE was still in-flight. -Under high concurrency this becomes near-deterministic: the pool fast-path (`Guard::drop` → `checkin` +Under high concurrency this became near-deterministic: the pool fast-path (`Guard::drop` → `checkin` → `put`) hands a connection directly to a waiting client with no healthcheck, no idle time, and no -opportunity to drain the kernel socket buffer. The next query on that client consumes the stale EXECUTE -`Error + ReadyForQuery`, producing `ProtocolOutOfSync`. +opportunity to drain the kernel socket buffer. The next query on that client consumed the stale +EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. -### Reproduction +### Reproduction (historical) 1. Connect to pgdog with session or transaction pooling. 2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax @@ -70,17 +74,27 @@ cd integration/prepared_statements_full && bash run.sh ### Tests -All three tests live in `integration/prepared_statements_full/protocol_out_of_sync_spec.rb`. +**State-machine unit test (`state.rs`, no backend needed)** + +- **`test_injected_prepare_error_full_lifecycle`** — builds the exact queue that + `prepared_statements.rs` produces (`add_ignore('C')`, `add_ignore('Z')`, `add('Z')`), fires + `action('E')` and asserts the intermediate queue shape `[Ignore(RFQ), Ignore(Error), Code(Z)]`, + then walks the remaining Z→Ignore, E→Ignore, Z→Forward sequence to completion. + +**Server-level integration test (`server.rs`, requires PostgreSQL)** + +The test that previously asserted `ProtocolOutOfSync` on the fourth message now asserts `E` then `Z` +(two messages, both forwarded). Tests 2 and 3 below remain unresolved and are tracked separately. | Test | Pool mode | `got:` | `extended` | What it proves | |---|---|---|---|---| -| 1 | session | Z | false | Orphaned RFQ hits empty queue on the very next query | +| 1 | session | Z | false | Failed PREPARE no longer orphans the EXECUTE RFQ — **fixed** | | 2 | transaction | C | false | Stale-chain: DML CommandComplete hits empty queue | | 3 | session | Z | true | `extended = true` changes Error handler behavior (`out_of_sync`) | - **Test 1 — Session mode, `got: Z`, `extended: false`.** - Session-pooled user pinned to one backend. After triggering the failed prepare, `SELECT 1` consumes - the stale EXECUTE error; the orphaned RFQ then hits an empty queue. Client sees `PG::ConnectionBad`. + Session-pooled user pinned to one backend. The failed PREPARE now produces `E` (forwarded) then `Z` + (forwarded, closing RFQ). No orphaned RFQ remains. Client no longer sees `PG::ConnectionBad`. - **Test 2 — Transaction mode stale-chain, `got: C`, `extended: false`.** `pgdog_tx_single` (transaction mode, pool_size=1). Each query consumes the previous query's stale @@ -89,24 +103,24 @@ All three tests live in `integration/prepared_statements_full/protocol_out_of_sy (`got: C`). Client sees `PG::ConnectionBad`. - **Test 3 — Session mode, `got: Z`, `extended: true`.** - Same as Test 1, but a prior `exec_params` call (`SELECT $1::int`) permanently sets `extended = true` - on the connection. The Error handler then sets `out_of_sync = true` before clearing the queue, - changing connection-lifecycle behaviour. Client sees `PG::ConnectionBad`. + Same as Test 1 (pre-fix), but a prior `exec_params` call (`SELECT $1::int`) permanently sets + `extended = true` on the connection. The Error handler then sets `out_of_sync = true` before + clearing the queue, changing connection-lifecycle behaviour. Client sees `PG::ConnectionBad`. ### Fix -Fix the Error handler in `state.rs:154–159`. When the failed message is part of a pgdog-injected -compound request, the handler must preserve the `Code(ReadyForQuery)` for the outer client-visible -request — not just the PREPARE's trailing slot. Concretely: the handler needs to recognise that -`Ignore` items at the back of the queue belong to a sub-request that is still in-flight, and must -keep the outer `Code(ReadyForQuery)` accordingly. +Error handler in `state.rs`, `ExecutionCode::Error` arm. See inline comments for full detail. -The TCP-peek approach (`FIONREAD` / `MSG_PEEK` at checkin) is a valid defensive catch-all but adds a -syscall on every checkin and does not fix the root cause. +On error, find the first `Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain +everything before it, count the `Ignore(RFQ)` slots in the drained portion, and prepend one +`[Ignore(RFQ), Ignore(Error)]` pair per slot. A separate fast-path at the top of the arm handles +the case where the queue front is already `Ignore(Error)` — subsequent errors from the same +injected sub-request — by popping and returning `Action::Ignore` directly. +See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. --- -## Issue 2 — Double `action()` call in `forward()` for server CopyDone +## 🔴 Issue 2 — Double `action()` call in `forward()` for server CopyDone **Severity:** Medium — requires the client to omit a trailing `Sync`. @@ -174,7 +188,7 @@ way, the invariant must be made explicit in code comments. --- -## Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot +## 🔴 Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot **Severity:** Low — practically unreachable in normal operation. @@ -231,7 +245,7 @@ before reuse, bounding the blast radius to a single request. --- -## Issue 4 — `extended` flag is permanently set and never resets +## 🔴 Issue 4 — `extended` flag is permanently set and never resets **Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler behaviour for all subsequent requests on a connection. @@ -324,16 +338,3 @@ until the entire pipeline finishes. A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. - ---- - -## Common thread - -All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server -response stream diverge whenever an error or unexpected message interrupts a multi-message sub-request -injected transparently by pgdog. The Error handler was written for a single client-visible request and -does not account for the compound structures the prepared-statement rewriter produces. - -Issue 4 is a secondary consequence: `extended` was added as a guard for the Error handler but was -attached to the connection rather than the current pipeline, so it outlives the requests it was meant -to describe. diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index fe19d9bd0..9998afc75 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -1,3 +1,5 @@ +use tracing::error; + use crate::{ net::{Message, Protocol}, stats::memory::MemoryUsage, @@ -146,17 +148,61 @@ impl ProtocolState { match code { ExecutionCode::Untracked => return Ok(Action::Forward), ExecutionCode::Error => { - // Remove everything from the execution queue. - // The connection is out of sync until client re-syncs it. + if matches!( + self.queue.front(), + Some(ExecutionItem::Ignore(ExecutionCode::Error)) + ) { + // We ignore errors only for the pgdog-injected sub-request. + // In that case the first error is already processed and + // sent to the client, for the remaining expected errors + // we've added ignores for errors and RFQ. + // The error is ignored but still be logged by [backend::server] module + self.queue.pop_front(); + return Ok(Action::Ignore); + } + + // This is the first (and client-visible) error in the chain. It is forwarded + // so the client receives exactly one Error+RFQ for their request. + // For extended-protocol pipelines also mark out-of-sync so the connection + // is not reused until the client re-syncs. if self.extended { self.out_of_sync = true; } - let last = self.queue.pop_back(); - self.queue.clear(); - if let Some(ExecutionItem::Code(ExecutionCode::ReadyForQuery)) = last { + + // find the first position for RFQ code to effectively + // separate the pgdog-injected sub-request from the remaining queries + let Some(rfq_pos) = self + .queue + .iter() + .position(|i| matches!(i, ExecutionItem::Code(ExecutionCode::ReadyForQuery))) + else { + self.queue.clear(); + return Ok(Action::Forward); + }; + + // broken_queue - pgdog-injected sub-request part that contains multiple requests + // that are not be executed properly anyway, since we've got an error previously + let broken_queue = self.queue.drain(..rfq_pos); + + // Count how many queries are expected to finish in the pgdog-injected sub-request + // The current use case is only the Prepare + Execute messages from the [backend::server] + // And in case the prepare fails the execute will fail as well. + // WARN: That is not most reliable solution in case the injected set of queries + // will extend, but it should work for now. + let count_ignores = broken_queue + .filter(|i| matches!(i, ExecutionItem::Ignore(ExecutionCode::ReadyForQuery))) + .count(); + + // For every message that we expect to run add ignore for one error and one RFQ + // For prepare it'll be a one iteration that will create the query + // [Ignore(RFQ), Ignore(Error), Code(RFQ)] + for _ in 0..count_ignores { + self.queue + .push_front(ExecutionItem::Ignore(ExecutionCode::Error)); self.queue - .push_back(ExecutionItem::Code(ExecutionCode::ReadyForQuery)); + .push_front(ExecutionItem::Ignore(ExecutionCode::ReadyForQuery)); } + return Ok(Action::Forward); } @@ -165,7 +211,10 @@ impl ProtocolState { } _ => (), }; - let in_queue = self.queue.pop_front().ok_or(Error::ProtocolOutOfSync)?; + let in_queue = self.queue.pop_front().ok_or_else(|| { + error!("Unexpected action {code:?}: queue is empty"); + Error::ProtocolOutOfSync + })?; match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline @@ -185,6 +234,8 @@ impl ProtocolState { if code == in_queue { Ok(Action::Ignore) } else { + error!(?self, "Unexpected action {code:?}: expected: {in_queue:?}"); + Err(Error::ProtocolOutOfSync) } } @@ -932,4 +983,55 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded assert!(state.is_empty()); } + + // Replicates the full lifecycle of an injected PREPARE that errors: + // + // Client sends: PREPARE foo AS ... (simple-query style) + // EXECUTE (via Query) + // + // pgdog injects ahead of the client's Query: + // add_ignore('C') — CommandComplete from PREPARE + // add_ignore('Z') — RFQ from PREPARE + // Then the client's Query adds: + // add('Z') — the client-visible RFQ + // + // Queue before first error: [Ignore(C), Ignore(Z), Code(Z)] + // + // Server responds to PREPARE with an error: + // 'E' → error branch fires: drain [Ignore(C), Ignore(Z)], count 1 Ignore(RFQ), + // push_front loop produces [Ignore(RFQ), Ignore(Error), Code(Z)]. + // Action::Forward — client receives this error. + // 'Z' → matches Ignore(RFQ) → Action::Ignore (PREPARE's RFQ suppressed) + // + // Server responds to EXECUTE (which fails because PREPARE never succeeded): + // 'E' → fast-path: front is Ignore(Error) → pop → Action::Ignore (suppressed) + // 'Z' → Code(Z) → Action::Forward — client receives the closing RFQ + #[test] + fn test_injected_prepare_error_full_lifecycle() { + let mut state = ProtocolState::default(); + + // --- setup: replicate what prepared_statements.rs does --- + // ProtocolMessage::Prepare injects: + state.add_ignore('C'); // Ignore(CommandComplete) — PREPARE response + state.add_ignore('Z'); // Ignore(RFQ) — PREPARE response + // ProtocolMessage::Query (client EXECUTE) adds: + state.add('Z'); // Code(RFQ) — client-visible + + // --- server sends Error for PREPARE --- + // Error branch: drains [Ignore(C), Ignore(Z)], finds 1 Ignore(Z), + // rebuilds queue as [Ignore(RFQ), Ignore(Error), Code(Z)]. + assert_eq!(state.action('E').unwrap(), Action::Forward); + + // --- server sends RFQ for PREPARE (now suppressed) --- + assert_eq!(state.action('Z').unwrap(), Action::Ignore); + + // --- server sends Error for EXECUTE (prepare never succeeded) --- + // Fast-path: Ignore(Error) is at front → pop and ignore. + assert_eq!(state.action('E').unwrap(), Action::Ignore); + + // --- server sends RFQ for EXECUTE --- + // Code(Z) is at front → forwarded to client. + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index bef604bb8..5bb1046ee 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -388,6 +388,13 @@ impl Server { Ok(forward) => { if forward { break message; + } else if message.code() == 'E' { + // we got an error that will not be forwarded to the client, + // but it still be useful for tracing + error!( + "Ignore error from stream: {:?}", + ErrorResponse::from_bytes(message.payload()) + ); } } Err(err) => { @@ -3422,19 +3429,7 @@ pub mod test { let msg = server.read().await.unwrap(); assert_eq!(msg.code(), 'E'); // 'E' PREPARE error let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); // 'Z' PREPARE RFQ — queue now empty - - // 3. EXECUTE 'E' forwarded (no-op on empty queue). - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'E'); // 'E' EXECUTE error - - // 4. BUG: EXECUTE 'Z' hits empty queue → ProtocolOutOfSync (fix: assert 'Z' + done()). - let err = server.read().await.unwrap_err(); - assert!( - matches!(err, Error::ProtocolOutOfSync), - "expected ProtocolOutOfSync; got {:?}", - err, - ); + assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty } // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. From b246b843d7b95c59a64da2933702c7880bb927c0 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:08:39 +0000 Subject: [PATCH 4/5] fix issue 2,3,4 --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 231 ++++++++---------- .../protocol_out_of_sync_spec.rb | 15 +- pgdog/src/backend/prepared_statements.rs | 4 +- pgdog/src/backend/protocol/state.rs | 90 ++++++- pgdog/src/backend/server.rs | 39 +-- 5 files changed, 223 insertions(+), 156 deletions(-) diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index e5b51d3e0..77689cebe 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -9,9 +9,8 @@ per anticipated response before forwarding any client message. As server bytes a calls `action(code)` which pops the queue front and checks the match. Two conditions raise `ProtocolOutOfSync`: -- **Empty queue** (`state.rs:168`) — a tracked message arrives but nothing was expected. -- **Ignore mismatch** (`state.rs:188–191`) — queue front is an `Ignore` slot but the server sent a - different code. +- **Empty queue** — a tracked message arrives but nothing was expected. +- **Ignore mismatch** — queue front is an `Ignore` slot but the server sent a different code. --- @@ -25,8 +24,8 @@ calls `action(code)` which pops the queue front and checks the match. Two condit ### Description When pgdog injects a `PREPARE` to rewrite a simple-query `EXECUTE` and that `PREPARE` fails on the -server, the Error handler incorrectly clears the queue. The subsequent `ReadyForQuery` from the now- -orphaned `EXECUTE` hits an empty queue and raises `ProtocolOutOfSync`. +server, the old Error handler incorrectly cleared the queue. The subsequent `ReadyForQuery` from the +now-orphaned `EXECUTE` hit an empty queue and raised `ProtocolOutOfSync`. ### Code path @@ -38,11 +37,9 @@ independently by `handle()`. After both calls the queue is: ↑──────────── handle(Prepare) ─────────────↑ ↑─── handle(Query) ───↑ ``` -If the injected `PREPARE` fails on the server: - -The Error handler's old behaviour was to pop the last item, clear the queue, and optionally re-add -a trailing `Code(ReadyForQuery)`. That assumed a flat, single-request queue. With the injected -sub-request the queue is compound, so clearing it discarded the client's own `Code(ReadyForQuery)`: +The old handler popped the last item, cleared the queue, and optionally re-added a trailing +`Code(ReadyForQuery)`. That assumed a flat, single-request queue. With the injected sub-request the +queue is compound, so clearing it discarded the client's own `Code(ReadyForQuery)`: | Step | Server sends | Old handler action | Queue after | |---|---|---|---| @@ -51,23 +48,13 @@ sub-request the queue is compound, so clearing it discarded the client's own `Co | 3 | `Error` for EXECUTE (statement absent) | `pop_back` → None; nothing re-added | **empty** | | 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | -The handler only re-added a trailing `ReadyForQuery` when it found `Code(ReadyForQuery)` at the -back. The two `Ignore` items representing the PREPARE sub-request were invisible to it; once they -were cleared the queue no longer knew the EXECUTE was still in-flight. - Under high concurrency this became near-deterministic: the pool fast-path (`Guard::drop` → `checkin` -→ `put`) hands a connection directly to a waiting client with no healthcheck, no idle time, and no -opportunity to drain the kernel socket buffer. The next query on that client consumed the stale -EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. +→ `put`) hands a connection directly to a waiting client with no healthcheck and no opportunity to +drain the kernel socket buffer. The next query consumed the stale EXECUTE `Error + ReadyForQuery`, +producing `ProtocolOutOfSync`. ### Reproduction (historical) -1. Connect to pgdog with session or transaction pooling. -2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax - error, or stale local cache with a duplicate name). -3. Issue any subsequent query on the same connection. -4. The second query fails with `PG::ConnectionBad` / `protocol is out of sync`. - ```sh cd integration/prepared_statements_full && bash run.sh ``` @@ -79,63 +66,54 @@ cd integration/prepared_statements_full && bash run.sh - **`test_injected_prepare_error_full_lifecycle`** — builds the exact queue that `prepared_statements.rs` produces (`add_ignore('C')`, `add_ignore('Z')`, `add('Z')`), fires `action('E')` and asserts the intermediate queue shape `[Ignore(RFQ), Ignore(Error), Code(Z)]`, - then walks the remaining Z→Ignore, E→Ignore, Z→Forward sequence to completion. + then walks the remaining Z→Ignore, E→Ignore (fast-path), Z→Forward sequence to completion. **Server-level integration test (`server.rs`, requires PostgreSQL)** -The test that previously asserted `ProtocolOutOfSync` on the fourth message now asserts `E` then `Z` -(two messages, both forwarded). Tests 2 and 3 below remain unresolved and are tracked separately. +The test that previously asserted `ProtocolOutOfSync` on the fourth message now asserts `E` then +`Z` (both forwarded). All three configurations now pass. | Test | Pool mode | `got:` | `extended` | What it proves | |---|---|---|---|---| | 1 | session | Z | false | Failed PREPARE no longer orphans the EXECUTE RFQ — **fixed** | -| 2 | transaction | C | false | Stale-chain: DML CommandComplete hits empty queue | -| 3 | session | Z | true | `extended = true` changes Error handler behavior (`out_of_sync`) | +| 2 | transaction | — | false | Stale-chain: injected E+Z drained internally; pool socket is clean — **fixed** | +| 3 | session | Z | true | `extended` now resets after RFQ drain — **fixed by Issue 4** | -- **Test 1 — Session mode, `got: Z`, `extended: false`.** - Session-pooled user pinned to one backend. The failed PREPARE now produces `E` (forwarded) then `Z` - (forwarded, closing RFQ). No orphaned RFQ remains. Client no longer sees `PG::ConnectionBad`. +- **Test 1** (`pgdog_session`): session-pooled connection; failed PREPARE then EXECUTE; subsequent `SELECT 1` must succeed. +- **Test 2** (`pgdog_tx_single`, `pool_size=1`): same failure sequence in transaction mode. Issue 1 drains the orphaned EXECUTE E+Z internally before the connection returns to the pool, so no stale bytes shift subsequent queries. +- **Test 3** (`pgdog_session` with prior `exec_params`): `extended=true` before the failure; Issue 4 ensures the flag resets after the RFQ drains the queue. -- **Test 2 — Transaction mode stale-chain, `got: C`, `extended: false`.** - `pgdog_tx_single` (transaction mode, pool_size=1). Each query consumes the previous query's stale - response, creating a one-slot-shifted chain. A `BEGIN` that consumes a `'T'`-status RFQ keeps the - connection open (`in_transaction = true`); the actual INSERT response then hits an empty queue - (`got: C`). Client sees `PG::ConnectionBad`. - -- **Test 3 — Session mode, `got: Z`, `extended: true`.** - Same as Test 1 (pre-fix), but a prior `exec_params` call (`SELECT $1::int`) permanently sets - `extended = true` on the connection. The Error handler then sets `out_of_sync = true` before - clearing the queue, changing connection-lifecycle behaviour. Client sees `PG::ConnectionBad`. +See `integration/prepared_statements_full/protocol_out_of_sync_spec.rb` for full test bodies. ### Fix -Error handler in `state.rs`, `ExecutionCode::Error` arm. See inline comments for full detail. - -On error, find the first `Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain -everything before it, count the `Ignore(RFQ)` slots in the drained portion, and prepend one -`[Ignore(RFQ), Ignore(Error)]` pair per slot. A separate fast-path at the top of the arm handles -the case where the queue front is already `Ignore(Error)` — subsequent errors from the same -injected sub-request — by popping and returning `Action::Ignore` directly. +Error handler in `state.rs`, `ExecutionCode::Error` arm. On error, find the first +`Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain everything before it, count +the `Ignore(RFQ)` slots in the drained portion, and prepend one `[Ignore(RFQ), Ignore(Error)]` pair +per slot. A fast-path at the top of the arm handles subsequent errors from the same injected +sub-request — when the queue front is already `Ignore(Error)` — by popping and returning +`Action::Ignore` directly. See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. + --- -## 🔴 Issue 2 — Double `action()` call in `forward()` for server CopyDone +## ✅ Issue 2 — Double `action()` call in `forward()` for server CopyDone **Severity:** Medium — requires the client to omit a trailing `Sync`. -**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`, lines ~198 and ~237. +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`. ### Description -`forward()` calls `state.action(code)` unconditionally at line 198, then a second time inside the -`'c'` (CopyDone) match arm at line 237. When no `Code(ReadyForQuery)` backstop is present in the -queue, the second call hits an empty queue and raises `ProtocolOutOfSync`. +`forward()` called `state.action(code)` unconditionally near the top of the function, then called +it a second time inside the `'c'` (CopyDone) match arm. Without a `Code(ReadyForQuery)` backstop in +the queue the second call hit an empty queue and raised `ProtocolOutOfSync`. ### Code path -Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back rather -than consuming it, making the double call idempotent. +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back +rather than consuming it, making the double call idempotent. Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: @@ -143,20 +121,20 @@ Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `hand [Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] ``` -No `Code(ReadyForQuery)` is added. When the server responds with CopyDone: +No `Code(ReadyForQuery)` is added. When the server responded with CopyDone: ``` First action('c'): pops Code(ExecutionCompleted) — consumed Second action('c'): empty queue → ProtocolOutOfSync ``` -### Reproduction +### Reproduction (historical) Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after -`Execute`. Requires sending raw protocol messages directly. +`Execute`. Required sending raw protocol messages directly. ```sh -cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync +cargo test -p pgdog --lib -- test_copydone_double_action_oos_without_rfq_backstop ``` ### Tests @@ -165,30 +143,33 @@ cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync - **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; two `action('c')` calls both succeed; RFQ slot is pushed back and survives. -- **`test_copydone_double_action_oos_without_rfq_backstop`** — queue `[Code(ExecutionCompleted)]`; - second `action('c')` returns `Err(ProtocolOutOfSync)`. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — documents the raw state-machine + invariant: calling `action('c')` twice with no RFQ backstop still causes `ProtocolOutOfSync` + directly on the state machine. `forward()` no longer makes this second call; this path is + unreachable through normal protocol flow. Test is retained to pin the underlying invariant. **Server-level tests (`server.rs`, require PostgreSQL)** -- **`test_copydone_double_action_oos_without_sync`** — `Parse + Bind + Execute + Flush` - (no Sync); reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts - `ProtocolOutOfSync` on CopyDone. +- **`test_copydone_single_action_without_sync`** — `Parse + Bind + Execute + Flush` (no Sync); + reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts CopyDone is + forwarded successfully. The trailing CommandComplete then hits an empty queue (no RFQ backstop) + and raises `ProtocolOutOfSync` — that is the correct remaining behavior with no `Sync`. - **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence completes without error; asserts `server.done()`. ```sh -cargo test -p pgdog test_copydone_double_action +cargo test -p pgdog --lib -- test_copydone_double_action +cargo test -p pgdog -- test_copydone ``` ### Fix -Remove the second `action()` call in the `'c'` arm of `forward()`, or guarantee that a -`Code(ReadyForQuery)` backstop is always in the queue before the CopyDone path is reached. Either -way, the invariant must be made explicit in code comments. +Removed the redundant `self.state.action(code)?` from the `'c'` arm in `forward()`. The call at +the top of the function already advances the state machine for CopyDone; the arm body is now empty. --- -## 🔴 Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot +## ✅ Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot **Severity:** Low — practically unreachable in normal operation. @@ -234,7 +215,7 @@ integration test is not practical; the precondition cannot be reached through no protocol flow. ```sh -cargo test -p pgdog test_stale_rfq +cargo test -p pgdog --lib -- test_stale_rfq ``` ### Fix @@ -245,96 +226,90 @@ before reuse, bounding the blast radius to a single request. --- -## 🔴 Issue 4 — `extended` flag is permanently set and never resets +## ✅ Issue 4 — `extended` flag is permanently set and never resets **Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler behaviour for all subsequent requests on a connection. -**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `state.rs:151–153`, -Error handler. +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `Code` arm of +`action()`. ### Description -`ProtocolState.extended` is set to `true` the first time any parameterised query runs on a connection -and is never reset. The Error handler checks this flag to set `out_of_sync = true`; because the flag -is permanent, every error on that connection — including plain simple-query errors — sets -`out_of_sync = true` spuriously. +`ProtocolState.extended` was set to `true` the first time any parameterised query ran on a +connection and was never reset. The Error handler checked this flag to set `out_of_sync = true`; +because the flag was permanent, every subsequent error on that connection — including plain +simple-query errors — set `out_of_sync = true` spuriously. ### Code path -`add()` and `add_ignore()` set the flag whenever `ParseComplete ('1')` or `BindComplete ('2')` is -enqueued: - -```rust -self.extended = self.extended || code.extended(); -``` - -The Error handler (`state.rs:151–153`): - -```rust -ExecutionCode::Error => { - if self.extended { - self.out_of_sync = true; // fires on every error, forever - } - // ... -} -``` - -There is no reset path. +`add()` and `add_ignore()` latch the flag via `self.extended = self.extended || code.extended()` +whenever `ParseComplete ('1')` or `BindComplete ('2')` is enqueued. Once set, the Error handler +checked `self.extended` to set `out_of_sync = true`, with no reset path, so every subsequent +error on the connection triggered it regardless of whether the current request was parameterised. ### Consequences -- `done()` stays `false` one extra round-trip (until RFQ clears `out_of_sync`) on simple-query - errors for connections that have ever served a parameterised query. Harmless in practice today, but - more conservative than necessary. -- Future changes to the Error handler that add `extended`-specific behaviour will silently apply to - all long-lived connections, not just those currently mid-pipeline. -- `extended` reads as "has this connection *ever* been in extended-protocol mode", not "is this - connection *currently* in extended-protocol mode" — a semantic mismatch that will mislead future - readers. +- `done()` stayed `false` one extra round-trip (until RFQ cleared `out_of_sync`) on simple-query + errors for connections that had ever served a parameterised query. +- Future changes to the Error handler that added `extended`-specific behaviour would silently apply + to all long-lived connections, not just those currently mid-pipeline. +- `extended` read as "has this connection *ever* been in extended-protocol mode", not "is this + connection *currently* in extended-protocol mode" — a semantic mismatch. -### Reproduction +### Reproduction (historical) 1. Connect to pgdog. -2. Execute a parameterised query (any `$1` placeholder) — sets `extended = true`. +2. Execute a parameterised query (any `$1` placeholder) — permanently sets `extended = true`. 3. Execute `SELECT 1/0` (simple query). 4. Observe `server.out_of_sync() == true` immediately after the `'E'` response, before RFQ arrives. Expected: `false`. ```sh -cargo test -p pgdog test_extended_flag_never_resets +cargo test -p pgdog -- test_extended_resets_after_rfq_drain ``` ### Tests -**`test_extended_flag_never_resets_spurious_out_of_sync`** in `server.rs` (requires PostgreSQL) — -three phases on one connection: +**State-machine unit tests (`state.rs`, no backend needed)** -1. *Baseline* — fresh connection (`extended = false`); `SELECT 1/0`; asserts `out_of_sync == false`. -2. *Trigger* — `Parse + Bind + Execute + Sync` for `SELECT $1::int`; permanently sets `extended = true`. -3. *Regression* — `SELECT 1/0` twice more; asserts `out_of_sync == true` after each `'E'`. Each - `ReadyForQuery` resets `out_of_sync` to `false` but leaves `extended` unchanged. +- **`test_extended_resets_on_rfq_drain`** — parameterised queue drains; `extended` is `true` before + the final RFQ and `false` after. +- **`test_extended_stays_true_mid_pipeline`** — an intermediate RFQ with items still queued behind + it does not prematurely reset `extended`; only the last RFQ that drains the queue resets it. +- **`test_no_spurious_out_of_sync_after_extended_reset`** — after a parameterised pipeline + completes and `extended` resets, a subsequent simple-query error does not set `out_of_sync`. + +**Server-level test (`server.rs`, requires PostgreSQL)** + +- **`test_extended_resets_after_rfq_drain`** — four phases on one connection: (1) baseline simple + error, no `out_of_sync`; (2) parameterised query sets `extended`, RFQ drain resets it; (3) and + (4) simple errors after reset, both assert `out_of_sync == false`. ```sh -cargo test -p pgdog test_extended_flag_never_resets +cargo test -p pgdog --lib -- test_extended_resets +cargo test -p pgdog -- test_extended_resets_after_rfq_drain ``` ### Fix -Reset `extended` to `false` at the same point `out_of_sync` resets — when `ReadyForQuery` is -processed and the queue is fully drained: +In the `Code(in_queue_code)` arm of `action()`, after `pop_front()` has already consumed the +RFQ item, `self.extended` is reset to `false` when the queue is now empty. The check must live +here — after the pop — so `is_empty()` reflects the post-pop state. Placing it in the outer +`ReadyForQuery` match arm (as originally proposed) runs before `pop_front()` and would never +observe an empty queue. Resetting only when `is_empty()` is safe: pipelined requests still in +the queue keep `extended = true` until the entire pipeline finishes. -```rust -ExecutionCode::ReadyForQuery => { - self.out_of_sync = false; - if self.is_empty() { - self.extended = false; // pipeline complete; reset for next request - } -} -``` +--- + +## Common thread -Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` -until the entire pipeline finishes. +All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server +response stream diverge whenever an error or unexpected message interrupts a multi-message +sub-request injected transparently by pgdog. The Error handler was written for a single +client-visible request and did not account for the compound structures the prepared-statement +rewriter produces. -A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an -intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. +Issue 4 was a secondary consequence: `extended` was added as a guard for the Error handler but was +attached to the connection rather than the current pipeline, so it outlived the requests it was meant +to describe. diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb index e7664800c..1f601a965 100644 --- a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -2,7 +2,9 @@ require_relative 'rspec_helper' -# Triggers the failed-prepare/orphaned-EXECUTE bug (Issue 1). +# Triggers the Issue 1 scenario: PREPARE fails, pgdog injects a retry PREPARE that also fails, +# leaving an orphaned EXECUTE response. After the fix, pgdog drains the orphaned E+Z internally +# so no stale bytes remain on the wire when this helper returns. def trigger_prepare_inject_failure(conn, statement_name:) # 1. PREPARE fails — pgdog keeps the statement in its local cache despite the error. expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } @@ -23,7 +25,7 @@ def trigger_prepare_inject_failure(conn, statement_name:) # Bug: stale E+Z left on wire; next query consumed stale E, orphaned Z hit empty queue → ConnectionBad. # Fix: Error handler must preserve Code(ReadyForQuery) for the outer EXECUTE when an injected # PREPARE fails; no stale bytes reach the client. - it 'orphaned EXECUTE RFQ hits empty queue in session mode' do + it 'next query succeeds after failed injected PREPARE in session mode' do conn = connect_pgdog(user: 'pgdog_session') begin trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') @@ -36,11 +38,12 @@ def trigger_prepare_inject_failure(conn, statement_name:) end end - # Transaction mode (pool_size=1): stale-chain — 'T'-status RFQ keeps server alive; INSERT hits empty queue (got: C). - it 'stale-chain in transaction mode produces ProtocolOutOfSync got: C' do + # Transaction mode (pool_size=1): Issue 1 fix drains orphaned EXECUTE E+Z internally; + # no stale bytes reach the pool — subsequent queries on pool-recycled connections must succeed. + it 'next query succeeds after failed injected PREPARE in transaction mode' do conn = connect_pgdog(user: 'pgdog_tx_single') begin - # 1. Leave stale EXECUTE E+'I'-Z on TCP buffer. + # 1. Trigger PREPARE injection failure; pgdog drains orphaned EXECUTE E+Z internally. trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') tmp = "#{Process.pid}_#{rand(1_000_000)}" @@ -68,7 +71,7 @@ def trigger_prepare_inject_failure(conn, statement_name:) # Bug: same as Test 1; extended=true additionally sets out_of_sync=true in the Error handler, # changing connection-lifecycle behaviour. Either way, the next query must not fail. # Fix: same root fix; extended flag behaviour (Issue 4) is a separate concern. - it 'orphaned EXECUTE RFQ hits empty queue after extended query in session mode' do + it 'next query succeeds after failed injected PREPARE when prior extended query ran first' do conn = connect_pgdog(user: 'pgdog_session') begin # Parameterised query runs first — sets extended=true on the connection. diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..93d90c38d 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -231,9 +231,7 @@ impl PreparedStatements { } // Backend told us the copy is done. - 'c' => { - self.state.action(code)?; - } + 'c' => {} _ => (), } diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 9998afc75..1b4214f35 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -224,6 +224,11 @@ impl ProtocolState { && in_queue_code == ExecutionCode::ReadyForQuery { self.queue.push_front(in_queue); + } else if in_queue_code == ExecutionCode::ReadyForQuery && self.queue.is_empty() { + // The last RFQ of this pipeline was just consumed and nothing remains. + // Reset extended so subsequent simple-query errors are not spuriously + // treated as mid-extended-pipeline and do not trigger out_of_sync. + self.extended = false; } Ok(Action::Forward) @@ -380,7 +385,8 @@ mod test { assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); - assert!(state.extended); + // extended resets to false once the final RFQ drains the queue. + assert!(!state.extended); } #[test] @@ -913,18 +919,22 @@ mod test { assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery } - // Failure path: no Code(ReadyForQuery) backstop — second action('c') hits empty queue. + // Documents raw state-machine behavior: calling action('c') twice with no RFQ backstop + // causes ProtocolOutOfSync. forward() was the only caller that did this; the second call + // has been removed from the 'c' arm in prepared_statements.rs, making this path unreachable + // through normal protocol flow. The test is kept to pin the underlying invariant. #[test] fn test_copydone_double_action_oos_without_rfq_backstop() { let mut state = ProtocolState::default(); - // 1. Queue: Execute + Flush (no Sync) — no RFQ backstop. + // Queue: Execute + Flush (no Sync) — no RFQ backstop. state.add('C'); // ExecutionCompleted - // 2. First action('c'): pops ExecutionCompleted; queue empty. + // First action('c'): pops ExecutionCompleted; queue empty. assert_eq!(state.action('c').unwrap(), Action::Forward); assert!(state.is_empty()); - // 3. Second action('c'): empty queue → ProtocolOutOfSync. + // Second action('c') directly: empty queue → ProtocolOutOfSync. + // This is the raw state machine. forward() no longer makes this second call. assert!(state.action('c').is_err()); } @@ -1034,4 +1044,74 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + // ======================================== + // extended flag reset tests (Issue 4) + // ======================================== + + // extended resets to false once the last RFQ of a pipeline is consumed. + #[test] + fn test_extended_resets_on_rfq_drain() { + let mut state = ProtocolState::default(); + // add_ignore('1') sets extended=true (ParseComplete is an extended-protocol code). + state.add_ignore('1'); // ParseComplete — injected, sets extended=true + state.add('Z'); // RFQ — client-visible + + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert!(state.extended, "extended must be true before RFQ"); + + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!( + !state.extended, + "extended must reset to false after last RFQ drains queue" + ); + assert!(state.is_empty()); + } + + // extended must NOT reset mid-pipeline: an RFQ that still has items behind it + // belongs to a pipelined request and should not prematurely clear the flag. + #[test] + fn test_extended_stays_true_mid_pipeline() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — sets extended=true + state.add('Z'); // first pipeline RFQ + state.add('Z'); // second pipeline RFQ + + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert_eq!(state.action('Z').unwrap(), Action::Forward); // first RFQ, one item remains + assert!( + state.extended, + "extended must stay true while pipeline is not fully drained" + ); + + assert_eq!(state.action('Z').unwrap(), Action::Forward); // second RFQ drains queue + assert!( + !state.extended, + "extended must reset once queue is fully drained" + ); + } + + // After extended resets, a plain simple-query error must not set out_of_sync. + // Before the fix, extended stuck permanently and every subsequent error triggered + // out_of_sync=true spuriously. + #[test] + fn test_no_spurious_out_of_sync_after_extended_reset() { + let mut state = ProtocolState::default(); + + // Phase 1: parameterised query sets extended=true, then resets on drain. + state.add_ignore('1'); // ParseComplete + state.add('Z'); + assert_eq!(state.action('1').unwrap(), Action::Ignore); + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(!state.extended); + + // Phase 2: simple-query error on a now-reset connection. + state.add('Z'); // RFQ from simple query + assert_eq!(state.action('E').unwrap(), Action::Forward); // error forwarded + assert!( + !state.out_of_sync(), + "out_of_sync must be false: extended was reset" + ); + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.done()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5bb1046ee..ed149b753 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -3432,9 +3432,12 @@ pub mod test { assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty } - // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. + // Extended Execute + Flush (no Sync): single action('c') now succeeds. + // CopyDone is forwarded to client; the trailing CommandComplete then hits an empty + // queue (no RFQ backstop, no Sync) and raises ProtocolOutOfSync. + // This is distinct from the former double-action bug, which fired on CopyDone itself. #[tokio::test] - async fn test_copydone_double_action_oos_without_sync() { + async fn test_copydone_single_action_without_sync() { let mut server = test_server().await; // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. @@ -3459,10 +3462,14 @@ pub mod test { assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 - // 3. BUG: CopyDone — first action() pops ExecutionCompleted; second hits empty queue. + + // 3. CopyDone — fixed: single action() pops ExecutionCompleted; no second call. + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone forwarded + + // 4. CommandComplete hits empty queue (no RFQ backstop without Sync). assert!( matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), - "expected ProtocolOutOfSync" + "expected ProtocolOutOfSync on CommandComplete with empty queue" ); } @@ -3500,9 +3507,10 @@ pub mod test { ); } - // extended=true sticks after any parameterised query; Error handler sets out_of_sync on every subsequent error. + // After a parameterised query, extended resets once the RFQ drains the queue. + // Subsequent simple-query errors must NOT set out_of_sync. #[tokio::test] - async fn test_extended_flag_never_resets_spurious_out_of_sync() { + async fn test_extended_resets_after_rfq_drain() { use crate::net::bind::Parameter; let mut server = test_server().await; @@ -3519,7 +3527,8 @@ pub mod test { assert_eq!(msg.code(), 'Z'); assert!(server.done()); - // 2. Parameterised query sets extended=true permanently. + // 2. Parameterised query: Parse+Bind+Execute+Sync sets extended=true, then + // the final RFQ drains the queue and resets extended to false. let bind = Bind::new_params_codes( "", &[Parameter { @@ -3545,29 +3554,31 @@ pub mod test { let msg = server.read().await.unwrap(); assert_eq!(msg.code(), c); } - assert!(server.done()); + assert!(server.done()); // extended was reset when 'Z' drained the queue - // 3. Same error on same connection: extended stuck → out_of_sync=true spuriously. + // 3. Simple-query error after extended resets: out_of_sync must be false. server .send(&vec![Query::new("SELECT 1/0").into()].into()) .await .unwrap(); let msg = server.read().await.unwrap(); assert_eq!(msg.code(), 'E'); - assert!(server.out_of_sync()); // spurious: extended=true even for plain simple query + assert!( + !server.out_of_sync(), + "out_of_sync must be false: extended was reset by prior RFQ drain" + ); let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); // RFQ clears out_of_sync but leaves extended stuck - assert!(!server.out_of_sync()); + assert_eq!(msg.code(), 'Z'); assert!(server.done()); - // 4. Confirm extended remains stuck across RFQ resets. + // 4. Confirm: same result on the next error — extended stays reset across requests. server .send(&vec![Query::new("SELECT 1/0").into()].into()) .await .unwrap(); let msg = server.read().await.unwrap(); assert_eq!(msg.code(), 'E'); - assert!(server.out_of_sync()); + assert!(!server.out_of_sync()); let msg = server.read().await.unwrap(); assert_eq!(msg.code(), 'Z'); assert!(server.done()); From ad658f051e440bf23cecb02f8463dbe4d90cc791 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 8 Apr 2026 18:37:48 +0000 Subject: [PATCH 5/5] cleanup dev changes --- integration/common.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/common.sh b/integration/common.sh index d6a474d7d..812acc69f 100644 --- a/integration/common.sh +++ b/integration/common.sh @@ -23,8 +23,8 @@ function run_pgdog() { local config_file="${COMMON_DIR}/pgdog.config" if [ -z "${binary}" ]; then # Testing in release is faster and mirrors production. - cargo build - binary="target/debug/pgdog" + cargo build --release + binary="target/release/pgdog" fi if [ -f "${pid_file}" ]; then local existing_pid=$(cat "${pid_file}")