DBT + Iceberg + Athena = ❤️
Descubra como a combinação de DBT, Apache Iceberg e AWS Athena transforma a gestão de dados, proporcionando um pipeline poderoso, eficiente e escalável para transformação e modelagem de dados.
1. Introdução
Neste artigo, vamos explorar como a integração entre DBT (Data Build Tool), Apache Iceberg e AWS Athena pode melhorar nossa camada de transformação de dados em larga escala. Todo o código desse artigo estará no GitHub.
Apache Iceberg, um projeto open-source, oferece um formato de tabela avançado projetado especificamente para melhorar a eficiência e escalabilidade de operações em data lakes. O Iceberg traz soluções inovadoras para problemas tradicionais em data lakes, como o versionamento de dados, manuseio de esquemas evolutivos e, mais crucialmente, transações de dados complexas como upserts. Essas operações de upsert, que combinam atualizações e inserções, são essenciais em ambientes de dados dinâmicos, onde a mutabilidade dos dados é uma realidade. O Iceberg gerencia essas transações com um mecanismo de snapshot robusto, que mantém o histórico de dados de forma eficiente, permitindo rollbacks rápidos e auditorias detalhadas sem sacrificar o desempenho.
O DBT, uma ferramenta moderna de transformação de dados, facilita o processamento e a modelagem de dados utilizando linguagem SQL. Quando integrado com o Iceberg, o DBT permite a definição de modelos e lógicas de negócio que são aplicados diretamente sobre as estruturas de dados otimizadas pelo Iceberg, maximizando a eficiência e a governança dos dados.
Por sua vez, o AWS Athena complementa esta arquitetura ao oferecer um serviço de consulta SQL serverless, permitindo que usuários executem consultas diretamente em dados armazenados em formatos como o do Iceberg no Amazon S3. Esta capacidade de executar consultas interativas sem a necessidade de infraestrutura dedicada é fundamental para a agilidade e a eficiência operacional, especialmente em análises de big data onde a demanda por acessos rápidos e flexíveis aos dados é alta.
A combinação de DBT, Iceberg e Athena não apenas resolve desafios técnicos complexos de gerenciamento de dados, mas também promove uma abordagem escalável, eficiente e custo-efetiva para transformação e análise de dados em larga escala. Este artigo vai demonstrar como podemos implementar essa solução e explorar as vantagens que ela oferece.
2. Criando nosso pipeline
Neste artigo, nosso objetivo é disponibilizar uma tabela com o status mais atualizado dos registros. Para isso, utilizaremos tabelas Iceberg para realizar operações de upsert, que abordaremos mais adiante, visando atualizar nossos registros.
A base de dados que usaremos consiste em reclamações de clientes, onde cada reclamação possui um case_id único.
1. Landing Zone (Camada de Entrada):
- Nesta primeira camada, vamos inserir registros sem realizar qualquer tipo de deduplicação. Portanto, um registro pode aparecer mais de uma vez, já que ele pode sofrer alterações ao longo do tempo.
2. Bronze (Camada Intermediária):
- Na segunda camada, manteremos apenas a última ocorrência de cada registro, garantindo que cada case_id tenha apenas uma entrada correspondente ao seu estado mais recente.
3. Silver (Camada Final):
- Na terceira camada, disponibilizaremos uma tabela materializada no formato Iceberg contendo os registros mais atualizados.
Todas essas transformações serão orquestrada pelo DBT e usando o AWS Athena como SQL Engine.
2.1 Instalando e Inicializando nosso projeto DBT
Nessa etapa vamos instalar o DBT Core localmente na nossa máquina, que é um processo bem simples. Conforme a documentação, podemos executar os seguintes comandos:
- Primeiro, vamos criar um ambiente virtual para isolar as dependências do nosso projeto
python -m venv dbt-env # create the environment
- Agora, vamos ativar o ambiente virtual recém-criado com o comando apropriado para seu sistema operacional
source dbt-env/bin/activate # activate the environment for Mac and Linux OR
dbt-env\Scripts\activate # activate the environment for Windows
- Por fim, vamos instalar o plugin desenvolvido pela comunidade para o DBT conseguir usar o Athena como a ferramenta de SQL Engine. Na documentação, conseguimos ver como podemos configurar o DBT para conseguir usar o Athena da nossa conta AWS.
pip install dbt-athena-community
Agora, já temos a nossa ferramenta de transformação de dados devidamente instalada e configurada. Conseguimos criar o projeto o nosso projeto com o seguinte comando, o nosso projeto de chamará dbt_athena_iceberg:
dbt init dbt_athena_iceberg
Dando tudo certo, vamos ter o seguinte projeto criado:
O principal arquivo que vamos mexer é o dbt_project.yml, pois nele contém informações importantes que informam ao DBT como operar seu projeto.
Na pasta models é onde os desenvolvedores passam a maior parte do tempo. Os models no DBT são arquivos SQL que descrevem como transformar os dados brutos em informações estruturadas e úteis. Cada model é essencialmente uma instrução SQL que define uma tabela ou view transformada. Esses modelos permitem a criação de pipelines de dados que seguem uma lógica clara e bem documentada.
2.2 Criando nossa tabela
Para criar a tabela base deste artigo, usamos um script Python que gera e armazena dados de reclamações de clientes no Amazon S3 e já disponibliza no Athena. O código completo estará disponível no GitHub.
num_registros = 100
df = generate_consumer_cases(num_registros)
bucket = "<BUCKET_NAME>"
tabela = 'consumer_cases'
database = 'landing_zone'
layer = 'landing_zone'
path_s3 = f's3://{bucket}/{layer}/{database}/{tabela}'
wr.s3.to_parquet(
df = df,
path = path_s3,
dataset = True,
mode = 'append',
database = database,
table = tabela,
dtype = {'updated_at': 'timestamp', 'date_reported':'date', 'resolution_date': 'date'}
)
Primeiramente, definimos a variável num_registros, que representa a quantidade de registros de reclamações. A função generate_consumer_cases gera um DataFrame com os dados simulados de reclamações de clientes. A função não está explicitada no script, mas é assumido que cria um DataFrame com colunas relevantes como case_id, updated_at, date_reported etc.
Configuramos alguns parâmetros essenciais, como o nome do bucket S3, o nome da tabela, o banco de dados no Athena e a camada. O caminho S3 completo é então definido para organizar os dados hierarquicamente.
Usamos a função wr.s3.to_parquet
da biblioteca AWS Data Wrangler para salvar o DataFrame no S3 em formato Parquet e já disponibilizar esse DataFrame como uma tabela no Athena. Configuramos o caminho, o modo de gravação (append
), o banco de dados, o nome da tabela e os tipos de dados de algumas colunas (dtype
).
Assim, ao acessar nossa conta AWS e visitar o Athena, encontramos a tabela recém-criada disponível para consulta:
2.3 Criando nossa camada Landing Zone
Agora, vamos criar o nosso primeiro modelo no DBT, que vai ser para criar nossa camada de landing zone, então basicamente irá criar três arquivos:
- _landing_zone__sources.yml: Este arquivo é utilizado para definir as fontes de dados no DBT. Aqui, especificamos a origem dos dados que alimentam a camada de landing zone, como conexões com bases de dados específicas ou caminhos em sistemas de arquivos. Esse arquivo serve como um ponto central para gerenciar de onde os dados são extraídos, facilitando a manutenção e a escalabilidade do projeto.
_landing_zone__models.yml: Este arquivo contém a configuração dos modelos relacionados à landing zone no DBT. Aqui, definimos propriedades como materialização dos modelos (por exemplo, tabelas ou visualizações), descrições dos modelos, e configurações de testes que garantem a qualidade e a integridade dos dados processados. Ele ajuda a organizar e documentar as transformações aplicadas, tornando o processo de desenvolvimento mais claro e estruturado.
- landing_zone_consumer_cases.sql: Arquivo SQL que contém a lógica de transformação dos dados para a camada de landing zone. Neste script, são definidas a query que transforma os dados brutos em um formato mais organizado e útil para análises subsequentes. Este arquivo é fundamental para a execução das transformações de dados especificadas no modelo do DBT.
Esses arquivos são a base para construir e organizar a camada de landing zone dentro da estrutura do DBT, facilitando a gestão e escalabilidade das operações de dados.
O DBT utiliza Common Table Expressions (CTEs) para melhorar a legibilidade, manutenibilidade e reusabilidade do código SQL. CTEs ajudam a organizar consultas complexas em partes menores e mais gerenciáveis, facilitando a depuração e manutenção. Além disso, permitem um melhor desempenho em algumas situações, por otimizarem a execução das consultas, e suportam a modularidade, permitindo que partes do código sejam reutilizadas dentro de uma mesma consulta. Essa abordagem modular e clara é ideal para a construção de pipelines de transformação de dados eficientes e organizados
Quando executarmos o comando dbt run, vamos criar uma tabela as is da tabela consumer_cases, fazermos isso por boas práticas do DBT e mantermos a rastreabilidade da nossa tabela.
2.4 Criando nossa camada Bronze
Continuando com o desenvolvimento do nosso pipeline de dados, vamos construir a camada bronze. Esta camada é essencial pois nela manteremos apenas a última ocorrência de cada registro, eliminando duplicidades. Essa configuração garante que tenhamos dados atualizados e prontos para serem utilizados nas etapas subsequentes do pipeline, especialmente na atualização das tabelas das camadas superiores.
Para implementar essa camada, criaremos um novo modelo no DBT, que será composto por dois arquivos fundamentais:
- bronze__models.yml: Este arquivo configura o modelo da camada bronze no DBT. Ele define como o modelo será materializado, por padrão é view, e inclui metadados como descrições e configurações de teste, conforme a imagem a seguir.
- bronze_consumer_cases.sql: Este arquivo SQL contém a lógica de transformação específica para a camada bronze. Ele executa a filtragem e a consolidação dos dados para garantir que apenas a última ocorrência de cada registro seja mantida. Esse script é crucial para a transformação dos dados brutos em um formato otimizado para processos subsequentes.
A consulta usa a função row_number()
para numerar os registros dentro de cada grupo, identificado pelo case_id
, baseando-se na coluna updated_at
em ordem descendente. Apenas o registro mais recente (onde ROW_NUM = 1
) de cada grupo é mantido, resultando em uma camada bronze sem duplicatas e com dados atualizados.
2.5 Criando nossa camada Silver
Por fim, na última etapa do nosso artigo, vamos criar a camada silver, onde nossa tabela de teste será disponibilizada de forma materializada e no formato Apache Iceberg, contendo apenas os registros mais atualizados.
Para concluir essa parte do projeto no DBT, criaremos mais um modelo que envolverá a criação de dois arquivos principais:
- silver__models.yml: Este arquivo será bastante similar aos anteriores, onde declararemos nosso modelo. Nele, podemos incluir metadados relevantes e configurar testes específicos para garantir a integridade e a correção dos dados
- silver_consumer_cases.sql: Este arquivo SQL será responsável pela criação da tabela materializada no formato Iceberg. Aqui, definiremos a estrutura da tabela e especificaremos as transformações necessárias para obter os registros atualizados, garantindo que a tabela final reflita apenas as informações mais recentes de cada caso.
Nessa consulta há uma configuração especial do modelo utilizando a função config()
. Aqui, estamos especificando detalhes importantes sobre como a tabela será criada e gerenciada pelo DBT:
table_type='iceberg'
: Indica que a tabela será gerenciada pelo Apache Iceberg.materialized='incremental'
: Define que a tabela será materializada de forma incremental, ou seja, será atualizada incrementalmente à medida que novos dados estiverem disponíveis.incremental_strategy='merge'
: Especifica a estratégia de atualização incremental como "merge", o que implica que os novos dados serão mesclados com os dados existentes na tabela.unique_key=['case_id']
: Define a chave única da tabela como sendo o campocase_id
, garantindo a integridade dos dados.update_condition='target.case_id = src.case_id'
: Estabelece a condição de atualização, indicando que os registros serão atualizados com base nocase_id
.format='parquet'
: Especifica o formato de armazenamento dos dados como Parquet.
Utilizamos a função ref()
do DBT para criar uma dependência dinâmica entre este modelo e o modelo bronze_consumer_cases
, assegurando que a execução respeite a ordem correta das dependências e utilize a versão mais recente dos dados.
Por fim, realizamos uma seleção dos campos de interesse da tabela bronze_consumer_cases
e aplicamos uma condição condicional {% if is_incremental() %}
para filtrar os dados de forma incremental, onde apenas os registros com updated_at
posterior à última execução do modelo serão considerados.
Quando executarmos o comando dbt run, vamos ter nossa tabela criada no Athena:
3. Resultados
Após a implementação das etapas do pipeline de dados, podemos observar os seguintes resultados:
- Eficiência na Transformação de Dados: A integração do DBT com o Apache Iceberg e o AWS Athena promoveu uma gestão eficiente dos dados, desde a ingestão até a disponibilização das tabelas materializadas. Isso resultou em um processo simplificado e organizado para a transformação de dados, otimizando a produtividade e a eficácia das operações.
- Atualização de Registros e Eliminação de Duplicidades: As camadas bronze e silver do pipeline garantiram a remoção de duplicidades e a disponibilização dos registros mais atualizados para análise. Isso proporcionou maior confiabilidade e precisão nos dados, aproveitando as soluções inovadoras oferecidas pelo Iceberg. O Iceberg traz benefícios significativos para problemas tradicionais em data lakes, como as transações de dados complexas como upserts. Sua arquitetura robusta e seus recursos avançados contribuíram para a integridade e a consistência dos dados em nosso pipeline, fortalecendo nossas transformações e decisões baseadas em dados.
Além dos benefícios mencionados, é importante destacar que a implementação do DBT proporcionou documentação automatizada do pipeline de dados e metadados de tabela. A geração automática de documentação pelo DBT oferece uma visão clara e abrangente das transformações de dados realizadas em cada modelo, facilitando a compreensão do fluxo de dados e agilizando processos de auditoria e revisão.
Além disso, a rastreabilidade dos dados foi aprimorada significativamente. O DBT facilita o rastreamento dos dados desde sua origem até a sua transformação final em tabelas materializadas. Isso permite uma maior transparência e controle sobre o processo de transformação de dados.
4. Conclusão
A implementação do pipeline de dados, utilizando o DBT em conjunto com o Apache Iceberg e o AWS Athena, representa uma combinação poderosa que pode gerar resultados significativos para a gestão de dados em qualquer organização. Além disso, a documentação gerada pelo DBT, com a rastreabilidade, teste, e documentação dos dados, oferece uma visão clara e abrangente do fluxo de dados, facilitando processos de auditoria e governança de dados.
Todo o código desse artigo tá no GitHub. Espero que tenha sido útil para você esse artigo! Caso desejem enviar sugestões , queiram outros temas de post, feedbacks ou apenas trocar um papo legal podem falar comigo no meu Linkedin :) . Se gostou , curte e compartilha para chegar em mais pessoas :).