Hey! I wrote a book!

My new book is out and it's called: Building Table Views with Phoenix LiveView.
If you like this article, you will also like the book! Check it out at PragProg!

Before we dive into today’s blog post, please consider the following question:

What has two butts and kills people?

The answer is obvious:

An assassin

Just as assassins have two butts, we will also talk about two outstanding things today: insert and update operations on the database. In particular, this post provides a complete guide to their funky fusion: the upsert operation.

Upserts are a combination of insert and update. They allow you to update a schema without checking if it exists in your database first. A conventional update throws an error if it doesn’t exist. An upsert updates the schema if it exists and creates it if it doesn’t.

🔗 When to use Upserts

Upserts are useful in situations where you don’t care whether a schema exists or not, you just want to update it. Let’s say you want to track how often a user calls your API on a given day. In this case, you don’t care whether it’s the first or the hundredth API call. You just want to increase the number of API calls the user made.

Without upserts, you would need two database calls: First, a read to fetch any existing schema for the user and today’s date combination. If a schema exists, you bump its counter and execute an update. If no schema exists, you insert a new one. In both scenarios, you always need a read and a write call. With upserts, you only need one write call to the database (the read is optional though, more about this later).

So, instead of doing this:

schema = Repo.get_by(Usage, user_id: user.id, date: Date.utc_today())

if schema do
  changeset = Ecto.Changeset.change(schema, counter: schema.counter + 1)
  Repo.update(changeset)
else
  new_schema = %Usage{
    user_id: user.id,
    date: Date.utc_today(),
    counter: 1
  }

  Repo.insert(new_schema)
end

You can simply do this:

Repo.insert(
  %Usage{user_id: user.id, date: Date.utc_today(), counter: 1}, 
  on_conflict: [inc: [counter: 1], set: [updated_at: DateTime.utc_now()]],
  conflict_target: [:user_id, :date]
)

Much better! We condensed 14 lines of code into 5, reduced the complexity of our code, and halved its database call! We replaced our insert and update calls with a single upsert. The options that made this possible are on_conflict and conflict_target. Let’s have a closer look at them.

🔗 Configuring an Upsert

The reason upserts only need one database call is the on_conflict option for Ecto.Repo.insert/2. It instructs the database what to do when you try to insert a schema that violates a uniqueness constraint. A uniqueness constraint could be a unique_index on a specific field or a primary_key. If you try to insert a duplicate into one of these fields, Ecto will throw an error like this:

** (Ecto.ConstraintError) constraint error when attempting to insert struct:

    * usages_user_id_date_index (unique_constraint)

You can set the on_conflict option to any option described here. Without defining an on_conflict-option, Ecto raises an error by default. If you want to ignore the error, you can instruct it to do :nothing. You can overwrite the existing schema with :replace_all, overwrite all fields except for some with {:replace_all_except, fields}, or overwrite only specific fields with {:replace, fields}.

You can also define an update operator like inc, set, push, or pull.

  • inc will increase (or decrease if the given value is negative) a numeric field with a given value.
  • set overwrites a field with a new value.
  • push appends a value to an array field.
  • pull removes a value from an array field.

Lastly, you can also provide a custom update query like this:

query =
  from(u in Usage,
    update: [
      inc: [counter: 1],
      set: [updated_at: ^DateTime.utc_now()]
    ]
  )

Repo.insert(Usage, %{...}, on_conflict: query, conflict_target: [...])

The second option needed for implementing upserts is the conflict_target option. It specifies the constraint for which the on_conflict operation should step in as a fallback. In our case, we define the [:user_id, :date] constraint. If our insertion violates that constraint, Postgres will execute the on_conflict fallback. But if our insertion violates another constraint (e.g. the primary_key constraint), Postgres will not execute our fallback and return a constraint error instead. We don’t want to update our schema if we violate just any constraint. conflict_target makes sure of that.

Heads up: The conflict_target option is only available for Postgres and not for MySQL databases. In MySQL, you can just omit it.

🔗 Preparing a Schema for Upserts

Upserts rely on uniqueness constraints in the database. We usually create such constraints in the migration that creates the table of the schema. Let’s have a look at different constraints and how to create them.

First, we can leverage the uniqueness constraint of primary keys for our upserts. In such cases, we typically use a field other than the autogenerated id as the primary key for the schema. In our example above, we use a combination of user_id and date to identify the correct database entry. Let’s create a migration that sets these two fields as - what Postgres calls - a “composite primary key”:

defmodule Demo.Repo.Migrations.CreateUsages do
  use Ecto.Migration

  def change do
    create table(:usages, primary_key: false) do
      add(
        :user_id, 
        references(:users, on_delete: :nothing), 
        primary_key: true
      )
      add(:date, :date, primary_key: true)
      add(:counter, :integer, default: 0)

      timestamps()
    end
  end
end

This migration creates a usages table and adds a uniqueness constraint for the user_id and date combination since these columns act as primary keys for the table. If you prefer having an autogenerated id instead of a composite primary key, you can also create the uniqueness constraint for the two columns yourself:

