From 4068a67d9c5842a9efe46b25638604684c9109ab Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Wed, 22 Apr 2026 19:49:41 +0530 Subject: [PATCH 1/9] feat(redis): survive transient Redis outages with bounded reconnects The broker's consume() loop previously rethrew any RedisException raised during the blocking pop, crashing the worker on every transient network blip. The connection layer also opened a brand-new socket on the first call with no retry, so a single DNS or TCP hiccup during boot would take the process down. Connection layer (Redis, RedisCluster): - getRedis() now retries up to 5 attempts with exponential backoff (100ms base, 3s cap) and full jitter to avoid thundering herd on recovery. - close() is best-effort and swallows Throwable so a dead socket doesn't mask the original error. Broker (Redis): - On RedisException during pop, drop the stale connection and retry with exponential backoff (100ms base, 5s cap, full jitter). Worker stays alive across outages instead of crash-looping under a supervisor. - Attempt counter resets on the first successful pop so each outage starts from a fresh backoff. Relies on the SWOOLE_HOOK_ALL hook flags set in Adapter/Swoole.php so usleep yields cooperatively inside coroutines rather than blocking the reactor. --- src/Queue/Broker/Redis.php | 21 +++++++++++- src/Queue/Connection/Redis.php | 46 +++++++++++++++++++++------ src/Queue/Connection/RedisCluster.php | 37 ++++++++++++++++++--- 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index b36e1b3..8718690 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -11,6 +11,9 @@ class Redis implements Publisher, Consumer { private const int POP_TIMEOUT = 2; + private const int RECONNECT_BASE_BACKOFF_MS = 100; + private const int RECONNECT_MAX_BACKOFF_MS = 5_000; + private const int RECONNECT_BACKOFF_CAP_SHIFT = 10; private bool $closed = false; @@ -20,18 +23,34 @@ public function __construct(private readonly Connection $connection) public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { + $reconnectAttempts = 0; + while (!$this->closed) { /** * Waiting for next Job. */ try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); + $reconnectAttempts = 0; } catch (\RedisException $e) { if ($this->closed) { break; } - throw $e; + // Drop the stale connection so the next pop opens a fresh one, then + // back off with full jitter before retrying. Keeps the worker alive + // across transient Redis outages instead of crash-looping. + $this->connection->close(); + + $reconnectAttempts++; + $shift = \min(self::RECONNECT_BACKOFF_CAP_SHIFT, $reconnectAttempts - 1); + $backoffMs = \min( + self::RECONNECT_MAX_BACKOFF_MS, + self::RECONNECT_BASE_BACKOFF_MS * (2 ** $shift), + ); + \usleep(\mt_rand(0, $backoffMs) * 1000); + + continue; } if (!$nextMessage) { diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 8900277..a344357 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -6,6 +6,10 @@ class Redis implements Connection { + protected const int CONNECT_MAX_ATTEMPTS = 5; + protected const int CONNECT_BASE_BACKOFF_MS = 100; + protected const int CONNECT_MAX_BACKOFF_MS = 3_000; + protected string $host; protected int $port; protected ?string $user; @@ -178,25 +182,49 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + // best-effort: underlying socket may already be dead + } finally { + $this->redis = null; + } } protected function getRedis(): \Redis { - if ($this->redis) { + if ($this->redis instanceof \Redis) { return $this->redis; } - $this->redis = new \Redis(); - $connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout; - $this->redis->connect($this->host, $this->port, $connectTimeout); - if ($this->readTimeout >= 0) { - $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); + for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) { + $redis = new \Redis(); + + try { + $redis->connect($this->host, $this->port, $connectTimeout); + + if ($this->readTimeout >= 0) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); + } + + $this->redis = $redis; + return $this->redis; + } catch (\RedisException $e) { + if ($attempt === self::CONNECT_MAX_ATTEMPTS) { + throw $e; + } + + // Exponential backoff with full jitter to avoid thundering herd on recovery. + $backoffMs = \min( + self::CONNECT_MAX_BACKOFF_MS, + self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)), + ); + \usleep(\mt_rand(0, $backoffMs) * 1000); + } } - return $this->redis; + throw new \RedisException('Unreachable: connect loop exited without success or exception.'); } } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 1c8c0a9..101a00c 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -6,6 +6,10 @@ class RedisCluster implements Connection { + protected const int CONNECT_MAX_ATTEMPTS = 5; + protected const int CONNECT_BASE_BACKOFF_MS = 100; + protected const int CONNECT_MAX_BACKOFF_MS = 3_000; + protected array $seeds; protected float $connectTimeout; protected float $readTimeout; @@ -175,19 +179,42 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + // best-effort: underlying socket may already be dead + } finally { + $this->redis = null; + } } protected function getRedis(): \RedisCluster { - if ($this->redis) { + if ($this->redis instanceof \RedisCluster) { return $this->redis; } $connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout; $readTimeout = $this->readTimeout < 0 ? 0 : $this->readTimeout; - $this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout); - return $this->redis; + + for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) { + try { + $this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout); + return $this->redis; + } catch (\RedisClusterException $e) { + if ($attempt === self::CONNECT_MAX_ATTEMPTS) { + throw $e; + } + + // Exponential backoff with full jitter to avoid thundering herd on recovery. + $backoffMs = \min( + self::CONNECT_MAX_BACKOFF_MS, + self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)), + ); + \usleep(\mt_rand(0, $backoffMs) * 1000); + } + } + + throw new \RedisClusterException('Unreachable: connect loop exited without success or exception.'); } } From 05fbae21f0d6689427684d497f5ac6b2eb46bb4a Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:01:59 +0530 Subject: [PATCH 2/9] fix(redis): handle cluster reconnect reviews --- src/Queue/Broker/Redis.php | 20 +++++--------------- src/Queue/Connection/Redis.php | 15 ++++++++------- src/Queue/Connection/RedisCluster.php | 9 ++------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 8718690..1f34b5f 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -13,7 +13,6 @@ class Redis implements Publisher, Consumer private const int POP_TIMEOUT = 2; private const int RECONNECT_BASE_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; - private const int RECONNECT_BACKOFF_CAP_SHIFT = 10; private bool $closed = false; @@ -23,7 +22,7 @@ public function __construct(private readonly Connection $connection) public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $reconnectAttempts = 0; + $reconnectBackoffMs = self::RECONNECT_BASE_BACKOFF_MS; while (!$this->closed) { /** @@ -31,24 +30,15 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe */ try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); - $reconnectAttempts = 0; - } catch (\RedisException $e) { + $reconnectBackoffMs = self::RECONNECT_BASE_BACKOFF_MS; + } catch (\RedisException|\RedisClusterException $e) { if ($this->closed) { break; } - // Drop the stale connection so the next pop opens a fresh one, then - // back off with full jitter before retrying. Keeps the worker alive - // across transient Redis outages instead of crash-looping. $this->connection->close(); - - $reconnectAttempts++; - $shift = \min(self::RECONNECT_BACKOFF_CAP_SHIFT, $reconnectAttempts - 1); - $backoffMs = \min( - self::RECONNECT_MAX_BACKOFF_MS, - self::RECONNECT_BASE_BACKOFF_MS * (2 ** $shift), - ); - \usleep(\mt_rand(0, $backoffMs) * 1000); + \usleep(\mt_rand(0, $reconnectBackoffMs) * 1000); + $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); continue; } diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index a344357..eecaf19 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -182,13 +182,8 @@ public function ping(): bool public function close(): void { - try { - $this->redis?->close(); - } catch (\Throwable) { - // best-effort: underlying socket may already be dead - } finally { - $this->redis = null; - } + $this->redis?->close(); + $this->redis = null; } protected function getRedis(): \Redis @@ -201,9 +196,11 @@ protected function getRedis(): \Redis for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) { $redis = new \Redis(); + $connected = false; try { $redis->connect($this->host, $this->port, $connectTimeout); + $connected = true; if ($this->readTimeout >= 0) { $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); @@ -212,6 +209,10 @@ protected function getRedis(): \Redis $this->redis = $redis; return $this->redis; } catch (\RedisException $e) { + if ($connected) { + $redis->close(); + } + if ($attempt === self::CONNECT_MAX_ATTEMPTS) { throw $e; } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 101a00c..0eaca1e 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -179,13 +179,8 @@ public function ping(): bool public function close(): void { - try { - $this->redis?->close(); - } catch (\Throwable) { - // best-effort: underlying socket may already be dead - } finally { - $this->redis = null; - } + $this->redis?->close(); + $this->redis = null; } protected function getRedis(): \RedisCluster From 1b6702b8123faf6d94d3d1f891613849c1513920 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:06:58 +0530 Subject: [PATCH 3/9] fix(redis): avoid masking reconnect failures --- src/Queue/Broker/Redis.php | 6 +++++- src/Queue/Connection/Redis.php | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 1f34b5f..b1c0a62 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -36,7 +36,11 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe break; } - $this->connection->close(); + try { + $this->connection->close(); + } catch (\Throwable) { + } + \usleep(\mt_rand(0, $reconnectBackoffMs) * 1000); $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index eecaf19..05ea803 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -210,7 +210,10 @@ protected function getRedis(): \Redis return $this->redis; } catch (\RedisException $e) { if ($connected) { - $redis->close(); + try { + $redis->close(); + } catch (\Throwable) { + } } if ($attempt === self::CONNECT_MAX_ATTEMPTS) { From acf40726e325f5de6e9156e1b7d32ea06a938726 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:16:17 +0530 Subject: [PATCH 4/9] chore(redis): keep cached connection guard unchanged --- src/Queue/Connection/Redis.php | 2 +- src/Queue/Connection/RedisCluster.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 05ea803..80bee3d 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -188,7 +188,7 @@ public function close(): void protected function getRedis(): \Redis { - if ($this->redis instanceof \Redis) { + if ($this->redis) { return $this->redis; } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 0eaca1e..5531f62 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -185,7 +185,7 @@ public function close(): void protected function getRedis(): \RedisCluster { - if ($this->redis instanceof \RedisCluster) { + if ($this->redis) { return $this->redis; } From 1005a1d725fed975cc48c408351a3db96b94a280 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:19:12 +0530 Subject: [PATCH 5/9] fix(redis): add retry failure context --- src/Queue/Broker/Redis.php | 6 +++--- src/Queue/Connection/Redis.php | 23 +++++++++++++++++++---- src/Queue/Connection/RedisCluster.php | 21 +++++++++++++++++---- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index b1c0a62..6eaf3a3 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -11,7 +11,7 @@ class Redis implements Publisher, Consumer { private const int POP_TIMEOUT = 2; - private const int RECONNECT_BASE_BACKOFF_MS = 100; + private const int RECONNECT_BACKOFF_MS = 100; private const int RECONNECT_MAX_BACKOFF_MS = 5_000; private bool $closed = false; @@ -22,7 +22,7 @@ public function __construct(private readonly Connection $connection) public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { - $reconnectBackoffMs = self::RECONNECT_BASE_BACKOFF_MS; + $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; while (!$this->closed) { /** @@ -30,7 +30,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe */ try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); - $reconnectBackoffMs = self::RECONNECT_BASE_BACKOFF_MS; + $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; } catch (\RedisException|\RedisClusterException $e) { if ($this->closed) { break; diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 80bee3d..29161fc 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -7,7 +7,7 @@ class Redis implements Connection { protected const int CONNECT_MAX_ATTEMPTS = 5; - protected const int CONNECT_BASE_BACKOFF_MS = 100; + protected const int CONNECT_BACKOFF_MS = 100; protected const int CONNECT_MAX_BACKOFF_MS = 3_000; protected string $host; @@ -217,18 +217,33 @@ protected function getRedis(): \Redis } if ($attempt === self::CONNECT_MAX_ATTEMPTS) { - throw $e; + throw new \RedisException( + \sprintf( + 'Failed to connect to Redis at %s:%d after %d attempts: %s', + $this->host, + $this->port, + self::CONNECT_MAX_ATTEMPTS, + $e->getMessage(), + ), + (int)$e->getCode(), + $e, + ); } // Exponential backoff with full jitter to avoid thundering herd on recovery. $backoffMs = \min( self::CONNECT_MAX_BACKOFF_MS, - self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)), + self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)), ); \usleep(\mt_rand(0, $backoffMs) * 1000); } } - throw new \RedisException('Unreachable: connect loop exited without success or exception.'); + throw new \RedisException(\sprintf( + 'Unreachable: Redis connect loop for %s:%d exited after %d attempts without success or exception.', + $this->host, + $this->port, + self::CONNECT_MAX_ATTEMPTS, + )); } } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 5531f62..8a4e1a4 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -7,7 +7,7 @@ class RedisCluster implements Connection { protected const int CONNECT_MAX_ATTEMPTS = 5; - protected const int CONNECT_BASE_BACKOFF_MS = 100; + protected const int CONNECT_BACKOFF_MS = 100; protected const int CONNECT_MAX_BACKOFF_MS = 3_000; protected array $seeds; @@ -198,18 +198,31 @@ protected function getRedis(): \RedisCluster return $this->redis; } catch (\RedisClusterException $e) { if ($attempt === self::CONNECT_MAX_ATTEMPTS) { - throw $e; + throw new \RedisClusterException( + \sprintf( + 'Failed to connect to Redis cluster seeds [%s] after %d attempts: %s', + \implode(', ', $this->seeds), + self::CONNECT_MAX_ATTEMPTS, + $e->getMessage(), + ), + (int)$e->getCode(), + $e, + ); } // Exponential backoff with full jitter to avoid thundering herd on recovery. $backoffMs = \min( self::CONNECT_MAX_BACKOFF_MS, - self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)), + self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)), ); \usleep(\mt_rand(0, $backoffMs) * 1000); } } - throw new \RedisClusterException('Unreachable: connect loop exited without success or exception.'); + throw new \RedisClusterException(\sprintf( + 'Unreachable: Redis cluster connect loop for seeds [%s] exited after %d attempts without success or exception.', + \implode(', ', $this->seeds), + self::CONNECT_MAX_ATTEMPTS, + )); } } From c590239512b4d1424b08d687414ef282e832d109 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:23:06 +0530 Subject: [PATCH 6/9] chore(redis): clarify cluster node error text --- src/Queue/Connection/RedisCluster.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 8a4e1a4..c3bc354 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -200,7 +200,7 @@ protected function getRedis(): \RedisCluster if ($attempt === self::CONNECT_MAX_ATTEMPTS) { throw new \RedisClusterException( \sprintf( - 'Failed to connect to Redis cluster seeds [%s] after %d attempts: %s', + 'Failed to connect to Redis cluster nodes [%s] after %d attempts: %s', \implode(', ', $this->seeds), self::CONNECT_MAX_ATTEMPTS, $e->getMessage(), @@ -220,7 +220,7 @@ protected function getRedis(): \RedisCluster } throw new \RedisClusterException(\sprintf( - 'Unreachable: Redis cluster connect loop for seeds [%s] exited after %d attempts without success or exception.', + 'Unreachable: Redis cluster connect loop for nodes [%s] exited after %d attempts without success or exception.', \implode(', ', $this->seeds), self::CONNECT_MAX_ATTEMPTS, )); From 86ad826206026d50c64c54578c15626135d46c3e Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 08:29:36 +0530 Subject: [PATCH 7/9] fix(redis): always clear closed handles --- src/Queue/Connection/Redis.php | 8 ++++++-- src/Queue/Connection/RedisCluster.php | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 29161fc..ee979af 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -182,8 +182,12 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + } finally { + $this->redis = null; + } } protected function getRedis(): \Redis diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index c3bc354..8886f6c 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -179,8 +179,12 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + } finally { + $this->redis = null; + } } protected function getRedis(): \RedisCluster From 81b7779c7b6ff6e261803768c390a15e8e9ad436 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 10:34:50 +0530 Subject: [PATCH 8/9] feat(redis): expose reconnect callback --- src/Queue/Broker/Redis.php | 32 +++- .../Adapter/RedisReconnectCallbackTest.php | 162 ++++++++++++++++++ 2 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 6eaf3a3..c558536 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -15,14 +15,26 @@ class Redis implements Publisher, Consumer private const int RECONNECT_MAX_BACKOFF_MS = 5_000; private bool $closed = false; + /** + * @var (callable(Queue, \Throwable, int, int): void)|null + */ + private $reconnectCallback = null; public function __construct(private readonly Connection $connection) { } + public function setReconnectCallback(?callable $callback): self + { + $this->reconnectCallback = $callback; + + return $this; + } + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $reconnectAttempt = 0; while (!$this->closed) { /** @@ -31,17 +43,23 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $reconnectAttempt = 0; } catch (\RedisException|\RedisClusterException $e) { if ($this->closed) { break; } + $reconnectAttempt++; + try { $this->connection->close(); } catch (\Throwable) { } - \usleep(\mt_rand(0, $reconnectBackoffMs) * 1000); + $sleepMs = \mt_rand(0, $reconnectBackoffMs); + $this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs); + + \usleep($sleepMs * 1000); $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); continue; @@ -117,6 +135,18 @@ public function close(): void $this->closed = true; } + private function triggerReconnectCallback(Queue $queue, \Throwable $error, int $attempt, int $sleepMs): void + { + if (!\is_callable($this->reconnectCallback)) { + return; + } + + try { + ($this->reconnectCallback)($queue, $error, $attempt, $sleepMs); + } catch (\Throwable) { + } + } + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php new file mode 100644 index 0000000..44010b5 --- /dev/null +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -0,0 +1,162 @@ +setReconnectCallback(function (Queue $queue, \Throwable $error, int $attempt, int $sleepMs) use (&$calls, $broker): void { + $calls[] = [ + 'queue' => $queue, + 'error' => $error, + 'attempt' => $attempt, + 'sleepMs' => $sleepMs, + ]; + + $broker->close(); + }); + + $broker->consume( + $queue, + fn () => null, + fn () => null, + fn () => null, + ); + + $this->assertSame(1, $connection->popAttempts); + $this->assertCount(1, $calls); + $this->assertSame($queue, $calls[0]['queue']); + $this->assertInstanceOf(\RedisException::class, $calls[0]['error']); + $this->assertSame(1, $calls[0]['attempt']); + $this->assertIsInt($calls[0]['sleepMs']); + $this->assertGreaterThanOrEqual(0, $calls[0]['sleepMs']); + $this->assertLessThanOrEqual(100, $calls[0]['sleepMs']); + } +} + +class FailingRedisConnection implements Connection +{ + public int $popAttempts = 0; + + public function rightPushArray(string $queue, array $payload): bool + { + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $this->popAttempts++; + + throw new \RedisException('Redis is unavailable.'); + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + return false; + } + + public function leftPushArray(string $queue, array $payload): bool + { + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + return false; + } + + public function rightPush(string $queue, string $payload): bool + { + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + return false; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + return false; + } + + public function leftPush(string $queue, string $payload): bool + { + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + return false; + } + + public function listRemove(string $queue, string $key): bool + { + return true; + } + + public function listSize(string $key): int + { + return 0; + } + + public function listRange(string $key, int $total, int $offset): array + { + return []; + } + + public function remove(string $key): bool + { + return true; + } + + public function move(string $queue, string $destination): bool + { + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + return true; + } + + public function get(string $key): array|string|null + { + return null; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + return true; + } + + public function increment(string $key): int + { + return 1; + } + + public function decrement(string $key): int + { + return 0; + } + + public function ping(): bool + { + return false; + } + + public function close(): void + { + } +} From 1bee0057a40d90dc7b5dc97d2cb826c0e219311a Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 23 Apr 2026 10:54:18 +0530 Subject: [PATCH 9/9] feat(redis): expose reconnect success callback --- src/Queue/Broker/Redis.php | 27 ++++++++++++ .../Adapter/RedisReconnectCallbackTest.php | 44 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index c558536..161e687 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -19,6 +19,10 @@ class Redis implements Publisher, Consumer * @var (callable(Queue, \Throwable, int, int): void)|null */ private $reconnectCallback = null; + /** + * @var (callable(Queue, int): void)|null + */ + private $reconnectSuccessCallback = null; public function __construct(private readonly Connection $connection) { @@ -31,6 +35,13 @@ public function setReconnectCallback(?callable $callback): self return $this; } + public function setReconnectSuccessCallback(?callable $callback): self + { + $this->reconnectSuccessCallback = $callback; + + return $this; + } + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; @@ -42,6 +53,10 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe */ try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); + if ($reconnectAttempt > 0) { + $this->triggerReconnectSuccessCallback($queue, $reconnectAttempt); + } + $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; $reconnectAttempt = 0; } catch (\RedisException|\RedisClusterException $e) { @@ -147,6 +162,18 @@ private function triggerReconnectCallback(Queue $queue, \Throwable $error, int $ } } + private function triggerReconnectSuccessCallback(Queue $queue, int $attempts): void + { + if (!\is_callable($this->reconnectSuccessCallback)) { + return; + } + + try { + ($this->reconnectSuccessCallback)($queue, $attempts); + } catch (\Throwable) { + } + } + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php index 44010b5..481ab9e 100644 --- a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -43,6 +43,36 @@ public function testReconnectCallbackReceivesAttemptContext(): void $this->assertGreaterThanOrEqual(0, $calls[0]['sleepMs']); $this->assertLessThanOrEqual(100, $calls[0]['sleepMs']); } + + public function testReconnectSuccessCallbackReceivesAttemptCount(): void + { + $queue = new Queue('reconnect-success-callback'); + $connection = new RecoveringRedisConnection(); + $broker = new RedisBroker($connection); + $calls = []; + + $broker->setReconnectCallback(fn () => null); + $broker->setReconnectSuccessCallback(function (Queue $queue, int $attempts) use (&$calls, $broker): void { + $calls[] = [ + 'queue' => $queue, + 'attempts' => $attempts, + ]; + + $broker->close(); + }); + + $broker->consume( + $queue, + fn () => null, + fn () => null, + fn () => null, + ); + + $this->assertSame(2, $connection->popAttempts); + $this->assertCount(1, $calls); + $this->assertSame($queue, $calls[0]['queue']); + $this->assertSame(1, $calls[0]['attempts']); + } } class FailingRedisConnection implements Connection @@ -160,3 +190,17 @@ public function close(): void { } } + +class RecoveringRedisConnection extends FailingRedisConnection +{ + public function rightPopArray(string $queue, int $timeout): array|false + { + $this->popAttempts++; + + if ($this->popAttempts === 1) { + throw new \RedisException('Redis is unavailable.'); + } + + return false; + } +}