Skip to content

Commit 957765c

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit 957765c

File tree

11 files changed

+344
-240
lines changed

11 files changed

+344
-240
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,25 @@
33
namespace Utopia\Queue\Adapter;
44

55
use Swoole\Constant;
6+
use Swoole\Process;
67
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
8+
9+
use Utopia\Console;
810
use Utopia\Queue\Adapter;
911
use Utopia\Queue\Consumer;
1012

13+
use function Swoole\Coroutine\go;
14+
1115
class Swoole extends Adapter
1216
{
1317
protected Pool $pool;
1418

15-
/** @var callable */
16-
private $onStop;
17-
18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
19+
public function __construct(
20+
Consumer $consumer,
21+
int $workerNum,
22+
string $queue,
23+
string $namespace = "utopia-queue",
24+
) {
2025
parent::__construct($workerNum, $queue, $namespace);
2126

2227
$this->consumer = $consumer;
@@ -25,36 +30,15 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2530

2631
public function start(): self
2732
{
28-
$this->pool->set(['enable_coroutine' => true]);
29-
30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
36-
37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
41-
42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
33+
// Enable coroutine hooks for Redis and other extensions
34+
$this->pool->set(["enable_coroutine" => true]);
4735

4836
$this->pool->start();
4937
return $this;
5038
}
5139

5240
public function stop(): self
5341
{
54-
if ($this->onStop) {
55-
call_user_func($this->onStop);
56-
}
57-
5842
Console::info("[Swoole] Shutting down process pool...");
5943
$this->pool->shutdown();
6044
Console::success("[Swoole] Process pool stopped.");
@@ -63,33 +47,42 @@ public function stop(): self
6347

6448
public function workerStart(callable $callback): self
6549
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
50+
$this->pool->on(Constant::EVENT_WORKER_START, function (
51+
Pool $pool,
52+
string $workerId,
53+
) use ($callback) {
54+
// Register signal handlers for graceful shutdown
55+
Process::signal(SIGTERM, function () use ($workerId) {
56+
Console::info(
57+
"[Swoole] Worker {$workerId} received SIGTERM, stopping consumer...",
58+
);
59+
$this->consumer->close();
60+
});
61+
62+
Process::signal(SIGINT, function () use ($workerId) {
63+
Console::info(
64+
"[Swoole] Worker {$workerId} received SIGINT, stopping consumer...",
65+
);
66+
$this->consumer->close();
67+
});
68+
69+
// Run consume loop in a coroutine to allow event loop to process signals
70+
// The coroutine container waits for all child coroutines before worker exits
71+
go(function () use ($callback, $workerId) {
72+
\call_user_func($callback, $workerId);
73+
});
8374
});
8475

8576
return $this;
8677
}
8778

8879
public function workerStop(callable $callback): self
8980
{
90-
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
81+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
82+
Pool $pool,
83+
string $workerId,
84+
) use ($callback) {
85+
\call_user_func($callback, $workerId);
9386
});
9487

9588
return $this;

src/Queue/Broker/Pool.php

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

3842
public function close(): void
3943
{
40-
$this->delegateConsumer(__FUNCTION__, \func_get_args());
44+
// TODO: Implement closing all connections in the pool
4145
}
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

src/Queue/Broker/Redis.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9292
public function close(): void
9393
{
9494
$this->closed = true;
95+
$this->connection->close();
9596
}
9697

9798
public function enqueue(Queue $queue, array $payload): bool

src/Queue/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;
28+
public function close(): void;
2829
}

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ public function ping(): bool
169169
}
170170
}
171171

172+
public function close(): void
173+
{
174+
$this->redis?->close();
175+
$this->redis = null;
176+
}
177+
172178
protected function getRedis(): \Redis
173179
{
174180
if ($this->redis) {

src/Queue/Connection/RedisCluster.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ public function ping(): bool
166166
}
167167
}
168168

169+
public function close(): void
170+
{
171+
$this->redis?->close();
172+
$this->redis = null;
173+
}
174+
169175
protected function getRedis(): \RedisCluster
170176
{
171177
if ($this->redis) {

0 commit comments

Comments
 (0)