Cheaf Docs
Data

Arquitectura de Pipelines de Datos

Sistema de Orquestación, Ingesta y Transformación de Datos

Introducción

El sistema de datos de Cheaf está diseñado para orquestar y ejecutar pipelines de ingesta y transformación de datos de forma automatizada. La arquitectura se compone de múltiples servicios de GCP que trabajan en conjunto para mover datos desde las fuentes transaccionales hacia un Data Warehouse analítico en BigQuery.

Arquitectura del Data Warehouse

El Data Warehouse de Cheaf está organizado en 3 capas que siguen las mejores prácticas de modelado de datos:

┌─────────────────────────────────────────────────────────────────────────────────────┐
│                                 FUENTES DE DATOS                                     │
├──────────────┬──────────────┬──────────────┬──────────────┬─────────────────────────┤
│  PostgreSQL  │   Cassandra  │     S3       │    Gmail     │       Pushwoosh         │
│ (Marketplace)│(Notificaciones)│  (Análisis) │   (Merma)   │   (Push Notifications)  │
└──────┬───────┴──────┬───────┴──────┬───────┴──────┬───────┴────────────┬────────────┘
       │              │              │              │                    │
       ▼              │              │              │                    ▼
┌──────────────┐      │              │              │           ┌─────────────────────┐
│  DataStream  │      │              │              │           │ Cloud Run + Eventarc│
│   (CDC)      │      │              │              │           │ + Cloud Function    │
└──────┬───────┘      │              │              │           └──────────┬──────────┘
       │              ▼              ▼              ▼                      │
       │        ┌─────────────────────────────────────────┐                │
       │        │              AIRFLOW                    │                │
       │        │  (Cassandra, S3, Gmail ingestion DAGs)  │                │
       │        └─────────────────────┬───────────────────┘                │
       │                              │                                    │
       ▼                              ▼                                    ▼
┌─────────────────────────────────────────────────────────────────────────────────────┐
│                                    BIGQUERY                                          │
├─────────────────────────────────────────────────────────────────────────────────────┤
│  ┌───────────────────────────────────────────────────────────────────────────────┐  │
│  │  STG (Staging Layer)                                                           │  │
│  │  - Datos crudos desde las fuentes                                             │  │
│  │  - Dataset: stg_cheaf_warehouse                                               │  │
│  │  - Tablas: stg_* (ej: stg_accounts_raw, stg_pushwoosh_messages)              │  │
│  └───────────────────────────────────┬───────────────────────────────────────────┘  │
│                                      │                                               │
│                                      ▼  (Dataform)                                  │
│  ┌───────────────────────────────────────────────────────────────────────────────┐  │
│  │  ODS (Operational Data Store)                                                  │  │
│  │  - Datos limpios y normalizados                                               │  │
│  │  - Modelos: ods_<domain>_<entity>                                             │  │
│  │  - Full refresh (tablas pequeñas) e Incremental (tablas grandes)              │  │
│  └───────────────────────────────────┬───────────────────────────────────────────┘  │
│                                      │                                               │
│                                      ▼  (Dataform)                                  │
│  ┌───────────────────────────────────────────────────────────────────────────────┐  │
│  │  GOLD (Analytical Layer)                                                       │  │
│  │  - Modelos analíticos y KPIs de negocio                                       │  │
│  │  - Modelos: gold_<business_metric>                                            │  │
│  │  - Optimizado para consultas de reporting                                     │  │
│  └───────────────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────────────┘

Capas del Warehouse

CapaDescripciónNaming ConventionRefresh
STGDatos crudos desde las fuentes, sin transformacionesstg_*Continuo (DataStream) / Batch (Airflow)
ODSDatos limpios, normalizados y tipificadosods_<domain>_<entity>Full / Incremental
GOLDModelos analíticos, KPIs y métricas de negociogold_<business_metric>Daily

DataStream (Ingesta PostgreSQL)

La ingesta de datos transaccionales desde PostgreSQL hacia BigQuery se realiza mediante Google Cloud DataStream, un servicio de CDC (Change Data Capture) que replica los cambios en tiempo real.

Configuración del Stream

PropiedadValor
ID de flujodatastream
Perfil de origenmarketplace (PostgreSQL)
Perfil de destinodatastream-bq (BigQuery)
Proyecto destinobackends-399017
Dataset destinostg_cheaf_warehouse
Regiónus-west1 (Oregon)
Conexión PostgreSQL192.168.0.3:cheaf

Configuración de Replicación

