Handle Aurora PostgreSQL Triggers with Lambda

databasepostgresqlaws

PostgreSQL supports BEFORE/AFTER triggers like MySQL, and can execute Python if PL/Python is installed. It also has a Pub/Sub feature (NOTIFY/LISTEN) similar to Redis, which combined with triggers enables real-time notification of data changes.

CREATE FUNCTION notify_change() RETURNS TRIGGER AS $$
  BEGIN
    PERFORM pg_notify('data_changes', json_build_object('table', TG_TABLE_NAME, 'id', NEW.id)::text);
    RETURN NEW;
  END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER users_notify
  AFTER INSERT ON users
  FOR EACH ROW
  EXECUTE FUNCTION notify_change();

LISTEN data_changes; -- from another session

INSERT INTO users (name) VALUES ('Alice');
-- Notified: {"table": "users", "id": 1}

This allows programmatic handling of triggers even in databases like Aurora where PL/Python cannot be installed.

$ PGPASSWORD='*****' psql -h *****.cluster-*****.ap-northeast-1.rds.amazonaws.com -U postgres -d testdb -c "SELECT * FROM pg_extension;"
  oid  | extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
-------+---------+----------+--------------+----------------+------------+-----------+--------------
 14501 | plpgsql |       10 |           11 | f              | 1.0        |           |
(1 row)

$ cat listen.py
import psycopg

conn = psycopg.connect(
  host="*****.cluster-*****.ap-northeast-1.rds.amazonaws.com",
  dbname="testdb",
  user="postgres",
  password="*****",
  autocommit=True
)

conn.execute("LISTEN data_changes;")

try:
  for notify in conn.notifies():
    print(f"Received: {notify.payload}")
except KeyboardInterrupt:
  print("\nStopped listening")
finally:
  conn.close()

However, this approach requires constantly listening for notifications, and if disconnected, changes are missed.

Aurora has the aws_lambda extension, whose installation allows invoking Lambda functions.

CREATE IF NOT EXISTS EXTENSION aws_lambda CASCADE;

Grant permissions to invoke the function.

const auroraLambdaRole = new iam.Role(this, 'AuroraLambdaRole', {
  assumedBy: new iam.ServicePrincipal('rds.amazonaws.com'),
});

testFunction.grantInvoke(auroraLambdaRole);

const cfnDbCluster = dbCluster.node.defaultChild as rds.CfnDBCluster;
cfnDbCluster.associatedRoles = [{
  roleArn: auroraLambdaRole.roleArn,
  featureName: 'Lambda',
}];

Then you can call aws_lambda.invoke() from a trigger, eliminating the need to maintain connections.

\x
SELECT * FROM aws_lambda.invoke(
  aws_commons.create_lambda_function_arn('(function_name)', 'ap-northeast-1'),
  '{"message": "Hello from Aurora"}'::json
  -- , 'Event' /* async */
);

-[ RECORD 1 ]----+----------------------------------------------------------------------------------------------------------------------
status_code      | 200
payload          | {"statusCode": 200, "body": "{\"message\": \"Hello from Lambda!\", \"event\": {\"message\": \"Hello from Aurora\"}}"}
executed_version | $LATEST
log_result       |