Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit ce601bc

Browse files
feat: add support for task locks (#60)
* feat: add support for task locks * make phpcpd happy
1 parent 835ba88 commit ce601bc

File tree

2 files changed

+229
-9
lines changed

2 files changed

+229
-9
lines changed

‎src/Commands/QueueWork.php

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,15 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
239239
timer()->start('work');
240240
$payload = $work->payload;
241241

242+
$payloadMetadata = null;
243+
242244
try {
245+
// Load payload metadata
246+
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
247+
248+
// Renew lock if needed
249+
$this->renewLock($payloadMetadata);
250+
243251
$class = $config->resolveJobClass($payload['job']);
244252
$job = new $class($payload['data']);
245253
$job->process();
@@ -250,9 +258,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
250258
CLI::write('The processing of this job was successful', 'green');
251259

252260
// Check chained jobs
253-
if (isset($payload['metadata']) && $payload['metadata'] !== []) {
254-
$this->processNextJobInChain($payload['metadata']);
255-
}
261+
$this->processNextJobInChain($payloadMetadata);
256262
} catch (Throwable $err) {
257263
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
258264
// Schedule for later
@@ -263,6 +269,9 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
263269
}
264270
CLI::write('The processing of this job failed', 'red');
265271
} finally {
272+
// Remove lock if needed
273+
$this->clearLock($payloadMetadata);
274+
266275
timer()->stop('work');
267276
CLI::write(sprintf('It took: %s sec', timer()->getElapsedTime('work')) . PHP_EOL, 'cyan');
268277
}
@@ -271,10 +280,8 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
271280
/**
272281
* Process the next job in the chain
273282
*/
274-
private function processNextJobInChain(array $payloadMetadata): void
283+
private function processNextJobInChain(PayloadMetadata $payloadMetadata): void
275284
{
276-
$payloadMetadata = PayloadMetadata::fromArray($payloadMetadata);
277-
278285
if (! $payloadMetadata->hasChainedJobs()) {
279286
return;
280287
}
@@ -305,6 +312,40 @@ private function processNextJobInChain(array $payloadMetadata): void
305312
CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green');
306313
}
307314

