Leveraging PostgreSQL Notifications for Real-Time Updates in Python Applications

Introduction:

In the world of database-driven applications, ensuring real-time communication between different components is essential. PostgreSQL, a powerful open-source relational database, provides a feature known as "Notifications" that enables the database to notify listening clients about events. In this blog post, we'll explore the implementation of a PostgreSQL notification system to achieve real-time updates when rows are updated in a table.

Setting the Stage:

Our scenario involves a Python application that utilizes PostgreSQL as its backend database. We have a table named job representing job statuses, and we want to receive notifications whenever a job's status is updated. To achieve this, we'll implement a trigger function in PL/pgSQL to notify a channel when specific conditions are met during a row update. We are using Job and job_states table combined here, you can use your own table as well and write custom logic to create a notification, here we are notifying when there is a change in job_status.

  1. Creating the Trigger Function:
CREATE OR REPLACE FUNCTION notify_job_status_update()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
DECLARE
    notification_data JSON;
BEGIN
    IF (OLD.status != NEW.status AND (NEW.status = 3 OR NEW.status = 4)) THEN
        SELECT 
            jsonb_build_object(
                'job_id', NEW.job_id,
                'output_file_path', NEW.output_file_path,
                'error_description', NEW.error_description,
                'job_status', js.status,
                'context', NEW.context
            )::json
        INTO notification_data
        FROM
            job_states js
        WHERE
            js.id = NEW.status;

        PERFORM pg_notify('status_update', notification_data::text);
    END IF;

    RETURN NEW;
END;
$function$
  1. Creating the Trigger:
CREATE TRIGGER job_status_update_trigger AFTER
UPDATE
    ON
    job FOR EACH ROW EXECUTE FUNCTION notify_job_status_update();
  1. Listening for Notifications in Python:
import json
import logging
import psycopg2.extensions
from app import cfg
from app.dao import conn
from app.otel_settings.logging import setup_logging

from app.utils.sqs_util import send_sqs_task

logger = logging.getLogger("my-logger")
logger.debug("Logging context initialized successfully")

conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

cur = conn.cursor()
cur.execute("LISTEN status_update;")
logger.info("Listening for notifications on the 'status_update' channel")


def batch_update_listener():
    while True:
        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            logger.info(
                f"Received notification on channel '{notify.channel}': {notify.payload}"
            )
            payload = json.loads(notify.payload)
            print(payload)


if __name__ == "__main__":
    batch_update_listener()

Conclusion:

In this blog post, we've explored the implementation of a PostgreSQL notification system to achieve real-time updates in a Python application. By utilizing triggers and listening for notifications, we've established a seamless communication channel between our PostgreSQL database and Python components.

This approach enhances the responsiveness and efficiency of our application, allowing us to react promptly to changes in the database. Consider integrating PostgreSQL notifications into your applications to unlock the power of real-time updates.