I'm using the logged_action audit system in PostgreSQL, and I'm encountering a problem where the event_id
s are not guaranteed to be committed to the database in ascending order. This is causing my application to skip certain rows during processing.
Currently I am processing rows by looking for event_id
s greater than the last one I processed, but event_id
s may not be committed to the database in order. For example if the event_id
s get inserted in the following order: 1, 2, 3, 5, 6, 4 and my application has already processed 5, it will skip 4.
This is my code defining the logged_actions
table and the trigger for inserting rows in this table
CREATE EXTENSION IF NOT EXISTS hstore;
CREATE TABLE logged_actions (
event_id bigserial primary key,
table_name text not null,
action_tstamp_stm TIMESTAMP WITH TIME ZONE NOT NULL,
action TEXT NOT NULL CHECK (action IN ('I','D','U', 'T')),
row_data hstore,
changed_fields hstore
);
CREATE INDEX logged_actions_action_tstamp_tx_stm_idx ON logged_actions(action_tstamp_stm);
CREATE INDEX logged_actions_action_idx ON logged_actions(action);
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $body$
DECLARE
audit_row logged_actions;
include_values boolean;
log_diffs boolean;
h_old hstore;
h_new hstore;
excluded_cols text[] = ARRAY[]::text[];
BEGIN
IF TG_WHEN <> 'AFTER' THEN
RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
END IF;
audit_row = ROW(
nextval('logged_actions_event_id_seq'), -- event_id
TG_TABLE_NAME::text, -- table_name
statement_timestamp(), -- action_tstamp_stm
substring(TG_OP,1,1), -- action
NULL, NULL -- row_data, changed_fields
);
IF TG_ARGV[1] IS NOT NULL THEN
excluded_cols = TG_ARGV[1]::text[];
END IF;
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*) - excluded_cols;
audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
IF audit_row.changed_fields = hstore('') THEN
-- All changed fields are ignored. Skip this update.
RETURN NULL;
END IF;
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(OLD.*) - excluded_cols;
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
audit_row.row_data = hstore(NEW.*) - excluded_cols;
ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
ELSE
RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
RETURN NULL;
END IF;
INSERT INTO logged_actions VALUES (audit_row.*);
RETURN NULL;
END;
$body$
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, public;
CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean, ignored_cols text[]) RETURNS void AS $body$
DECLARE
stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE';
_q_txt text;
_ignored_cols_snip text = '';
BEGIN
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table;
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table;
IF audit_rows THEN
IF array_length(ignored_cols,1) > 0 THEN
_ignored_cols_snip = ', ' || quote_literal(ignored_cols);
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
target_table ||
' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
quote_literal(audit_query_text) || _ignored_cols_snip || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
stm_targets = 'TRUNCATE';
ELSE
END IF;
_q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' ||
target_table ||
' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func('||
quote_literal(audit_query_text) || ');';
RAISE NOTICE '%',_q_txt;
EXECUTE _q_txt;
END;
$body$
language 'plpgsql';
SELECT audit_table('orders', 't', 'f', ARRAY[]::text[]);
SELECT audit_table('deposit', 't', 'f', ARRAY[]::text[]);
Then from my application I do
// get the last processed `event_id` from Redis
const lastId = await getLastId();
const query = `select * from logged_actions where event_id > ${lastId}`;
const rows = await db.query(query);
for(const row of rows) {
await processRow(row)
// save last processed rows event_id in Redis
await setLastId(row.event_id);
}
If the table has data as below, it will skip event_id
1003, since the application already processed 1004 by the time 1003 was inserted.
event_id | action_tstamp_stm |
---|---|
1001 | 2023年01月01日 00:00:01 |
1002 | 2023年01月01日 00:00:02 |
1004 | 2023年01月01日 00:00:02 |
1003 | 2023年01月01日 00:00:03 |
1005 | 2023年01月01日 00:00:03 |
I would like to avoid locking the logged_actions
table as that would significantly degrade the performance of my application.
Is there a way to ensure that event_id
s get committed to the database in ascending order without locking the logged_actions
table? Ultimately my goal is not to skip any rows from the application layer, while also ensure that every row is processed only once.
1 Answer 1
What you want is impossible in PostgreSQL, unless you serialize the transactions, so that no two of them can run in parallel. That is highly undesirable for performance reasons.
The best solution is to allow for some slack by ignoring any table entries that are more recent than — say — 5 seconds. If none of your transactions take that long, you can be certain that no row with an earlier timestamp or event_id
can be committed later.