315+
/**
316+
* Renew task lock
317+
*/
318+
private function renewLock(PayloadMetadata $payloadMetadata): void
319+
{
320+
if (! $payloadMetadata->has('taskLockTTL') || ! $payloadMetadata->has('taskLockKey')) {
321+
return;
322+
}
323+
324+
$ttl = $payloadMetadata->get('taskLockTTL');
325+
$key = $payloadMetadata->get('taskLockKey');
326+
327+
// Permanent lock, no need to renew
328+
if ($ttl === 0) {
329+
return;
330+
}
331+
332+
cache()->save($key, [], $ttl);
333+
}
334+
335+
/**
336+
* Remove task lock
337+
*/
338+
private function clearLock(PayloadMetadata $payloadMetadata): void
339+
{
340+
if (! $payloadMetadata->has('taskLockKey')) {
341+
return;
342+
}
343+
344+
$key = $payloadMetadata->get('taskLockKey');
345+
346+
cache()->delete($key);
347+
}
348+
308349
private function maxJobsCheck(int $maxJobs, int $countJobs): bool
309350
{
310351
if ($maxJobs > 0 && $countJobs >= $maxJobs) {

‎tests/Commands/QueueWorkTest.php

Lines changed: 182 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
namespace Tests\Commands;
1515

16+
use CodeIgniter\Cache\CacheInterface;
17+
use CodeIgniter\Config\Services;
1618
use CodeIgniter\I18n\Time;
1719
use CodeIgniter\Queue\Models\QueueJobModel;
1820
use CodeIgniter\Test\Filters\CITestStreamFilter;
@@ -123,13 +125,17 @@ public function testRunWithChainedQueueSucceed(): void
123125
'job' => 'success',
124126
'data' => ['key' => 'value'],
125127
'metadata' => [
126-
'queue' => 'queue',
128+
'queue' => 'test',
127129
'chainedJobs' => [
128130
[
129-
'job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => [
131+
'job' => 'success',
132+
'data' => [
133+
'key3' => 'value3',
134+
],
135+
'metadata' => [
130136
'queue' => 'queue',
131137
'priority' => 'high',
132-
'delay' => 10,
138+
'delay' => 30,
133139
],
134140
],
135141
],
@@ -154,5 +160,178 @@ public function testRunWithChainedQueueSucceed(): void
154160
$this->assertSame('The processing of this job was successful', $this->getLine(4));
155161
$this->assertSame('Chained job: success has been placed in the queue: queue', $this->getLine(5));
156162
$this->assertSame('No job available. Stopping.', $this->getLine(8));
163+
164+
$this->seeInDatabase('queue_jobs', [
165+
'queue' => 'queue',
166+
'payload' => json_encode([
167+
'job' => 'success',
168+
'data' => ['key3' => 'value3'],
169+
'metadata' => [
170+
'queue' => 'queue',
171+
'priority' => 'high',
172+
'delay' => 30,
173+
],
174+
]),
175+
]);
176+
}
177+
178+
public function testRunWithTaskLock(): void
179+
{
180+
$lockKey = 'test_lock_key';
181+
$lockTTL = 300; // 5 minutes
182+
183+
Time::setTestNow('2023年12月19日 14:15:16');
184+
185+
$cache = $this->createMock(CacheInterface::class);
186+
187+
// Set up expectations
188+
$cache->expects($this->once())
189+
->method('save')
190+
->with($lockKey, $this->anything(), $lockTTL)
191+
->willReturn(true);
192+
193+
$cache->expects($this->once())
194+
->method('delete')
195+
->with($lockKey)
196+
->willReturn(true);
197+
198+
// Replace the cache service
199+
Services::injectMock('cache', $cache);
200+
201+
fake(QueueJobModel::class, [
202+
'connection' => 'database',
203+
'queue' => 'test',
204+
'payload' => [
205+
'job' => 'success',
206+
'data' => ['key' => 'value'],
207+
'metadata' => [
208+
'taskLockKey' => $lockKey,
209+
'taskLockTTL' => $lockTTL,
210+
'queue' => 'test',
211+
],
212+
],
213+
'priority' => 'default',
214+
'status' => 0,
215+
'attempts' => 0,
216+
'available_at' => 1_702_977_074,
217+
]);
218+
219+
CITestStreamFilter::registration();
220+
CITestStreamFilter::addOutputFilter();
221+
222+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
223+
$this->parseOutput(CITestStreamFilter::$buffer);
224+
225+
CITestStreamFilter::removeOutputFilter();
226+
227+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
228+
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
229+
$this->assertSame('The processing of this job was successful', $this->getLine(4));
230+
}
231+
232+
public function testRunWithPermanentTaskLock(): void
233+
{
234+
$lockKey = 'permanent_lock_key';
235+
$lockTTL = 0; // Permanent lock
236+
237+
Time::setTestNow('2023年12月19日 14:15:16');
238+
239+
$cache = $this->createMock(CacheInterface::class);
240+
241+
// For permanent lock (TTL=0), save should NOT be called
242+
$cache->expects($this->never())
243+
->method('save');
244+
245+
$cache->expects($this->once())
246+
->method('delete')
247+
->with($lockKey)
248+
->willReturn(true);
249+
250+
// Replace the cache service
251+
Services::injectMock('cache', $cache);
252+
253+
fake(QueueJobModel::class, [
254+
'connection' => 'database',
255+
'queue' => 'test',
256+
'payload' => [
257+
'job' => 'success',
258+
'data' => ['key4' => 'value4'],
259+
'metadata' => [
260+
'taskLockKey' => $lockKey,
261+
'taskLockTTL' => $lockTTL,
262+
'queue' => 'test',
263+
],
264+
],
265+
'priority' => 'default',
266+
'status' => 0,
267+
'attempts' => 0,
268+
'available_at' => 1_702_977_074,
269+
]);
270+
271+
CITestStreamFilter::registration();
272+
CITestStreamFilter::addOutputFilter();
273+
274+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
275+
$this->parseOutput(CITestStreamFilter::$buffer);
276+
277+
CITestStreamFilter::removeOutputFilter();
278+
279+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
280+
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
281+
$this->assertSame('The processing of this job was successful', $this->getLine(4));
282+
}
283+
284+
public function testLockClearedOnFailure(): void
285+
{
286+
$lockKey = 'failure_lock_key';
287+
$lockTTL = 300;
288+
289+
Time::setTestNow('2023年12月19日 14:15:16');
290+
291+
$cache = $this->createMock(CacheInterface::class);
292+
293+
// Set up expectations
294+
$cache->expects($this->once())
295+
->method('save')
296+
->with($lockKey, $this->anything(), $lockTTL)
297+
->willReturn(true);
298+
299+
$cache->expects($this->once())
300+
->method('delete')
301+
->with($lockKey)
302+
->willReturn(true);
303+
304+
// Replace the cache service
305+
Services::injectMock('cache', $cache);
306+
307+
fake(QueueJobModel::class, [
308+
'connection' => 'database',
309+
'queue' => 'test',
310+
'payload' => [
311+
'job' => 'failure',
312+
'data' => ['key' => 'value'],
313+
'metadata' => [
314+
'taskLockKey' => $lockKey,
315+
'taskLockTTL' => $lockTTL,
316+
'queue' => 'test',
317+
],
318+
],
319+
'priority' => 'default',
320+
'status' => 0,
321+
'attempts' => 0,
322+
'available_at' => 1_702_977_074,
323+
]);
324+
325+
CITestStreamFilter::registration();
326+
CITestStreamFilter::addOutputFilter();
327+
328+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
329+
$this->parseOutput(CITestStreamFilter::$buffer);
330+
331+
CITestStreamFilter::removeOutputFilter();
332+
333+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
334+
$this->assertSame('Starting a new job: failure, with ID: 1', $this->getLine(3));
335+
$this->assertSame('The processing of this job failed', $this->getLine(4));
157336
}
158337
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /