- Introduction
- Build_Tools
- Pre-requisites
- ETL Process
- Database Normalization
- Contributions
- Bug / Feature Request
- Authors
- This project is aimed at providing actionable insights to support United Nations' (UN) Sustainable Development Goal (SDG) Number 16.
- The SDGs are goals that are universally set by the UN Member States, to end poverty, protect the planet, and ensure peace and prosperity to all people.
- According to the UN, SDG Number 16 is directed to "Promote peaceful and inclusive societies for sustainable development, provide access to justice for all and build effective, accountable and inclusive institutions at all levels."
- This project is directed at supporting the achievement of SDG 16, by providing data to generate actionable insights to stakeholders regarding the 2022 Presidential Elections, Police Brutality, and Propagation of Hate Speech, in one of Kenya's most widely used social media platform -- Twitter.
- Python 3
- Tweepy -- Python library for Twitter API
- PostgreSQL
- JSON
- SQLAlchemy
- Pandas
You need to have the following to run the application:
- Linux OS (installed by dual booting with Windows, or on Windows using WSL 2)
- FTP Clients e.g FileZilla or WinSCP --> for file transfers between Windows and Linux environments (or remote Linux Server)
- PostgreSQL RDBMS installed
- Twitter API Keys -- access tokens, consumer keys
- Python 3 environment with Tweepy, Pandas, Numpy, SQLALchemy, Json, and Glob libraries installed
- ETL is short form for Extract, Transform, Load -- a three phase integration process for getting data to a data warehouse
To extract the data(tweets) from Twitter, we create a Streaming Pipeline using the below steps:
- Import the Tweepy package, and then authenticate from the Twitter API using access tokens and consumer keys
- Specify keywords (filtering) that would be used to stream the tweets. This will revolve around the 2022 Presidential Elections, Police Brutality, and Propagation of Hate Speech
- Instantiate
SListener
andStream
objects, and loop the streaming filter to run continuously in a streaming fashion - Include an
except
clause to handle network disconnections, by reconnecting streaming automatically - The streamed tweets (in
SListener
object) will be dumped in JSON files (staging area), using Python's file writing operation. Note that the filename is derived from the time the file was being written, and each file is capped at 500 tweets
Note that the JSON files created are a list of key: value pairs, containing information about the tweet (e.g time of creation, tweet text, tweet sender).
To transform the data, we create a Data Transform Pipeline using the below steps:
- Create a function
json2df
, which takes a JSON filename as a parameter; to load the JSON files into a list of Python dictionaries - Utilize Pandas to convert the dictionaries to Pandas DataFrames
- Add the converted dataframes to a list named
to_sql
, where each dataframe corresponds to a table in the database - The function
json2df
then returns theto_sql
list, which will be used for the last step of the ETL process
To load the data to the PostgreSQL data warehouse, we create a Data Load Pipeline as below:
-
Create a new database in the PostgreSQL RDBMS.
This can be achieved graphically using pgAdmin 4 (management system for Postgres), or programmatically as below:
-
To create tables for data storage in the database, we will employ SQLAlchemy
SQLAlchemy allows creation of tables from Pandas DataFrames automatically, without the need for explicitly specifying columns and column types for each table.
This comes in handy, as we have lots of tables and columns to load into our database -
Proceed to load data into the database tables from the Pandas DataFrames, by utilizing the
Pandas.DataFrame.to_sql()
function
-
We then automate data loading to the database by batch processing using Linux's Crontab
-
We need to process files for previous day, by adding a Crontab
-
The full command of the Crontab for execution on a daily cadence at 4:30 A.M UTC is as below:
30 4 * * * /home/imwendwa/.pyenv/versions/twitterScraping/bin/python
/home/imwendwa/analytics/policeAndElectionsTwitterScraping/police_and_elections_etl_prod.py >>
/home/imwendwa/analytics/policeAndElectionsTwitterScraping/etl_logs/`date +\%Y-\%m-\%d_\%H:\%M:\%S`-police-and-elections-etl-logs.log 2>&1
-
The first line of the command references running the script using the Python interpreter (.../bin/python) at 4:30 AM UTC
-
The second line is the path of the ETL script, and we pipe the output using the ">>" symbol
-
The third/fourth line of the Crontab is the path of the ETL logs, we get after running the ETL script. The filename is derived from a timestamp as the prefix
-
- Database normalization is a database design principle for organizing data in an organized and consistent manner
- It is directed at avoiding redundancy and maintaining integrity of the database; by avoiding complexities and eliminating duplicates
- In normalization, data is divided into several tables linked together with relationships; achieved using primary, foreign and composite keys
- We note that the tables created using SQLAlchemy do not have primary/foreign keys specified, nor do they have any explicit relationships defined -- implying a lot of redundancy in the tables
- To correct this, we do the following to normalize our database:
- Add constraints to the tables by creating primary and foreign keys in the tables
- Delete duplicate records in the tables
- Delete unnecessary columns from the tables
- The normalized database will now look like as below:
-
This can be summarized to the below forms:
Contributions are welcome using pull requests. To contribute, follow these steps:
- Fork this repository.
- Create a branch:
git checkout -b <branch_name>
- Make your changes to relevant file(s)
- Check status of your commits:
git status
- Add and commit file(s) to the repo using:
git add <file(s)>
git commit -m "<message>"
- Push repo to Github:
git push origin <branch_name
- Create the pull request. See the GitHub documentation on creating a pull request.
If you find a bug (the website couldn't handle the query and/or gave undesired results), kindly open an issue here by including your search query and the expected result.
If you'd like to request a new function, feel free to do so by opening an issue here. Please include sample queries and their corresponding results.
See also the list of Contributors who participated in this project.