Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit e642299

Browse files
Merge branch '4.x-dev' into 4.x-dev
2 parents 7a0f7df + b377efa commit e642299

File tree

8 files changed

+128
-15
lines changed

8 files changed

+128
-15
lines changed

‎src/CloudTasksQueue.php‎

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
class CloudTasksQueue extends LaravelQueue implements QueueContract
2626
{
27-
private Closure | array $headers = [];
27+
private Closure|array $headers = [];
2828
private static ?Closure $handlerUrlCallback = null;
2929

3030
public function __construct(public array $config, public CloudTasksClient $client, public $dispatchAfterCommit = false)
@@ -160,7 +160,7 @@ private function taskName(string $queueName, array $payload): string
160160
$this->config['project'],
161161
$this->config['location'],
162162
$queueName,
163-
$displayName.'-'.bin2hex(random_bytes(8)),
163+
$displayName.'-'.bin2hex(random_bytes(8)),
164164
);
165165
}
166166

@@ -180,8 +180,7 @@ private function enrichPayloadWithInternalData(
180180
string $queueName,
181181
string $taskName,
182182
string $connectionName,
183-
): array
184-
{
183+
): array {
185184
$payload['internal'] = [
186185
'attempts' => $payload['internal']['attempts'] ?? 0,
187186
'queue' => $queueName,
@@ -197,7 +196,7 @@ public function addPayloadToTask(array $payload, Task $task, mixed $job): Task
197196
{
198197
$headers = value($this->headers, $payload) ?: [];
199198

200-
if (!empty($this->config['app_engine'])) {
199+
if (!empty($this->config['app_engine'])) {
201200
$path = \Safe\parse_url(route('cloud-tasks.handle-task'), PHP_URL_PATH);
202201

203202
$appEngineRequest = new AppEngineHttpRequest();
@@ -206,7 +205,7 @@ public function addPayloadToTask(array $payload, Task $task, mixed $job): Task
206205
$appEngineRequest->setBody(json_encode($payload));
207206
$appEngineRequest->setHeaders($headers);
208207

209-
if (!empty($service = $this->config['app_engine_service'])) {
208+
if (!empty($service = $this->config['app_engine_service'])) {
210209
$routing = new AppEngineRouting();
211210
$routing->setService($service);
212211
$appEngineRequest->setAppEngineRouting($routing);
@@ -264,14 +263,14 @@ public function getHandler(mixed $job): string
264263

265264
$handler = rtrim($this->config['handler'], '/');
266265

267-
if (str_ends_with($handler, '/'.config('cloud-tasks.uri'))) {
266+
if (str_ends_with($handler, '/'.config('cloud-tasks.uri'))) {
268267
return $handler;
269268
}
270269

271-
return $handler.'/'.config('cloud-tasks.uri');
270+
return $handler.'/'.config('cloud-tasks.uri');
272271
}
273272

274-
public function setTaskHeaders(Closure | array $headers): void
273+
public function setTaskHeaders(Closure|array $headers): void
275274
{
276275
$this->headers = $headers;
277276
}

‎src/CloudTasksServiceProvider.php‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
namespace Stackkit\LaravelGoogleCloudTasksQueue;
66

77
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
8+
use Illuminate\Contracts\Debug\ExceptionHandler;
9+
use Illuminate\Foundation\Application;
810
use Illuminate\Queue\Events\JobExceptionOccurred;
911
use Illuminate\Queue\Events\JobFailed;
1012
use Illuminate\Queue\Events\JobProcessed;
@@ -29,6 +31,15 @@ private function registerClient(): void
2931
return new CloudTasksClient();
3032
});
3133

34+
$this->app->singleton('cloud-tasks.worker', function (Application $app) {
35+
return new Worker(
36+
$app['queue'],
37+
$app['events'],
38+
$app[ExceptionHandler::class],
39+
fn () => $app->isDownForMaintenance(),
40+
);
41+
});
42+
3243
$this->app->bind('cloud-tasks-api', CloudTasksApiConcrete::class);
3344
}
3445

‎src/IncomingTask.php‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Stackkit\LaravelGoogleCloudTasksQueue;
66

77
use Safe\Exceptions\JsonException;
8+
89
use function Safe\json_decode;
910

1011
class IncomingTask

‎src/TaskHandler.php‎

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
88
use Illuminate\Container\Container;
9-
use Illuminate\Queue\Worker;
109
use Illuminate\Queue\WorkerOptions;
1110

1211
class TaskHandler
@@ -51,7 +50,7 @@ private function run(IncomingTask $task): void
5150

5251
$job->setAttempts($job->attempts() + 1);
5352

54-
tap(app('queue.worker'), fn (Worker $worker) => $worker->process(
53+
tap(app('cloud-tasks.worker'), fn (Worker $worker) => $worker->process(
5554
connectionName: $job->getConnectionName(),
5655
job: $job,
5756
options: $this->getWorkerOptions()

‎src/Worker.php‎

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Stackkit\LaravelGoogleCloudTasksQueue;
6+
7+
use Illuminate\Queue\Worker as LaravelWorker;
8+
use Illuminate\Queue\WorkerOptions;
9+
10+
/**
11+
* Custom worker class to handle specific requirements for Google Cloud Tasks.
12+
*
13+
* This class modifies the behavior of the Laravel queue worker to better
14+
* integrate with Google Cloud Tasks, particularly focusing on job timeout
15+
* handling and graceful shutdowns to avoid interrupting the HTTP lifecycle.
16+
*
17+
* Firstly, the 'supportsAsyncSignals', 'listenForSignals', and 'registerTimeoutHandler' methods
18+
* are protected and called within the queue while(true) loop. We want (and need!) to have that
19+
* too in order to support job timeouts. So, to make it work, we create a public method that
20+
* can call the private signal methods.
21+
*
22+
* Secondly, we need to override the 'kill' method because it tends to kill the server process (artisan serve, octane),
23+
* as well as abort the HTTP request from Cloud Tasks. This is not the desired behavior.
24+
* Instead, it should just fire the WorkerStopped event and return a normal status code.
25+
*/
26+
class Worker extends LaravelWorker
27+
{
28+
public function process($connectionName, $job, WorkerOptions $options)
29+
{
30+
if ($this->supportsAsyncSignals()) {
31+
$this->listenForSignals();
32+
33+
$this->registerTimeoutHandler($job, $options);
34+
}
35+
36+
return parent::process($connectionName, $job, $options);
37+
}
38+
39+
public function kill($status = 0, $options = null): void
40+
{
41+
parent::stop($status, $options);
42+
43+
// When running tests, we cannot run exit because it will kill the PHPunit process.
44+
// So, to still test that the application has exited, we will simply rely on the
45+
// WorkerStopped event that is fired when the worker is stopped.
46+
if (app()->runningUnitTests()) {
47+
return;
48+
}
49+
50+
exit($status);
51+
}
52+
}

‎tests/QueueTest.php‎

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
use Illuminate\Support\Facades\Queue;
1717
use PHPUnit\Framework\Attributes\Test;
1818
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
19-
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;
2019
use Stackkit\LaravelGoogleCloudTasksQueue\Events\JobReleased;
2120
use Tests\Support\FailingJob;
2221
use Tests\Support\FailingJobWithExponentialBackoff;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Support;
6+
7+
use Illuminate\Queue\Events\WorkerStopping;
8+
use Illuminate\Support\Facades\Event;
9+
10+
class SimpleJobWithTimeout extends SimpleJob
11+
{
12+
public $timeout = 3;
13+
14+
public function handle()
15+
{
16+
Event::listen(WorkerStopping::class, function () {
17+
event(new JobOutput('SimpleJobWithTimeout:worker-stopping'));
18+
});
19+
20+
event(new JobOutput('SimpleJobWithTimeout:1'));
21+
sleep(1);
22+
event(new JobOutput('SimpleJobWithTimeout:2'));
23+
sleep(1);
24+
event(new JobOutput('SimpleJobWithTimeout:3'));
25+
sleep(1);
26+
event(new JobOutput('SimpleJobWithTimeout:4'));
27+
sleep(1);
28+
event(new JobOutput('SimpleJobWithTimeout:5'));
29+
}
30+
}

‎tests/TaskHandlerTest.php‎

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Tests\Support\FailingJobWithUnlimitedTries;
1818
use Tests\Support\JobOutput;
1919
use Tests\Support\SimpleJob;
20+
use Tests\Support\SimpleJobWithTimeout;
2021

2122
class TaskHandlerTest extends TestCase
2223
{
@@ -37,7 +38,7 @@ public function it_can_run_a_task()
3738
$this->dispatch(new SimpleJob())->runWithoutExceptionHandler();
3839

3940
// Assert
40-
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'SimpleJob:success');
41+
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'SimpleJob:success');
4142
}
4243

4344
#[Test]
@@ -54,7 +55,7 @@ public function it_can_run_a_task_using_the_task_connection()
5455
$this->dispatch($job)->runWithoutExceptionHandler();
5556

5657
// Assert
57-
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'SimpleJob:success');
58+
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'SimpleJob:success');
5859
}
5960

6061
#[Test]
@@ -185,7 +186,7 @@ public function it_can_handle_encrypted_jobs()
185186
decrypt($job->payloadAsArray('data.command')),
186187
);
187188

188-
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'EncryptedJob:success');
189+
Event::assertDispatched(fn(JobOutput $event) => $event->output === 'EncryptedJob:success');
189190
}
190191

191192
#[Test]
@@ -248,4 +249,25 @@ public function retried_jobs_get_a_new_name()
248249
$this->assertCount(2, $this->createdTasks);
249250
$this->assertNotEquals($this->createdTasks[0]->getName(), $this->createdTasks[1]->getName());
250251
}
252+
253+
#[Test]
254+
public function test_job_timeout()
255+
{
256+
// Arrange
257+
Event::fake(JobOutput::class);
258+
259+
// Act
260+
$this->dispatch(new SimpleJobWithTimeout())->run();
261+
262+
// Assert
263+
$events = Event::dispatched(JobOutput::class)->map(fn ($event) => $event[0]->output)->toArray();
264+
$this->assertEquals([
265+
'SimpleJobWithTimeout:1',
266+
'SimpleJobWithTimeout:2',
267+
'SimpleJobWithTimeout:3',
268+
'SimpleJobWithTimeout:worker-stopping',
269+
'SimpleJobWithTimeout:4',
270+
'SimpleJobWithTimeout:5',
271+
], $events);
272+
}
251273
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /