Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
class Redis implements Publisher, Consumer
{
private const int POP_TIMEOUT = 2;
private const int RECONNECT_BASE_BACKOFF_MS = 100;
private const int RECONNECT_MAX_BACKOFF_MS = 5_000;
private const int RECONNECT_BACKOFF_CAP_SHIFT = 10;

private bool $closed = false;

Expand All @@ -20,18 +23,34 @@ public function __construct(private readonly Connection $connection)

public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
{
$reconnectAttempts = 0;

while (!$this->closed) {
/**
* Waiting for next Job.
*/
try {
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
$reconnectAttempts = 0;
} catch (\RedisException $e) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 RedisCluster outages not caught

The broker catches \RedisException, but when the connection layer is Connection/RedisCluster, phpredis throws \RedisClusterException (a sibling of \RedisException under RuntimeException, not a subclass). Any cluster outage during brPop will propagate uncaught and crash the worker just as before this PR — the new reconnect logic silently doesn't apply to the cluster path.

To cover both adapters, widen the catch:

Suggested change
} catch (\RedisException $e) {
} catch (\RedisException|\RedisClusterException $e) {

if ($this->closed) {
break;
}

throw $e;
// Drop the stale connection so the next pop opens a fresh one, then
// back off with full jitter before retrying. Keeps the worker alive
// across transient Redis outages instead of crash-looping.
$this->connection->close();

$reconnectAttempts++;
$shift = \min(self::RECONNECT_BACKOFF_CAP_SHIFT, $reconnectAttempts - 1);
$backoffMs = \min(
self::RECONNECT_MAX_BACKOFF_MS,
self::RECONNECT_BASE_BACKOFF_MS * (2 ** $shift),
);
Comment on lines +46 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 RECONNECT_BACKOFF_CAP_SHIFT doesn't prevent integer overflow

The constant is described as guarding against integer overflow, but RECONNECT_MAX_BACKOFF_MS (5 000 ms) is always the binding cap — 2 ** 10 × 100 = 102 400 ms never takes effect in practice because min(5000, 102400) = 5000. The constant is harmless but the overflow-prevention justification is misleading; consider updating the comment to clarify it simply limits the exponent for readability.

\usleep(\mt_rand(0, $backoffMs) * 1000);

continue;
}

if (!$nextMessage) {
Expand Down
46 changes: 37 additions & 9 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

class Redis implements Connection
{
protected const int CONNECT_MAX_ATTEMPTS = 5;
protected const int CONNECT_BASE_BACKOFF_MS = 100;
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;

protected string $host;
protected int $port;
protected ?string $user;
Expand Down Expand Up @@ -178,25 +182,49 @@ public function ping(): bool

public function close(): void
{
$this->redis?->close();
$this->redis = null;
try {
$this->redis?->close();
} catch (\Throwable) {
// best-effort: underlying socket may already be dead
} finally {
$this->redis = null;
}
}

protected function getRedis(): \Redis
{
if ($this->redis) {
if ($this->redis instanceof \Redis) {
return $this->redis;
}

$this->redis = new \Redis();

$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
$this->redis->connect($this->host, $this->port, $connectTimeout);

if ($this->readTimeout >= 0) {
$this->redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
$redis = new \Redis();

try {
$redis->connect($this->host, $this->port, $connectTimeout);

if ($this->readTimeout >= 0) {
$redis->setOption(\Redis::OPT_READ_TIMEOUT, $this->readTimeout);
}

$this->redis = $redis;
return $this->redis;
Comment on lines +205 to +213
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Leaked socket if setOption() throws

$redis has an active TCP connection after connect() succeeds. If setOption() throws a \RedisException, the exception is caught by the retry loop, a backoff sleep is applied, and a brand-new $redis is created — but the successfully-connected socket from the previous attempt is never closed. Each failed setOption() attempt leaks a file descriptor. While setOption rarely throws, wrapping the entire attempt in a try/finally to call $redis->close() on exception would be the safe pattern.

} catch (\RedisException $e) {
if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
throw $e;
}

// Exponential backoff with full jitter to avoid thundering herd on recovery.
$backoffMs = \min(
self::CONNECT_MAX_BACKOFF_MS,
self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)),
);
\usleep(\mt_rand(0, $backoffMs) * 1000);
}
}

return $this->redis;
throw new \RedisException('Unreachable: connect loop exited without success or exception.');
}
}
37 changes: 32 additions & 5 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

class RedisCluster implements Connection
{
protected const int CONNECT_MAX_ATTEMPTS = 5;
protected const int CONNECT_BASE_BACKOFF_MS = 100;
protected const int CONNECT_MAX_BACKOFF_MS = 3_000;

protected array $seeds;
protected float $connectTimeout;
protected float $readTimeout;
Expand Down Expand Up @@ -175,19 +179,42 @@ public function ping(): bool

public function close(): void
{
$this->redis?->close();
$this->redis = null;
try {
$this->redis?->close();
} catch (\Throwable) {
// best-effort: underlying socket may already be dead
} finally {
$this->redis = null;
}
}

protected function getRedis(): \RedisCluster
{
if ($this->redis) {
if ($this->redis instanceof \RedisCluster) {
return $this->redis;
}

$connectTimeout = $this->connectTimeout < 0 ? 0 : $this->connectTimeout;
$readTimeout = $this->readTimeout < 0 ? 0 : $this->readTimeout;
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
return $this->redis;

for ($attempt = 1; $attempt <= self::CONNECT_MAX_ATTEMPTS; $attempt++) {
try {
$this->redis = new \RedisCluster(null, $this->seeds, $connectTimeout, $readTimeout);
return $this->redis;
} catch (\RedisClusterException $e) {
if ($attempt === self::CONNECT_MAX_ATTEMPTS) {
throw $e;
}

// Exponential backoff with full jitter to avoid thundering herd on recovery.
$backoffMs = \min(
self::CONNECT_MAX_BACKOFF_MS,
self::CONNECT_BASE_BACKOFF_MS * (2 ** ($attempt - 1)),
);
\usleep(\mt_rand(0, $backoffMs) * 1000);
}
}

throw new \RedisClusterException('Unreachable: connect loop exited without success or exception.');
}
}
Loading