Hey! I recorded a video course!

My first video course is out and it's called: Build an MVP with Elixir
If you like this article, you will also like the course! Check it out here!

Welcome back to another blog post :) Before we begin, one question:

Why do cows have hooves instead of feet?

Because they lactose!

Alright, now let’s dive into today’s topic: How to listen to database changes using Postgres triggers and Elixir

🔗 Why listen at all?

In almost all codebases, you have subsequent processes depending on previous processes. Imagine a sign-up flow for new users. After a user signs up, you might want to start sending marketing emails periodically. This is called a downstream process. The sign-up to the marketing emails depends on the user’s registration. When your software is still simple enough, you usually get away with calling these processes in sequence. Your code might look something like this:

def register_user(params) do
  with {:ok, user} <- Repo.create_user(params),
       :ok <- sign_up_for_marketing_emails(user) do
    :ok
  end
end

In this simple case, you only have one downstream process: the sign-up for the marketing emails. But what if your software becomes more complex and you add more and more such downstream processes? Your register_user/1 function will grow and grow. Maybe your company grows as well and adds more dev teams. That’s great, but now you have more teams adding their own downstream calls to register_user/1, making them all dependent on each other.

Imagine that one of the many calls fails. Usually, the entire user registration will roll back, undoing the changes in every team that placed their downstream call before the failed one. In the worst case, you might have an incomplete registration, where some calls succeed and others fail and you didn’t roll back properly. Now, the teams with failed calls need to somehow fetch the missing user data and reconcile it with everything that might have happened after the registration. It’s simply a mess.

A frequent solution to untangle these dependencies is to implement an event system. Phoenix.PubSub is such a system for example. Event systems have many advantages (and disadvantages), but in this case, they help us to reverse the code dependency between upstream and downstream processes. Without an event system, register_user/1 has to call sign_up_for_marketing_emails/1 directly. register_user/1 is aware of the downstream process and it needs to change if the downstream process changes, for example when we need to provide more info, like which marketing emails the user subscribed to and which ones they didn’t. Our upstream code, register_user/1, depends on the downstream process, sign_up_for_marketing_emails/1.

Now, with an event system, we can reverse that dependency. Our register_user/1 call could simply broadcast an event using PubSub.broadcast/4. Our downstream process can then subscribe and react to the events. Now, our upstream code is unaware of the downstream process. It doesn’t care whether it succeeds or fails or if it changes. Instead, the downstream code now depends on register_user/1 to publish the necessary data. That’s it. If we want to change the marketing email sign-up, we simply handle the received data differently in sign_up_for_marketing_emails/1. No need to change register_user/1. Perfect!

🔗 How to listen?

I mentioned Phoenix.PubSub before and if I’m implementing an Elixir application, that would be my go-to event system if I need to send and react to basic notifications. If I need more advanced systems, Kafka, RabbitMQ, or Amazon's SQS would come to mind. But there’s another, low-effort solution that you probably already have in your tech stack: Postgres.

Postgres offers quick and simple Notifications that can help you react to changes in your database without much overhead. They are particularly interesting if you can’t use Phoenix’s PubSub, for example, if another non-Elixir application also makes changes to your database.

Let’s see how you can set up Postgres notifications in your Phoenix app.

🔗 The Migration

The first step is to add the notifications using Postgres SQL. The migration below adds a trigger to a table called appointments that sends a notification whenever an appointment is canceled or uncanceled. Let’s have a look:

defmodule Demo.Repo.Migrations.AddAppointmentCanceledTrigger do
  use Ecto.Migration

  def up do
    execute """
    CREATE OR REPLACE FUNCTION notify_canceled_changed()
      RETURNS trigger AS $trigger$
      DECLARE
        payload TEXT;
      BEGIN
        IF (TG_OP = 'UPDATE') AND (OLD.canceled != NEW.canceled) THEN
          payload := json_build_object('id',OLD.id,'old',row_to_json(OLD),'new',row_to_json(NEW));
          PERFORM pg_notify('appointments_canceled_changed', payload);
        END IF;

        RETURN NEW;
      END;
      $trigger$ LANGUAGE plpgsql;
    """

    execute """
    CREATE TRIGGER appointments_canceled_changed_trigger
      AFTER UPDATE ON appointments FOR EACH ROW
      WHEN ( OLD.canceled IS DISTINCT FROM NEW.canceled )
      EXECUTE PROCEDURE notify_canceled_changed();
    """
  end

  def down do
    execute """
    DROP TRIGGER appointments_canceled_changed_trigger ON appointments;
    """

    execute """
    DROP FUNCTION notify_canceled_changed();
    """
  end
end