PropiedadValor
Slot de replicaciónds_slot_marketplace
Publicacióndatastream_pub
Esquemapublic
Tablas replicadas169 de 178 tablas
Tablas excluidas10 tablas
Modo de reabastecimientoAutomático
Conexiones máximas50 conexiones simultáneas
Límite de inactividad15 minutos

Pushwoosh (Ingesta de Notificaciones Push)

La ingesta de datos de notificaciones push desde Pushwoosh hacia BigQuery se realiza mediante una arquitectura serverless basada en Cloud Run y Cloud Functions.

Arquitectura del Flujo

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ Cloud Scheduler │────▶│  Cloud Run Job  │────▶│  Cloud Storage  │────▶│  Eventarc       │
│ (5:00 AM MX)    │     │ pushwoosh-job   │     │ pushwoosh-data- │     │  Trigger        │
│                 │     │                 │     │ transfer        │     │                 │
└─────────────────┘     └─────────────────┘     └─────────────────┘     └────────┬────────┘


                        ┌─────────────────┐     ┌─────────────────────────────────────────┐
                        │    BigQuery     │◀────│           Cloud Function                │
                        │ stg_pushwoosh_  │     │    (Parsea JSON → BigQuery)            │
                        │ messages        │     │                                         │
                        └─────────────────┘     └─────────────────────────────────────────┘

Componentes

ComponenteDescripción
Script GoExtrae datos de la API Pushwoosh, maneja paginación y sube JSONs a Cloud Storage
Docker Imagegcr.io/backends-399017/pushwoosh-job:latest
Cloud Run JobEjecuta el contenedor en modalidad serverless bajo demanda
Cloud SchedulerPrograma la ejecución diaria del job (5:00 AM México)
Cloud StorageBucket pushwoosh-data-transfer para almacenamiento temporal de JSONs
Eventarc TriggerDetecta eventos finalized de archivos en el bucket
Cloud FunctionProcesa los archivos JSON y carga los datos a BigQuery

Flujo de Ejecución

  1. Programación: Cloud Scheduler dispara el Cloud Run Job pushwoosh-job diariamente a las 5:00 AM (México)
  2. Extracción: El contenedor accede a la API de Pushwoosh y descarga eventos del día anterior
  3. Paginación: Maneja automáticamente la paginación de resultados
  4. Carga a Storage: Sube cada página como archivo JSON al bucket pushwoosh-data-transfer
  5. Detección: Eventarc detecta cada archivo subido (finalized event)
  6. Procesamiento: Cloud Function parsea el JSON e inserta en BigQuery

Tabla Destino

Dataset: stg_cheaf_warehouse Tabla: stg_pushwoosh_messages

ColumnaTipoDescripción
timestampDATETIMEFecha y hora del evento
message_idSTRINGID único del mensaje
message_codeSTRINGCódigo del mensaje
campaign_codeSTRINGCódigo de la campaña
hwidSTRINGHardware ID del dispositivo
platformSTRINGPlataforma (iOS/Android)
actionSTRINGAcción realizada
push_alerts_enabledBOOLSi las notificaciones están habilitadas
statusSTRINGEstado del envío
error_reasonSTRINGRazón del error (si aplica)
user_idSTRINGID del usuario
application_codeSTRINGCódigo de la aplicación

Dataform (Transformaciones STG → ODS → GOLD)

Las transformaciones entre capas del warehouse se gestionan mediante Dataform, una herramienta de GCP para orquestación de SQL y modelado de datos.

Repositorio

GitHub: https://github.com/cheafdev/dataform-cheaf-models

Estructura del Proyecto

dataform-cheaf-models/
├── includes/              # Utilidades SQLX reutilizables (lógica de negocio, parsing de fechas)
├── definitions/models/
│   ├── ods/
│   │   ├── full/          # Modelos ODS full-refresh (tablas pequeñas/medianas)
│   │   └── incremental/   # Modelos ODS incrementales (tablas grandes/eventos)
│   └── gold/              # Capa Gold: modelos analíticos y de negocio
├── package.json           # Configuración del proyecto
├── production_google      # DER PostgreSQL
└── metadata_categorys_posgressql.json  # Metadatos PostgreSQL

Configuración de Lanzamiento

  • Release Configuration: production_models
  • Rama: master

Workflows Programados

WorkflowContenidoScheduleÚltima Ejecución
dimensiones_6hdim_storeDiario 6:00 AM UTC
dimensiones_hourlydim_userDiario 6:00 AM UTC
fact_campaign_performance_dailyfact_campaign_performance_dailyDiario 7:00 AM UTC
gold_crm3 accionesDiario 9:00 AM UTC
gold_cupones_co_wtdgold_coupons_type_weekly_coMartes 7:00 AM UTC

