Cheaf Docs
Data/Airflow

Arquitectura de Pipelines de Airflow

Sistema de Orquestación e Ingesta de Datos

Introducción

El sistema de pipelines de Airflow de Cheaf está diseñado para orquestar y ejecutar pipelines de ingesta de datos de forma incremental y automatizada. La arquitectura se centra en un sistema de orquestación inteligente que gestiona la ejecución secuencial de múltiples DAGs de ingesta, optimizando recursos y asegurando la integridad de los datos.

Prerequisitos y Dependencias

Infraestructura Requerida

  • Airflow 2.x: Con Python 3.10+
  • PostgreSQL: Base de datos principal (puerto 5432)
  • Cassandra: Base de datos de notificaciones
  • BigQuery: Data warehouse destino
  • Gmail API: Para ingesta de archivos de merma
  • Redis: Sistema de colas

Configuraciones Críticas

  • Smart Scheduler: Configurado en zona horaria Argentina (GMT-3)
  • Limpieza automática: Umbral 85% de uso de disco
  • Reintentos: 1 reintento por DAG con delay de 1 minuto
  • Pausas: 2 minutos entre DAGs para estabilización
  • Connection pooling: Máximo 10 conexiones concurrentes por fuente
  • Batch processing: Lotes de 10,000 registros por transacción

Modelos de Datos

DAGs de Ingesta Incremental

Los DAGs de ingesta están organizados por fuente de datos y utilizan un patrón de diseño consistente:

DAGs principales:

  • accounts_incremental_id: Ingesta de datos de cuentas de usuario
  • auth_incremental_id: Datos de autenticación y sesiones
  • banners_incremental_id: Información de banners y promociones visuales
  • news_incremental_id: Contenido de noticias y comunicaciones
  • orders_incremental_id: Datos de pedidos y transacciones
  • packages_payments_permissions_places_id: Información de paquetes, pagos y ubicaciones
  • products_incremental_id: Catálogo de productos
  • promotions_incremental_id: Datos de promociones y descuentos
  • restaurants_incremental_id: Información de restaurantes
  • several_incremental_id: Datos misceláneos de otras fuentes

Documentación Técnica del Sistema de Pipelines de Airflow

1. Visión General de la Arquitectura

El sistema está diseñado para gestionar la ingesta incremental de datos desde múltiples fuentes hacia BigQuery de forma automatizada y eficiente. La arquitectura se centra en dos componentes principales que trabajan en conjunto: orchestrator_incremental_id y orchestrator_auto_trigger.

El flujo es declarativo: el orchestrator_auto_trigger verifica el scheduling inteligente y dispara el orchestrator_incremental_id, que ejecuta secuencialmente todos los DAGs de ingesta. Posteriormente, un sistema de limpieza automática optimiza los recursos del sistema.

Modelos de Datos y sus Relaciones

La arquitectura se compone de DAGs de orquestación y DAGs de ingesta especializados.

DAGs Principales (Orquestación)

  • orchestrator_auto_trigger: Sistema de auto-trigger que ejecuta cada 15 minutos, verifica el scheduling inteligente y dispara el orquestador principal.

    • Relaciones: Dispara el orchestrator_incremental_id cuando es necesario.
  • orchestrator_incremental_id: DAG maestro que ejecuta secuencialmente todos los DAGs de ingesta incremental.

    • Relaciones: Orquesta la ejecución de todos los DAGs de ingesta en orden específico.

DAGs de Ingesta (Fuentes de Datos)

  • accounts_incremental_id: Ingesta datos de cuentas de usuario desde PostgreSQL hacia BigQuery.

    • Relaciones: Ejecutado por el orquestador principal.
  • orders_incremental_id: Ingesta datos de pedidos y transacciones.

    • Relaciones: Ejecutado por el orquestador principal.
  • restaurants_incremental_id: Ingesta información de restaurantes.

    • Relaciones: Ejecutado por el orquestador principal.
  • products_incremental_id: Ingesta catálogo de productos.

    • Relaciones: Ejecutado por el orquestador principal.
  • promotions_incremental_id: Ingesta datos de promociones y descuentos.

    • Relaciones: Ejecutado por el orquestador principal.

