O material abaixo foi inspirado na guia de objetivos fornecido pela Hortonworks em seu site.
- Ingestão de Dados
- Importação de tabela de um RDBMS para o HDFS usando o Sqoop
- Importe os resultados de uma query a um banco de dados para o HDFS
- Importe uma tabela de um banco de dados relacional para uma tabela do Hive
- Insira ou atualize dados do HDFS para um tabela de uma banco de dados relacional
- Inicie um agente do Flume a partir de um arquivo de configuração
- Configure um
channel
de memória com um tamanho específico
- Transformação de Dados
- Escreva execute um script do Pig
- Carregue dados para uma relação do Pig sem definir um esquema
- Carregue dados para uma relação do Pig definindo um esquema
- Carregue dados de uma tabela do Hive para uma relação do Pig
- Use o Pig para transformar dados para um formato específico
- Transforme os dados para um esquema pŕe-definido do Hive
- Agrupe os dados em uma ou mais relações do Pig
- Use o Pig para remover valores ausentes em uma relação
- Armazene os dados de uma relação no Pig em uma pasta no HDFS
- Armazene os dados de uma relação no Pig em uma tabela do Hive
- Ordene a saída de uma relação do Pig
- Remove as tuplas duplicadas de uma relação do Pig
- Especifique o número de reducers a serem usados no Pig
- Junte dois datasets usando o Pig
- Perform a replicated join using Pig
- Run a Pig job using Tez
- Within a Pig script, register a JAR file of User Defined Functions
- Within a Pig script, define an alias for a User Defined Function
- Within a Pig script, invoke a User Defined Function
- Análise de Dados
- Write and execute a Hive query
- Define a Hive-managed table
- Define a Hive external table
- Define a partitioned Hive table
- Define a bucketed Hive table
- Define a Hive table from a select query
- Define a Hive table that uses the ORCFile format
- Create a new ORCFile table from the data in an existing non-ORCFile Hive table
- Specify the storage format of a Hive table
- Specify the delimiter of a Hive table
- Load data into a Hive table from a local directory
- Load data into a Hive table from an HDFS directory
- Load data into a Hive table as the result of a query
- Load a compressed data file into a Hive table
- Update a row in a Hive table
- Delete a row from a Hive table
- Insert a new row into a Hive table
- Join two Hive tables
- Run a Hive query using Tez
- Run a Hive query using vectorization
- Output the execution plan for a Hive query
- Use a subquery within a Hive query
- Output data from a Hive query that is totally ordered across multiple reducers
- Set a Hadoop or Hive configuration property from within a Hive query
- Output the execution plan for a Hive query
- Use a subquery within a Hive query
- Output data from a Hive query that is totally ordered across multiple reducers
- Set a Hadoop or Hive configuration property from within a Hive query
Para isso, nós devemos usar o sqoop import
e sempre devemos
usar o argumento --connect
que vai permitir que o sqoop
se conecte ao banco
de dados. Por exemplo:
$ sqoop import \
--connect jdbc:mysql://database.example.com:3306/employees
No exemplo acima, o sqoop
se conectará a uma base de dados MySQL nomeada
employees
na máquina host
database.example.com
.
Podemos listar o nome de todas as base de dados disponíveis com sqoop list-databases
e as tabelas na base de dados usando o sqoop list-tables
.
Na importação, o nome da tabela deve ser informada usando o argumento
--table
. Caso deseje, é possível importar todas as tabelas do banco de dados
usando sqoop import--all-tables
.
É provável que seja necessário fornecer um nome de usuário (--username
) e
senha para o sqoop
. Para o nome de usuários, nós usamos --username
e para a
senha, nós usamos diretamente via --password
ou usando um arquivo com o
--password-file
(mais seguro).
O sqoop
nos permite salvar a tabela no HDFS nos formatos
- Texto (padrão):
--as-textfile
- Avro:
--as-avrodatafile
- SequenceFiles:
--as-sequencefile
- Parquet:
--as-parquetfile
Vamos ver um exemplo de como importar uma tabela chamada vendas
de um banco
de dados MySQL chamado ecommerce.db
para dentro do HDFS not formato texto:
$ sqoop import \
--connect jdbc:mysql://database.example.com:3306/ecommerce.db \
--username fulano \
--password 123456 \
--table vendas \
--as-textfile
Note que o último argumento, --as-textfile
, não é obrigatório já que o
sqoop
importa para formato texto por padrão. Uma pasta chamada vendas
será
criada na pasta do usuário no HDFS.
Para maiores informações, recomendo a leitura da documentação.
Podemos realizar uma query em um banco de dados relacional e importar o resultado direto para o HDFS. Para isso, nós precisaremos usar dois argumentos:
--query
, em que passamos aquery
para osqoop
; e--target-dir
, em que passamos o diretório no HDFS em que os dados serão gravados.
Por exemplo:
$ sqoop import \
--connect jdbc:mysql://database.example.com:3306/ecommerce.db \
--username fulano \
--password 123456 \
--query 'SELECT vendas.*, fornecedores.* FROM vendas JOIN fornecedores on (vendas.forn_id == fornecedores.id) WHERE vendas.preco > 5000' \
--target-dir /user/fulano/meus-resultados
A documentação mostra alguns argumentos extras para casos especiais.
Para importar uma tabela direto para o Hive, nós podemos usar os seguintes argumentos:
--hive-import
: este parâmetro é obrigatório.--create-hive-table
: opcional. Cria a tabela caso esta não exista. Se existir, retorna um erro.--hive-overwrite
: opcional. Sobrescreve uma tabela caso esta já exista.--hive-table <db_name>.<table_name>
: opcional. Informa em qual banco de dados e em qual tabela salvar.
Vamos ver um exemplo:
$ sqoop import \
--connect jdbc:mysql://database.example.com:3306/ecommerce.db \
--username fulano \
--password 123456 \
--table vendas \
--hive-import \
--create-hive-table \
--hive-table ecommerce.vendas
Para maiores detalhes, recomendo a documentação e este tutorial.
Além de inserir tabelas de um banco de dados relacional para o HDFS, o sqoop
também nos permite fazer o processo contrário. Vamos ver agora como exportar
uma tabela do HDFS para um banco de dados.
Ao invés do import
nós devemos usar o export
, e alguns dos argumentos são
os mesmos, tais quais --connect <jdbc-uri>
, --username
e --password
.
Vamos ver um exemplo:
$ sqoop export \
--connect jdbc:mysql://database.example.com:3306/ecommerce.db \
--username fulano \
--password 123456 \
--table vendas \
--export-dir /user/fulano/vendas
O comando acima vai exportar uma tabela localizada no diretório
/user/fulano/vendas
no HDFS para uma banco de dados relacional MySQL chamado
ecommerce.db
.
O comando export
aceita outros argumentos e recomendo a
documentação para uma melhor explicação.
Nós iniciamos um agente do Flume através do terminal. Para isso
$ flume-ng agent --name nome_do_agente --conf /etc/flume/conf --conf-file flume.conf
O comando agent
inicia o agente coletor do Flume. Vamos ver agora os
argumentos passados para o Flume:
--name
(-n
): nome do agente (obrigatório):--conf
(-c
): aponta o diretório onde Flume encontrará seus principais configurações.conf-file
(-f
): caminho para o arquivo de configuração do Flume onde o agente está definido.
Para maiores detalhes, recomendo a documentação oficial e este guia da Hortonworks que mostra como começar o agente no HDP.
Um channel
é um repositório onde os eventos capturados pelo course
são
armazenados até que o sink
os removam. Um channel
de removam é o tipo mais
básico de channel
e, abaixo, mostramos uma configuração típica, considerando
que um source
e um sink
já foram configurados.
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.byteCapacityBufferPercentage = 20
Vamos ver esta configuração linha a linha:
- Um
channel
de nomea1
é definido para o agentea1
. - O tipo é definido,
memory
. - Definimos a capacidade do
channel
que, neste exemplo, é de 10000 eventos. - Nesta linha, definimos a capacidade máxima de eventos que o
channel
vai passar dosource
para osink
. Neste caso, são 10000 eventos por transação. - Quantidade total de memória, em bytes, que é a soma de todos os eventos
armazenados. Vale notar que, da forma que o
channel
de memória foi implementado, obyteCapacity
corresponde apenas aos dados dobody
do evento. - Nesta última linha, definimos a a quantidade de memória para o cabeçalho dos
eventos. O valor é a porcentagem do buffer entre o
byteCapacity
e a soma estimada de todos os eventos que, neste caso, foi definida como 20%.
O exemplo acima foi retirado da documentação que também
exemplifica os outros tipos de channels
.
O Pig é uma ferramenta de alto nível para processamento de dados. Ele nos permite processar dados de forma mais prática do que se fôssemos usar diretamente o MapReduce.
Nós podemos processar dados através um um ambiente REPL (Read-Eval-Print-Loop) chamado Grunt ou através de execução de scripts criados no nosso editor de preferência. Vamos ver aqui um exemplo simples de script e como executá-lo, mas saiba que é possível rodar o script linha a linha no ambiente REPL.
Vamos ver um script simples obtido da documentação:
/* meu_script.pig
Meu script é simples.
É composto apenas de três declarações
*/
A = LOAD 'vendas' USING PigStorage() AS (item:chararray, preco:float, qtde:int); -- carregando os dados
B = FOREACH A GENERATE item; -- transformando os dados
DUMP B; -- obtendo os resultados
As quatro primeiras linhas são um comentário. Na sexta linha, nós carregamos
uma tabela chamada vendas
em que três colunas são definidas: item
definida
como uma string, preco
definida como um número do tipo float e qtde
definida como um número inteiro. Na sétima linha, executamos um loop na
tabela carregada em A
e executamos uma operação que, neste caso, é só
retornar o valor de item
. Na última linha, nós retornamos o resultado da
relação B
.
É importante salientar, que no Pig, A
e B
não são chamados de variáveis,
mas de relações.
Para executar este script, nós devemos executar o seguinte commando:
$ pig meu_script.pig
Nós utilizamos a declaração LOAD
para carregar um conjunto de dados para uma
relação do Pig. A forma mais básica de carregar um conjunto de dados é:
A = LOAD 'vendas';
Em que carregamos uma tabela chamada vendas
usando o método padrão,
PigStorage
.
É importante saber que o Pig considera tabulação como delimitador padrão. Para
definir um delimitador explictamente precisamos usar USING PigStorage('delimitador')
. Por exemplo, para carregar um arquivo CSV, nós
temos que fazer o seguinte:
A = LOAD 'vendas' USING PigStorage(',');
A documentação possui mais detalhes.
Agora, veremos como definir um esquema ao carregar uma tabela de dados no Pig.
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
Nós carregamos a mesma tabela, só que desta vez nós definimos o nome de cada coluna e o tipo de dado que cada coluna armazena. Não é necessário definir o tipo da variável e, caso não façamos, será carregada como texto.
O Pig nos permite obter dados de outras fontes como de uma tabela armazenada no
Hive. Para isso, nós vamos precisar o argumento USING
junto com o LOAD
:
A = LOAD 'vendas' USING org.apache.hive.hcatalog.pig.HCatLoader();
Veja que utilizamos o HCatLoader
ao invés do PigStorage
. Para maiores detalhes, veja a documentação.
Após carregar os dados usando LOAD
, nós vamos quere executar alguma operação
neles e para isso podemos usar o comando FOREACH
que executa um loop linha
a linha. Por exemplo:
A = LOAD 'vendas';
B = FOREACH A GENERATE $0;
Nós carregamos a tabela acima na primeira linha e criamos uma relação B
que
contém apenas a informação da primeira coluna.
Um exemplo mais interessante é calcular o valor da receita de cada venda:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = FOREACH A GENERATE preco * qtde;
Veja a documentação do FOREACH
para maiores detalhes.
Neste tópico, o objetivo é transformar uma tabela carregada com o Pig para um
esquema já pré-definido do Hive. Para isso, basta usarmos o FOREACH
para
colocar as colunas da tabela na mesma ordem da tabela do Hive. Por exemplo:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = FOREACH A GENERATE qtde, preco, item;
Em que o esquema do Hive teria o esquema qtde
, preco
e item
.
Em processos de análise de dados, o argupamento de dados é uma operação básica,
essencial e corriqueira. No Pig, nós utilizamos o GROUP BY
:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = GROUP A BY item;
Também é possível agrupar por mais de um campo:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = GROUP A BY (item, preco);
Caso tenhamos duas relações, podemos fazer um COGROUP
:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = LOAD 'estoque' AS (item_nome:chararray, qtde_estoque:int);
C = COGROUP A BY item, B BY item_nome;
A documentação possui mais detalhes e exemplos.
É comum termos valores ausentes em nosso dados e, em muita das vezes,
precisamos excluir estas entradas. Para fazer isto com o Pig, nós usamos o
comando FILTER
:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = FILTER A BY item != '';
A documentação do FILTER
possui mais detalhes.
Após realizarmos nossas operações nos dados, é bem provável que iremos querer
salvar o conjunto de dados trasnformado para futuras consultas. Para isso, nós
utilizamos o método STORE
. No exemplo abaixo, exemplifico como armazenar uma
relação A
:
STORE A INTO 'meus_dados' USING PigStorage(',');
Estamos salvando uma relação A
em uma pasta chamada meus_dados
. Esta pasta
contém arquivos típicos de uma saída de um processo MapReduce: um arquivo
chamado _SUCCESS
apontando o sucesso da operação, e um ou mais arquivos com
os dados.
Aqui, nós estamos forçando que os elementos de cada linha sejam separadas por
vírgula (USING PigStorage(',')
). Se nós não usarmos este argumento, o Pig
salvará usando tabulação que é seu delimitador padrão.
No nosso exemplo acima, é possível que o Pig salve os dados na máquina local;
vai depender de como o pig foi iniciado (e.g., pig -x local
). para nos
certificarmos que a relação A
será salva no HDFS, nós temos que passar o
endereço correto do HDFS.
STORE A INTO 'hdfs://sandbox-hdp.hortonworks.com:8020/user/root/meus_dados';
A documentação do STORE
possui mais exemplos.
Além de salvar o arquivo em um diretório do HDFS, nós também podemos salvar diretamente para um banco de dados do Hive. Para isso, nós precisaremos primeiro criar a tabela no Hive. Você pode fazer isso na interface gráfica do Ambari ou na interface do Hive no terminal:
hive> CREATE TABLE vendas (
> item string,
> preco float,
> qtde int
> );
Com isso, teremos uma tabela criada no Hive. Por padrão, o Hive criara no banco
de dados default
. Agora, para podermos exportar o dado para o Hive, nós
precisaremos usar a flag -useHCatalog
, tanto se estivermos rodando um
script quanto para abrir o grunt:
$ pig -useHCatalog
...
grunt> A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
grunt> STORE A INTO 'nome_da_db.vendas' using org.apache.hive.hcatalog.pig.HCatStorer();
Outro requisito é que os dados tenham um esquema definido e que seja o mesmo da tabela criada no Hive.
Para maioes detalhes, veja a documentação.
Para ordernar uma tabela nós usmaos o ORDER BY
:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = ORDER A BY preco;
C = ORDER A BY qtde DESC;
Por padrão, o Pig ordena de forma crescente, que é eemplificado na segunda
linha do código acima (relação B
). Para ordernar de forma descendente, nós
usamos o argumento DESC
, exemplificado na terciera linha na relação C
).
A documentação traz mais exemplos.
Um outro processo de manuseio de dados muito importante é a remoção de
duplicatas, e para isso, nós utilizamos o DISTINCT
.
B = DISTINCT A
Este comando vai retornar apenas as tuplas distintas. Vale notar aqui que cada linha da tabela vai ser tratada como se fosse uma tupla, assim o `DISTINCT só verifica por linhas distintas. Em caso de duplictas, o Pig retorna apenas uma tupla e descarta as outras.
Veja a documentação para maiores detalhes.
O Pig nos permite definir manualmente o número de * reducers* a serem usados em suas tarefas MapReduce. Veja que só podemos faer isto para os reducers ; a quantidade de paraleleismo para os maps é definido pela arquivo de entrada (um map para cada bloco no HDFS).
No exemplor abaixo, mostro como executar 10 reducers num processo de agrupamento:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = GROUP A BY item PARALLEL 10;
Podemos também definir globalmente a quantidade de reducers em nossos scripts:
SET default_parallel 10;
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = GROUP A BY item ;
Caos o nível de paralelismo não tenha sido defiido manualmente, o Pig decidirá por você. Para maiores detalhes, recomendo a documentação.
Outro processo bem comum é juntar dois conjuntos de dados. Por padrão, o Pig
faz um outer join e o usuário pode definir o tipo: LEFT
, RIGHT
ou FULL
.
Vamos ver um exemploi de um left outer join:
A = LOAD 'vendas' AS (item:chararray, preco:float, qtde:int);
B = LOAD 'estoque' AS (item_nome:chararray, qtde_estoque:int);
C = JOIN A BY item LEFT OUTER, B BY item_nome;
Para mais exemplos e detalhes, sugiro ler a documentação: