Let's assume you have a task FOO
that can be queued once every minute, and a pool of 50 workers that can be paused. The queue is paused for 10 minutes, and 10 FOO
tasks are queued. When the queue is resumed, the 10 FOO
tasks will be executed almost concurrently (because there are more workers than tasks).
In this case, I need to ensure that no more than 1 FOO
task per minute (time can vary) is performed.
One solution, using Redis, is to take advantage of Redis atomic and the TTL
of a key. When a FOO
task starts, it checks if the key worker:FOO
exists. If does, then it exists, if it does not it sets the value and a TTL
to the maximum frequency. This is easy to achieve using SETNX worker:FOO whatever
and then using TTL worker:FOO
if the previous command returned 1.
Because SETNX
is atomic, I won't fall into the case where two FOO
tasks are executed because of the race condition between the GET and the SET.
Now the question is: what is the correct way to achieve the same result using PostgreSQL? I can have a table with a key
and a executed_on
timestamp value, but how can I ensure that there is no case where two FOO
tasks are both executed because of the delay between FOO 1
checks the record and writes a lock?
1 Answer 1
Since you're trying to serialize work, I'd update a record in a table.
CREATE TABLE task_keys (
task varchar(10) primary key,
last_executed timestamp with time zone not null,
by_worker_id integer
);
INSERT INTO task_keys(task, last_executed)
VALUES ('FOO', '-infinity');
then to see if you can run a task yet:
UPDATE task_keys SET
last_executed = current_timestamp,
by_worker = 1ドル
WHERE task = 'FOO'
AND last_executed < (current_timestamp - INTERVAL '1' MINUTE)
RETURNING *;
The isolation rules of READ COMMITTED
guarantee that if this query successfully updates the table and returns a row, no other query can have concurrently done so. A row lock is taken on the relevant task_keys
row. If another UPDATE
tries to affect the same row, it'll wait until the row lock is released by a commit or rollback of the holding transaction... then it will re-check the WHERE
clause. If the other tx committed then the WHERE
clause will no longer match, so it'll affect zero rows.
See the documentation on transaction isolation.
If you need concurrency this gets a little tricker. What you really want is a token pool that refills on a timer, where workers can grab tokens from the pool to do work. That's effectively what we're doing here, with one token - so one option is to add more rows for the same task and grab the first task where the last_executed timestamp is old enough.
There are two flaws with this whole approach though:
- It doesn't know when a task finishes, so long running tasks could overlap; and
- It doesn't care if a task succeeds or fails
To solve those you need to use a proper work queue implementation. Those are currently very difficult to implement correctly in the database, so I suggest you look at using an external message queue / work queue system to manage them. In PostgreSQL 9.5 the new FOR UPDATE SKIP LOCKED
feature will make implementing work queues like this in the database quite simple, though.
BTW, advisory locking is often a good choice for this sort of thing, but it won't help you with the need to expire the lock automatically after a certain amount of time.
-
"advisory locks won't help to expire the lock automatically" - how would the lock obtained through an UPDATE expire automatically? If the update is never committed, it won't expire either. Or am I missing something?user1822– user18222015年04月08日 12:24:25 +00:00Commented Apr 8, 2015 at 12:24
-
@a_horse_with_no_name The update is committed, and sets a timestamp for when the task started. So it works much like the redis approach outlined in the question. But isn't very smart about failed jobs, when jobs finish, etc; this is not a proper queue system which is why I'm pointing the OP at proper queuing tools too.Craig Ringer– Craig Ringer2015年04月08日 13:23:44 +00:00Commented Apr 8, 2015 at 13:23