That’s a lot of SQL, but in short, the migration first adds a function called notify_canceled_changed() which sends a notification to the channel appoiments_canceled_changed with the id of the appointment and the before and after state of the appointment encoded as JSON. We add the function as a trigger to the appointments table using CREATE TRIGGER .... The function is only executed if the appointment is updated, so not inserted or deleted, and only if the appointment.canceled boolean field has changed.

In our notify_canceled_changed function, we double-check these validations as a precaution (i.e. the record is updated and canceled has changed). We wouldn’t need to though since Postgres will execute this function only if these requirements are met. But it never hurts to double-check!

You might have noticed that we don’t use an EVENT TRIGGER here. The difference between a TRIGGER and an EVENT TRIGGER is that the latter handles database-wide events (e.g. CREATE TABLE) whereas the former handles events specific to a given table (e.g. INSERT|UPDATE|READ|DELETE| row). We are only interested in changes to the appointments table which is why we use a simple TRIGGER.

Another detail you might have spotted is that we execute the trigger after a row was updated with AFTER UPDATE ON. For our use-case, the distinction between BEFORE and AFTER update doesn’t matter, but if you wanted to change the new data before it is written to the row, you would need to use the BEFORE UPDATE statement. Read this for more details on the difference between the two.

If you want more examples of triggers, here are some.

Now with the migration in place, run mix ecto.migrate to add the function and trigger to your database. However, if you run your application now and update an appointment, you won’t see anything. That’s because we’re not yet listening to such events from our Elixir application. Let’s change that.

🔗 The Listener

We will use the Postgrex.Notifications module to listen to and handle messages from Postgres. Here’s our listener:

defmodule Demo.DatabaseListener do
  use GenServer

  @channel "appointments_canceled_changed"

  def start_link(init_args) do
    GenServer.start_link(__MODULE__, [init_args], name: __MODULE__)
  end

  def init(_args) do
    repo_config = Demo.Repo.config()
    
    {:ok, pid} = Postgrex.Notifications.start_link(repo_config)
    {:ok, ref} = Postgrex.Notifications.listen(pid, @channel)

    {:ok, {pid, ref}}
  end

  def handle_info({:notification, _pid, _ref, @channel, payload}, state) do
    payload = Jason.decode!(payload)
    IO.puts("Received a notification for appointment #{payload["id"]}")
    IO.puts(payload)

    {:noreply, state}
  end
end

We created a GenServer for our listener with its own message queue to handle incoming Postgres messages. Be aware that this listener can easily become a bottleneck if you have lots of messages. If you can’t handle the messages quickly enough, the message queue will fill up and crash your application. If you’re worried about this case, you could create one listener per channel or use a PartitionSupervisor to start more handlers and spread out the work.

Our listener is pretty straightforward. First, we spawn another process that connects to Postgres using our Repo config. Then, we start listening to that process which means that we’ll receive any Postgres messages it might catch. Whenever a notification comes in, our handle_info/2 callback will handle it. In this case, we simply decode its JSON payload and log it to the console.

The last step is to start our listener whenever our application starts. Open your lib/application.ex and add the listener to the supervisor’s children, like this:

# lib/application.ex
def start(_type, _args) do
  children = [
    # Other modules
    Demo.Repo, 
    # Make sure to start the listener AFTER your Repo.
    # Otherwise, your listener won't connect.
    Demo.DatabaseListener
  ]

  opts = [strategy: :one_for_one, name: Demo.Supervisor]
  Supervisor.start_link(children, opts)
end

And that’s it! Now, whenever we start our application, we establish a link to Postgres and listen to messages on the given channel. If such a message comes in, our handle_info/2 will log it. Let’s try it out!

🔗 The Test

Now with everything in place, we can test-run the setup. Let’s create and update an appointment:

iex> {:ok, a} = Demo.Appointments.create_appointment(%{canceled: false})
{:ok, %Appointment{...}}
iex> {:ok, a} = Demo.Appointments.update_appointment(a, %{canceled: true})
{:ok, %Appointment{...}}
"Received a notification for appointment 1"
%{
  "id" => 1,
  "old" => %{
    "id" => 1,
    "canceled" => false,
    "inserted_at" => "2023-02-11T17:36:56",
    "updated_at" => "2023-02-11T17:37:03"
  },
  "new" => %{
    "id" => 1,
    "canceled" => true,
    "inserted_at" => "2023-02-11T17:36:56",
    "updated_at" => "2023-02-11T17:44:48"
  }
}

After updating the canceled-flag of our appointment, we received a message from Postgres! It works! Yey!

🔗 Conclusion

And that’s it! I hope you enjoyed this article! If you have questions or comments, let’s discuss them on Twitter. Follow me on Twitter or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until the next time! Cheerio 👋

Liked this post?

Get notified about new posts