How/Why to Sweep Async Tasks Under a Postgres Table
I like slim and stupid servers, where each endpoint wraps a very dumb DB query.
Dumb queries are fast. Fast queries make websites smooth and snappy. Keep those click/render loops sacred.
Sweep complexity under a task
table:
router.post("/signup", async ctx => {
const { email, password } = await ctx.request.body().value;
const [{ usr_id } = { usr_id: null }] = await sql`
with usr_ as (
insert into usr (email, password)
values (${email}, crypt(${password}, gen_salt('bf')))
returning *
), task_ as (
insert into task (task_type, params)
values ('SEND_EMAIL_WELCOME', ${sql({ usr_id })})
)
select * from usr_
`;
await ctx.cookies.set("usr_id", usr_id);
ctx.response.status = 204;
});
This example uses CTEs with postgres.js.
Of course using mailgun.send
is easier than queuing it in a task
table.
Adding indirection rarely makes systems less complex. But somehow I'm here to
advocate exactly that. You may ignore my manifesto and
skip to my implementation at the end.
- Secret Surface Error Area
- Never Handroll Your Own Two-Phase Commit
- One Way To Do Things
- TODO-Driven Development
- Human Fault Tolerance
- Show Me The Code
Secret Surface Error Area
Customers don't care about cosmic rays. They want a thing. More imporantly, they want immediate confirmation of their thing. They want to offload the mental burden of their goal.
For them to delegate that responsibility, your DB is probably the only thing that matters. Once information is committed to your database, you can confidently say "we'll take it from here".
You can send emails later. You can process payments later. You can do almost anything later. Just tell your customer they can continue with their goddamn day.
Delight your customers with clear feedback.
Delight your computers by writing to one place at a time.
Never Handroll Your Own Two-Phase Commit
Writing to two places at "the same time" is sinful.
When the gods gave us computer storage, the people became unhappy. They cried,
"What is consistency? Where are our guarantees? Why must I fsync
?" And so they
wore sackloth and ashes for many years in their coding caves.
The people were overjoyed when the gods scrawled Postgres (and other inferior databases) onto stone tablets. The holy "database transactions" allowed humankind to pretend that they could read/write to multiple places at the same time.
To this day, databases sometimes work.
But some developers deny the works of the gods. They mix multiple tools, and so commit the sin of writing to multiple places.
"Oh, we'll just send a pubsub message after we insert the row." But data is lost. Message before insert row? Data lost. All blasphemers are doomed to reinvent two-phase commit.
One Way To Do Things
I like LEGO. I like Play-Doh. I like Lincoln Logs. I do not, however, like mixing them together.
It's painful to investigate systems when state is spread across SQS, Redis, PubSub, Celery, Airflow, etc. I shouldn't have to open a local detective agency find out why a process isn't running as expected.
Most modern projects use SQL. Because I dislike mixing systems, I try to take SQL as far as possible.
Of all the SQL databases, Postgres currently offers the best mix of modern first-class features and third-party extensions. Postgres can be your knock-off Kafka, artificial Airflow, crappy Clickhouse, nasty Elasticsearch, poor man's PubSub, on-sale Celery, etc.
Sure, Postgres doesn't have all the fancy features of each specialized system. But colocating queue/pipeline/async data in your main database eliminates swaths of errors. In my experience, transaction guarantees supercede everything else.
TODO-Driven Development
while (true) {
// const rows = await ...
for (const { task_type, params } of rows)
if (task_type in tasks) {
await tasks[task_type](tx, params);
} else {
console.error(`Task type not implemented: ${task_type}`);
}
}
With a simple retry system, asynchronous decoupling magically tracks all your incomplete flows.
No need to rely upon Jira -- bugs and unimplemented tasks will be logged and retried. Working recursively from error queues is truly a wonderful experience. All your live/urgent TODOs are printed to the same place (in development and in production).
With this paradigm, you'll gravitate towards scalable pipelines. Wishful thinking makes natural architecture.
Human Fault Tolerance
Many systems foist useless retry-loops onto humans.
Humans should receive feedback for human errors. But humans should not receive feedback for problems that can be handled by computers (and their software developers).
Remember, all your retry-loops have to happen somewhere. Be careful what you delegate to customers and developers. Your business's bottom-line is bounded by human patience; computers have infinitely more patience than humans.
Show Me The Code
Here's the task
table:
create table task
( task_id bigint primary key not null generated always as identity
, task_type text not null -- consider using enum
, params jsonb not null -- hstore also viable
, created_at timestamptz not null default now()
, unique (task_type, params) -- optional, for pseudo-idempotency
)
Don't use serial in Postgres.
Here's the code for the task worker:
const tasks = {
SEND_EMAIL_WELCOME: async (tx, params) => {
const { email } = params;
if (!email) throw new Error(`Bad params ${JSON.stringify(params)}.`);
await sendEmail({ email, body: "WELCOME" });
},
};
(async () => {
while (true) {
try {
while (true) {
await sql.begin(async (tx: any) => {
const rows = await tx`
delete from task
where task_id in
( select task_id
from task
order by random() -- use tablesample for better performance
for update
skip locked
limit 1
)
returning task_id, task_type, params::jsonb as params
`;
for (const { task_type, params } of rows)
if (task_type in tasks) {
await tasks[task_type](tx, params);
} else {
throw new Error(`Task type not implemented: ${task_type}`);
}
if (rows.length <= 0) {
await delay(10 * 1000);
}
});
}
} catch (err) {
console.error(err);
await delay(1 * 1000);
}
}
})();
A few notable features of this snippet:
- The task row will not be deleted if
sendEmail
fails. The PG transaction will be rolled back. The row andsendEmail
will be retried. - The PG transaction
tx
is passed along to tasks. This is convenient for marking rows as "processed", etc. - Transactions make error-handling so much nicer. Always organize reversible queries before irreversible side-effects (e.g. mark DB status before sending the email). Remember that the DB commits at the end.
- Because of
skip locked
, you can run any number of these workers in parallel. They will not step on each others' toes. - Random ordering is technically optional, but it makes the system more resilient to errors. With adequate randomness, a single task type cannot block the queue for all.
- Use
order by (case task_type ... end), random()
to create an easy prioritized queue. - Limiting number of retries makes the code more complicated, but definitely worth it for user-facing side-effects like emails.
if (rows.length <= 0)
prevents overzealous polling. Your DBA will be grateful.