defmodule Demo.Repo.Migrations.CreateUsages do
  use Ecto.Migration

  def change do
    create table(:usages) do
      add(:user_id, references(:users, on_delete: :nothing))
      add(:date, :date)
      add(:counter, :integer, default: 0)

      timestamps()
    end

    create(unique_index(:usages, [:user_id, :date]))
  end
end

The migration above creates an autogenerated id primary key and adds a uniqueness constraint on the [:user_id, :date] columns. Both approaches work just fine. It is up to you to decide which one you prefer.

🔗 Upserting Usage Data

Here is an example function that upserts data into the usages table:

def upsert_usage(user, increment \\ 1, date \\ Date.utc_today()) do
  Repo.insert(
    %Usage{user_id: user.id, date: date, counter: increment}, 
    on_conflict: [
      inc: [counter: increment], 
      set: [updated_at: DateTime.utc_now()]
    ],
    conflict_target: [:user_id, :date]
  )
end

One nitty-gritty detail is the second part of the on_conflict instruction: The set: [updated_at: DateTime.utc_now()] part. The problem is that the on_conflict option is similar to the Repo.update_all/3 operation. It does not update autogenerated fields like inserted_at or updated_at. We need to set it ourselves. Usually, we don’t create autogenerated timestamps when using upserts and rely on the date field instead. This allows us to remove the set operation above. However, adding the timestamps is unproblematic as long as you remember to update them manually in your upsert function.

🔗 Testing your Upserts

Testing your upsert functionality is straightforward. Have a look at the example test below:

test "upsert_usage/1 creates and then updates a usage", %{user: user} do
  assert [] = Usages.list_usages()
  
  assert {:ok, _usage} = Usages.upsert_usage(user)
  
  assert [usage] = Usages.list_usages()
  assert usage.user_id == user.id
  assert usage.date == Date.utc_today()
  assert usage.counter == 1

  assert {:ok, _usage} = Usages.upsert_usage(user)

  assert [usage] = Usages.list_usages()
  assert usage.counter == 2
end

You might wonder why we fetched the schema from the database using Usages.list_usages/0 instead of using the one returned in {:ok, usage}. The reason is that the on_conflict fallback does not return the updated schema. It returns the schema that we tried to insert instead, which has a counter of 1. This becomes clearer when we run the upsert in an IEx shell:

# Return values were shortened for brevity
iex> Usages.upsert_usage(user)
{:ok, %Usage{counter: 1}}

iex> Usages.upsert_usage(user)
{:ok, %Usage{counter: 1}}

iex> Usages.list_usages()
[%Usage{counter: 2}]

As you can see, Usages.upsert_usage/1 always returns a counter of 1. You have to fetch the updated counter yourself. There are different ways to achieve this, so let’s have a look at them.

🔗 Fetching the new State

If you want to return the updated schema from your upsert function, you need to add the returning: true option on the Repo.insert/2 like this:

Repo.insert(
  %Usage{user_id: user.id, date: date, counter: increment}, 
  on_conflict: [
    inc: [counter: increment], 
    set: [updated_at: DateTime.utc_now()]
  ],
  conflict_target: [:user_id, :date],
  # Add the following option:
  returning: true
)

This option instructs Ecto to read after write, which means that it fetches the updated schema from the database after the on_conflict operation. This means that you perform two database calls, a write and then a read!. If you care about the performance of your upserts, don’t use this option since it will always fetch the entire updated schema by default. Instead, you can fetch the schema whenever you need it manually.

Now, with the returning: true option in place, let’s repeat our instructions in the IEx shell:

# Return values were shortened for brevity
iex> Usages.upsert_usage(user)
{:ok, %Usage{counter: 1}}

iex> Usages.upsert_usage(user)
{:ok, %Usage{counter: 2}}

iex> Usages.list_usages()
[%Usage{counter: 2}]

This looks more like expected! Ecto now fetches the entire schema after the update.

If you want to fetch only certain fields and not the entire schema, you can specify them by returning: [:my_field, :another_field]. This will instruct Ecto to only read the specified fields after the update and not the entire schema.

Another option is to add read_after_writes: true to the fields in your schema definition:

defmodule Demo.Usages.Usage do
  use Ecto.Schema
  import Ecto.Changeset

  schema "usages" do
    field(:date, :date)
    field(:path, :string)
    # Instruct Ecto to always read this field 
    # from the database after every insert, update, or delete.
    field(:counter, :integer, read_after_writes: true)

    timestamps()
  end
end

If we recompile our code and execute the IEx instructions again, it will show the same result. Only the counter-field is read after the on_conflict operation finishes now. Be aware that Ecto will always read the counter field from the database for all other database operations as well. So, even if you update another field of the schema, Ecto will always fetch its counter-field. Even when the counter field didn’t change.

Heads up: The returning: true and read_after_writes: true options only work for Postgres and raise an error in MySQL.

🔗 Using Upserts with Binary IDs

Ecto generates a new id whenever it tries to insert a schema with a binary_id. Upserting such schemas can become confusing if you don’t use returning: true. Have a look at the following IEx instructions:

