Getting Data from the SFPD dataset, integrating this data into a Datamart, and then proceed with the analysis through a dashboard.
The objective is to perform analysis on incidents happening in San Francisco via indicators in a dashboard while going through the integration phase after applying some transformations. This project is done in order to practice and be familiar with some data engineering and data analysis tools : Apache Airflow, Docker, Python and Power BI.
- Some explanations and remarks about the dataset.
- Designing the Datamart model.
- Explaining the DAG.
- Fetching the relevant data i.e the incidents that have been reported the day before.
- Creating the tables if they have not been created yet.
- Loading the dimension tables with the appropriate data.
- Loading the fact table and the incident_category table.
- Creating the dashboard after connecting Power BI to the Datamart.
- Final thoughts.
The dataset used in this project contains information about the incidents that was reported in San Francisco Polcie Department, it contains multiple columns, the columns that will be used for the project :
- incident_id : a system generated identifier for the incident report.
- incident_description : the description of the incident.
- incident_datetime : The date and time when the incident occurred.
- incident_day_of_week
- incident_category
- incident_subcategory
- report_datetime : the date and time when the report was filed.
- report_type_code : a system code for report types.
- report_type_description
- police_district : the police district where the incident occured.
- latitude : the latitude coordinate in WGS84.
- longitude
- resolution : The resolution of the incident at the time of the report. It can be either of (Cite or Arrest Adult, Exceptional Adult, Open or Active, Unfounded).
It is worth mentioning that the incident_id can be repeated in multiple rows in the dataset, that is due to the fact the incident can belong to multiple categories.
In the example above, 1242744 is repeated three times for the categories Assault, Robbery and Weapons Carrying Etc.
The model contains 4 dimensions and one fact table. Besides, a table incident_category is used to normalize incident and category. However, it is also possible to add the category in the fact table in a denormalized manner so that the incident_id is repeated in the table, this solution can be used when we want to avoid an additional join operation; in contrast, the table will be harder to maintain in comparison to the previous approach.
The time dimension is a role playing dimension which will serve in keeping track of incidents and reports dates.
The grain would be that each row in the fact table gives inforamtion about the incident that took place in a location at a time and reported at a time belonging to a category filed by a police district with a resolution status.
The Directed Acyclic Graph in Apache Airflow represents a series of tasks.
First, the data is retrieved from the API and is stored in a staging table. Then, the dimension tables are loaded after opening a connection to the database. After that, the fact table and the incident_cateogory tables are loaded. Finally, the connection to the database is closed.
This DAG is scheduled using Airflow to run on a daily basis.
In this step, the data is fetched from the API provided by the SFPD that contains information about the incidents from 2018 to present, the dataset is updated every day.
The goal is to retrieve only yesterday's data; however, if we used the API endpoint provided https://data.sfgov.org/resource/wg3w-h783.csv, it will load 5000 random rows from the dataset. Fortunately, the API provides different ways to query the dataset using filters and SoQL Queries. Therefore, it is possible to get only yesterday's data. Then, data is inserted into a staging table crimes.
The task code can be found in get data from api file.
The script for the creation of the staging, dimension and fact tables can be found in db_data folder.
The dimension tables loading step is where the dimension tables are loaded from the data that is present in the staging table in a correct manner.
Only yesterday's date is loaded into the time dimension table.
The logic followed for the other dimensions :
- Distinct values related to the dimension are extracted from the staging table and are loaded in a python dataframe df1 and apply some transformations.
- Extract data that is present in the dimension table and put it in df2 and apply some transformations.
- Retrieve only the values that are in the df1 and not in df2 and put them in df3.
- Append df3 to the dimension table.
The tasks codes are present in the functions folder.
After loading the dimension tables of the datamart, the fact table is loaded by applying joins between the dimension tables and the staging table.
The logic followed is :
- Extract data from the staging table ST and apply some transformations.
- Extract the dimension tables DTs and apply some transformations.
- Perform joins between ST and DTs.
- Append new data to the fact table.
This task code can be found in the load fact file.
The logic followed for loading incident_category table is :
- Extract distinct value pairs of incident_id and category_id and put them in df1.
- Extract data that is present in the existing incident_category table and the it in df2.
- Retrieve data that is present in df1 and not present in df2 and put it in df3.
- Append df3 to the incident_category in the datamart.
The dashboard contains information about the number of incidents by date and time, it is possible to navigate through the hierarchy and to filter by any year, month date or day of week. It also contains information about the number of incidents rate grouped by category and subcategory, the number of incidents by the resolution status in a pie chart and in the map.
One thing that i wanted to improve, is the resolution status of the incident. After reading the API docs, I'm still not sure exactly of what happens to the dataset when an incident gets resolved. After loading the whole dataset and querying it to find any incident that has its resolution changed, the result was an empty set.
Besides, the approach followed when normalizing incidents and category data helps in updating only one row for each incidents in the fact table. In addition, adding the information of the date and time of the resolution to the dataset would be a good plus. This way, we can apply SCD concept in the fact table and we can get the time taken to resolve each incident.
Any advice, remark or criticism is welcome.