diff --git a/integration/rust/tests/tokio_postgres/copy.rs b/integration/rust/tests/tokio_postgres/copy.rs index 4473bedc9..45bc88a83 100644 --- a/integration/rust/tests/tokio_postgres/copy.rs +++ b/integration/rust/tests/tokio_postgres/copy.rs @@ -1,32 +1,65 @@ -// use futures_util::{TryStreamExt, pin_mut}; -// use tokio_postgres::binary_copy::{BinaryCopyInWriter, BinaryCopyOutStream}; -// use tokio_postgres::types::Type; +use bytes::{BufMut, BytesMut}; +use futures_util::SinkExt; +use tokio_postgres::NoTls; -// use rust::setup::connections; +/// Demonstrate that COPY FROM STDIN via extended protocol (tokio-postgres) +/// works correctly through pgdog. +/// +/// tokio-postgres sends COPY using Bind+Execute+Sync (extended protocol). +/// PostgreSQL ignores the Sync during COPY IN mode, producing only one +/// ReadyForQuery instead of two. Without the remove_one_rfq() fix, the +/// stale ReadyForQuery expectation desyncs the state machine and the +/// connection becomes unusable for subsequent queries. +#[tokio::test] +async fn test_copy_in_extended_protocol() { + let (conn, connection) = tokio_postgres::connect( + "host=127.0.0.1 user=pgdog dbname=pgdog password=pgdog port=6432", + NoTls, + ) + .await + .unwrap(); -// #[tokio::test] -// async fn test_copy() { -// for conn in connections().await { -// conn.batch_execute( -// "DROP SCHEMA IF EXISTS rust_test_insert CASCADE; -// CREATE SCHEMA rust_test_insert; -// CREATE TABLE rust_test_insert.sharded (id BIGINT PRIMARY KEY, value VARCHAR); -// SET search_path TO rust_test_insert,public;", -// ) -// .await -// .unwrap(); + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); -// let sink = conn -// .copy_in("COPY sharded (id, value) FROM STDIN BINARY") -// .await -// .unwrap(); -// let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TEXT]); -// for i in 0..25 { -// let writer = tokio::pin!(writer); -// writer. -// .write(&[&1_i64, &"foobar"]) -// .await -// .unwrap(); -// } -// } -// } + // Setup: clean slate + conn.batch_execute( + "DROP TABLE IF EXISTS _copy_test; + CREATE TABLE _copy_test (id BIGINT, value TEXT);", + ) + .await + .unwrap(); + + // COPY FROM STDIN — tokio-postgres sends this via extended protocol + // (Parse, Bind, Execute, Sync), triggering the double-Sync pattern. + let sink = conn + .copy_in("COPY _copy_test (id, value) FROM STDIN") + .await + .unwrap(); + + // Write some tab-delimited rows + let mut buf = BytesMut::new(); + for i in 0..10_i64 { + buf.put_slice(format!("{}\trow_{}\n", i, i).as_bytes()); + } + futures_util::pin_mut!(sink); + sink.send(buf.freeze()).await.unwrap(); + let rows_copied = sink.finish().await.unwrap(); + assert_eq!(rows_copied, 10); + + // This query AFTER the copy is the real test. + // Without the fix, the state machine has a stale ReadyForQuery + // and this query will either hang, error, or return wrong results. + let rows = conn + .query("SELECT count(*) FROM _copy_test", &[]) + .await + .unwrap(); + let count: i64 = rows[0].get(0); + assert_eq!(count, 10); + + // Cleanup + conn.execute("DROP TABLE _copy_test", &[]).await.unwrap(); +} diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..2af38bc13 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -228,6 +228,10 @@ impl PreparedStatements { 'G' => { self.state.prepend('G'); // Next thing we'll see is a CopyFail or CopyDone. + // PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6). + // Remove the ReadyForQuery that was expected from the initial + // Bind+Execute+Sync — the server won't send it. + self.state.remove_one_rfq(); } // Backend told us the copy is done. @@ -374,3 +378,53 @@ impl PreparedStatements { close } } + +#[cfg(test)] +mod test { + use super::*; + use crate::net::messages::Payload; + + /// Build a minimal backend Message with the given code byte. + fn msg(code: char) -> Message { + Message::new(Payload::named(code).freeze()) + } + + /// Simulate a client that sends Bind+Execute+Sync for COPY FROM STDIN + /// (the tokio-postgres double-Sync pattern). PostgreSQL ignores Sync + /// during COPY IN mode, so only one ReadyForQuery is produced. + /// + /// This test exercises the real `forward()` code path — it must clean + /// up the stale ReadyForQuery when it sees CopyInResponse. + #[test] + fn test_copy_in_with_client_double_sync() { + let mut ps = PreparedStatements::new(); + + // Client: Bind + Execute + Sync → state expects [BindComplete, ExecutionCompleted, RFQ] + ps.state_mut().add('2'); + ps.state_mut().add(ExecutionCode::ExecutionCompleted); + ps.state_mut().add('Z'); + + // Server responds: BindComplete + assert!(ps.forward(&msg('2')).unwrap()); + + // Server responds: CopyInResponse — forward() should drop the stale RFQ + assert!(ps.forward(&msg('G')).unwrap()); + + // Queue should be [Copy] only — no stale ReadyForQuery. + assert!(ps.in_copy_mode()); + assert!(!ps.done()); // still in COPY mode + + // Client sends CopyDone (handled through handle()) + ps.state_mut().action('c').unwrap(); + + // Client sends second Sync + ps.state_mut().add('Z'); + + // Server: CommandComplete + ReadyForQuery (one pair, not two) + assert!(ps.forward(&msg('C')).unwrap()); + assert!(ps.forward(&msg('Z')).unwrap()); + + // Clean — no stale entries. + assert!(ps.done()); + } +} diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index ca86915d2..67a2c0bf1 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -195,6 +195,21 @@ impl ProtocolState { self.queue.front() == Some(&ExecutionItem::Code(ExecutionCode::Copy)) } + /// Remove one ReadyForQuery expectation from the queue. + /// + /// Called when the server enters COPY IN mode (sends CopyInResponse). + /// PostgreSQL ignores Sync during COPY IN (protocol spec §55.2.6), + /// so the ReadyForQuery that was expected from the initial + /// Bind+Execute+Sync will never arrive. Leaving it in the queue + /// would desync the state machine on the next query. + pub(crate) fn remove_one_rfq(&mut self) { + if let Some(pos) = self.queue.iter().position(|item| { + matches!(item, ExecutionItem::Code(ExecutionCode::ReadyForQuery)) + }) { + self.queue.remove(pos); + } + } + pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } @@ -847,4 +862,5 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + }