From dae451930c7b8c56cd13f8838347be48c75f3208 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 10 Apr 2026 04:26:17 +0000 Subject: [PATCH] fix: drop stale ReadyForQuery expectation when server enters COPY IN mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a client sends Bind+Execute+Sync for a COPY FROM STDIN statement, pgdog adds a ReadyForQuery expectation for that Sync. But PostgreSQL ignores Sync during COPY IN mode (protocol spec §55.2.6) and never sends ReadyForQuery for it. The stale entry stays in the queue, done() never returns true, and the connection is never returned to the pool. Call remove_one_rfq() in forward() when we see CopyInResponse ('G') to drop the ReadyForQuery that will never arrive. Verified with end-to-end integration test using tokio-postgres copy_in(): - WITHOUT fix: query timeout - CopyDone hangs because state machine is desynced - WITH fix: COPY completes, subsequent queries work normally https://claude.ai/code/session_01PQvrbw2xJHgQBXtASWHFcv --- integration/rust/tests/tokio_postgres/copy.rs | 91 +++++++++++++------ pgdog/src/backend/prepared_statements.rs | 54 +++++++++++ pgdog/src/backend/protocol/state.rs | 16 ++++ 3 files changed, 132 insertions(+), 29 deletions(-) diff --git a/integration/rust/tests/tokio_postgres/copy.rs b/integration/rust/tests/tokio_postgres/copy.rs index 4473bedc9..45bc88a83 100644 --- a/integration/rust/tests/tokio_postgres/copy.rs +++ b/integration/rust/tests/tokio_postgres/copy.rs @@ -1,32 +1,65 @@ -// use futures_util::{TryStreamExt, pin_mut}; -// use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream}; -// use tokio_postgres::types::Type; +use bytes::{BufMut, BytesMut}; +use futures_util::SinkExt; +use tokio_postgres::NoTls; -// use rust::setup::connections; +/// Demonstrate that COPY FROM STDIN via extended protocol (tokio-postgres) +/// works correctly through pgdog. +/// +/// tokio-postgres sends COPY using Bind+Execute+Sync (extended protocol). +/// PostgreSQL ignores the Sync during COPY IN mode, producing only one +/// ReadyForQuery instead of two. Without the remove_one_rfq() fix, the +/// stale ReadyForQuery expectation desyncs the state machine and the +/// connection becomes unusable for subsequent queries. +#[tokio::test] +async fn test_copy_in_extended_protocol() { + let (conn, connection) = tokio_postgres::connect( + "host=127.0.0.1 user=pgdog dbname=pgdog password=pgdog port=6432", + NoTls, + ) + .await + .unwrap(); -// #[tokio::test] -// async fn test_copy() { -// for conn in connections().await { -// conn.batch_execute( -// "DROP SCHEMA IF EXISTS rust_test_insert CASCADE; -// CREATE SCHEMA rust_test_insert; -// CREATE TABLE rust_test_insert.sharded (id BIGINT PRIMARY KEY, value VARCHAR); -// SET search_path TO rust_test_insert,public;", -// ) -// .await -// .unwrap(); + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); -// let sink = conn -// .copy_in("COPY sharded (id, value) FROM STDIN BINARY") -// .await -// .unwrap(); -// let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT]); -// for i in 0..25 { -// let writer = tokio::pin!(writer); -// writer. -// .write(&[&1_i64, &"foobar"]) -// .await -// .unwrap(); -// } -// } -// } + // Setup: clean slate + conn.batch_execute( + "DROP TABLE IF EXISTS _copy_test; + CREATE TABLE _copy_test (id BIGINT, value TEXT);", + ) + .await + .unwrap(); + + // COPY FROM STDIN — tokio-postgres sends this via extended protocol + // (Parse, Bind, Execute, Sync), triggering the double-Sync pattern. + let sink = conn + .copy_in("COPY _copy_test (id, value) FROM STDIN") + .await + .unwrap(); + + // Write some tab-delimited rows + let mut buf = BytesMut::new(); + for i in 0..10_i64 { + buf.put_slice(format!("{}\trow_{}\n", i, i).as_bytes()); + } + futures_util::pin_mut!(sink); + sink.send(buf.freeze()).await.unwrap(); + let rows_copied = sink.finish().await.unwrap(); + assert_eq!(rows_copied, 10); + + // This query AFTER the copy is the real test. + // Without the fix, the state machine has a stale ReadyForQuery + // and this query will either hang, error, or return wrong results. + let rows = conn + .query("SELECT count(*) FROM _copy_test", &[]) + .await + .unwrap(); + let count: i64 = rows[0].get(0); + assert_eq!(count, 10); + + // Cleanup + conn.execute("DROP TABLE _copy_test", &[]).await.unwrap(); +} diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..2af38bc13 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -228,6 +228,10 @@ impl PreparedStatements { 'G' => { self.state.prepend('G'); // Next thing we'll see is a CopyFail or CopyDone. + // PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6). + // Remove the ReadyForQuery that was expected from the initial + // Bind+Execute+Sync — the server won't send it. + self.state.remove_one_rfq(); } // Backend told us the copy is done. @@ -374,3 +378,53 @@ impl PreparedStatements { close } } + +#[cfg(test)] +mod test { + use super::*; + use crate::net::messages::Payload; + + /// Build a minimal backend Message with the given code byte. + fn msg(code: char) -> Message { + Message::new(Payload::named(code).freeze()) + } + + /// Simulate a client that sends Bind+Execute+Sync for COPY FROM STDIN + /// (the tokio-postgres double-Sync pattern). PostgreSQL ignores Sync + /// during COPY IN mode, so only one ReadyForQuery is produced. + /// + /// This test exercises the real `forward()` code path — it must clean + /// up the stale ReadyForQuery when it sees CopyInResponse. + #[test] + fn test_copy_in_with_client_double_sync() { + let mut ps = PreparedStatements::new(); + + // Client: Bind + Execute + Sync → state expects [BindComplete, ExecutionCompleted, RFQ] + ps.state_mut().add('2'); + ps.state_mut().add(ExecutionCode::ExecutionCompleted); + ps.state_mut().add('Z'); + + // Server responds: BindComplete + assert!(ps.forward(&msg('2')).unwrap()); + + // Server responds: CopyInResponse — forward() should drop the stale RFQ + assert!(ps.forward(&msg('G')).unwrap()); + + // Queue should be [Copy] only — no stale ReadyForQuery. + assert!(ps.in_copy_mode()); + assert!(!ps.done()); // still in COPY mode + + // Client sends CopyDone (handled through handle()) + ps.state_mut().action('c').unwrap(); + + // Client sends second Sync + ps.state_mut().add('Z'); + + // Server: CommandComplete + ReadyForQuery (one pair, not two) + assert!(ps.forward(&msg('C')).unwrap()); + assert!(ps.forward(&msg('Z')).unwrap()); + + // Clean — no stale entries. + assert!(ps.done()); + } +} diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index ca86915d2..67a2c0bf1 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -195,6 +195,21 @@ impl ProtocolState { self.queue.front() == Some(&ExecutionItem::Code(ExecutionCode::Copy)) } + /// Remove one ReadyForQuery expectation from the queue. + /// + /// Called when the server enters COPY IN mode (sends CopyInResponse). + /// PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6), + /// so the ReadyForQuery that was expected from the initial + /// Bind+Execute+Sync will never arrive. Leaving it in the queue + /// would desync the state machine on the next query. + pub(crate) fn remove_one_rfq(&mut self) { + if let Some(pos) = self.queue.iter().position(|item| { + matches!(item, ExecutionItem::Code(ExecutionCode::ReadyForQuery)) + }) { + self.queue.remove(pos); + } + } + pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } @@ -847,4 +862,5 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + }