2

I'm using the logged_action audit system in PostgreSQL, and I'm encountering a problem where the event_ids are not guaranteed to be committed to the database in ascending order. This is causing my application to skip certain rows during processing.

Currently I am processing rows by looking for event_ids greater than the last one I processed, but event_ids may not be committed to the database in order. For example if the event_ids get inserted in the following order: 1, 2, 3, 5, 6, 4 and my application has already processed 5, it will skip 4.

This is my code defining the logged_actions table and the trigger for inserting rows in this table

CREATE EXTENSION IF NOT EXISTS hstore;
CREATE TABLE logged_actions (
 event_id bigserial primary key,
 table_name text not null,
 action_tstamp_stm TIMESTAMP WITH TIME ZONE NOT NULL,
 action TEXT NOT NULL CHECK (action IN ('I','D','U', 'T')),
 row_data hstore,
 changed_fields hstore
);
CREATE INDEX logged_actions_action_tstamp_tx_stm_idx ON logged_actions(action_tstamp_stm);
CREATE INDEX logged_actions_action_idx ON logged_actions(action);
CREATE OR REPLACE FUNCTION if_modified_func() RETURNS TRIGGER AS $body$
DECLARE
 audit_row logged_actions;
 include_values boolean;
 log_diffs boolean;
 h_old hstore;
 h_new hstore;
 excluded_cols text[] = ARRAY[]::text[];
BEGIN
 IF TG_WHEN <> 'AFTER' THEN
 RAISE EXCEPTION 'if_modified_func() may only run as an AFTER trigger';
 END IF;
 audit_row = ROW(
 nextval('logged_actions_event_id_seq'), -- event_id
 TG_TABLE_NAME::text, -- table_name
 statement_timestamp(), -- action_tstamp_stm
 substring(TG_OP,1,1), -- action
 NULL, NULL -- row_data, changed_fields
 );
 IF TG_ARGV[1] IS NOT NULL THEN
 excluded_cols = TG_ARGV[1]::text[];
 END IF;
 IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN
 audit_row.row_data = hstore(OLD.*) - excluded_cols;
 audit_row.changed_fields = (hstore(NEW.*) - audit_row.row_data) - excluded_cols;
 IF audit_row.changed_fields = hstore('') THEN
 -- All changed fields are ignored. Skip this update.
 RETURN NULL;
 END IF;
 ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN
 audit_row.row_data = hstore(OLD.*) - excluded_cols;
 ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN
 audit_row.row_data = hstore(NEW.*) - excluded_cols;
 ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN
 ELSE
 RAISE EXCEPTION '[if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL;
 RETURN NULL;
 END IF;
 INSERT INTO logged_actions VALUES (audit_row.*);
 RETURN NULL;
END;
$body$
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, public;
CREATE OR REPLACE FUNCTION audit_table(target_table regclass, audit_rows boolean, audit_query_text boolean, ignored_cols text[]) RETURNS void AS $body$
DECLARE
 stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE';
 _q_txt text;
 _ignored_cols_snip text = '';
BEGIN
 EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table;
 EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table;
 IF audit_rows THEN
 IF array_length(ignored_cols,1) > 0 THEN
 _ignored_cols_snip = ', ' || quote_literal(ignored_cols);
 END IF;
 _q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' ||
 target_table ||
 ' FOR EACH ROW EXECUTE PROCEDURE if_modified_func(' ||
 quote_literal(audit_query_text) || _ignored_cols_snip || ');';
 RAISE NOTICE '%',_q_txt;
 EXECUTE _q_txt;
 stm_targets = 'TRUNCATE';
 ELSE
 END IF;
 _q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' ||
 target_table ||
 ' FOR EACH STATEMENT EXECUTE PROCEDURE if_modified_func('||
 quote_literal(audit_query_text) || ');';
 RAISE NOTICE '%',_q_txt;
 EXECUTE _q_txt;
END;
$body$
language 'plpgsql';
SELECT audit_table('orders', 't', 'f', ARRAY[]::text[]);
SELECT audit_table('deposit', 't', 'f', ARRAY[]::text[]);

Then from my application I do

// get the last processed `event_id` from Redis
const lastId = await getLastId();
const query = `select * from logged_actions where event_id > ${lastId}`;
const rows = await db.query(query);
for(const row of rows) {
 await processRow(row)
 
 // save last processed rows event_id in Redis
 await setLastId(row.event_id);
}

If the table has data as below, it will skip event_id 1003, since the application already processed 1004 by the time 1003 was inserted.

event_id action_tstamp_stm
1001 2023年01月01日 00:00:01
1002 2023年01月01日 00:00:02
1004 2023年01月01日 00:00:02
1003 2023年01月01日 00:00:03
1005 2023年01月01日 00:00:03

I would like to avoid locking the logged_actions table as that would significantly degrade the performance of my application.

Is there a way to ensure that event_ids get committed to the database in ascending order without locking the logged_actions table? Ultimately my goal is not to skip any rows from the application layer, while also ensure that every row is processed only once.

asked Nov 30, 2023 at 13:57

1 Answer 1

1

What you want is impossible in PostgreSQL, unless you serialize the transactions, so that no two of them can run in parallel. That is highly undesirable for performance reasons.

The best solution is to allow for some slack by ignoring any table entries that are more recent than — say — 5 seconds. If none of your transactions take that long, you can be certain that no row with an earlier timestamp or event_id can be committed later.

answered Nov 30, 2023 at 15:44

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.