Skip to content

vinicius-schulz/data-engineering-trabalho-final

Repository files navigation

Disciplina: Data Engineering (Big Data & Analitycs)

Professor:

  • Antonio Claudio Lopes

Integrantes:

  • Dabla Arévalo Ferreira
  • Gabrielle Brito Cadurim
  • Larissa Alves da Silva
  • Mateus Soares da Silva
  • Vinicius Miranda Lopes Schulz

Avaliação final - Trabalho em grupo

Decrição

Utilizando os arquivos de dados em anexo:

Construa uma solução de big data com os seguintes requisitos:

  1. Gráficos:
  • Total de acidentes com vítima por bairro em acidentes com embriaguez;
  • Total de acidentes por tipo de pavimento e condição do tempo;
  • Total de pessoas acidentadas por tipo de veiculo e tipo de pavimentação;
  • Média de idade dos condutores por tipo de veículo e tipo de acidente;
  • Média de idade dos condutores por indicativo de embriaguez;
  1. Ser implementada utilizando os recursos apresentados na disciplina;

A entrega deve conter:

  • Os códigos fonte;
  • Documento pdf com os gráficos.

Provisionando projeto vagrant para construção da máquina virtual com o Hadoop

Pré-requisitos:

Passos para iniciar a máquina:

  1. Abra o prompt de comando (se o SO do host for windows, abra como administrador) e digite:

git clone https://github.com/vinicius-schulz/data-engineering-trabalho-final.git

  1. Navegue até o diretório do projeto pelo usando o terminal de sua preferência

  2. Digite o comando para provisionar a VM usando o vagrant:

vagrant up --provider=virtualbox

ou (para máquina linux usando docker)

vagrant up --provider=docker

  1. Aguarde até o final do processo de provisionamento (deve demorar mais de 1h, dependendo da sua conexão com a internet)

  2. Editar o arquivo hosts (Windows - C:\Windows\System32\drivers\etc\hosts) ou (Linux - /etc/hosts) adicionando a linha abaixo

10.211.55.101 node1

  1. Para verificar se tudo está funcionando corretamente, utilize os links abaixo:

Criando estrutura de diretórios na VM e HDFS e cópia de arquivos

Faça o upload dos arquivos si_env-2019.csv, si-bol-2019.csv, si-log-2019.csv e sparks.py para a VM

vagrant upload "si_env-2019.csv" /home/vagrant/si_env.csv

vagrant upload "si-bol-2019.csv" /home/vagrant/si_bol.csv

vagrant upload "si-log-2019.csv" /home/vagrant/si_log.csv

vagrant upload "si_env-2019.csv" /home/vagrant/sparks.py

Conecte-se à VM usando o comando abaixo

vagrant ssh

Crie o diretorio output na pasta /home/vagrant/ da VM

mkdir /home/vagrant/output

Crie a estrutura de pastas no HDFS

hdfs dfs -mkdir /user/vagrant/

hdfs dfs -mkdir /user/vagrant/env

hdfs dfs -mkdir /user/vagrant/bol

hdfs dfs -mkdir /user/vagrant/log

hdfs dfs -mkdir /user/vagrant/output

Envie os arquivos da VM para o HDFS

hdfs dfs -put /home/vagrant/si_env.csv /user/vagrant/env/

hdfs dfs -put /home/vagrant/si_bol.csv /user/vagrant/bol/

hdfs dfs -put /home/vagrant/si_log.csv /user/vagrant/log/

Instalar Python3.7 e Libs Pandas e Matplotlib

Instale o pacote software-properties-common para usar o comando add-apt-repository

sudo apt-get install software-properties-common

Adicione o repositório com o Python3.7

sudo add-apt-repository ppa:deadsnakes/ppa

Atualize os pacotes

sudo apt-get update

Instale o Python3.7 (necessário instalar o Python3.6 antes)

sudo apt-get install python3.6

sudo apt-get install python3.7

Instale o PIP

sudo apt install python3-pip

Rode os comandos abaixo setar a prioridades de uso do Python

sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.6 1

sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 2

Instale o editor nano

sudo apt-get install nano

Inicializar o python3.7 como padrão na inicialização do sistema

nano ~/.bashrc

Adicione as linhas abaixo ao final do arquivo aberto no nano para trocar a versão padrão do python do spark para o python3. Salve o arquivo após a edição.

alias python=python3
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3

Escreva o comando abaixo para recarregar o .bashrc

source ~/.bashrc

Instale as libs pandas e matplotlib

pip3 install pandas

pip3 install matplotlib

Inicializando o HIVE e criando tabelas

A partir da linha de comando da VM, inicie o HIVE com o comando abaixo

hive

Crie um database

CREATE DATABASE gpdb;