Dominios de Negocio

  • Users: accounts_appuser, users_user, auth_user
  • Orders: orders_order, orders_orderitem, orders_orderstatusinline, orders_orderalerts
  • Stores & Locations: restaurants_store, places_place, restaurant_modules_*
  • Products & Categories: product_product, products_categories
  • Promotions & Coupons: promotions_coupon, orders_promotionscodeorder
  • Events: Pushwoosh (stg_pushwoosh_messages)

Mejores Prácticas en Dataform

  • DRY & Reusabilidad: Lógica centralizada en includes/
  • Documentación: Todos los modelos tienen descripción, tags y notas de transformación
  • Orquestación: Diseñado para integrarse con Airflow (daily full ODS, hourly incremental, daily Gold)
  • Validación: Checks de integridad referencial y conteo de registros

Prerequisitos y Dependencias

Infraestructura Requerida

  • Airflow 2.8.1: Con Python 3.10+
  • PostgreSQL: Base de datos principal (puerto 5432)
  • Cassandra: Base de datos de notificaciones
  • BigQuery: Data warehouse destino
  • Gmail API: Para ingesta de archivos de merma
  • Redis: Sistema de colas
  • AWS S3: Almacenamiento de archivos de análisis de productos

Conexiones de Airflow Requeridas

Antes de ejecutar los DAGs, asegúrate de tener configuradas las siguientes conexiones en Airflow:

  • cassandra_default: Conexión a Apache Cassandra
  • google_cloud_default: Credenciales de GCP para BigQuery
  • aws_default: Credenciales de AWS para acceso a S3
  • postgres_default: Conexión a PostgreSQL

Configuraciones Críticas

  • Smart Scheduler: Configurado en zona horaria Argentina (GMT-3)
  • Limpieza automática: Umbral 85% de uso de disco
  • Reintentos: 1 reintento por DAG con delay de 1 minuto
  • Pausas: 2 minutos entre DAGs para estabilización
  • Connection pooling: Máximo 10 conexiones concurrentes por fuente
  • Batch processing: Lotes de 10,000 registros por transacción

Airflow: DAGs de Ingesta Activos

Airflow se utiliza para la ingesta de datos desde fuentes que no soportan CDC nativo o requieren lógica de transformación personalizada.

DAGs Activos en Producción

  • gmail_merma_to_bigquery: Ingesta archivos de merma desde Gmail hacia BigQuery.
  • cassandra_crm_incremental: Ingesta datos de CRM desde Cassandra hacia BigQuery.
  • segments_incremental_id: Ingesta datos de segmentos de usuarios.

DAGs de Ingesta Cassandra y S3

DAG IDArchivoScheduleDescripción
cassandra_notifications_nightly_ingestionnotifications_cassandra_raw_detail_incremental.py0 2 * * * (23:00 ARG)Ingesta nocturna de notificaciones desde Cassandra a BigQuery con validación y backfill automático
cassandra_tracker_incremental_7hcassandra_tracker_massive_ingestion.py0 */7 * * *Ingesta incremental de tracker de notificaciones cada 7 horas
s3_analysis_to_bigquerys3_analysis_to_bq.py0 2 * * *Ingesta de archivos de análisis de productos desde S3 a BigQuery

Detalle de cada DAG:

  • cassandra_notifications_nightly_ingestion: Extrae datos de notifications_by_campaign_filter (Cassandra) y los carga en stg_notifications_cassandra_raw_detail (BigQuery). Incluye validación de conteos y backfill selectivo de schedulers faltantes.

  • cassandra_tracker_incremental_7h: Procesa la tabla notifications_tracker_by_day desde Cassandra hacia BigQuery con deduplicación y validación de datos existentes.

  • s3_analysis_to_bigquery: Procesa archivos JSON de análisis de productos (original_dataset y left_dataset) desde S3 hacia BigQuery en paralelo.

Integración con Sistemas Externos (Airflow)

El sistema de Airflow se integra con:

  • BigQuery: Destino principal de los datos ingeridos
  • Cassandra: Fuente de datos para notificaciones, tracker y segmentos
  • Gmail: Fuente de datos para archivos de merma
  • AWS S3: Fuente de archivos JSON de análisis de productos

Validaciones del Sistema

Validaciones de Integridad de Datos

  1. Verificación de esquemas: Cada DAG valida estructura antes de procesar
  2. Checkpoints de datos: Puntos de verificación en cada etapa del pipeline
  3. Validación de tipos: Verificación de tipos de datos en cada columna
  4. Detección de duplicados: Identificación y manejo de registros duplicados
  5. Validación de rangos: Verificación de valores dentro de rangos esperados
  6. Consistencia referencial: Validación de claves foráneas entre tablas

Validaciones de Recursos del Sistema

  1. Uso de memoria: Monitoreo continuo durante ejecución (límite: 4GB por DAG)
  2. Espacio en disco: Verificación antes de cada DAG (mínimo: 10GB libres)
  3. Conexiones de BD: Validación de conectividad antes de ingesta
  4. Timeouts configurables:
    • Timeout por tarea: 30 minutos
    • Timeout por DAG: 2 horas
    • Timeout de conexión: 30 segundos

Validaciones de Calidad de Datos

  1. Completitud de datos: Verificación de campos obligatorios
  2. Precisión de datos: Validación contra reglas de negocio
  3. Consistencia temporal: Verificación de secuencias temporales
  4. Validación de formato: Verificación de formatos de fecha, email, etc.

Validaciones de Rollback y Recuperación

  1. Rollback automático: En caso de fallo crítico durante ingesta
  2. Puntos de recuperación: Snapshots cada 1000 registros procesados
  3. Validación post-ingesta: Verificación de integridad después de carga
  4. Alertas automáticas: Notificaciones en caso de anomalías detectadas

Estructura del Proyecto

DATA-PIPELINES-ORCHESTRATOR/
├── config/
│   └── incremental_config.py                              # Configuración de ingestas incrementales

├── dags/
│   ├── notifications_cassandra_raw_detail_incremental.py  # Ingesta nocturna Cassandra → BigQuery
│   ├── cassandra_tracker_massive_ingestion.py             # Ingesta masiva tracker Cassandra → BigQuery
│   └── s3_analysis_to_bq.py                               # Ingesta análisis S3 → BigQuery

├── json/
│   └── metadata_categorys.json                            # Metadatos por categoría de tablas

├── utils/
│   ├── common_imports.py                                  # Importaciones compartidas
│   ├── credentials.py                                     # Autenticación con Google Cloud
│   ├── data_preparation.py                                # Transformaciones y limpieza
│   ├── observability.py                                   # Métricas, logs y validación
│   └── sql_utils.py                                       # Utilidades SQL dinámicas

Utilidades Comunes

Módulos en utils/

  • credentials.py: Manejo seguro de credenciales para GCP.
  • data_preparation.py: Normalización, tipificación, y limpieza de datos.
  • observability.py: Validación de integridad, conteo, logs y alertas.
  • sql_utils.py: Construcción dinámica de queries SQL y utilidades para PostgreSQL.
  • common_imports.py: Importaciones compartidas para todos los módulos.

Configuración

  • config/incremental_config.py: Define las reglas para cargas incrementales por tabla o categoría.

Metadatos

  • json/metadata_categorys.json: Clasificación y prioridad de tablas, clave para la segmentación de pipelines.

Funcionalidades Destacadas

  • Ingesta en tiempo real de PostgreSQL via DataStream (CDC)
  • Transformaciones SQL con Dataform (STG → ODS → GOLD)
  • Ingesta batch desde múltiples fuentes: Cassandra, S3, Gmail
  • Validación automatizada de conteos entre origen y destino
  • Backfill automático de datos faltantes
  • Deduplicación y manejo de errores robusto
  • Procesamiento paralelo de datasets

Consideraciones Técnicas

  1. La ingesta de datos transaccionales (PostgreSQL) se realiza mediante DataStream con CDC en tiempo real.
  2. Las transformaciones entre capas (STG → ODS → GOLD) se gestionan con Dataform.
  3. Airflow se utiliza exclusivamente para fuentes que no soportan CDC nativo (Cassandra, S3, Gmail).
  4. El sistema utiliza Airflow 2.8.1 con Python 3.10 para los DAGs de ingesta batch.
  5. Los DAGs de ingesta utilizan patrones de diseño consistentes para facilitar mantenimiento.
  6. El sistema de reintentos automáticos garantiza la robustez de la ejecución.

Dependencias

GCP Services

  • DataStream: Replicación CDC PostgreSQL → BigQuery
  • BigQuery: Data Warehouse (STG, ODS, GOLD)
  • Dataform: Transformaciones SQL y modelado de datos

Airflow (Ingesta Batch)

  • Apache Airflow 2.8.1
  • Python >= 3.10
  • Apache Cassandra (conexión configurada en Airflow)
  • AWS S3 (credenciales configuradas)
  • Gmail API
  • Dependencias Python: pandas, google-cloud-bigquery, cassandra-driver, boto3