Orquestrando seus DBT Models com Airflow

Ricardo Junior
5 min readJun 23, 2024

--

Neste artigo, construiremos um pipeline com Airflow para orquestrar nossos modelos DBT. Abordaremos todo o ciclo de vida, desde a criação e testes até a implantação e monitoramento.

1. Introdução

No artigo anterior, criamos um projeto usando DBT, Iceberg e AWS Athena. Onde o nosso objetivo era criar um pipeline de transformação de dados, partindo da nossa camada de pouso de dados até a camada silver, onde fizemos várias transformações em nossos dados, operações de upsert e muito mais.

Agora, nosso objetivo é automatizar esse projeto, orquestrando a execução das diferentes camadas. Quando se fala em orquestração de dados atualmente, uma das primeiras ferramentas que vem à mente de qualquer profissional da área de dados é o Apache Airflow.

O Airflow é uma ferramenta altamente difundida no mercado por ser uma plataforma robusta e altamente escalável, ideal para a criação, agendamento e monitoramento de workflows. Além disso, por ser open-source, possui uma vasta comunidade ativa que continuamente cria integrações com diversas ferramentas e serviços.

Neste artigo, vamos explorar como configurar o Airflow para orquestrar nosso pipeline de transformações de dados realizadas pelo DBT. Todo o código desse artigo estará no GitHub.

2. Configuração do Ambiente

Nesse tutorial vamos subir o Airflow localmente na nossa máquina usando o Docker, para isso vamos usar como base a documentação do Airflow.

Primeira passo que vamos dar é baixar o arquivo do docker compose do airflow e criar nossas pastas dags (pasta onde colocaremos nossas DAGs), logs, config, plugins. Esse arquivo contém vários serviços:

  • airflow-scheduler — O scheduler monitora todas as tarefas e DAGs, e então aciona as instâncias de tarefas quando suas dependências são concluídas.
  • airflow-webserver — O servidor web que estará disponível em http://localhost:8080.
  • airflow-worker — O worker que executa as tarefas dadas pelo scheduler.
  • airflow-triggerer — O triggerer executa um loop de eventos para tarefas adiáveis.
  • airflow-init — O serviço de inicialização.
  • postgres — O banco de dados.
  • redis — broker que encaminha mensagens do scheduler para o worker.

Agora, podemos executar o seguinte comando (DICA: setar para false a configuração de carregar as DAGs de exemplo — linha 62 do docker-compose):

docker-compose up --build

Se tudo ocorreu corretamente, vamos ter o nosso Airflow disponível em http://localhost:8080/home

Airflow Local

3. Instalando o DBT Core

Depois de subir o Airflow, vamos configurá-lo para executar nosso projeto DBT. Para isso, precisaremos customizar nosso arquivo docker-compose.yaml, instalando o dbt-core e, o mais importante para o Airflow, o astronomer-cosmos. Em julho de 2023, a Astronomer lançou o Cosmos, um pacote open-source que permite integrar jobs DBT ao Airflow, criando automaticamente tarefas do Airflow a partir de modelos DBT.

Dito isso, o primeiro passo é criarmos nosso Dockerfileno mesmo diretório que está o nosso docker-compose.yaml . Para conseguirmos usar a nossa imagem temos que comentar a linha 52 e descomentar a linha 53 do arquivo docker-compose. A nosso Dockerfile terá as seguintes configurações:

FROM apache/airflow:2.9.1

COPY requirements.txt ./

USER airflow

RUN python -m virtualenv dbt_venv && source dbt_venv/bin/activate

RUN pip install apache-airflow==${AIRFLOW_VERSION} && \
pip install -r requirements.txt

Usamos a imagem do Apache Airflow na versão 2.9.1. Em seguida, copiamos o arquivo requirements.txt, onde indicamos a instalação do dbt-coree astronomer-cosmos,para a imagem. Criamos e ativamos um ambiente virtual Python e instalamos o Airflow junto com as dependências listadas no requirements.txt. Dessa forma, garantimos que todas as bibliotecas necessárias estejam presentes em um ambiente isolado, evitando conflitos e proporcionando um ambiente consistente para a execução de workflows no Airflow.