# Return values were shortened for brevity
iex> Usages.upsert_usage(user)
{:ok, %Usage{
  id: "c3181047-ee1a-4621-94bc-e3a4b0a88d12", 
  user_id: 1, 
  counter: 1
}}

iex> Usages.upsert_usage(user)
{:ok, %Usage{
  id: "e8bd4951-ea2c-44b0-8123-45f314d6010f", 
  user_id: 1, 
  counter: 1
}}

iex> Usages.list_usages()
[%Usage{id: "c3181047-ee1a-4621-94bc-e3a4b0a88d12", user_id: 1, counter: 2}]

At first glance, you might wonder why Ecto inserts a new schema with a new id for the same user instead of upserting an existing one. The confusing part here is that it doesn’t. When we upsert our schema for the first time, Ecto generates a binary_id and saves it to the database. All good. When we upsert the schema again, Ecto generates a binary_id again and tries to insert it. Because of the constraint conflict though, that new binary_id is never persisted. The database will perform our on_conflict operation instead and ignore the generated id.

Now comes the tricky part: Once the on_conflict operation finishes, Ecto will return the schema it tried to insert, not the existing schema that was updated! That’s why it returns a completely new binary_id instead of the existing one! Once we fetch the schema from the database, we see that it uses the binary_id generated during the first upsert. So, all works as expected.

When you run this code in production, this detail probably won’t bother you. But don’t forget this fact when you write your tests! Look at the following test and try to find the error we made:

test "upsert_usage/1 actually upserts a schema", %{user: user} do
  assert {:ok, usage} = Usages.upsert_usage(user)
  assert %{counter: 1} = Repo.get(Usage, usage.id)

  assert {:ok, usage} = Usages.upsert_usage(user)
  assert %{counter: 2} = Repo.get(Usage, usage.id)
end

Did you see the mistake we made?

Unless you added returning: true or at least returning: [:id] to the upsert function, the second assertion will fail. We fetched the updated schema with the new but ignored id and not the original id. But our database found a constraint violation and ignored the new id! So, the second Repo.get(Usage, usage.id) will return nil.

Here is the test again, this time without the mistake:

test "upsert_usage/1 actually upserts a schema", %{user: user} do
  assert {:ok, original_usage} = Usages.upsert_usage(user)
  assert %{counter: 1} = Repo.get(Usage, usage.id)

  assert {:ok, _usage} = Usages.upsert_usage(user)
  assert %{counter: 2} = Repo.get(Usage, original_usage.id)
end

🔗 Exploring more funky ID behaviors

Another interesting behavior of upserts is that they cause gaps between primary keys. Have a look at the following IEx instructions which upsert a schema that has an integer primary key. For this demo, we added returning: true to the upsert function.

# Return values were shortened for brevity
iex> Usages.upsert_usage(user_1)
{:ok, %Usage{id: 1, user_id: 1, counter: 1}}

iex> Usages.upsert_usage(user_1)
{:ok, %Usage{id: 1, user_id: 1, counter: 2}}

iex> Usages.upsert_usage(user_2)
{:ok, %Usage{id: 3, user_id: 2, counter: 1}}

iex> Usages.list_usages()
[%Usage{id: 1}, %Usage{id: 3}]

That’s weird. Since we only insert two schemas, we would expect the second schema to have an id of 2, but it doesn’t! There’s a gap between the id of the first schema (1) and the id of the second schema (3) of exactly the number of upserts we executed. But why is that?

The reason is complex and described in more detail here, but this is the short version: Before Postgres does anything with the data you provide, it resolves the schema’s default values. So, if you define a default value for a field and don’t provide data for that field in your insert request, Postgres will fill in its default value. For integer IDs, that default value is the next number in a sequence of IDs for the table. Whenever you insert a schema, Postgres fetches the next ID from the sequence which increments the sequence by one. This way, it guarantees that the next ID has never been used before.

Practically, our upsert is an insert with a fallback, and therefore Postgres fills in its id, even if the insert is aborted due to a constraint violation! So, Postgres increments the sequence even if we haven’t inserted anything and the on_conflict operation was executed. Once we insert the next schema, it will have the id of the previous schema plus every upsert we called since plus one. Funny, right?

In practice, this fun fact probably has no implication (how often do you check for gaps in your primary keys anyways?). It is important to be aware of it though. The default id for Postgres tables is a SERIAL integer with 4 bytes. Its maximum value is 2_147_483_647. Imagine that you upsert your usage counter 10 times per second. After almost 7 years, your sequence of IDs will reach its maximum and you won’t be able to insert new schemas or upsert the existing ones anymore! This scenario is not unrealistic for large enterprise software, so keep this limitation in mind.

If you want to be on the safe side, you can always use a BIGSERIAL integer as a primary key with 8 bytes and a maximum value of 9_223_372_036_854_775_807. It would take around 29 billion years to reach its maximum, which is roughly 21 billion years after the sun explodes. So, you’ll probably be fine 👍

🔗 Conclusion

This has been a thorough deep-dive into all things upserts with Ecto. I hope you enjoyed it! If you have questions or comments, let’s discuss them on Twitter. Follow me on Twitter or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until then, thanks for reading!

Get notified about new posts