Conecte-se ao database criado

USE gpdb;

Crie as EXTERNAL TABLE

CREATE EXTERNAL TABLE IF NOT EXISTS si_env(`num_boletim` STRING, `data_hora_boletim` STRING, `Nº_envolvido` STRING, `condutor` STRING, `cod_severidade` STRING, `desc_severidade` STRING, `sexo` STRING, `cinto_seguranca` STRING, `Embreagues` STRING, `Idade` STRING, `nascimento` STRING, `categoria_habilitacao` STRING, `descricao_habilitacao` STRING, `declaracao_obito` STRING, `cod_severidade_antiga` STRING, `especie_veiculo` STRING, `pedestre` STRING, `passageiro` STRING)
COMMENT 'TABELA SI_ENV'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ';'
STORED AS TEXTFILE
LOCATION '/user/vagrant/env/';

CREATE EXTERNAL TABLE IF NOT EXISTS si_bol(`NUMERO_BOLETIM` STRING, `DATA HORA_BOLETIM` STRING, `DATA_INCLUSAO` STRING, `TIPO_ACIDENTE` STRING, `DESC_TIPO_ACIDENTE` STRING, `COD_TEMPO` STRING, `DESC_TEMPO` STRING, `COD_PAVIMENTO` STRING, `PAVIMENTO` STRING, `COD_REGIONAL` STRING, `DESC_REGIONAL` STRING, `ORIGEM_BOLETIM` STRING, `LOCAL_SINALIZADO` STRING, `VELOCIDADE_PERMITIDA` STRING, `COORDENADA_X` STRING, `COORDENADA_Y` STRING, `HORA_INFORMADA` STRING, `INDICADOR_FATALIDADE` STRING, `VALOR_UPS` STRING, `DESCRIÇÃO_UPS` STRING, `DATA_ALTERACAO_SMSA` STRING, `VALOR_UPS_ANTIGA` STRING, `DESCRIÇÃO_UPS_ANTIGA` STRING)
COMMENT 'TABELA SI_BOL'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ';'
STORED AS TEXTFILE
LOCATION '/user/vagrant/bol/';

CREATE EXTERNAL TABLE IF NOT EXISTS si_log(`Nº_boletim` STRING, `data_boletim` STRING, `Nº_municipio` STRING, `nome_municipio` STRING, `seq_logradouros` STRING, `Nº_logradouro` STRING, `tipo_logradouro` STRING, `nome_logradouro` STRING, `tipo_logradouro_anterior` STRING, `nome_logradouro_anterior` STRING, `Nº_bairro` STRING, `nome_bairro` STRING, `tipo_bairro` STRING, `descricao_tipo_bairro` STRING, `Nº_imovel` STRING, `Nº_imovel_proximo` STRING)
COMMENT 'TABELA SI_LOG'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ';'
STORED AS TEXTFILE
LOCATION '/user/vagrant/log/';

Geração dos resultados usando pyspark

Total de acidentes com vítima por bairro em acidentes com embriaguez;

Execute o comando pyspark

pyspark

Execute o código python abaixo dentro na linha de comando do pyspark

Código fonte da solução

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Total de acidentes com vitima por bairro em acidentes com embriaguez").enableHiveSupport().getOrCreate()
spark.sql("use gpdb")
df = spark.sql("SELECT trim(log.`nome_bairro`) as nome_bairro, COUNT(log.`nome_bairro`) as Quantidade FROM si_log log JOIN si_bol bol ON bol.`NUMERO_BOLETIM` = log.`Nº_boletim` WHERE EXISTS (SELECT * FROM si_env env where env.`Embreagues` = 'SIM' AND bol.`NUMERO_BOLETIM` = env.`num_boletim`) AND  bol.`DESC_TIPO_ACIDENTE` NOT LIKE '%SEM VITIMA%' GROUP BY log.`Nº_bairro`, log.`nome_bairro` ORDER BY log.`nome_bairro`")
dfpandas=df.toPandas()

size = 10
df_dict = {n: dfpandas.iloc[n:n+size, :] for n in range(0, len(dfpandas), size)}

for key in df_dict:
    plt.clf()
    plt.close()
    df_dict[key].plot.barh(y='Quantidade', x='nome_bairro', rot=75, figsize=(12, 12), fontsize=12, title='Número de Acidentes com Vítimas por Bairro', xlabel='Bairros')
    plt.savefig('output/output'+str(key)+'.png')

Total de acidentes por tipo de pavimento e condição do tempo;

