How to Select the First Row of Each Set of Grouped Rows Using GROUP BY
You can partition data in your queries using GROUP BY, but instead of aggregating all rows in each group to sum or count them, you can simply take the first one when sorted on specific columns. Here are some examples:
- Latest measure for each metric in a timeseries database
- Last contract with each supplier
- First purchase from each client
- Employee with the lowest salary in each department
- Lowest value for each sample taken at one time
There are many ways to write SQL queries such as this.
- Use window functions with a PARTITION and an ORDER BY clause.
- Combine DISTINCT to get the group and LIMIT to retrieve the first rows, with self-join.
- Utilize recursive queries that iterate like loops in procedural languages.
- Take advantage of PostgreSQL’s DISTINCT ON clause, which is not part of the SQL Standard.
The execution plan may vary; the performance goal is to keep the execution time scalable as the number of rows increases. Ideally, this time complexity should depend on the result, the number of groups, and not the number of rows in each group.
To show those techniques, let’s use a generic table with 4 columns:
- a is the one I filter on (
where a=1
) - b is the one I group on (
distinct b
) - c is the one I sort on (
order by b,c
) - d is the one to fetch additional information (
select a,b,c,d
)
I created this table and loaded rows (20000 rows in each group; 10 groups for each value of a
)
create table demo ( primary key (a, b, c), a int, b timestamptz, c float, d text ); insert into demo select a, now() as b, random() as c, repeat('x',5) as d from generate_series(1,5) a , generate_series(1,20000) c -- ignore bad luck random; on conflict do nothing ; -- run 10 more times (now() will be different): \watch count=9 vacuum analyze demo;
The primary key is on (a, b, c
) which provides fast access for the first row to retrieve: a=1
, the first value of b, and the first row when sorted by c
. The challenge in terms of performance is to skip to the next value of b without scanning all other rows with higher values of c
, in the first group. PostgreSQL cannot do this in a single Index Scan.
In PostgreSQL use DISTINCT ON … ORDER BY …
The simplest way to write is with DISTINCT ON
that splits the grouping (distinct on (b)
) and the sort (order by b, c
):
--explain (buffers, analyze, costs off) select distinct on (b) a, b, c, d from demo where a=1 order by b, c ;
Here is the output, with 10 groups and the lowest value in each:
a | b | c | d ---+-------------------------------+------------------------+------- 1 | 2023-06-07 12:54:46.912624+00 | 1.0889238778233334e-05 | xxxxx 1 | 2023-06-07 12:54:47.476301+00 | 1.7264625085111618e-05 | xxxxx 1 | 2023-06-07 12:54:49.476387+00 | 1.2618070031500395e-05 | xxxxx 1 | 2023-06-07 12:54:51.476418+00 | 7.189342356572759e-05 | xxxxx 1 | 2023-06-07 12:54:53.476414+00 | 0.00012155201528107895 | xxxxx 1 | 2023-06-07 12:54:55.476427+00 | 4.9272867939098575e-06 | xxxxx 1 | 2023-06-07 12:54:57.476406+00 | 0.0001299710969948631 | xxxxx 1 | 2023-06-07 12:54:59.47642+00 | 1.9242032286648225e-05 | xxxxx 1 | 2023-06-07 12:55:01.476429+00 | 2.6737312972535676e-06 | xxxxx 1 | 2023-06-07 12:55:03.476409+00 | 0.0001007765065004218 | xxxxx (10 rows)
The execution plan looks simple:
QUERY PLAN ------------------------------------------------------------------------------------------ Unique (actual time=0.025..101.114 rows=10 loops=1) Buffers: shared hit=199930 -> Index Scan using demo_pkey on demo (actual time=0.024..82.024 rows=200000 loops=1) Index Cond: (a = 1) Buffers: shared hit=199930 Planning Time: 0.087 ms Execution Time: 101.135 ms (7 rows)
However, even if an index is used, it reads rows=200000
from the table, one by one, with a shared buffer hit for each, to finally send the unique rows=10
. The index was used to filter on a=1
, and also to get rows sorted to be grouped, but it is not helpful to read only the first one of each group.
Can also use row_number() over (partition by … order by) in PostgreSQL
To filter on the first row only, window functions for analytic queries can be used, but they need to be executed within a subquery.
--explain (buffers, analyze, costs off) with demo as ( -- add the row number for each group select *, row_number() over (partition by b order by b, c) as "#" from demo where a=1 ) select a, b, c, d from demo where "#"=1 ;
The execution plan, show again an index scan returning 200000 rows:
QUERY PLAN ------------------------------------------------------------------------------------------ Subquery Scan on demo (actual time=0.044..103.769 rows=10 loops=1) Filter: (demo."#" = 1) Buffers: shared hit=199930 -> WindowAgg (actual time=0.031..103.751 rows=10 loops=1) Run Condition: (row_number() OVER (?) <= 1) Buffers: shared hit=199930 -> Index Scan using demo_pkey on demo demo_1 (actual time=0.022..81.349 rows=200000 loops=1) Index Cond: (a = 1) Buffers: shared hit=199930 Planning Time: 0.091 ms Execution Time: 103.795 ms (11 rows)
Before applying the Run Condition
filtering, all index entries were read, with their table rows (nearly 200000 buffer hits) to finally eliminate all but 10 rows.
To get this query scalable, we need to skip the unnecessary rows rather than scanning them to eliminate later.
Use Recursive Common Table Expression (CTE) in PostgreSQL
PostgreSQL lacks a built-in mechanism to skip to the next group while scanning the index before accessing the table. Nevertheless, I can assume control by implementing a form of procedural code using a recursive CTE.
--explain (buffers, analyze, costs off) with recursive skip_scan as ( ( -- get the first row select * from demo where a=1 order by b,c limit 1 ) union all ( -- get the next row select demo.* from skip_scan , lateral( select * from demo where demo.a = skip_scan.a and demo.b > skip_scan.b order by b,c limit 1 ) demo ) ) select * from skip_scan ;
This query execution will go to the first row, where my primary key index provides fast access, and then it will run another query to do the same but for a higher value (demo.b>skip_scan.b
). This second query joins the table to the previous iteration’s result to evaluate this predicate in a correlated subquery (lateral join). Recursive executes this second query as long as the join returns a result. Each iteration gets only the first row (order by b,c limit 1
) for each group.
Downside? It’s much more complex to code, understand, and maintain. Upside? It’s much faster:
QUERY PLAN ------------------------------------------------------------------------------------------ CTE Scan on skip_scan (actual time=0.026..0.132 rows=10 loops=1) Buffers: shared hit=44 CTE skip_scan -> Recursive Union (actual time=0.025..0.127 rows=10 loops=1) Buffers: shared hit=44 -> Limit (actual time=0.024..0.024 rows=1 loops=1) Buffers: shared hit=4 -> Index Scan using demo_pkey on demo (actual time=0.023..0.023 rows=1 loops=1) Index Cond: (a = 1) Buffers: shared hit=4 -> Nested Loop (actual time=0.009..0.010 rows=1 loops=10) Buffers: shared hit=40 -> WorkTable Scan on skip_scan skip_scan_1 (actual time=0.000..0.000 rows=1 loops=10) -> Limit (actual time=0.009..0.009 rows=1 loops=10) Buffers: shared hit=40 -> Index Scan using demo_pkey on demo demo_1 (actual time=0.008..0.008 rows=1 loops=10) Index Cond: ((a = skip_scan_1.a) AND (b > skip_scan_1.b)) Buffers: shared hit=40 Planning Time: 0.147 ms Execution Time: 0.155 ms (20 rows)
This emulates what exists in other databases and is known by many names-Loose Index Scan, Skip Scan, Jump Scan or Hybrid Scan. This is described in the PostgreSQL Wiki: Loose indexscan.
Distinct subquery and correlated self-join
There is a simpler way to decompose the query in PostgreSQL, getting the distinct values first (distinct a,b
), and then get the first row for each one (order by a, b, c limit 1
):
--explain (buffers, analyze, costs off) select * from ( -- get each distinct values select distinct a,b from demo where a=1 ) skip_scan, lateral ( -- for each, get the first row select * from demo where demo.a=skip_scan.a and demo.b=skip_scan.b order by a, b, c limit 1 ) demo ;
In SQL, LATERAL in the FROM clause act like a nested loop where the inner subquery is executed for each outer row, and the join condition is a WHERE clause in the inner subquery.
This, on PostgreSQL is not faster but can run in parallel.
QUERY PLAN ------------------------------------------------------------------------------------------ Nested Loop (actual time=23.275..24.222 rows=10 loops=1) Buffers: shared hit=1323 -> Unique (actual time=23.236..24.108 rows=10 loops=1) Buffers: shared hit=1283 -> Gather Merge (actual time=23.235..24.103 rows=21 loops=1) Workers Planned: 2 Workers Launched: 2 Buffers: shared hit=1283 -> Unique (actual time=0.020..14.148 rows=7 loops=3) Buffers: shared hit=1283 -> Parallel Index Only Scan using demo_pkey on demo (actual time=0.019..9.515 rows=66667 loops=3) Index Cond: (a = 1) Heap Fetches: 0 Buffers: shared hit=1283 -> Limit (actual time=0.009..0.010 rows=1 loops=10) Buffers: shared hit=40 -> Index Scan using demo_pkey on demo demo_1 (actual time=0.009..0.009 rows=1 loops=10) Index Cond: ((a = demo.a) AND (b = demo.b)) Buffers: shared hit=40 Planning Time: 0.155 ms Execution Time: 24.253 ms (21 rows)
The problem here is that all rows were read to get the distinct values. On a large system it can be faster because it is a Parallel Index Only Scan
but will take more resources than necessary.
But there is another way.
PostgreSQL-Compatible YugabyteDB
When utilizing YugabyteDB, all the mentioned queries (above) function identically since it leverages the existing PostgreSQL code in the query layer. The only requirement is that the columns where DISTINCT and ORDER are applied must utilize range sharding instead of hash sharding. By default, the hash partition key is assigned to the first column of the primary key, which is fine since it is filtered using an equality predicate (where a=1). It is easy to verify the sharding type (HASH or ASC/DEC range) from the index definition.
yugabyte=# \d demo; Table "public.demo" Column | Type | Collation | Nullable | Default --------+--------------------------+-----------+----------+--------- a | integer | | not null | b | timestamp with time zone | | not null | c | double precision | | not null | d | text | | | Indexes: "demo_pkey" PRIMARY KEY, lsm (a HASH, b ASC, c ASC)
In short, the index definition should match the query’s WHERE clause and ORDER BY: equality on a and ascending order on b and c.
Even if the recursive CTE performs well in PostgreSQL,YugabyteDB’s storage is different and better. It provides higher performance and scalability by distributing table rows and index entries to multiple nodes. Tables and indexes are stored in LSM-Tree structures rather than a combination of Heap Tables and B-Tree indexes. The table rows are stored in the primary key index and do not need additional hops to the table.
The last query, which was reading all 200000 index entries in PostgreSQL, reads only the 10 distinct ones in YugabyteDB (rows=10):
QUERY PLAN ------------------------------------------------------------------------------------------ Nested Loop (actual time=0.909..3.570 rows=10 loops=1) -> Unique (actual time=0.616..0.633 rows=10 loops=1) -> Index Scan using demo_pkey on demo (actual time=0.615..0.626 rows=10 loops=1) Index Cond: (a = 1) -> Limit (actual time=0.292..0.292 rows=1 loops=10) -> Index Scan using demo_pkey on demo demo_1 (actual time=0.284..0.284 rows=1 loops=10) Index Cond: ((a = demo.a) AND (b = demo.b)) Planning Time: 0.145 ms Execution Time: 3.611 ms Peak Memory Usage: 24 kB (10 rows)
There’s no special note because this optimization resides at a level lower than the SQL execution layer, but the rows=10 for the output of the Index Scan is the proof that the DISTINCT operation has been pushed down to the storage layer. The execution time, which is less than one millisecond to get the 10 distinct values from the 200000 index entries range, gives a clue about what happens.
YugabyteDB tables are sharded to tablets, and each tablet is a LSM-Tree. The YugabyteDB LSM-Tree implementation is based on RocksDB where rows are accessed by ‘seek’ and ‘next’ operations. YugabyteDB has incorporated Hybrid Scan, where a single scan perform multiple seek() operations to skip a group of rows. This is how the DISTINCT is pushed down to the lowest level. After reading a row, the scan can efficiently seek to the next distinct value.
To Summarize….
When performance and scalability are not concern, using DISTINCT ON is a good alternative since it is shorter to write. No subqueries required. However, it is not standard SQL and is not optimized to skip between distinct ranges in the index.
Window functions are the standard SQL approach to group and sort in a single query. However, filtering on the result is done on the output, which necessitates scanning and fetching all rows first. This approach is not scalable.
When aiming to minimize the necessary workload, you have two possibilities for which the performance depends on the storage layer access method:
- Recursive CTE: This involves reading distinct values one by one, serving as a procedural workaround for the absence of skip scan. While it may be more challenging to code, it is the least resource-intensive option to run on PostgreSQL.
- Distinct subquery and correlated self-join: This method allows retrieving the first row of each group and is relatively easier to code and maintain. Additionally, YugabyteDB provides additional optimizations for this approach.
For more explanations about this YugabyteDB optimization, you can check out this presentation.