DAGs de Soporte (Otras Aplicaciones)

  • gmail_merma_to_bigquery: Ingesta archivos de merma desde Gmail hacia BigQuery.
  • cassandra_crm_incremental: Ingesta datos de CRM desde Cassandra hacia BigQuery.
  • segments_incremental_id: Ingesta datos de segmentos de usuarios.

Diagrama de Arquitectura Completo

El siguiente diagrama ilustra cómo todos los DAGs interactúan en el flujo de ejecución.

                                (Fase 1: Auto-Trigger - Cada 15 minutos)
+---------------------------+      +-----------------+      +--------------------+      +------------------+
| orchestrator_auto_trigger |----->| Smart Scheduler |----->| Resource Cleanup   |----->| Trigger Decision |
| (Cada 15 min)             |      | (Verifica tiempo|      | (Si disco > 85%)   |      | (Trigger/Wait)   |
|                           |      |  de ejecución)  |      |                    |      |                  |
+---------------------------+      +-----------------+      +--------------------+      +------------------+
             |                                                                                     |
             |                                                                                     |
             |                                                                                     v
             |                                                                              +------------------+
             +------------------------------------------------------------------------------------| SI es momento   |
                                                                                                  +------------------+
                                                                                                           |
                                                                                                           v
                                    +--------------------+
                                    | orchestrator_incremental_id|
                                    |                            |
                                    +--------------------+
                                            |
                                            v
                                    (Fase 2: Orquestación Secuencial)
                                            |
                    +--------------------+  |  +-----------------+      +-----------------+      +-----------------+
                    | accounts_incremental|---->| auth_incremental|----->| banners_incremental|----->| news_incremental |
                    | (5-10 min)         |     | (5-10 min)      |      | (5-10 min)      |      | (5-10 min)      |
                    +--------------------+     +-----------------+      +-----------------+      +-----------------+
                             |                         |                         |                         |
                             v                         v                         v                         v
                    +--------------------+     +-----------------+      +-----------------+      +-----------------+
                    | orders_incremental |---->| packages_incremental|->| products_incremental|->| promotions_incremental|
                    | (5-10 min)         |     | (5-10 min)      |      | (5-10 min)      |      | (5-10 min)      |
                    +--------------------+     +-----------------+      +-----------------+      +-----------------+
                             |                         |                         |                         |
                             v                         v                         v                         v
                    +--------------------+     +-----------------+      +-----------------+      +-----------------+
                    | restaurants_incremental|->| several_incremental|->| Completion Time  |----->| BigQuery         |
                    | (5-10 min)         |     | (5-10 min)      |      | (Registra fin)  |      | (Destino final)  |
                    +--------------------+     +-----------------+      +-----------------+      +-----------------+

                                    (Pausas de 2 minutos entre cada DAG)
                                    (Tiempo total: 45-60 minutos)

Flujo de Ejecución:

  1. Auto-Trigger (Cada 15 min): Verifica si es momento de ejecutar
  2. Limpieza de Recursos: Si disco > 85%, limpia logs antiguos
  3. Decisión de Trigger: Si es hora, dispara el orquestador
  4. Orquestación Secuencial: Ejecuta DAGs uno por uno con pausas
  5. Registro de Finalización: Guarda timestamp para próximo scheduling
  6. Destino Final: Todos los datos van a BigQuery

Características del Flujo:

  • Ejecución secuencial: Un DAG espera a que termine el anterior
  • Pausas de estabilización: 2 minutos entre cada DAG
  • Limpieza automática: Solo cuando es necesario (disco > 85%)
  • Scheduling inteligente: Basado en tiempo de finalización anterior

2. Flujo de Trabajo y Lógica de Negocio

El proceso se divide en dos fases principales: Auto-Trigger y Orquestación Secuencial.

Fase 1: Auto-Trigger (Scheduling Inteligente)

El DAG orchestrator_auto_trigger se ejecuta cada 15 minutos y es el punto de entrada para todo el sistema. No requiere argumentos y es idempotente.

