Arquitectura de Pipelines de Datos
Sistema de Orquestación, Ingesta y Transformación de Datos
Introducción
El sistema de datos de Cheaf está diseñado para orquestar y ejecutar pipelines de ingesta y transformación de datos de forma automatizada. La arquitectura se compone de múltiples servicios de GCP que trabajan en conjunto para mover datos desde las fuentes transaccionales hacia un Data Warehouse analítico en BigQuery.
Arquitectura del Data Warehouse
El Data Warehouse de Cheaf está organizado en 3 capas que siguen las mejores prácticas de modelado de datos:
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ FUENTES DE DATOS │
├──────────────┬──────────────┬──────────────┬──────────────┬─────────────────────────┤
│ PostgreSQL │ Cassandra │ S3 │ Gmail │ Pushwoosh │
│ (Marketplace)│(Notificaciones)│ (Análisis) │ (Merma) │ (Push Notifications) │
└──────┬───────┴──────┬───────┴──────┬───────┴──────┬───────┴────────────┬────────────┘
│ │ │ │ │
▼ │ │ │ ▼
┌──────────────┐ │ │ │ ┌─────────────────────┐
│ DataStream │ │ │ │ │ Cloud Run + Eventarc│
│ (CDC) │ │ │ │ │ + Cloud Function │
└──────┬───────┘ │ │ │ └──────────┬──────────┘
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ AIRFLOW │ │
│ │ (Cassandra, S3, Gmail ingestion DAGs) │ │
│ └─────────────────────┬───────────────────┘ │
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────────────────────┐
│ BIGQUERY │
├─────────────────────────────────────────────────────────────────────────────────────┤
│ ┌───────────────────────────────────────────────────────────────────────────────┐ │
│ │ STG (Staging Layer) │ │
│ │ - Datos crudos desde las fuentes │ │
│ │ - Dataset: stg_cheaf_warehouse │ │
│ │ - Tablas: stg_* (ej: stg_accounts_raw, stg_pushwoosh_messages) │ │
│ └───────────────────────────────────┬───────────────────────────────────────────┘ │
│ │ │
│ ▼ (Dataform) │
│ ┌───────────────────────────────────────────────────────────────────────────────┐ │
│ │ ODS (Operational Data Store) │ │
│ │ - Datos limpios y normalizados │ │
│ │ - Modelos: ods_<domain>_<entity> │ │
│ │ - Full refresh (tablas pequeñas) e Incremental (tablas grandes) │ │
│ └───────────────────────────────────┬───────────────────────────────────────────┘ │
│ │ │
│ ▼ (Dataform) │
│ ┌───────────────────────────────────────────────────────────────────────────────┐ │
│ │ GOLD (Analytical Layer) │ │
│ │ - Modelos analíticos y KPIs de negocio │ │
│ │ - Modelos: gold_<business_metric> │ │
│ │ - Optimizado para consultas de reporting │ │
│ └───────────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────────┘Capas del Warehouse
| Capa | Descripción | Naming Convention | Refresh |
|---|---|---|---|
| STG | Datos crudos desde las fuentes, sin transformaciones | stg_* | Continuo (DataStream) / Batch (Airflow) |
| ODS | Datos limpios, normalizados y tipificados | ods_<domain>_<entity> | Full / Incremental |
| GOLD | Modelos analíticos, KPIs y métricas de negocio | gold_<business_metric> | Daily |
DataStream (Ingesta PostgreSQL)
La ingesta de datos transaccionales desde PostgreSQL hacia BigQuery se realiza mediante Google Cloud DataStream, un servicio de CDC (Change Data Capture) que replica los cambios en tiempo real.
Configuración del Stream
| Propiedad | Valor |
|---|---|
| ID de flujo | datastream |
| Perfil de origen | marketplace (PostgreSQL) |
| Perfil de destino | datastream-bq (BigQuery) |
| Proyecto destino | backends-399017 |
| Dataset destino | stg_cheaf_warehouse |
| Región | us-west1 (Oregon) |
| Conexión PostgreSQL | 192.168.0.3:cheaf |
Configuración de Replicación
| Propiedad | Valor |
|---|---|
| Slot de replicación | ds_slot_marketplace |
| Publicación | datastream_pub |
| Esquema | public |
| Tablas replicadas | 169 de 178 tablas |
| Tablas excluidas | 10 tablas |
| Modo de reabastecimiento | Automático |
| Conexiones máximas | 50 conexiones simultáneas |
| Límite de inactividad | 15 minutos |
Pushwoosh (Ingesta de Notificaciones Push)
La ingesta de datos de notificaciones push desde Pushwoosh hacia BigQuery se realiza mediante una arquitectura serverless basada en Cloud Run y Cloud Functions.
Arquitectura del Flujo
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Cloud Scheduler │────▶│ Cloud Run Job │────▶│ Cloud Storage │────▶│ Eventarc │
│ (5:00 AM MX) │ │ pushwoosh-job │ │ pushwoosh-data- │ │ Trigger │
│ │ │ │ │ transfer │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └────────┬────────┘
│
▼
┌─────────────────┐ ┌─────────────────────────────────────────┐
│ BigQuery │◀────│ Cloud Function │
│ stg_pushwoosh_ │ │ (Parsea JSON → BigQuery) │
│ messages │ │ │
└─────────────────┘ └─────────────────────────────────────────┘Componentes
| Componente | Descripción |
|---|---|
| Script Go | Extrae datos de la API Pushwoosh, maneja paginación y sube JSONs a Cloud Storage |
| Docker Image | gcr.io/backends-399017/pushwoosh-job:latest |
| Cloud Run Job | Ejecuta el contenedor en modalidad serverless bajo demanda |
| Cloud Scheduler | Programa la ejecución diaria del job (5:00 AM México) |
| Cloud Storage | Bucket pushwoosh-data-transfer para almacenamiento temporal de JSONs |
| Eventarc Trigger | Detecta eventos finalized de archivos en el bucket |
| Cloud Function | Procesa los archivos JSON y carga los datos a BigQuery |
Flujo de Ejecución
- Programación: Cloud Scheduler dispara el Cloud Run Job
pushwoosh-jobdiariamente a las 5:00 AM (México) - Extracción: El contenedor accede a la API de Pushwoosh y descarga eventos del día anterior
- Paginación: Maneja automáticamente la paginación de resultados
- Carga a Storage: Sube cada página como archivo JSON al bucket
pushwoosh-data-transfer - Detección: Eventarc detecta cada archivo subido (
finalizedevent) - Procesamiento: Cloud Function parsea el JSON e inserta en BigQuery
Tabla Destino
Dataset: stg_cheaf_warehouse
Tabla: stg_pushwoosh_messages
| Columna | Tipo | Descripción |
|---|---|---|
timestamp | DATETIME | Fecha y hora del evento |
message_id | STRING | ID único del mensaje |
message_code | STRING | Código del mensaje |
campaign_code | STRING | Código de la campaña |
hwid | STRING | Hardware ID del dispositivo |
platform | STRING | Plataforma (iOS/Android) |
action | STRING | Acción realizada |
push_alerts_enabled | BOOL | Si las notificaciones están habilitadas |
status | STRING | Estado del envío |
error_reason | STRING | Razón del error (si aplica) |
user_id | STRING | ID del usuario |
application_code | STRING | Código de la aplicación |
Dataform (Transformaciones STG → ODS → GOLD)
Las transformaciones entre capas del warehouse se gestionan mediante Dataform, una herramienta de GCP para orquestación de SQL y modelado de datos.
Repositorio
GitHub: https://github.com/cheafdev/dataform-cheaf-models
Estructura del Proyecto
dataform-cheaf-models/
├── includes/ # Utilidades SQLX reutilizables (lógica de negocio, parsing de fechas)
├── definitions/models/
│ ├── ods/
│ │ ├── full/ # Modelos ODS full-refresh (tablas pequeñas/medianas)
│ │ └── incremental/ # Modelos ODS incrementales (tablas grandes/eventos)
│ └── gold/ # Capa Gold: modelos analíticos y de negocio
├── package.json # Configuración del proyecto
├── production_google # DER PostgreSQL
└── metadata_categorys_posgressql.json # Metadatos PostgreSQLConfiguración de Lanzamiento
- Release Configuration:
production_models - Rama:
master
Workflows Programados
| Workflow | Contenido | Schedule | Última Ejecución |
|---|---|---|---|
dimensiones_6h | dim_store | Diario 6:00 AM UTC | |
dimensiones_hourly | dim_user | Diario 6:00 AM UTC | |
fact_campaign_performance_daily | fact_campaign_performance_daily | Diario 7:00 AM UTC | |
gold_crm | 3 acciones | Diario 9:00 AM UTC | |
gold_cupones_co_wtd | gold_coupons_type_weekly_co | Martes 7:00 AM UTC |
Dominios de Negocio
- Users:
accounts_appuser,users_user,auth_user - Orders:
orders_order,orders_orderitem,orders_orderstatusinline,orders_orderalerts - Stores & Locations:
restaurants_store,places_place,restaurant_modules_* - Products & Categories:
product_product,products_categories - Promotions & Coupons:
promotions_coupon,orders_promotionscodeorder - Events: Pushwoosh (
stg_pushwoosh_messages)
Mejores Prácticas en Dataform
- DRY & Reusabilidad: Lógica centralizada en
includes/ - Documentación: Todos los modelos tienen descripción, tags y notas de transformación
- Orquestación: Diseñado para integrarse con Airflow (daily full ODS, hourly incremental, daily Gold)
- Validación: Checks de integridad referencial y conteo de registros
Prerequisitos y Dependencias
Infraestructura Requerida
- Airflow 2.8.1: Con Python 3.10+
- PostgreSQL: Base de datos principal (puerto 5432)
- Cassandra: Base de datos de notificaciones
- BigQuery: Data warehouse destino
- Gmail API: Para ingesta de archivos de merma
- Redis: Sistema de colas
- AWS S3: Almacenamiento de archivos de análisis de productos
Conexiones de Airflow Requeridas
Antes de ejecutar los DAGs, asegúrate de tener configuradas las siguientes conexiones en Airflow:
- cassandra_default: Conexión a Apache Cassandra
- google_cloud_default: Credenciales de GCP para BigQuery
- aws_default: Credenciales de AWS para acceso a S3
- postgres_default: Conexión a PostgreSQL
Configuraciones Críticas
- Smart Scheduler: Configurado en zona horaria Argentina (GMT-3)
- Limpieza automática: Umbral 85% de uso de disco
- Reintentos: 1 reintento por DAG con delay de 1 minuto
- Pausas: 2 minutos entre DAGs para estabilización
- Connection pooling: Máximo 10 conexiones concurrentes por fuente
- Batch processing: Lotes de 10,000 registros por transacción
Airflow: DAGs de Ingesta Activos
Airflow se utiliza para la ingesta de datos desde fuentes que no soportan CDC nativo o requieren lógica de transformación personalizada.
DAGs Activos en Producción
gmail_merma_to_bigquery: Ingesta archivos de merma desde Gmail hacia BigQuery.cassandra_crm_incremental: Ingesta datos de CRM desde Cassandra hacia BigQuery.segments_incremental_id: Ingesta datos de segmentos de usuarios.
DAGs de Ingesta Cassandra y S3
| DAG ID | Archivo | Schedule | Descripción |
|---|---|---|---|
cassandra_notifications_nightly_ingestion | notifications_cassandra_raw_detail_incremental.py | 0 2 * * * (23:00 ARG) | Ingesta nocturna de notificaciones desde Cassandra a BigQuery con validación y backfill automático |
cassandra_tracker_incremental_7h | cassandra_tracker_massive_ingestion.py | 0 */7 * * * | Ingesta incremental de tracker de notificaciones cada 7 horas |
s3_analysis_to_bigquery | s3_analysis_to_bq.py | 0 2 * * * | Ingesta de archivos de análisis de productos desde S3 a BigQuery |
Detalle de cada DAG:
-
cassandra_notifications_nightly_ingestion: Extrae datos denotifications_by_campaign_filter(Cassandra) y los carga enstg_notifications_cassandra_raw_detail(BigQuery). Incluye validación de conteos y backfill selectivo de schedulers faltantes. -
cassandra_tracker_incremental_7h: Procesa la tablanotifications_tracker_by_daydesde Cassandra hacia BigQuery con deduplicación y validación de datos existentes. -
s3_analysis_to_bigquery: Procesa archivos JSON de análisis de productos (original_datasetyleft_dataset) desde S3 hacia BigQuery en paralelo.
Integración con Sistemas Externos (Airflow)
El sistema de Airflow se integra con:
- BigQuery: Destino principal de los datos ingeridos
- Cassandra: Fuente de datos para notificaciones, tracker y segmentos
- Gmail: Fuente de datos para archivos de merma
- AWS S3: Fuente de archivos JSON de análisis de productos
Validaciones del Sistema
Validaciones de Integridad de Datos
- Verificación de esquemas: Cada DAG valida estructura antes de procesar
- Checkpoints de datos: Puntos de verificación en cada etapa del pipeline
- Validación de tipos: Verificación de tipos de datos en cada columna
- Detección de duplicados: Identificación y manejo de registros duplicados
- Validación de rangos: Verificación de valores dentro de rangos esperados
- Consistencia referencial: Validación de claves foráneas entre tablas
Validaciones de Recursos del Sistema
- Uso de memoria: Monitoreo continuo durante ejecución (límite: 4GB por DAG)
- Espacio en disco: Verificación antes de cada DAG (mínimo: 10GB libres)
- Conexiones de BD: Validación de conectividad antes de ingesta
- Timeouts configurables:
- Timeout por tarea: 30 minutos
- Timeout por DAG: 2 horas
- Timeout de conexión: 30 segundos
Validaciones de Calidad de Datos
- Completitud de datos: Verificación de campos obligatorios
- Precisión de datos: Validación contra reglas de negocio
- Consistencia temporal: Verificación de secuencias temporales
- Validación de formato: Verificación de formatos de fecha, email, etc.
Validaciones de Rollback y Recuperación
- Rollback automático: En caso de fallo crítico durante ingesta
- Puntos de recuperación: Snapshots cada 1000 registros procesados
- Validación post-ingesta: Verificación de integridad después de carga
- Alertas automáticas: Notificaciones en caso de anomalías detectadas
Estructura del Proyecto
DATA-PIPELINES-ORCHESTRATOR/
├── config/
│ └── incremental_config.py # Configuración de ingestas incrementales
│
├── dags/
│ ├── notifications_cassandra_raw_detail_incremental.py # Ingesta nocturna Cassandra → BigQuery
│ ├── cassandra_tracker_massive_ingestion.py # Ingesta masiva tracker Cassandra → BigQuery
│ └── s3_analysis_to_bq.py # Ingesta análisis S3 → BigQuery
│
├── json/
│ └── metadata_categorys.json # Metadatos por categoría de tablas
│
├── utils/
│ ├── common_imports.py # Importaciones compartidas
│ ├── credentials.py # Autenticación con Google Cloud
│ ├── data_preparation.py # Transformaciones y limpieza
│ ├── observability.py # Métricas, logs y validación
│ └── sql_utils.py # Utilidades SQL dinámicasUtilidades Comunes
Módulos en utils/
credentials.py: Manejo seguro de credenciales para GCP.data_preparation.py: Normalización, tipificación, y limpieza de datos.observability.py: Validación de integridad, conteo, logs y alertas.sql_utils.py: Construcción dinámica de queries SQL y utilidades para PostgreSQL.common_imports.py: Importaciones compartidas para todos los módulos.
Configuración
config/incremental_config.py: Define las reglas para cargas incrementales por tabla o categoría.
Metadatos
json/metadata_categorys.json: Clasificación y prioridad de tablas, clave para la segmentación de pipelines.
Funcionalidades Destacadas
- Ingesta en tiempo real de PostgreSQL via DataStream (CDC)
- Transformaciones SQL con Dataform (STG → ODS → GOLD)
- Ingesta batch desde múltiples fuentes: Cassandra, S3, Gmail
- Validación automatizada de conteos entre origen y destino
- Backfill automático de datos faltantes
- Deduplicación y manejo de errores robusto
- Procesamiento paralelo de datasets
Consideraciones Técnicas
- La ingesta de datos transaccionales (PostgreSQL) se realiza mediante DataStream con CDC en tiempo real.
- Las transformaciones entre capas (STG → ODS → GOLD) se gestionan con Dataform.
- Airflow se utiliza exclusivamente para fuentes que no soportan CDC nativo (Cassandra, S3, Gmail).
- El sistema utiliza Airflow 2.8.1 con Python 3.10 para los DAGs de ingesta batch.
- Los DAGs de ingesta utilizan patrones de diseño consistentes para facilitar mantenimiento.
- El sistema de reintentos automáticos garantiza la robustez de la ejecución.
Dependencias
GCP Services
- DataStream: Replicación CDC PostgreSQL → BigQuery
- BigQuery: Data Warehouse (STG, ODS, GOLD)
- Dataform: Transformaciones SQL y modelado de datos
Airflow (Ingesta Batch)
- Apache Airflow 2.8.1
- Python >= 3.10
- Apache Cassandra (conexión configurada en Airflow)
- AWS S3 (credenciales configuradas)
- Gmail API
- Dependencias Python:
pandas,google-cloud-bigquery,cassandra-driver,boto3