How to Build a Robust Distributed FIFO Job Queue in YugabyteDB

Premkumar Thangamani

The modern digital era demands applications that can instantly process a vast number of transactions. This makes efficient task management critical.

Job queues are essential for contemporary systems, as they allow asynchronous and scalable task execution across various domains; from ticketing systems to microservices communication.

As applications grow and become more complex, a single job queue can create performance bottlenecks, leading to delays and reduced efficiency.

Distributed job queues address this challenge by spreading tasks across multiple nodes. This approach significantly increases job processing capacity, enhances fault tolerance, and ensures system availability.

When developing both small applications and large-scale distributed systems, understanding and implementing distributed job queues can dramatically improve your infrastructure. This ensures your applications remain robust, scalable, and efficient.

This blog takes an in-depth look at distributed job queues, including their:

  • Architecture
  • Advantages
  • Typical applications

Scenario

Let’s begin with a basic Jobs table, which includes details about each Item, including the ID, creation timestamp, current state (e.g., Queued, Running, Completed), and associated data. For this exercise, we will keep completed jobs in the queue, marked with a status of Success, Failed, or Canceled.

-- Helper enum
CREATE TYPE JobStatus AS ENUM ('Queued','Running','Canceled','Failed','Success');

CREATE TABLE jobs (
  id BIGINT,
  create_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  status JobStatus DEFAULT 'Queued',
  data TEXT,
  bucket smallint DEFAULT floor(random()*4),

  PRIMARY KEY (id)

The table is based on what we learned in this Avoid Hotspot on Range-Based Indexes blog.

To avoid hotspots on our timestamp-based index, we create multiple application-level buckets using the bucket column (there are four in the example above, but feel free to play with higher values). Here, the queue data will be distributed based on the hash of the item ID.

Data Generation

Let’s insert a million rows into the Jobs table.

INSERT INTO Jobs(id, create_ts, data)
 SELECT n, now() - make_interval(mins=>1000000-n, secs=>EXTRACT (SECONDS FROM now())), 'iteminfo-' || n FROM generate_series(1, 1000000) n;

Sample Data:
 id |      create_ts      | status |    data     | bucket
----+---------------------+--------+-------------+----------
  1 | 2021-01-07 04:58:00 | Queued | iteminfo-1  |        2
  2 | 2021-01-07 04:59:00 | Queued | iteminfo-2  |        0
  3 | 2021-01-07 05:00:00 | Queued | iteminfo-3  |        1
  4 | 2021-01-07 05:01:00 | Queued | iteminfo-4  |        0
  5 | 2021-01-07 05:02:00 | Queued | iteminfo-5  |        1
  6 | 2021-01-07 05:03:00 | Queued | iteminfo-6  |        2
  7 | 2021-01-07 05:04:00 | Queued | iteminfo-7  |        3
  8 | 2021-01-07 05:05:00 | Queued | iteminfo-8  |        1
  9 | 2021-01-07 05:06:00 | Queued | iteminfo-9  |        3
 10 | 2021-01-07 05:07:00 | Queued | iteminfo-10 |        3

Head of the Queue

To begin processing the Jobs in sequence, we need to identify the earliest inserted item. This is essentially the head of the queue – the top element when the queue jobs are sorted by create_ts. To do this, we require an index in ascending order of create_ts. Additionally, we only want to retrieve the queued jobs that have yet to be processed. Let’s create an index on status and create_ts.

CREATE INDEX idx ON jobs (bucket HASH, status, create_ts ASC) INCLUDE(id);

Note that we have added the id column in the INCLUDE clause to fetch it with an Index-Only scan (without making a trip to the Jobs table).

Let us fetch the first unprocessed item like this:

SELECT id FROM jobs 
  WHERE bucket IN (0,1,2,3) AND status = 'Queued' 
  ORDER BY create_ts ASC LIMIT 1;
                                                     QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
 Limit (actual time=624.707..624.709 rows=1 loops=1)
   Output: id, create_ts
   ->  Sort (actual time=624.705..624.706 rows=1 loops=1)
         Output: id, create_ts
         Sort Key: jobs.create_ts
         Sort Method: top-N heapsort  Memory: 25kB
         ->  Index Only Scan using idx on public.jobs (actual time=4.199..317.773 rows=1000000 loops=1)
               Output: id, create_ts
               Index Cond: ((jobs.bucket = ANY ('{0,1,2,3}'::integer[])) AND (jobs.status = 'Queued'::jobstatus))
               Heap Fetches: 0
               Storage Index Read Requests: 652
               Storage Index Read Execution Time: 48.773 ms
               Storage Index Rows Scanned: 1000000
 Planning Time: 10.236 ms
 Execution Time: 624.798 ms
 Storage Read Requests: 652
 Storage Read Execution Time: 48.773 ms
 Storage Rows Scanned: 1000000
 Catalog Read Requests: 7
 Catalog Read Execution Time: 8.175 ms
 Storage Execution Time: 56.949 ms
(24 rows)

Time: 670.058 ms

You will notice that the statement above is very slow, even though it uses an index. This is because it processed all of the million jobs (Storage Rows Scanned: 1000000) to fetch just one row. The table/index is not globally sorted on created_ts, but only sorted by created_ts within each unique value for the “bucket” column. We could create an index on “created_ts” instead of (bucket, created_ts) – but that has the disadvantage of creating a hot spot on the index during writes.

Let’s write a VIEW  based on the lessons we learned from this “Optimal Pagination for Distributed, Ordered Data” blog, to retrieve the data from all application-level buckets, while retaining order.

CREATE OR REPLACE VIEW jobqueue AS
    (select id,create_ts from Jobs where 
          bucket = 0 and status = 'Queued' order by create_ts ASC) union all
    (select id,create_ts from Jobs where 
          bucket = 1 and status = 'Queued' order by create_ts ASC) union all
    (select id,create_ts from Jobs where
          bucket = 2 and status = 'Queued' order by create_ts ASC) union all
    (select id,create_ts from Jobs where 
          bucket = 3 and status = 'Queued' order by create_ts ASC) 
    order by create_ts ASC ;

With the help of the above view, we should be able to get the unprocessed Jobs in the required order.

select * from jobqueue limit 1;

                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------
 Limit (actual time=6.060..6.065 rows=1 loops=1)
   Output: jobs.id, jobs.create_ts
   ->  Merge Append (actual time=6.056..6.056 rows=1 loops=1)
         Sort Key: jobs.create_ts
         ->  Index Only Scan using idx on public.jobs (actual time=2.344..2.344 rows=1 loops=1)
               Output: jobs.id, jobs.create_ts
               Index Cond: ((jobs.bucket = 0) AND (jobs.status = 'Queued'::jobstatus))
               Heap Fetches: 0
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 1.995 ms
               Storage Index Rows Scanned: 1
         ->  Index Only Scan using idx on public.jobs jobs_1 (actual time=1.404..1.404 rows=1 loops=1)
               Output: jobs_1.id, jobs_1.create_ts
               Index Cond: ((jobs_1.bucket = 1) AND (jobs_1.status = 'Queued'::jobstatus))
               Heap Fetches: 0
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 1.298 ms
               Storage Index Rows Scanned: 1
         ->  Index Only Scan using idx on public.jobs jobs_2 (actual time=1.118..1.118 rows=1 loops=1)
               Output: jobs_2.id, jobs_2.create_ts
               Index Cond: ((jobs_2.bucket = 2) AND (jobs_2.status = 'Queued'::jobstatus))
               Heap Fetches: 0
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 1.031 ms
               Storage Index Rows Scanned: 1
         ->  Index Only Scan using idx on public.jobs jobs_3 (actual time=1.176..1.176 rows=1 loops=1)
               Output: jobs_3.id, jobs_3.create_ts
               Index Cond: ((jobs_3.bucket = 3) AND (jobs_3.status = 'Queued'::jobstatus))
               Heap Fetches: 0
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 1.051 ms
               Storage Index Rows Scanned: 1
 Planning Time: 0.651 ms
 Execution Time: 6.254 ms
 Storage Read Requests: 4
 Storage Read Execution Time: 5.375 ms
 Storage Rows Scanned: 4
 Storage Write Requests: 0
 Catalog Read Requests: 0
 Catalog Write Requests: 0
 Storage Flush Requests: 0
 Storage Execution Time: 5.375 ms
 Peak Memory Usage: 120 kB
(43 rows)

Time: 8.495 ms

You will notice that the execution time has reduced from 670 ms to 8 ms, as only 4 rows were scanned to find the top of the queue!

Item Selection

We now have a mechanism to fetch jobs from the queue in a distributed, correct, and first-in-first-out (FIFO) manner. To do this, we have to pick the first item in the queue and quickly mark it as selected so that other workers accessing the queue won’t process that particular item.

In YugabyteDB, individual statements are transactional, so we just need to select one item, mark it as Running in one statement, and return the ID selected.

Let’s frame such an update:

UPDATE Jobs SET status = 'Running' 
WHERE id = (SELECT id FROM jobqueue FOR UPDATE SKIP LOCKED LIMIT 1)

This selects the item to be picked, quickly marks it as Running, and returns the item-id.

During execution, that row will be temporarily locked, so other workers don’t pick it. At the same time, other workers might try to pick the row and this would lead to contention.  To mitigate this, YugabyteDB supports FOR UPDATE SKIP LOCKED.  This lets other worker threads quickly skip over the temporarily locked row, instead of being blocked.

Executor hint

Our Queue has been working well, but after it has processed many jobs, over time, performance can potentially degrade. This is because the executor has to skip through multiple tombstones (deleted jobs) to find the next item. This is described in our “The Curious Case of the Ever-Increasing Tombstones” blog.
Let’s delete a lot of jobs to simulate the passing of time:

DELETE FROM jobs WHERE id <= 500000;

When we try to fetch the next item, we will notice that it is much slower than before. It will continue to get slower as time progresses.

select * from jobqueue limit 1;

   id   |      create_ts
--------+---------------------
 500001 | 2023-07-20 06:42:00
(1 row)

Time: 388.923 ms

To avoid this problem, we need to give a hint about where we want to start the search to the executor. The timestamp of the last processed item can be used as a hint:

select * from jobqueue where create_ts > '2023-07-20 06:41:00' limit 1;

   id   |      create_ts
--------+---------------------
 500001 | 2023-07-20 06:42:00
(1 row)

Time: 10.619 ms

This hint brings the execution time down to 10 ms from 400 ms.

Conclusion

In this blog, we have demonstrated how you can design a simple Ordered Distributed Queue using YugabyteDB.

Depending on your needs, you can extend this pattern to have multiple queues and support priorities. You can also fetch more jobs as a batch rather than just one, and distribute jobs across various workers to reduce contention. Download YugabyteDB today and try it out, the possibilities are endless!

Want to know more? Check out Yugabyte expert Franck Pachot’s DEV blog, Scalable Unordered Job Queue on top of YugabyteDB, for further insight.

Premkumar Thangamani

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free