Código fonte da solução

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Total de acidentes por tipo de pavimento e condição do tempo;").enableHiveSupport().getOrCreate()
spark.sql("use gpdb")
df = spark.sql(" SELECT CONCAT_WS(\" e \", trim(bol.DESC_TIPO_ACIDENTE), trim(bol.DESC_TEMPO)) as description, COUNT(bol.`PAVIMENTO`) as Pavimento FROM  si_bol bol GROUP BY bol.`DESC_TIPO_ACIDENTE`, bol.`DESC_TEMPO`")
dfpandas=df.toPandas()
plt.clf()
plt.close()
dfpandas.plot.barh(x="description", figsize=(30, 30), fontsize=12, title='Total de acidentes por tipo de pavimento e condição do tempo')
plt.savefig('output/output.png')

Total de pessoas acidentadas por tipo de veiculo e tipo de pavimentação;

Código fonte da solução

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Total de pessoas acidentadas por tipo de veiculo e tipo de pavimentação").enableHiveSupport().getOrCreate()
spark.sql("use gpdb")
df = spark.sql("SELECT CONCAT_WS(\" e \", trim(env.especie_veiculo), trim(bol.PAVIMENTO)) as description, count(*) as amount FROM si_log log JOIN si_bol bol ON bol.`NUMERO_BOLETIM` = log.`Nº_boletim` JOIN si_env env ON env.`num_boletim` = log.`Nº_boletim` GROUP BY env.especie_veiculo, bol.PAVIMENTO")
dfpandas=df.toPandas()

plt.clf()
plt.close()
dfpandas.plot.barh(x="description", figsize=(30, 30), fontsize=12, title='Total de pessoas acidentadas por tipo de veiculo e tipo de pavimentação')
plt.savefig('output/output.png')

Média de idade dos condutores por tipo de veículo e tipo de acidente;

Código fonte da solução

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Média de idade dos condutores por tipo de veículo e tipo de
acidente").enableHiveSupport().getOrCreate()
spark.sql("use gpdb")
df = spark.sql("SELECT CONCAT_WS(\" e \", trim(env.especie_veiculo), trim(bol.DESC_TIPO_ACIDENTE)) as
Tipo_Acidente, AVG(env.`Idade`)AS Media_Idade FROM si_log log JOIN si_bol bol ON
bol.`NUMERO_BOLETIM` = log.`Nº_boletim` JOIN si_env env ON env.`num_boletim` = log.`Nº_boletim` GROUP
BY env.especie_veiculo, bol.DESC_TIPO_ACIDENTE")
dfpandas=df.toPandas()
size = 10
df_dict = {n: dfpandas.iloc[n:n+size, :] for n in range(0, len(dfpandas), size)}
for dabla in df_dict:
    plt.clf()
    plt.close()
    df_dict[dabla].plot.barh(y='Media_Idade', x='Tipo_Acidente',rot=75, figsize=(12, 12), fontsize=12, title='Média de idade dos condutores por tipo de veículo e tipo de acidente', xlabel='Média')
    plt.savefig('output/'+str(dabla)+'.png')

Média de idade dos condutores por indicativo de embriaguez;

Código fonte da solução

from pyspark.sql import SparkSession
from pyspark.sql import Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.appName("Média de Idade dos condutores por indicativo de Embreagues").enableHiveSupport().getOrCreate()
spark.sql("use gpdb")
df = spark.sql("SELECT CONCAT_WS(\" e \", trim(env.idade), trim(env.`Embreagues`)) as Embreagues, COUNT(env.`Idade`) as Idade FROM si_env env GROUP BY env.`Idade`, env.`Embreagues`")
dfpandas=df.toPandas()
size = 10
df_dict = {n: dfpandas.iloc[n:n+size, :] for n in range(0, len(dfpandas), size)}

for key in df_dict:
    plt.clf()
    plt.close()
    df_dict[key].plot.barh(y='Quantidade', x='nome_bairro', rot=75, figsize=(12, 12), fontsize=12, title='Número de Acidentes com Vítimas por Bairro', xlabel='Bairros')
    plt.savefig('output/output'+str(key)+'.png')

Também é possível executar uma aplicação escrita em um arquivo usando o comando spark-submit

spark-submit /home/vagrant/sparks.py

Após a execução e geração dos arquivos de gráfico saida do console do pyspark apertando Ctrl+D

Enviar arquivos gerados para o HDFS

  1. Execute o comando abaixo para copiar as imagens geradas para o HDFS

hdfs dfs -put -f /home/vagrant/output/ /user/vagrant/output/

  1. Os arquivos de imagem serão disponibilizados no diretorio '/user/vagrant/output/'. Será possível acessar o mesmo por meio do link

http://node1:50070/explorer.html#/user/vagrant/output

Parar a máquina virtual

Pelo linha de comando digite e aguarde o encerramento da VM

vagrant halt

Créditos

Antonio Claudio Lopes

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •