Sometimes you need to process your database data in batches. It may be absolutely necessary to retrieve the rows from the database in your application first, and then process it in the code iterating over the results. In this case you split the result set from the database query into chunks in you code like in the following Python snippet:
from typing import Any, Sequence, Iterator def get_chunks(sequence: Sequence[Any], chunk_size: int) -> Iterator[Any]: for i in range(0, len(sequence), chunk_size): yield sequence[i : i + chunk_size] for ids_chunk in get_chunks(sequence=account_ids_from_db, chunk_size=10): # do something with a chunk here for account_id in ids_chunk: ...
Depending on the number of rows to be retrieved from the database, the SQL query plan and some other factors, you may need to use LIMIT and OFFSET clauses in your SQL and make multiple queries, instead of selecting all the rows at once. It saves you some memory in the application.
But sometimes you may fully rely on the DBMS features to process data in batches. Let’s take a look at the example.
Problem: update blacklist expiration timestamps in chunks
We’ve got a black list of customers. Each item in the blacklist table in the database may have an optional value in expiration timestamp column. If the value is NULL then a client considered permanentely blacklisted. Otherwise a row will be removed from the blacklist table (possibly by a cronjob task), once an expiration timestamp is less than current time.
Problem: we want to set an expiration date to all permanently blacklisted customers so that they get removed from the list in batches of size 10 a day. When splitting into chunks, we want to order the rows so that the customers with the most recent suspicious activity come first. The activity log defined in another database table.
In other words we want to update all rows in the table where some conditions met (expiry value equals to NULL in this case). But we have to do it in chunks of the constant size.
We can do it like in the example above: select all the rows that satisfy the condition, then iterate over batches of size 10 and update. But in this case there’s nothing that a modern RDBMS couldn’t achieve by itself, without writing any extra code. So let’s see how to do it using pure PostgreSQL 9+ query.
Solution: PostgreSQL query for updating rows in chunks using row_number() function
First, let’s see how the table’s schema may look like.
CREATE TABLE balcklist( customer_id INTEGER PRIMARY KEY, expiry TIMESTAMP WITHOUT TIME ZONE default NULL, timestamp TIMESTAMP WITHOUT TIME ZONE default (now() at time zone 'utc') );
A log of suspicious activity is stored in suspicious_activity database table that we can join using customer_id column.
The algorithm we are going to use when writing final SQL query is the following:
- Join activity table, filter and select the data to be updated
- Get row numbers using PostgreSQL’s window function row_number()
- Get batch numbers using simple arithmetic operations (row number integer division by chunk size)
- Get new expiry timestamps using current batch number and PostgreSQL’s interval type arithmetic operations
- Wrap up all previous steps into a subquery; update expiry column for customers selected in the subquery.
The final SQL query then is the following:
-- main: update expiry timestamp column in chunks UPDATE blacklist SET expiry = subquery.new_expiry_timestamp FROM ( -- subquery 3: get new expiration timestamps based on a chunk number SELECT row_num, row_batched, customer_id, NOW()::TIMESTAMP + row_batched * INTERVAL '1 day' AS new_expiry_timestamp, latest_suspicious_activity, blacklist_timestamp FROM ( -- subquery 2: get chunk numbers based on a row number SELECT row_num, ((row_num - 1) / 10) + 1 AS row_batched, customer_id, latest_suspicious_activity, blacklist_timestamp FROM ( -- subquery 1: get row numbers SELECT row_number() OVER (ORDER BY latest_suspicious_activity DESC NULLS LAST) AS row_num, customer_id, latest_suspicious_activity, blacklist_timestamp FROM ( -- subquery 0: order blacklist SELECT blacklist.customer_id AS customer_id, MAX(suspicious_activity.created) AS latest_suspicious_activity, blacklist.timestamp AS blacklist_timestamp FROM blacklist INNER JOIN suspicious_activity ON blacklist.customer_id = suspicious_activity.customer_id WHERE blacklist.expiry_timestamp is NULL GROUP BY blacklist.customer_id ORDER BY latest_suspicious_activity DESC, blacklist_timestamp DESC ) AS ordered ) AS numbered ) AS offsetted ) AS subquery WHERE blacklist.customer_id = subquery.customer_id
The SQL above may not be SQL-standard compatible and is guaranteed to work on PostgreSQL 9.6 or later.
Comments, labels and the algorithm above should make the query self-explanatory, although some basic knowledge of SQL is needed to understand what the query really does.
Basically, we need only new_expiry_timestamp and customer_id columns from the subquery. But some extra columns are passed from the deepest subquery all the way to the top (e.g. latest_suspicious_activity, row_num, etc), so that you can run the subquery only and see how new expiry timestamps are calculated before doing any updates to the data. BEGIN/ROLLBACK may also be of great help here.
All in all, the query above:
- Easy to read
- Requires no extra code and does all the batch processing using PostgreSQL query language
- Executed much faster than when doing extra processing in your app’s code
- Can easily take advantage of PostgreSQL transaction and atomic updates