The building block of a PeerDB deployment is a peer, which could represent a database from a wide variety of providers. We can query these peers using the same Postgres dialect of SQL and also run mirrors to migrate data between two peers. To begin with, let us create two Postgres peers on the same cluster that PeerDB uses for internal state management. If you have another Postgres cluster, you can use that as well! Just be sure to substitute the connection details wherever needed.
Let's begin by connecting to the PeerDB internal cluster [which we will call catalog from now on]:
```bash
psql "host=localhost user=postgres password=postgres database=postgres port=5432"
```
Then execute the following commands:
```bash
postgres=# CREATE DATABASE peerone;
CREATE DATABASE
postgres=# CREATE DATABASE peertwo;
CREATE DATABASE
```
Now switch over to peerone
and create some sample tables.
```bash
postgres=# \c peerone
You are now connected to database "peerone" as user "postgres".
peerone=# CREATE TABLE tableone(id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, c2 INT, t TEXT);
CREATE TABLE
peerone=# CREATE TABLE tabletwo(id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c3 INT, c4 INT, u TEXT);
CREATE TABLE
```
All done! Keep this terminal open though, we'll come to it later.
Now coming back to the terminal where we are connected to PeerDB itself, we run the following commands to create two Postgres peers.
```bash
kevin=> CREATE PEER peerone FROM POSTGRES WITH
(
host = 'catalog',
port = '5432',
user = 'postgres',
password = 'postgres',
database = 'peerone'
);
OK
kevin=> CREATE PEER peertwo FROM POSTGRES WITH
(
host = 'catalog',
port = '5432',
user = 'postgres',
password = 'postgres',
database = 'peertwo'
);
OK
```
Now check that we are able to query the two tables we created on peerone
.
```bash
kevin=> SELECT * FROM peerone.public.tableone;
id | c1 | c2 | t
----+----+----+---
(0 rows)
kevin=> SELECT * FROM peerone.public.tabletwo;
id | c3 | c4 | u
----+----+----+---
(0 rows)
```
Now imagine the two tables we created in peerone
have a lot of records being inserted, updated and deleted concurrently. We want a live copy of these tables in peertwo
. We don't want to change the data in any way, just Extract it from peerone
and Load it in peertwo
.
PeerDB supports this via CDC mirrors, which builds on Postgres' logical replication capablities and allows efficient synchronization between the state of two tables.
```bash
kevin=> CREATE MIRROR cdc_mirror FROM peerone TO peertwo WITH TABLE MAPPING (public.tableone:public.tableone_dst, public.tabletwo:public.tabletwo_dst);
CREATE MIRROR cdc_mirror
```
After a few seconds, you should be able to see the destination tables created automatically on peertwo
.
```bash
kevin=> SELECT * FROM peertwo.public.tableone_dst;
c1 | c2 | id | t
----+----+----+---
(0 rows)
kevin=> SELECT * FROM peertwo.public.tabletwo_dst;
c3 | c4 | id | u
----+----+----+---
(0 rows)
```
To simulate the desired usage patterns on the two tables, we'll use pgbench
which is a standard tool for Postgres testing. It usually comes bundled with psql
, so it should already be present on your system.
```bash
\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
INSERT INTO public.tableone(c1,c2,t) VALUES (:aid,:bid,'pgbenchinsert'||substr(md5(random()::text), 0, 25));
INSERT INTO public.tabletwo(c3,c4,u) VALUES (:bid,:aid,'pgbenchinsert'||substr(md5(random()::text), 0, 25));
```
```bash
\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
UPDATE public.tableone SET t='pgbenchupdate'||substr(md5(random()::text), 0, 25) where id = :aid;
UPDATE public.tabletwo SET u='pgbenchupdate'||substr(md5(random()::text), 0, 25) where id = :bid;
```
```bash
\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
DELETE FROM public.tableone WHERE id = :aid;
DELETE FROM public.tabletwo WHERE id = :bid;
```
These 3 pgbench scripts insert records in the two tables with random data, while the other two scripts attempt to update or delete a record if it updates. By running them concurrently, we can roughly simulate the sort of load we want.
After saving these scripts to files, we run them in parallel like so:
```bash
pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_insert.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_update.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_delete.sql -j 2 -c 8 -T 120 -P 60
```
[For those who are curious, the flags are asking pgbench
to run each script on 8 parallel connections on 2 threads, for a time period of 120 seconds and a progress report every 60 seconds.]
[NOTE: Windows Powershell or Command Prompt users cannot run commands in parallel like this, there are other ways but it would probably be easier to run them in 3 seperate terminals.]
As soon as the rows are inserted into the source tables, PeerDB will start receiving logical replication messages and perform the same operations on the destination tables. We can observe the process from psql in the other terminal.
```bash
kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst) AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst) AS two;
one | two
-----+-----
0 | 0
(1 row)
kevin=> \watch 5
... (every 5s)
one | two
------+------
7549 | 7543
(1 row)
... lines omitted ...
... (every 5s)
one | two
--------+--------
216463 | 216251
(1 row)
```
We can confirm that the records in both tables are exactly as the records in the source tables. Do note that rows present in the source tables before the mirror was created are not replicated currently.
```bash
kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst WHERE u LIKE '%update%') AS two;
one | two
-------+-------
35061 | 34768
(1 row)
kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone) AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo) AS two;
one | two
--------+--------
216463 | 216251
(1 row)
kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo WHERE u LIKE '%update%') AS two;
one | two
-------+-------
35061 | 34768
(1 row)
```
After some brainstorming, we realize that having a copy of both tables is unnecessary when all we do is run this one query against them.
```bash
peertwo=# SELECT one.id, one.c1+two.c3 AS c13, one.c2+two.c4 AS c24 FROM public.tableone_dst AS one JOIN public.tabletwo_dst AS two ON one.id=two.id;
```
It would therefore be ideal if we could just get the results of this query in one table. Well, PeerDB can do this too! PeerDB supports something called query replication [QRep] mirrors. They allow you to Extract the data from the source tables, Transform it in some way and then Load it into the destination tables.
We need to first create the table that the results are replicated to, keeping in mind that the results of the query we run must map to the columns of the destination table.
```bash
peertwo=# CREATE TABLE qrep_dst(id INT PRIMARY KEY, c13 INT, c24 INT);
CREATE TABLE
```
Now in the PeerDB terminal, we kickoff the QRep mirror.
```bash
kevin=> CREATE MIRROR qrep_mirror FROM peerone TO peertwo FOR $$SELECT one.id, one.c1+two.c3 AS c13, one.c2+two.c4 AS c24 FROM public.tableone AS one JOIN public.tabletwo AS two ON one.id=two.id WHERE one.id BETWEEN {{.start}} AND {{.end}}$$ WITH (watermark_column='id', watermark_table_name='public.tableone', mode='append', parallelism=2, refresh_interval=30, num_rows_per_partition=100000, destination_table_name='public.qrep_dst');
CREATE MIRROR qrep_mirror
```
Note that this QRep mirror is operating in 'append' mode, where rows are inserted once and changes to them are not tracked. This is fine for our purposes because we are only tracking rows that don't change.
Within a few seconds, you should see the existing data in peerone
and peertwo
in the table we created.
```bash
kevin=> SELECT SUM(one.c1+two.c3), SUM(one.c2+two.c4) FROM peerone.public.tableone AS one JOIN peerone.public.tabletwo AS two ON one.id=two.id;
sum | sum
--------------+--------------
178035703078 | 177984263847
(1 row)
kevin=> SELECT SUM(c13), SUM(c24) FROM peertwo.public.qrep_dst;
sum | sum
--------------+--------------
178035703078 | 177984263847
(1 row)
```
We can even run our pgbench
scripts from earlier and the changes should reflect in qrep_dst
. Also since our CDC mirror from before is still running, the new rows will reflect there as well.
```bash
pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_insert.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_update.sql -j 2 -c 8 -T 120 -P 60
```
```bash
kevin=> SELECT SUM(one.c1+two.c3), SUM(one.c2+two.c4) FROM peerone.public.tableone AS one JOIN peerone.public.tabletwo AS two ON one.id=two.id;
sum | sum
--------------+--------------
502025807680 | 501974368449
(1 row)
kevin=> SELECT SUM(c13), SUM(c24) FROM peertwo.public.qrep_dst;
sum | sum
--------------+--------------
502025807680 | 501974368449
(1 row)
kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo WHERE u LIKE '%update%') AS two;
one | two
--------+--------
154848 | 154972
(1 row)
kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst WHERE u LIKE '%update%') AS two;
one | two
--------+--------
154848 | 154972
(1 row)
```