Pasos que ejecuta el auto-trigger:

  1. Limpieza Automática de Recursos: Verifica el uso de disco y limpia logs antiguos si supera el 85%.
  2. Verificación de Concurrencia: Verifica si el orquestador ya está en ejecución para evitar ejecuciones simultáneas.
  3. Scheduling Inteligente: Calcula la próxima ejecución basada en el tiempo de finalización anterior.
  4. Decisión de Trigger: Determina si es momento de ejecutar el orquestador o esperar.

Lógica de Limpieza Automática:

  • Umbral de limpieza: 85% de uso de disco
  • Logs del scheduler: Limpia logs regenerables del scheduler
  • Logs del processor: Limpia logs regenerables del dag_processor_manager
  • Runs de DAGs: Mantiene solo las 2 ejecuciones más recientes de cada DAG
  • Archivos temporales: Elimina archivos temporales antiguos (>1 hora)

Lógica de Scheduling Inteligente:

  • Zona horaria: Argentina (GMT-3)
  • Cálculo de próxima ejecución: Basado en el timestamp de finalización anterior
  • Verificación de tiempo: Compara hora actual con próxima ejecución calculada

Fase 2: Orquestación Secuencial (Ejecución de DAGs)

El DAG orchestrator_incremental_id ejecuta los DAGs de ingesta en el siguiente orden:

  1. accounts_incremental_id
  2. auth_incremental_id
  3. banners_incremental_id
  4. news_incremental_id
  5. orders_incremental_id
  6. packages_payments_permissions_places_id
  7. products_incremental_id
  8. promotions_incremental_id
  9. restaurants_incremental_id
  10. several_incremental_id

Características del flujo secuencial:

  • Ejecución secuencial: Cada DAG espera a que el anterior termine completamente
  • Pausa de estabilización: 2 minutos entre cada DAG para estabilizar recursos
  • Manejo de errores: Reintentos automáticos (1 reintento con delay de 1 minuto)
  • Registro de finalización: Al completar toda la secuencia, registra el timestamp para el próximo scheduling

Configuración de Recursos y Optimización

Parámetros de configuración:

  • Umbral de limpieza: 85% de uso de disco
  • Runs mantenidos: 2 ejecuciones más recientes por DAG
  • Pausa entre DAGs: 2 minutos
  • Reintentos: 1 reintento por DAG con delay de 1 minuto
  • Intervalo de verificación: 60 segundos entre verificaciones de estado

Optimizaciones implementadas:

  • Limpieza inteligente: Solo limpia cuando es necesario
  • Pausas de estabilización: Evita sobrecarga de recursos
  • Ejecución secuencial: Garantiza integridad de datos
  • Scheduling inteligente: Optimiza tiempos de ejecución

3. Ejemplo Práctico Completo

Escenario: Ejecución completa del sistema de orquestación

  1. 15:00: orchestrator_auto_trigger se ejecuta cada 15 minutos
  2. 15:00: Verifica uso de disco (ej. 80% - no requiere limpieza)
  3. 15:00: Verifica si orchestrator_incremental_id ya está ejecutándose (no)
  4. 15:00: Calcula próxima ejecución basada en timestamp anterior
  5. 15:00: Si es momento, dispara orchestrator_incremental_id
  6. 15:01: Orquestador inicia con accounts_incremental_id
  7. 15:05: accounts_incremental_id completa, pausa de 2 minutos
  8. 15:07: Inicia auth_incremental_id
  9. 15:10: Continúa secuencialmente con todos los DAGs
  10. 15:45: Todos los DAGs completados, registra timestamp de finalización
  11. 15:46: Orquestador termina exitosamente

Caso de limpieza automática:

  1. 15:00: orchestrator_auto_trigger verifica uso de disco (ej. 90% - requiere limpieza)
  2. 15:00: Limpia logs del scheduler y processor
  3. 15:00: Mantiene solo 2 runs más recientes de cada DAG
  4. 15:00: Elimina archivos temporales antiguos
  5. 15:00: Verifica uso de disco después de limpieza (ej. 75%)
  6. 15:00: Continúa con lógica de scheduling normal

