- Data & AI Tech Immersion Workshop – Product Review Guide and Lab Instructions
- Data, Experience 7 - Open source databases at scale
- Technology overview
- Scenario overview
- Experience requirements
- Task 1: Connect to PostgreSQL
- Task 2: Create a table to store clickstream data
- Task 3: Shard tables across nodes and create rollup tables
- Task 4: Create rollup functions
- Task 5: Copy data into raw events table
- Task 6: Schedule periodic aggregation and execute dashboard queries
- Wrap-up
- Additional resources and more information
Azure Database for PostgreSQL is a fully managed service built on the open source PostgreSQL database engine. It provides high availability and performance with monitoring and alerting, enterprise-grade security and compliance, automatic backups, data secured at-rest and in-motion, and full compatibility with PostgreSQL extensions with little to no administration required.
Hyperscale clusters are enabled by Citus, which is an extension of PostgreSQL that allows you to horizontally scale queries across multiple machines, using sharding and partitioning techniques. Because of this, the query engine parallelizes incoming SQL queries across these servers for faster response times on large datasets. Data sharding is transparently handled for you and allows your existing PostgreSQL applications to take advantage of the benefits distributed data storage and querying provides, with minimal changes. This means that Contoso Auto's multi-tenant, real-time operational analytics requirements will be met with little effort on their part.
A Hyperscale server group (database cluster) consists of a coordinator node and several worker nodes. To scale up, you define the number of cores and storage you require per coordinator node and worker node. You can select up to 32 vCores with 8 GiB RAM per vCore and up to 2 TiB of storage with up to 3 IOPS / GiB per node. To scale out, you can define the number of worker nodes, between 2 and 20. If you require more than 20 worker nodes, you can submit a support request.
You can define the compute and storage settings independently for your nodes, giving you flexibility by adding more of one or the other depending on your needs. The storage includes database files, temporary files, transaction logs, and the Postgres server logs. The total amount of storage you provision also defines the I/O capacity available to each worker and coordinator node.
Contoso Auto has a host of online stores for their catalog of vehicles, targeted toward vehicle lines such as consumer, government, and recreational (RVs, ATVs, etc.). They have been conducting aggressive marketing campaigns to promote online sales and have been experiencing rapid growth on these sites. This has made it more challenging to analyze user clickstream data, online ad performance, and other marketing campaigns at scale, and to provide insights to the marketing team in real-time.
Real-time marketing analysis is provided through interactive reports and dashboards on Contoso Auto's home-grown web platform, ContosoAutoMarket. This platform has served them well, but they are currently hindered by their inability to keep up with demand. ContosoAutoMarket's primary users are members of the marketing team, and the secondary users are shoppers on their various online platforms for whom website interaction behavior is being tracked. Other sources of data are fed from online ad data generated by ads run on social media platforms and email marketing campaigns. They use this type of data to evaluate ad effectiveness and customer reach, ultimately leading to sales conversions.
Contoso Auto needs to be able to efficiently build an analytical dashboard with the real-time marketing and website usage data that:
- Supports a large number of concurrent users
- Has subsecond response times
- Incorporates new data within minutes of arrival
- Supports advanced analytics
Contoso Auto also has an Oracle database that they would like to migrate to PostgreSQL on Azure. They would like to use a tool that will ensure success and will simplify the migration.
In this experience, you will use advanced features of the managed PostgreSQL PaaS service on Azure to make your database more scalable and able to handle the rapid ingest of streaming data while simultaneously generating and serving pre-aggregated data for reports. Then, you will migrate an Oracle database to PostgreSQL.
- Azure subscription
- pgAdmin 4 or greater
-
Open the Azure portal and navigate to the resource group you created (
tech-immersion-SUFFIX
where SUFFIX is your unique identifier). -
Find your PostgreSQL server group and select it. (The server group name will not have a suffix. Items with names ending in, for example, "-c", "-w0", or "-w1" are not the server group.)
-
On the Overview blade, locate and copy the Coordinator name and Admin username values. Store these values in Notebook or similar text editor for later.
-
Select Networking in the left-hand menu underneath Security. In the Networking rules blade, select + Add current client IP address (xxx.xxx.xxx.xxx) to add your IP to the server group's firewall.
-
Select Save to apply the new firewall rule.
-
Launch pgAdmin. Select Add New Server on the home page.
-
In the General tab of the Create Server dialog, enter Lab into the Name field.
-
Select the Connection tab. Enter the following into the fields within the Connection tab:
- Host name/address: paste the coordinator name value you copied earlier. For example:
<your-server-name>.postgres.database.azure.com
) - Port: 5432
- Maintenance database: citus
- Username: citus
- Password: the administrative password (such as
Abc!1234567890
) - Save password?: check the box
- Host name/address: paste the coordinator name value you copied earlier. For example:
-
Click the Save button.
-
Expand the newly added Lab server under the Servers tree on the pgAdmin home page. You should be able to expand the citus database.
In this task, you will create the events
raw table to capture every clickstream event. This table is partitioned by event_time
since we are using it to store time series data. The script you execute to create the schema creates a partition every 5 minutes, using pg_partman.
Partitioning is the key to high performance and being able to scale out across several database nodes. One of the keys to fast data loading is to avoid using large indexes. Traditionally, you would use block-range (BRIN) indexes to speed up range scans over roughly-sorted data. However, when you have unsorted data, BRIN indexes tend to perform poorly. Partitioning helps keep indexes small. It does this by dividing tables into partitions, avoiding fragmentation of data while maintaining smaller indexes.
-
With the Lab server expanded under the Servers tree in pgAdmin, expand Databases then select citus. When the citus database is highlighted, select the Query Tool button above.
-
Paste the following query into the Query Editor:
CREATE TABLE events( event_id serial, event_time timestamptz default now(), customer_id bigint, event_type text, country text, browser text, device_id bigint, session_id bigint ) PARTITION BY RANGE (event_time); --Create 5-minutes partitions SELECT partman.create_parent('public.events', 'event_time', 'native', '5 minutes'); UPDATE partman.part_config SET infinite_time_partitions = true; SELECT create_distributed_table('events','customer_id');
-
Press F5 to execute the query, or select the Execute button on the toolbar above.
-
After executing the query, verify that the new
events
table was created under the citus database by expanding Schemas -> public -> Tables in the navigation tree on the left. You may have to refresh the Schemas list by right-clicking, then selecting Refresh.
In this task, you will create two rollup tables for storing aggregated data pulled from the raw events table. Later, you will create rollup functions and schedule them to run periodically.
The two tables you will create are:
- rollup_events_5mins: stores aggregated data in 5-minute intervals.
- rollup_events_1hr: stores aggregated data every 1 hour.
You will notice in the script below, as well as in the script above, that we are sharding each of the tables on customer_id
column. The sharding logic is handled for you by the Hyperscale server group (enabled by Citus), allowing you to horizontally scale your database across multiple managed Postgres servers. This provides you with multi-tenancy because the data is sharded by the same Tenant ID (customer_id). Because we are sharding on the same ID for our raw events table and rollup tables, our data stored in both types of table are automatically co-located for us by Citus. Furthermore, this means that aggregations can be performed locally without crossing network boundaries when we insert our events data into the rollup tables. Our dashboard queries that execute against the rollup tables are always for a particular tenant (customer id). Hyperscale clusters allow us to parallelize our aggregations across shards, then perform a SELECT on a rollup for a particular customer from the dashboard, and have it automatically routed to the appropriate shard.
Another important thing to note about the rollup tables is that we are using HyperLogLog (HLL) data types to very rapid obtain distinct counts for devices and sessions (device_distinct_count
and session_distinct_count
). HyperLogLog is a fixed-size data structure that is extremely fast at estimating distinct value counts with tunable precision. For example, in 1280 bytes HLL
can estimate the count of tens of billions of distinct values with only a few percent error (source).
It is very common to run SELECT COUNT(DISTINCT)
on your database to update a dashboard with the number of unique items such as unique purchases of a particular item, unique users, unique page visits, etc. However, when you are using distributed systems, as Contoso Auto is in this situation, calculating unique counts is a difficult problem to solve. One reason for this is that there can be overlapping records across the workers. You could get around this by pulling all the data into a single machine and perform the count, but this does not scale well. Another option is to perform map/reduce functions, which scales, but are very slow to execute. The better option that provides scalability and speed is to use approximation algorithms to provide distinct count results within mathematically provable error bounds. This is why we are using HyperLogLog.
If we were not using HLL, we would be limited to creating a large number of rollup tables. You would need rollup tables for various time periods, and rollup tables to calculate the distinct counts constrained by combinations of columns. For example, if you pre-aggregate over minutes, then you cannot answer queries asking for distinct counts over an hour. If you try and each minute's result to find hourly visits to a specific page, for example, the result will be unreliable because you are likely to have overlapping records within those different minutes. This problem is further complicated when you want to return a count of page visits filtered by time and unique page visit counts by user or a combination of the two. HLL allows us to use one or two rollup tables to answer all of these queries and more. This is because HLL overcomes the overlapping records problem by encoding the data in a way that allows summing up individual unique counts without re-counting overlapping records. When we write data to the HLL columns, we also hash it to ensure uniform distribution. We'll go over this in a bit.
-
With the Lab server expanded under the Servers tree in pgAdmin, expand Databases then select citus. When the citus database is highlighted, select the Query Tool button above.
-
Replace the query in the Query Editor with the following:
CREATE TABLE rollup_events_5min ( customer_id bigint, event_type text, country text, browser text, minute timestamptz, event_count bigint, device_distinct_count hll, session_distinct_count hll, top_devices_1000 jsonb ); CREATE UNIQUE INDEX rollup_events_5min_unique_idx ON rollup_events_5min(customer_id,event_type,country,browser,minute); SELECT create_distributed_table('rollup_events_5min','customer_id'); CREATE TABLE rollup_events_1hr ( customer_id bigint, event_type text, country text, browser text, hour timestamptz, event_count bigint, device_distinct_count hll, session_distinct_count hll, top_devices_1000 jsonb ); CREATE UNIQUE INDEX rollup_events_1hr_unique_idx ON rollup_events_1hr(customer_id,event_type,country,browser,hour); SELECT create_distributed_table('rollup_events_1hr','customer_id');
-
Press F5 to execute the query, or select the Execute button on the toolbar above.
-
After executing the query, verify that the new
rollup_events_1hr
androllup_events_5min
tables were created under the citus database by expanding Schemas -> public -> Tables in the navigation tree on the left. You may have to refresh the Schemas list by right-clicking, then selecting Refresh.
In Contoso Auto's pipeline, they are storing event source data (clickstream time series data) in PostgreSQL, within the partitioned events
table you created earlier. The next step of the pipeline is to aggregate this data into rollup tables so it can be efficiently accessed by their dashboard app or BI tools without impacting performance on the raw data tables.
Rollups are an integral piece of this solution because they provide fast, indexed lookups of aggregates where compute-heavy work is performed periodically in the background. Because these rollups are compact, they can easily be consumed by various clients and kept over longer periods of time.
When you look at the SQL scripts for the five_minutely_aggregation
and hourly_aggregation
functions below, you will notice that we are using incremental aggregation to support late, or incoming, data. This is accomplished by using ON CONFLICT ... DO UPDATE
in the INSERT
statement.
When executing aggregations, you have the choice between append-only or incremental aggregation. Append-only aggregation (insert) supports all aggregates, including exact distinct and percentiles, but are more difficult to use when handling late data. This is because you have to keep track of which time periods have been aggregated already, since you aggregate events for a particular time period and append them to the rollup table once all the data for that period are available. Incremental aggregation (upsert), on the other hand, easily supports processing late data. The side effect is that it cannot handle all aggregates. We work around this limitation by using highly accurate approximation through HyperLogLog (HLL) and TopN
. As stated previously, we are aggregating new events and upserting them to our rollup tables. You still need to be able to keep track of which events have already been aggregated.
One way to keep track of which events have already been aggregated is to mark them as aggregated (SET aggregated = true
). The problem with this approach is that it causes bloat and fragmentation. Another way would be to use a staging table to temporarily store events. This can cause catalog bloat and high overhead per batch, depending on how often your aggregation is run. The recommended approach is to track the sequence number. This means that each event has a monotonically increasing sequence number (i
). We store sequence number S
up to the point in which all events were aggregated. To aggregate, we pull a number from the sequence (E
), briefly block writes to ensure there are no more in-flight transactions using sequence numbers <= E
(EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock)
), then incrementally aggregate all events with sequence numbers S
< i
<= E
. Finally, we set S
= E
and repeat this process on each upsert. You can see exactly how we're doing this in the incremental_rollup_window
function below. The rollups
table keeps track of the sequence for us. The five_minutely_aggregation
and hourly_aggregation
functions call incremental_rollup_window
to retrieve the range of page views that can be safely aggregated, using the start and end event_id
values (start_id
and end_id
).
Advanced aggregation is accomplished by using HyperLogLog (HLL) and TopN
, as discussed earlier. For this topic, reference the five_minutely_aggregation
and hourly_aggregation
functions below. Also, please note that where you see the special excluded
table in the query, it is used to reference values originally proposed for insertion. We are using hll_has_bigint
to hash the HLL columns device_id
and session_id
. This hash function produces a uniformly distributed bit string. HLL does this by dividing values into streams and averaging the results. The hll_add_agg
and hll_union
are used to do incremental rollups. TopN
keeps track of a set of counters in JSONB with the explicit goal of determining the top N (like top 10) items (or our "heavy hitters"). In our case, we're using it to return the top 1000 devices by device_id
. Similar to HLL, we are using topn_add_agg
and topn_union
to do incremental rollups. The topn_union
function merges TopN
objects over time periods and dimensions.
-
Open the Query Editor once more and replace the previous query with the following, then execute the query.
CREATE TABLE rollups ( name text primary key, event_table_name text not null, event_id_sequence_name text not null, last_aggregated_id bigint default 0 ); CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint) RETURNS record LANGUAGE plpgsql AS $function$ DECLARE table_to_lock regclass; BEGIN /* * Perform aggregation from the last aggregated ID + 1 up to the last committed ID. * We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent * aggregations from running concurrently. */ SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name) INTO table_to_lock, window_start, window_end FROM rollups WHERE name = rollup_name FOR UPDATE; IF NOT FOUND THEN RAISE 'rollup ''%'' is not in the rollups table', rollup_name; END IF; IF window_end IS NULL THEN /* sequence was never used */ window_end := 0; RETURN; END IF; /* * Play a little trick: We very briefly lock the table for writes in order to * wait for all pending writes to finish. That way, we are sure that there are * no more uncommitted writes with a identifier lower or equal to window_end. * By throwing an exception, we release the lock immediately after obtaining it * such that writes can resume. */ BEGIN EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock); RAISE 'release table lock'; EXCEPTION WHEN OTHERS THEN END; /* * Remember the end of the window to continue from there next time. */ UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name; END; $function$; -- Entries for the rollup tables so that they are getting tracked in incremental rollup process. INSERT INTO rollups (name, event_table_name, event_id_sequence_name) VALUES ('rollup_events_5min', 'events','events_event_id_seq'); INSERT INTO rollups (name, event_table_name, event_id_sequence_name) VALUES ('rollup_events_1hr', 'events','events_event_id_seq');
-
Replace the previous query with the following in the Query Editor to create a rollup function that populates the 5-minute rollup table. Then execute the query.
CREATE OR REPLACE FUNCTION five_minutely_aggregation(OUT start_id bigint, OUT end_id bigint) RETURNS record LANGUAGE plpgsql AS $function$ BEGIN /* determine which page views we can safely aggregate */ SELECT window_start, window_end INTO start_id, end_id FROM incremental_rollup_window('rollup_events_5min'); /* exit early if there are no new page views to aggregate */ IF start_id > end_id THEN RETURN; END IF; /* aggregate the page views, merge results if the entry already exists */ INSERT INTO rollup_events_5min SELECT customer_id, event_type, country, browser, date_trunc('seconds', (event_time - TIMESTAMP 'epoch') / 300) * 300 + TIMESTAMP 'epoch' AS minute, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count, topn_add_agg(device_id::text) top_devices_1000 FROM events WHERE event_id BETWEEN start_id AND end_id GROUP BY customer_id,event_type,country,browser,minute ON CONFLICT (customer_id,event_type,country,browser,minute) DO UPDATE SET event_count=rollup_events_5min.event_count+excluded.event_count, device_distinct_count = hll_union(rollup_events_5min.device_distinct_count, excluded.device_distinct_count), session_distinct_count= hll_union(rollup_events_5min.session_distinct_count, excluded.session_distinct_count), top_devices_1000 = topn_union(rollup_events_5min.top_devices_1000, excluded.top_devices_1000); END; $function$;
-
Replace the previous query with the following in the Query Editor to create a rollup function that populates the hourly rollup table. Then execute the query.
CREATE OR REPLACE FUNCTION hourly_aggregation(OUT start_id bigint, OUT end_id bigint) RETURNS record LANGUAGE plpgsql AS $function$ BEGIN /* determine which page views we can safely aggregate */ SELECT window_start, window_end INTO start_id, end_id FROM incremental_rollup_window('rollup_events_1hr'); /* exit early if there are no new page views to aggregate */ IF start_id > end_id THEN RETURN; END IF; /* aggregate the page views, merge results if the entry already exists */ INSERT INTO rollup_events_1hr SELECT customer_id, event_type, country, browser, date_trunc('hour', event_time) as hour, count(*) as event_count, hll_add_agg(hll_hash_bigint(device_id)) as device_distinct_count, hll_add_agg(hll_hash_bigint(session_id)) as session_distinct_count, topn_add_agg(device_id::text) top_devices_1000 FROM events WHERE event_id BETWEEN start_id AND end_id GROUP BY customer_id,event_type,country,browser,hour ON CONFLICT (customer_id,event_type,country,browser,hour) DO UPDATE SET event_count = rollup_events_1hr.event_count+excluded.event_count, device_distinct_count = hll_union(rollup_events_1hr.device_distinct_count,excluded.device_distinct_count), session_distinct_count = hll_union(rollup_events_1hr.session_distinct_count,excluded.session_distinct_count), top_devices_1000 = topn_union(rollup_events_1hr.top_devices_1000, excluded.top_devices_1000); END; $function$;
In this task, you will use the psql
COPY utility to load raw clickstream event data into the events
table. A few parallel COPY streams can load millions of events per second.
-
On your desktop, or your lab VM if you were provided one, open a new Command Prompt. If you are unfamiliar with how to do this in Windows, click the Start button, type
cmd
, then press Enter. -
In the Command Prompt window, type in the following then press Enter to navigate to the lab files directory for this experience:
cd C:\lab-files\data\7
-
For this next command, you will need to locate your PostgreSQL coordinator name that you copied from the Azure portal earlier. It should be in the form of
<your_postgres_name>-c.postgres.database.azure.com
. Use this value to replaceCOORDINATOR_NAME
in the following command:psql -h COORDINATOR_NAME -d citus -U citus
-
After modifying the command above, paste it into the Command Prompt window, then press Enter. When prompted, enter your PostgreSQL password.
-
After successfully entering your password and connecting to PostgreSQL, paste the following at the
citus=>
prompt, then hit Enter. This will copy the first set of records into theevents
table:\COPY events(customer_id,event_type,country,browser,device_id,session_id) FROM data/1.csv WITH (FORMAT CSV,HEADER TRUE);
This should output that you have copied 200,000 records from the CSV file.
-
Leave the Command Prompt open, then switch back to pgAdmin.
-
Open the Query Editor once more and replace the previous query with the following, then execute the query. This will run our 5-minute aggregation query.
SELECT five_minutely_aggregation();
-
Replace the previous query with the following in the Query Editor to run our hourly aggregation function. Then execute the query.
SELECT hourly_aggregation();
-
Switch back to your open Command Prompt window, then paste the following command into the window and press Enter.
\COPY events(customer_id,event_type,country,browser,device_id,session_id) FROM data/2.csv WITH (FORMAT CSV,HEADER TRUE);
This loads more data into our
events
table. We will now rerun the aggregations to demonstrate incrementally aggregating against new data into our rollup tables. -
Leave the Command Prompt open, then switch back to pgAdmin.
-
Open the Query Editor once more and replace the previous query with the following, then execute the query. This will re-run our 5-minute aggregation query.
SELECT five_minutely_aggregation();
-
Replace the previous query with the following in the Query Editor to re-run our hourly aggregation function. Then execute the query.
SELECT hourly_aggregation();
In this task, you will use pg_cron to run the aggregation functions on a periodic basis. Next, you will load the remainder of the test data and execute the dashboard queries.
You will then execute queries against the rollup tables that will be used for Contoso Auto's dashboard. This is to demonstrate that queries against the pre-aggregated tables that use HLL and TopN advanced aggregation features result in excellent query speeds and flexibility.
-
Replace the previous query with the following in the Query Editor to schedule the rollup functions to execute every 5 minutes, then execute the query.
SELECT cron.schedule('*/5 * * * *', 'SELECT five_minutely_aggregation();'); SELECT cron.schedule('*/5 * * * *', 'SELECT hourly_aggregation();');
-
Switch back to your open Command Prompt window, then paste the following command into the window and press Enter. This script will import the remaining 1.6 million rows.
\i copy.sql
This script simply executes the COPY command against the remaining 8 CSV files in the data folder. You should see a result of
COPY 200000
eight times. -
Switch back to pgAdmin. Although we set the cron schedule to run our query aggregates every five minutes, it is possible that they have not yet run. For now, replace the previous query in the Query Editor with the following to manually run the 5-minute aggregation query.
SELECT five_minutely_aggregation();
-
Replace the previous query with the following in the Query Editor to re-run our hourly aggregation function. Then execute the query.
SELECT hourly_aggregation();
-
Clear the query window and paste the following to retrieve the total number of events and count of distinct devices in the last 5 minutes:
SELECT sum(event_count) num_events, hll_cardinality(hll_union_agg(device_distinct_count)) distinct_devices FROM rollup_events_5min where minute >=now()-interval '5 minutes' AND minute <=now() AND customer_id=1;
Note: If you do not see any values in the result, try adjusting the
5 minutes
interval value to a higher value. If more than five minutes have passed since copying the data, you will not see results until you increase this value. -
Clear the query window and paste the following to return the count of distinct sessions over the past week:
SELECT sum(event_count) num_events, hll_cardinality(hll_union_agg(device_distinct_count)) distinct_devices FROM rollup_events_1hr WHERE hour >=date_trunc('day',now())-interval '7 days' AND hour <=now() AND customer_id=1;
-
Clear the query window and paste the following to return the trend of app usage in the past 2 days, broken down by hour:
SELECT hour, sum(event_count) event_count, hll_cardinality(hll_union_agg(device_distinct_count)) device_count, hll_cardinality(hll_union_agg(session_distinct_count)) session_count FROM rollup_events_1hr WHERE hour >=date_trunc('day',now())-interval '2 days' AND hour <=now() AND customer_id=1 GROUP BY hour;
-
Clear the query window and paste the following to return the top devices in the past 30 minutes:
SELECT (topn(topn_union_agg(top_devices_1000), 10)).item device_id FROM rollup_events_5min WHERE minute >=date_trunc('day',now())-interval '30 minutes' AND minute <=now() AND customer_id=2;
Thank you for participating in the open source databases at scale experience! We hope you have learned how managed PostgreSQL on Azure, using the Hyperscale (Citus) offering, can better help you manage and run high-scale PostgreSQL databases while meeting multi-tenancy and scale-out requirements.
To recap, you experienced:
- How to shard tables across nodes and create rollup tables for fast access to large data volumes.
- Create and schedule rollup functions.
- Use advanced aggregations using HyperLogLog and TopN.