Near real-time data pipeline that evaluates the accuracy of price feed contracts with respect to two public cryptocurrency APIs, Coingecko and Coinpaprika, for 3 pairs: ETH-USD, LINK-USD and BTC-USD. The metric used for the accuracy is the percentage difference between the values returned by the contracts and the average value returned by the APIs.
N.B.: This repo was written as part of a job interview and isn't necessarily a good example of a production Kafka use case.
docker-compose.yml
- This is the base composition. It allows you to bring up the stack
with just
docker-compose up
. Please be patient as downloading images will take a while. Don't forget to rundocker-compose down
when finished!
- This is the base composition. It allows you to bring up the stack
with just
-
kafka
- Basic Kafka setup: 1 broker, 2 topics (
contracts
andapis
) with one partition each and replication factor of 1 (due to there being only one broker). This setup is not suitable for a production environment.
- Basic Kafka setup: 1 broker, 2 topics (
-
zookeeper
- Kafka configuration manager. Single instance.
-
api_producer
- The source and
Dockerfile
for the API Producer service. Instantiates a Kafka Producer and sends synchronous requests to the relevant APIs by polling, then publishes the messages to theapis
topic.
- The source and
-
producer
- The source and
Dockerfile
for the Contract Producer service. Instantiates a Kafka Producer and listens asynchronously for events of typeAnswerUpdated
on the relevant price feeds, then publishes the messages to thecontracts
topic.
- The source and
-
api_consumer
- The source and
Dockerfile
for the API Consumer service. Consumes messages from theapi
topic and inserts into theapi_price
table.
- The source and
-
consumer
- The source and
Dockerfile
for the Contract Consumer service. Consumes messages from thecontracts
topic and inserts into thecontract_price
table.
- The source and
-
store
- TimescaleDB (Postgresql extension). Time-series DB designed to support high insert rates and fast time-based queries.
-
admin
- Adminer: Light-weight UI-based database management tool that can be used to quickly visualize the tables and run analytics queries against TimescaleDB.
-
tester
- Integration tests - to be done
Docker-compose. Version used for this project: 1.24.0. Not guaranteed to work with other versions.
Create an Infura project and copypaste the project id into an env file on the root level called producer.env
, like this:
INFURA_PROJECT_ID = <YOUR-PROJECT-ID>
Similarly create database.env
api_consumer.env
consumer.env
with the following ENV variables:
DB_USER=postgres
DB_PASSWORD=<YOUR-PASSWORD>
DB_NAME=kafka_sink
If you wish to add a new price feed contract simply specify the desired address and abi in the config.ini
file.
If you wish to add a new API please leverage the fetch
(for standard GET and POST requests) and fetch_custom
(for custom requests, e.g. get with path parameters)
from the APIHook
class. Additionally you might need to write a dedicated transform method in case the response is not in a JSON format.
Run
docker-compose up
In another terminal you can then run:
docker exec -it kafka-demo_store_1 psql -U postgres -a kafka_sink -c "SELECT * FROM contract_price;"
docker exec -it kafka-demo_store_1 psql -U postgres -a kafka_sink -c "SELECT * FROM api_price;"
Both target tables store event data in a jsonb column called event
. You can query by making use of Postgres JSON operators.
For example use the following query to get the latest ETH price from the coingecko ('cg') API:
SELECT (event -> 'ethereum' ->> 'usd')::numeric as price FROM api_price WHERE event ->> 'event_type' = 'cg' order by updated_at desc limit 1
This is just a quick demo of kafka's pub sub using Python. It's missing a lot of stuff you would want if you were going to use this in a production environment. Including:
- payload validation
- authentication and authorization
- improved error handling
- managing partitions
- probably other things