Integración con Sistemas Externos

El sistema de Airflow se integra con:

  • BigQuery: Destino principal de los datos ingeridos
  • PostgreSQL: Fuente de datos para DAGs de ingesta incremental
  • Cassandra: Fuente de datos para notificaciones y segmentos
  • Gmail: Fuente de datos para archivos de merma

Performance y Optimización

Métricas de Rendimiento

  • Throughput promedio: 50,000 registros/minuto por DAG
  • Latencia de ingesta: <5 minutos desde fuente a BigQuery
  • Eficiencia de recursos: 80% utilización CPU promedio
  • Tasa de compresión: 60% reducción en transferencia de datos
  • Tiempo de ejecución total: 45-60 minutos para ciclo completo
  • Paralelización: Hasta 5 tareas concurrentes por DAG

Optimizaciones Implementadas

  • Paralelización: Tareas independientes ejecutan en paralelo
  • Batch processing: Procesamiento en lotes de 10,000 registros
  • Connection pooling: Reutilización de conexiones de BD
  • Caching inteligente: Cache de metadatos y configuraciones
  • Compresión de datos: Algoritmos de compresión en transferencias
  • Índices optimizados: Índices específicos para consultas incrementales

Benchmarks por DAG

DAGRegistros/minTiempo promedioMemoria pico
accounts_incremental_id45,0008 min2.1 GB
orders_incremental_id35,00012 min3.2 GB
restaurants_incremental_id25,0006 min1.8 GB
products_incremental_id40,0009 min2.5 GB
promotions_incremental_id30,0007 min1.9 GB

Validaciones del Sistema

Validaciones de Integridad de Datos

  1. Verificación de esquemas: Cada DAG valida estructura antes de procesar
  2. Checkpoints de datos: Puntos de verificación en cada etapa del pipeline
  3. Validación de tipos: Verificación de tipos de datos en cada columna
  4. Detección de duplicados: Identificación y manejo de registros duplicados
  5. Validación de rangos: Verificación de valores dentro de rangos esperados
  6. Consistencia referencial: Validación de claves foráneas entre tablas

Validaciones de Recursos del Sistema

  1. Uso de memoria: Monitoreo continuo durante ejecución (límite: 4GB por DAG)
  2. Espacio en disco: Verificación antes de cada DAG (mínimo: 10GB libres)
  3. Conexiones de BD: Validación de conectividad antes de ingesta
  4. Timeouts configurables:
    • Timeout por tarea: 30 minutos
    • Timeout por DAG: 2 horas
    • Timeout de conexión: 30 segundos

Validaciones de Calidad de Datos

  1. Completitud de datos: Verificación de campos obligatorios
  2. Precisión de datos: Validación contra reglas de negocio
  3. Consistencia temporal: Verificación de secuencias temporales
  4. Validación de formato: Verificación de formatos de fecha, email, etc.

Validaciones de Rollback y Recuperación

  1. Rollback automático: En caso de fallo crítico durante ingesta
  2. Puntos de recuperación: Snapshots cada 1000 registros procesados
  3. Validación post-ingesta: Verificación de integridad después de carga
  4. Alertas automáticas: Notificaciones en caso de anomalías detectadas

Consideraciones Técnicas

  1. El sistema utiliza Airflow 2.8 con Python 3.10.
  2. Los DAGs de ingesta utilizan patrones de diseño consistentes para facilitar mantenimiento.
  3. La distribución de recursos se optimiza mediante pausas entre DAGs y limpieza automática.
  4. El sistema permite la integración con múltiples fuentes de datos a través de conectores especializados.
  5. El scheduling inteligente garantiza ejecuciones eficientes basadas en tiempos de finalización anteriores.
  6. La limpieza automática de recursos previene problemas de espacio en disco.
  7. El sistema de reintentos automáticos garantiza la robustez de la ejecución.
  8. La ejecución secuencial asegura la integridad y consistencia de los datos.