diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index b36e1b3..161e687 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -11,27 +11,73 @@ class Redis implements Publisher, Consumer { private const int POP_TIMEOUT = 2; + private const int RECONNECT_BACKOFF_MS = 100; + private const int RECONNECT_MAX_BACKOFF_MS = 5_000; private bool $closed = false; + /** + * @var (callable(Queue, \Throwable, int, int): void)|null + */ + private $reconnectCallback = null; + /** + * @var (callable(Queue, int): void)|null + */ + private $reconnectSuccessCallback = null; public function __construct(private readonly Connection $connection) { } + public function setReconnectCallback(?callable $callback): self + { + $this->reconnectCallback = $callback; + + return $this; + } + + public function setReconnectSuccessCallback(?callable $callback): self + { + $this->reconnectSuccessCallback = $callback; + + return $this; + } + public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void { + $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $reconnectAttempt = 0; + while (!$this->closed) { /** * Waiting for next Job. */ try { $nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT); - } catch (\RedisException $e) { + if ($reconnectAttempt > 0) { + $this->triggerReconnectSuccessCallback($queue, $reconnectAttempt); + } + + $reconnectBackoffMs = self::RECONNECT_BACKOFF_MS; + $reconnectAttempt = 0; + } catch (\RedisException|\RedisClusterException $e) { if ($this->closed) { break; } - throw $e; + $reconnectAttempt++; + + try { + $this->connection->close(); + } catch (\Throwable) { + } + + $sleepMs = \mt_rand(0, $reconnectBackoffMs); + $this->triggerReconnectCallback($queue, $e, $reconnectAttempt, $sleepMs); + + \usleep($sleepMs * 1000); + $reconnectBackoffMs = \min(self::RECONNECT_MAX_BACKOFF_MS, $reconnectBackoffMs * 2); + + continue; } if (!$nextMessage) { @@ -104,6 +150,30 @@ public function close(): void $this->closed = true; } + private function triggerReconnectCallback(Queue $queue, \Throwable $error, int $attempt, int $sleepMs): void + { + if (!\is_callable($this->reconnectCallback)) { + return; + } + + try { + ($this->reconnectCallback)($queue, $error, $attempt, $sleepMs); + } catch (\Throwable) { + } + } + + private function triggerReconnectSuccessCallback(Queue $queue, int $attempts): void + { + if (!\is_callable($this->reconnectSuccessCallback)) { + return; + } + + try { + ($this->reconnectSuccessCallback)($queue, $attempts); + } catch (\Throwable) { + } + } + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 8900277..ee979af 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -6,6 +6,10 @@ class Redis implements Connection { + protected const int CONNECT_MAX_ATTEMPTS = 5; + protected const int CONNECT_BACKOFF_MS = 100; + protected const int CONNECT_MAX_BACKOFF_MS = 3_000; + protected string $host; protected int $port; protected ?string $user; @@ -178,8 +182,12 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + } finally { + $this->redis = null; + } } protected function getRedis(): \Redis @@ -188,15 +196,58 @@ protected function getRedis(): \Redis return $this->redis; } - $this->redis = new \Redis(); - $connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout; - $this->redis->connect($this->host, $this->port, $connectTimeout); - if ($this->readTimeout >= 0) { - $this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); + for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) { + $redis = new \Redis(); + $connected = false; + + try { + $redis->connect($this->host, $this->port, $connectTimeout); + $connected = true; + + if ($this->readTimeout >= 0) { + $redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout); + } + + $this->redis = $redis; + return $this->redis; + } catch (\RedisException $e) { + if ($connected) { + try { + $redis->close(); + } catch (\Throwable) { + } + } + + if ($attempt === self::CONNECT_MAX_ATTEMPTS) { + throw new \RedisException( + \sprintf( + 'Failed to connect to Redis at %s:%d after %d attempts: %s', + $this->host, + $this->port, + self::CONNECT_MAX_ATTEMPTS, + $e->getMessage(), + ), + (int)$e->getCode(), + $e, + ); + } + + // Exponential backoff with full jitter to avoid thundering herd on recovery. + $backoffMs = \min( + self::CONNECT_MAX_BACKOFF_MS, + self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)), + ); + \usleep(\mt_rand(0, $backoffMs) * 1000); + } } - return $this->redis; + throw new \RedisException(\sprintf( + 'Unreachable: Redis connect loop for %s:%d exited after %d attempts without success or exception.', + $this->host, + $this->port, + self::CONNECT_MAX_ATTEMPTS, + )); } } diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 1c8c0a9..8886f6c 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -6,6 +6,10 @@ class RedisCluster implements Connection { + protected const int CONNECT_MAX_ATTEMPTS = 5; + protected const int CONNECT_BACKOFF_MS = 100; + protected const int CONNECT_MAX_BACKOFF_MS = 3_000; + protected array $seeds; protected float $connectTimeout; protected float $readTimeout; @@ -175,8 +179,12 @@ public function ping(): bool public function close(): void { - $this->redis?->close(); - $this->redis = null; + try { + $this->redis?->close(); + } catch (\Throwable) { + } finally { + $this->redis = null; + } } protected function getRedis(): \RedisCluster @@ -187,7 +195,38 @@ protected function getRedis(): \RedisCluster $connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout; $readTimeout = $this->readTimeout < 0 ? 0 : $this->readTimeout; - $this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout); - return $this->redis; + + for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) { + try { + $this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout); + return $this->redis; + } catch (\RedisClusterException $e) { + if ($attempt === self::CONNECT_MAX_ATTEMPTS) { + throw new \RedisClusterException( + \sprintf( + 'Failed to connect to Redis cluster nodes [%s] after %d attempts: %s', + \implode(', ', $this->seeds), + self::CONNECT_MAX_ATTEMPTS, + $e->getMessage(), + ), + (int)$e->getCode(), + $e, + ); + } + + // Exponential backoff with full jitter to avoid thundering herd on recovery. + $backoffMs = \min( + self::CONNECT_MAX_BACKOFF_MS, + self::CONNECT_BACKOFF_MS * (2 ** ($attempt - 1)), + ); + \usleep(\mt_rand(0, $backoffMs) * 1000); + } + } + + throw new \RedisClusterException(\sprintf( + 'Unreachable: Redis cluster connect loop for nodes [%s] exited after %d attempts without success or exception.', + \implode(', ', $this->seeds), + self::CONNECT_MAX_ATTEMPTS, + )); } } diff --git a/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php new file mode 100644 index 0000000..481ab9e --- /dev/null +++ b/tests/Queue/E2E/Adapter/RedisReconnectCallbackTest.php @@ -0,0 +1,206 @@ +setReconnectCallback(function (Queue $queue, \Throwable $error, int $attempt, int $sleepMs) use (&$calls, $broker): void { + $calls[] = [ + 'queue' => $queue, + 'error' => $error, + 'attempt' => $attempt, + 'sleepMs' => $sleepMs, + ]; + + $broker->close(); + }); + + $broker->consume( + $queue, + fn () => null, + fn () => null, + fn () => null, + ); + + $this->assertSame(1, $connection->popAttempts); + $this->assertCount(1, $calls); + $this->assertSame($queue, $calls[0]['queue']); + $this->assertInstanceOf(\RedisException::class, $calls[0]['error']); + $this->assertSame(1, $calls[0]['attempt']); + $this->assertIsInt($calls[0]['sleepMs']); + $this->assertGreaterThanOrEqual(0, $calls[0]['sleepMs']); + $this->assertLessThanOrEqual(100, $calls[0]['sleepMs']); + } + + public function testReconnectSuccessCallbackReceivesAttemptCount(): void + { + $queue = new Queue('reconnect-success-callback'); + $connection = new RecoveringRedisConnection(); + $broker = new RedisBroker($connection); + $calls = []; + + $broker->setReconnectCallback(fn () => null); + $broker->setReconnectSuccessCallback(function (Queue $queue, int $attempts) use (&$calls, $broker): void { + $calls[] = [ + 'queue' => $queue, + 'attempts' => $attempts, + ]; + + $broker->close(); + }); + + $broker->consume( + $queue, + fn () => null, + fn () => null, + fn () => null, + ); + + $this->assertSame(2, $connection->popAttempts); + $this->assertCount(1, $calls); + $this->assertSame($queue, $calls[0]['queue']); + $this->assertSame(1, $calls[0]['attempts']); + } +} + +class FailingRedisConnection implements Connection +{ + public int $popAttempts = 0; + + public function rightPushArray(string $queue, array $payload): bool + { + return true; + } + + public function rightPopArray(string $queue, int $timeout): array|false + { + $this->popAttempts++; + + throw new \RedisException('Redis is unavailable.'); + } + + public function rightPopLeftPushArray(string $queue, string $destination, int $timeout): array|false + { + return false; + } + + public function leftPushArray(string $queue, array $payload): bool + { + return true; + } + + public function leftPopArray(string $queue, int $timeout): array|false + { + return false; + } + + public function rightPush(string $queue, string $payload): bool + { + return true; + } + + public function rightPop(string $queue, int $timeout): string|false + { + return false; + } + + public function rightPopLeftPush(string $queue, string $destination, int $timeout): string|false + { + return false; + } + + public function leftPush(string $queue, string $payload): bool + { + return true; + } + + public function leftPop(string $queue, int $timeout): string|false + { + return false; + } + + public function listRemove(string $queue, string $key): bool + { + return true; + } + + public function listSize(string $key): int + { + return 0; + } + + public function listRange(string $key, int $total, int $offset): array + { + return []; + } + + public function remove(string $key): bool + { + return true; + } + + public function move(string $queue, string $destination): bool + { + return true; + } + + public function set(string $key, string $value, int $ttl = 0): bool + { + return true; + } + + public function get(string $key): array|string|null + { + return null; + } + + public function setArray(string $key, array $value, int $ttl = 0): bool + { + return true; + } + + public function increment(string $key): int + { + return 1; + } + + public function decrement(string $key): int + { + return 0; + } + + public function ping(): bool + { + return false; + } + + public function close(): void + { + } +} + +class RecoveringRedisConnection extends FailingRedisConnection +{ + public function rightPopArray(string $queue, int $timeout): array|false + { + $this->popAttempts++; + + if ($this->popAttempts === 1) { + throw new \RedisException('Redis is unavailable.'); + } + + return false; + } +}