Antes de subirmos o nosso ambiente, configuramos um novo volume no nosso docker-compose, para armazenar nossos projetos do dbt. Assim, vamos ter os seguintes volumes:

Volumes — docker-compose.yaml
Estruturas de Pastas

4. Criando nossa DAG

Subimos novamente nossa infraestrutura, com o comando visto na seção 2. Com o ambiente pronto, podemos criar a DAG no Airflow para orquestrar o nosso pipeline de transformação de dados usando DBT.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from pathlib import Path

default_args = {
"owner": "airflow",
"start_date": datetime(2022, 1, 1),
"retries": 1,
}

DBT_PATH = "/opt/airflow/dbt/dbt_athena_iceberg"
DBT_PROFILE = "dbt_athena_iceberg"
DBT_TARGETS = "dev"

# Profile enviroment
profile_config = ProfileConfig(
profile_name=DBT_PROFILE,
target_name=DBT_TARGETS,
profiles_yml_filepath=Path(f"{DBT_PATH}/profiles.yml"),
)

# Models dbt
project_config = ProjectConfig(
dbt_project_path=DBT_PATH,
models_relative_path="models"
)

dag = DAG(
"dag_dbt_athena_iceberg",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)

dbt_running_models = DbtTaskGroup(
group_id="dbt_running_models",
project_config=project_config,
profile_config=profile_config,
default_args={"retries": 2},
dag=dag,
)

end_dag = DummyOperator(task_id="end_dag", dag=dag)

start_dag >> dbt_running_models >> end_dag

Para nos auxiliar na orquestração exportamos três classes principais do cosmos, ProfileConfig, ProjectConfig e DbtTaskGroup.

  • ProfileConfig: Define as configurações do perfil do DBT que serão utilizadas para executar os modelos. O ProfileConfig especifica o nome do perfil (DBT_PROFILE) e o alvo (DBT_TARGETS), que são configurados no arquivo profiles.yml. Este arquivo contém informações sobre como conectar e configurar diferentes ambientes para execução dos modelos DBT.
  • ProjectConfig: Aqui é definido o caminho do projeto DBT (DBT_PATH) e o caminho relativo para os modelos dentro desse projeto (models). Isso é essencial para que o Airflow saiba onde encontrar os arquivos de configuração e os modelos que serão executados pelo DBT.
  • DbtTaskGroup: Utilizando as configurações do ProfileConfig e ProjectConfig,o DbtTaskGroup configura automaticamente as tarefas necessárias no Airflow para executar os modelos DBT. Isso inclui a definição de dependências entre as tarefas, garantindo que elas sejam executadas na ordem correta conforme especificado no DAG.

Quando executarmos a nossa DAG, vamos ter o seguinte resultado:

Execução da DAG

Conseguimos observar que o Airflow executou com sucesso todo o nosso pipeline de transformação de dados utilizando DBT, assegurando as dependências entre nossos modelos e realizando testes para validação.

5. Conclusão

Nesse post conseguimos ver a integração entre DBT e Airflow através do Cosmos da Astronomer, revelou-se uma solução poderosa para orquestrar e automatizar pipelines de transformação de dados de maneira eficiente. O Airflow proporcionou a robustez necessária para gerenciar a execução e o agendamento dos modelos DBT, enquanto o Cosmos simplificou a configuração e integração desses fluxos de trabalho complexos.

Todo o código desse artigo tá no GitHub. Espero que tenha sido útil para você esse artigo! Caso desejem enviar sugestões , queiram outros temas de post, feedbacks ou apenas trocar um papo legal podem falar comigo no meu Linkedin :) . Se gostou , curte e compartilha para chegar em mais pessoas :).

--

--

Ricardo Junior

Data Engineer - Sênior | Ml Engineer | Python | AWS | Azure | LinkedIn: shorturl.at/GPY35