taylor.town about now spam rss

How/Why to Sweep Async Tasks Under a Postgres Table

I like slim and stupid servers, where each endpoint wraps a very dumb DB query.

Dumb queries are fast. Fast queries make websites smooth and snappy. Keep those click/render loops sacred.

Sweep complexity under a task table:

router.post("/signup", async ctx => {
  const { email, password } = await ctx.request.body().value;
  const [{ usr_id } = { usr_id: null }] = await sql`
    with usr_ as (
      insert into usr (email, password)
      values (${email}, crypt(${password}, gen_salt('bf')))
      returning *
    ), task_ as (
      insert into task (task_type, params)
      values ('SEND_EMAIL_WELCOME', ${sql({ usr_id })})
    )
    select * from usr_
  `;
  await ctx.cookies.set("usr_id", usr_id);
  ctx.response.status = 204;
});

This example uses CTEs with postgres.js.

Of course using mailgun.send is easier than queuing it in a task table. Adding indirection rarely makes systems less complex. But somehow I'm here to advocate exactly that. You may ignore my manifesto and skip to my implementation at the end.

Secret Surface Error Area

Customers don't care about cosmic rays. They want a thing. More imporantly, they want immediate confirmation of their thing. They want to offload the mental burden of their goal.

For them to delegate that responsibility, your DB is probably the only thing that matters. Once information is committed to your database, you can confidently say "we'll take it from here".

You can send emails later. You can process payments later. You can do almost anything later. Just tell your customer they can continue with their goddamn day.

Delight your customers with clear feedback.

Delight your computers by writing to one place at a time.

Never Handroll Your Own Two-Phase Commit

Writing to two places at "the same time" is sinful.

When the gods gave us computer storage, the people became unhappy. They cried, "What is consistency? Where are our guarantees? Why must I fsync?" And so they wore sackloth and ashes for many years in their coding caves.

The people were overjoyed when the gods scrawled Postgres (and other inferior databases) onto stone tablets. The holy "database transactions" allowed humankind to pretend that they could read/write to multiple places at the same time.

To this day, databases sometimes work.

But some developers deny the works of the gods. They mix multiple tools, and so commit the sin of writing to multiple places.

"Oh, we'll just send a pubsub message after we insert the row." But data is lost. Message before insert row? Data lost. All blasphemers are doomed to reinvent two-phase commit.

One Way To Do Things

I like LEGO. I like Play-Doh. I like Lincoln Logs. I do not, however, like mixing them together.

It's painful to investigate systems when state is spread across SQS, Redis, PubSub, Celery, Airflow, etc. I shouldn't have to open a local detective agency find out why a process isn't running as expected.

Most modern projects use SQL. Because I dislike mixing systems, I try to take SQL as far as possible.

Of all the SQL databases, Postgres currently offers the best mix of modern first-class features and third-party extensions. Postgres can be your knock-off Kafka, artificial Airflow, crappy Clickhouse, nasty Elasticsearch, poor man's PubSub, on-sale Celery, etc.

Sure, Postgres doesn't have all the fancy features of each specialized system. But colocating queue/pipeline/async data in your main database eliminates swaths of errors. In my experience, transaction guarantees supercede everything else.

TODO-Driven Development

while (true) {
  // const rows = await ...
  for (const { task_type, params } of rows)
    if (task_type in tasks) {
      await tasks[task_type](tx, params);
    } else {
      console.error(`Task type not implemented: ${task_type}`);
    }
}

With a simple retry system, asynchronous decoupling magically tracks all your incomplete flows.

No need to rely upon Jira -- bugs and unimplemented tasks will be logged and retried. Working recursively from error queues is truly a wonderful experience. All your live/urgent TODOs are printed to the same place (in development and in production).

With this paradigm, you'll gravitate towards scalable pipelines. Wishful thinking makes natural architecture.

Human Fault Tolerance

Many systems foist useless retry-loops onto humans.

Humans should receive feedback for human errors. But humans should not receive feedback for problems that can be handled by computers (and their software developers).

Remember, all your retry-loops have to happen somewhere. Be careful what you delegate to customers and developers. Your business's bottom-line is bounded by human patience; computers have infinitely more patience than humans.

Show Me The Code

Here's the task table:

create table task
( task_id bigint primary key not null generated always as identity
, task_type text not null -- consider using enum
, params jsonb not null -- hstore also viable
, created_at timestamptz not null default now()
, unique (task_type, params) -- optional, for pseudo-idempotency
)

Don't use serial in Postgres.

Here's the code for the task worker:

const tasks = {
  SEND_EMAIL_WELCOME: async (tx, params) => {
    const { email } = params;
    if (!email) throw new Error(`Bad params ${JSON.stringify(params)}.`);
    await sendEmail({ email, body: "WELCOME" });
  },
};

(async () => {
  while (true) {
    try {
      while (true) {
        await sql.begin(async (tx: any) => {
          const rows = await tx`
            delete from task
            where task_id in
            ( select task_id
              from task
              order by random() -- use tablesample for better performance
              for update
              skip locked
              limit 1
            )
            returning task_id, task_type, params::jsonb as params
          `;
          for (const { task_type, params } of rows)
            if (task_type in tasks) {
              await tasks[task_type](tx, params);
            } else {
              throw new Error(`Task type not implemented: ${task_type}`);
            }
          if (rows.length <= 0) {
            await delay(10 * 1000);
          }
        });
      }
    } catch (err) {
      console.error(err);
      await delay(1 * 1000);
    }
  }
})();

A few notable features of this snippet: