How to Build a Robust Distributed FIFO Job Queue in YugabyteDB
The modern digital era demands applications that can instantly process a vast number of transactions. This makes efficient task management critical.
Job queues are essential for contemporary systems, as they allow asynchronous and scalable task execution across various domains; from ticketing systems to microservices communication.
As applications grow and become more complex, a single job queue can create performance bottlenecks, leading to delays and reduced efficiency.
Distributed job queues address this challenge by spreading tasks across multiple nodes. This approach significantly increases job processing capacity, enhances fault tolerance, and ensures system availability.
When developing both small applications and large-scale distributed systems, understanding and implementing distributed job queues can dramatically improve your infrastructure. This ensures your applications remain robust, scalable, and efficient.
This blog takes an in-depth look at distributed job queues, including their:
- Architecture
- Advantages
- Typical applications
Scenario
Let’s begin with a basic Jobs table, which includes details about each Item, including the ID, creation timestamp, current state (e.g., Queued, Running, Completed), and associated data. For this exercise, we will keep completed jobs in the queue, marked with a status of Success
, Failed
, or Canceled
.
-- Helper enum CREATE TYPE JobStatus AS ENUM ('Queued','Running','Canceled','Failed','Success'); CREATE TABLE jobs ( id BIGINT, create_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status JobStatus DEFAULT 'Queued', data TEXT, bucket smallint DEFAULT floor(random()*4), PRIMARY KEY (id)
The table is based on what we learned in this Avoid Hotspot on Range-Based Indexes blog.
To avoid hotspots on our timestamp-based index, we create multiple application-level buckets using the bucket column (there are four in the example above, but feel free to play with higher values). Here, the queue data will be distributed based on the hash of the item ID.
Data Generation
Let’s insert a million rows into the Jobs table.
INSERT INTO Jobs(id, create_ts, data) SELECT n, now() - make_interval(mins=>1000000-n, secs=>EXTRACT (SECONDS FROM now())), 'iteminfo-' || n FROM generate_series(1, 1000000) n; Sample Data: id | create_ts | status | data | bucket ----+---------------------+--------+-------------+---------- 1 | 2021-01-07 04:58:00 | Queued | iteminfo-1 | 2 2 | 2021-01-07 04:59:00 | Queued | iteminfo-2 | 0 3 | 2021-01-07 05:00:00 | Queued | iteminfo-3 | 1 4 | 2021-01-07 05:01:00 | Queued | iteminfo-4 | 0 5 | 2021-01-07 05:02:00 | Queued | iteminfo-5 | 1 6 | 2021-01-07 05:03:00 | Queued | iteminfo-6 | 2 7 | 2021-01-07 05:04:00 | Queued | iteminfo-7 | 3 8 | 2021-01-07 05:05:00 | Queued | iteminfo-8 | 1 9 | 2021-01-07 05:06:00 | Queued | iteminfo-9 | 3 10 | 2021-01-07 05:07:00 | Queued | iteminfo-10 | 3
Head of the Queue
To begin processing the Jobs in sequence, we need to identify the earliest inserted item. This is essentially the head of the queue – the top element when the queue jobs are sorted by create_ts
. To do this, we require an index in ascending order of create_ts
. Additionally, we only want to retrieve the queued jobs that have yet to be processed. Let’s create an index on status and create_ts
.
CREATE INDEX idx ON jobs (bucket HASH, status, create_ts ASC) INCLUDE(id);
Note that we have added the id column in the INCLUDE clause to fetch it with an Index-Only scan (without making a trip to the Jobs table).
Let us fetch the first unprocessed item like this:
SELECT id FROM jobs WHERE bucket IN (0,1,2,3) AND status = 'Queued' ORDER BY create_ts ASC LIMIT 1; QUERY PLAN --------------------------------------------------------------------------------------------------------------------- Limit (actual time=624.707..624.709 rows=1 loops=1) Output: id, create_ts -> Sort (actual time=624.705..624.706 rows=1 loops=1) Output: id, create_ts Sort Key: jobs.create_ts Sort Method: top-N heapsort Memory: 25kB -> Index Only Scan using idx on public.jobs (actual time=4.199..317.773 rows=1000000 loops=1) Output: id, create_ts Index Cond: ((jobs.bucket = ANY ('{0,1,2,3}'::integer[])) AND (jobs.status = 'Queued'::jobstatus)) Heap Fetches: 0 Storage Index Read Requests: 652 Storage Index Read Execution Time: 48.773 ms Storage Index Rows Scanned: 1000000 Planning Time: 10.236 ms Execution Time: 624.798 ms Storage Read Requests: 652 Storage Read Execution Time: 48.773 ms Storage Rows Scanned: 1000000 Catalog Read Requests: 7 Catalog Read Execution Time: 8.175 ms Storage Execution Time: 56.949 ms (24 rows) Time: 670.058 ms
You will notice that the statement above is very slow, even though it uses an index. This is because it processed all of the million jobs (Storage Rows Scanned: 1000000) to fetch just one row. The table/index is not globally sorted on created_ts
, but only sorted by created_ts
within each unique value for the “bucket” column. We could create an index on “created_ts
” instead of (bucket, created_ts
) – but that has the disadvantage of creating a hot spot on the index during writes.
Let’s write a VIEW based on the lessons we learned from this “Optimal Pagination for Distributed, Ordered Data” blog, to retrieve the data from all application-level buckets, while retaining order.
CREATE OR REPLACE VIEW jobqueue AS (select id,create_ts from Jobs where bucket = 0 and status = 'Queued' order by create_ts ASC) union all (select id,create_ts from Jobs where bucket = 1 and status = 'Queued' order by create_ts ASC) union all (select id,create_ts from Jobs where bucket = 2 and status = 'Queued' order by create_ts ASC) union all (select id,create_ts from Jobs where bucket = 3 and status = 'Queued' order by create_ts ASC) order by create_ts ASC ;
With the help of the above view, we should be able to get the unprocessed Jobs in the required order.
select * from jobqueue limit 1; QUERY PLAN --------------------------------------------------------------------------------------------------------- Limit (actual time=6.060..6.065 rows=1 loops=1) Output: jobs.id, jobs.create_ts -> Merge Append (actual time=6.056..6.056 rows=1 loops=1) Sort Key: jobs.create_ts -> Index Only Scan using idx on public.jobs (actual time=2.344..2.344 rows=1 loops=1) Output: jobs.id, jobs.create_ts Index Cond: ((jobs.bucket = 0) AND (jobs.status = 'Queued'::jobstatus)) Heap Fetches: 0 Storage Index Read Requests: 1 Storage Index Read Execution Time: 1.995 ms Storage Index Rows Scanned: 1 -> Index Only Scan using idx on public.jobs jobs_1 (actual time=1.404..1.404 rows=1 loops=1) Output: jobs_1.id, jobs_1.create_ts Index Cond: ((jobs_1.bucket = 1) AND (jobs_1.status = 'Queued'::jobstatus)) Heap Fetches: 0 Storage Index Read Requests: 1 Storage Index Read Execution Time: 1.298 ms Storage Index Rows Scanned: 1 -> Index Only Scan using idx on public.jobs jobs_2 (actual time=1.118..1.118 rows=1 loops=1) Output: jobs_2.id, jobs_2.create_ts Index Cond: ((jobs_2.bucket = 2) AND (jobs_2.status = 'Queued'::jobstatus)) Heap Fetches: 0 Storage Index Read Requests: 1 Storage Index Read Execution Time: 1.031 ms Storage Index Rows Scanned: 1 -> Index Only Scan using idx on public.jobs jobs_3 (actual time=1.176..1.176 rows=1 loops=1) Output: jobs_3.id, jobs_3.create_ts Index Cond: ((jobs_3.bucket = 3) AND (jobs_3.status = 'Queued'::jobstatus)) Heap Fetches: 0 Storage Index Read Requests: 1 Storage Index Read Execution Time: 1.051 ms Storage Index Rows Scanned: 1 Planning Time: 0.651 ms Execution Time: 6.254 ms Storage Read Requests: 4 Storage Read Execution Time: 5.375 ms Storage Rows Scanned: 4 Storage Write Requests: 0 Catalog Read Requests: 0 Catalog Write Requests: 0 Storage Flush Requests: 0 Storage Execution Time: 5.375 ms Peak Memory Usage: 120 kB (43 rows) Time: 8.495 ms
You will notice that the execution time has reduced from 670 ms to 8 ms, as only 4 rows were scanned to find the top of the queue!
Item Selection
We now have a mechanism to fetch jobs from the queue in a distributed, correct, and first-in-first-out (FIFO) manner. To do this, we have to pick the first item in the queue and quickly mark it as selected so that other workers accessing the queue won’t process that particular item.
In YugabyteDB, individual statements are transactional, so we just need to select one item, mark it as Running
in one statement, and return the ID selected.
Let’s frame such an update:
UPDATE Jobs SET status = 'Running' WHERE id = (SELECT id FROM jobqueue FOR UPDATE SKIP LOCKED LIMIT 1)
This selects the item to be picked, quickly marks it as Running
, and returns the item-id.
During execution, that row will be temporarily locked, so other workers don’t pick it. At the same time, other workers might try to pick the row and this would lead to contention. To mitigate this, YugabyteDB supports FOR UPDATE SKIP LOCKED. This lets other worker threads quickly skip over the temporarily locked row, instead of being blocked.
Executor hint
Our Queue has been working well, but after it has processed many jobs, over time, performance can potentially degrade. This is because the executor has to skip through multiple tombstones (deleted jobs) to find the next item. This is described in our “The Curious Case of the Ever-Increasing Tombstones” blog.
Let’s delete a lot of jobs to simulate the passing of time:
DELETE FROM jobs WHERE id <= 500000;
When we try to fetch the next item, we will notice that it is much slower than before. It will continue to get slower as time progresses.
select * from jobqueue limit 1; id | create_ts --------+--------------------- 500001 | 2023-07-20 06:42:00 (1 row) Time: 388.923 ms
To avoid this problem, we need to give a hint about where we want to start the search to the executor. The timestamp of the last processed item can be used as a hint:
select * from jobqueue where create_ts > '2023-07-20 06:41:00' limit 1; id | create_ts --------+--------------------- 500001 | 2023-07-20 06:42:00 (1 row) Time: 10.619 ms
This hint brings the execution time down to 10 ms from 400 ms.
Conclusion
In this blog, we have demonstrated how you can design a simple Ordered Distributed Queue using YugabyteDB.
Depending on your needs, you can extend this pattern to have multiple queues and support priorities. You can also fetch more jobs as a batch rather than just one, and distribute jobs across various workers to reduce contention. Download YugabyteDB today and try it out, the possibilities are endless!
Want to know more? Check out Yugabyte expert Franck Pachot’s DEV blog, Scalable Unordered Job Queue on top of YugabyteDB, for further insight.