Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pgdog/src/backend/replication/logical/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ pub enum Error {
#[error("shard {0} has no replication slot")]
NoReplicationSlot(usize),

#[error("parallel connection error")]
ParallelConnection,

#[error("no replicas available for table sync")]
NoReplicas,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ParallelSync {
.permit
.acquire()
.await
.map_err(|_| Error::ParallelConnection)?;
.map_err(|_| Error::DataSyncAborted)?;

if self.tx.is_closed() {
return Err(Error::DataSyncAborted);
Expand All @@ -64,9 +64,7 @@ impl ParallelSync {
}
};

self.tx
.send(result)
.map_err(|_| Error::ParallelConnection)?;
self.tx.send(result).map_err(|_| Error::DataSyncAborted)?;

Ok::<(), Error>(())
})
Expand Down
69 changes: 40 additions & 29 deletions pgdog/src/backend/replication/logical/subscriber/copy.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Shard COPY stream from one source
//! between N shards.

use futures::future::try_join_all;
use pg_query::{parse_raw, NodeEnum};
use pgdog_config::QueryParserEngine;
use tracing::debug;

use crate::{
backend::{replication::subscriber::ParallelConnection, Cluster, ConnectReason},
backend::{Cluster, ConnectReason, Server},
config::Role,
frontend::router::parser::{CopyParser, Shard},
net::{CopyData, CopyDone, ErrorResponse, FromBytes, Protocol, Query, ToBytes},
Expand All @@ -24,7 +25,7 @@ pub struct CopySubscriber {
copy: CopyParser,
cluster: Cluster,
buffer: Vec<CopyData>,
connections: Vec<ParallelConnection>,
connections: Vec<Server>,
stmt: CopyStatement,
bytes_sharded: usize,
}
Expand Down Expand Up @@ -85,7 +86,7 @@ impl CopySubscriber {
.1
.standalone(ConnectReason::Replication)
.await?;
servers.push(ParallelConnection::new(primary)?);
servers.push(primary);
}

self.connections = servers;
Expand All @@ -95,38 +96,38 @@ impl CopySubscriber {

/// Disconnect from all shards.
pub async fn disconnect(&mut self) -> Result<(), Error> {
for conn in std::mem::take(&mut self.connections) {
conn.reattach().await?;
}
self.connections.clear();

Ok(())
}

/// Start COPY on all shards.
pub async fn start_copy(&mut self) -> Result<(), Error> {
let stmt = Query::new(self.stmt.copy_in());

if self.connections.is_empty() {
self.connect().await?;
}

for server in &mut self.connections {
debug!("{} [{}]", stmt.query(), server.addr());
let stmt = Query::new(self.stmt.copy_in());

server.send_one(&stmt.clone().into()).await?;
server.flush().await?;
// Start COPY IN on all shards concurrently.
try_join_all(self.connections.iter_mut().map(|server| {
let msg: crate::net::ProtocolMessage = stmt.clone().into();
debug!("{} [{}]", stmt.query(), server.addr());

let msg = server.read().await?;
match msg.code() {
'G' => (),
'E' => {
return Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
msg.to_bytes()?,
)?)))
async move {
server.send_one(&msg).await?;
server.flush().await?;
let reply = server.read().await?;
match reply.code() {
'G' => Ok(()),
'E' => Err(Error::PgError(Box::new(ErrorResponse::from_bytes(
reply.to_bytes()?,
)?))),
c => Err(Error::OutOfSync(c)),
}
c => return Err(Error::OutOfSync(c)),
}
}
}))
.await?;

Ok(())
}
Expand All @@ -135,20 +136,20 @@ impl CopySubscriber {
pub async fn copy_done(&mut self) -> Result<(), Error> {
self.flush().await?;

for server in &mut self.connections {
// Finalise COPY on all shards concurrently.
try_join_all(self.connections.iter_mut().map(|server| async move {
server.send_one(&CopyDone.into()).await?;
server.flush().await?;

let command_complete = server.read().await?;
match command_complete.code() {
let cc = server.read().await?;
match cc.code() {
'E' => {
let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?;
let error = ErrorResponse::from_bytes(cc.to_bytes()?)?;
if error.code == "08P01" && error.message == "insufficient data left in message"
{
return Err(Error::BinaryFormatMismatch(Box::new(error)));
} else {
return Err(Error::PgError(Box::new(error)));
}
return Err(Error::PgError(Box::new(error)));
}
'C' => (),
c => return Err(Error::OutOfSync(c)),
Expand All @@ -158,7 +159,9 @@ impl CopySubscriber {
if rfq.code() != 'Z' {
return Err(Error::OutOfSync(rfq.code()));
}
}
Ok(())
}))
.await?;

Ok(())
}
Expand All @@ -174,12 +177,19 @@ impl CopySubscriber {
}

async fn flush(&mut self) -> Result<(usize, usize), Error> {
if self.buffer.is_empty() {
return Ok((0, 0));
}

let result = self.copy.shard(&self.buffer)?;
self.buffer.clear();

let rows = result.len();
let bytes = result.iter().map(|row| row.len()).sum::<usize>();
self.bytes_sharded += bytes;

// Route each row to the right shard(s). send_one is a buffered write
// so this loop does no I/O — no concurrency needed here.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true. send_one does a buffered write only if there is space in the buffer. Once that space runs out, it will flush the data to the socket.

With copy, where it writes a lot of data, this happens very frequently. This part definitely needs to be parallelized.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right about send_one.

but at the end that's the same send_one in the listener that will block the listener loop when the io buffer is full. So by using ParallelConnection we are defining additional buffer (channel) above that. And there is third buffer above it inside the copy_data.
The task offloading can help there to not block reading from the source while writing to the destination, but the computation cost should be the same (basically higher due to additional machinery).
My bet that's the size of the buffer is the important part since with enough size we can achieve parallel reading and writing. Where is this buffer is defined that's not important.

So, anyway, need to test and benchmark it.

for row in &result {
for (shard, server) in self.connections.iter_mut().enumerate() {
match row.shard() {
Expand All @@ -198,7 +208,8 @@ impl CopySubscriber {
}
}

self.bytes_sharded += result.iter().map(|c| c.len()).sum::<usize>();
// Flush all shards concurrently — this is the actual socket write.
try_join_all(self.connections.iter_mut().map(|s| s.flush())).await?;

Ok((rows, bytes))
}
Expand Down
2 changes: 0 additions & 2 deletions pgdog/src/backend/replication/logical/subscriber/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
pub mod context;
pub mod copy;
pub mod parallel_connection;
pub mod stream;

#[cfg(test)]
mod tests;

pub use context::StreamContext;
pub use copy::CopySubscriber;
pub use parallel_connection::ParallelConnection;
pub use stream::StreamSubscriber;
Loading
Loading