Modern Data Engineering Patterns

Modern Data Engineering Patterns

Data engineering has evolved dramatically with cloud-native technologies and streaming architectures. This guide covers modern patterns for building scalable, reliable data pipelines.

Streaming Architecture with Kafka

Producer Pattern

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

def send_event(topic, key, data):
    try:
        future = producer.send(topic, key=key, value=data)
        record_metadata = future.get(timeout=10)
        print(f"Sent to {record_metadata.topic} partition {record_metadata.partition}")
    except Exception as e:
        print(f"Failed to send message: {e}")

Consumer Pattern

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='analytics-group',
    enable_auto_commit=True,
    auto_offset_reset='latest'
)

for message in consumer:
    try:
        process_event(message.value)
    except Exception as e:
        print(f"Error processing message: {e}")
        # Send to dead letter queue
        send_to_dlq(message)

Batch Processing with Spark

Data Transformation Pipeline

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg

spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

def process_user_events(input_path, output_path):
    # Read data
    events_df = spark.read.parquet(input_path)

    # Data quality checks
    clean_events = events_df.filter(
        col("user_id").isNotNull() &
        col("event_type").isNotNull() &
        col("timestamp").isNotNull()
    )

    # Aggregations
    user_stats = clean_events.groupBy("user_id").agg(
        count("*").alias("event_count"),
        avg("session_duration").alias("avg_session_duration")
    )

    # Add derived columns
    user_stats_enhanced = user_stats.withColumn(
        "user_segment",
        when(col("event_count") > 100, "high_activity")
        .when(col("event_count") > 10, "medium_activity")
        .otherwise("low_activity")
    )

    # Write output
    user_stats_enhanced.write.mode("overwrite").parquet(output_path)

Data Quality Framework

Validation Rules

from typing import List, Dict, Any
import pandas as pd

class DataQualityRule:
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description

    def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
        raise NotImplementedError

class NonNullRule(DataQualityRule):
    def __init__(self, column: str):
        super().__init__(
            name=f"non_null_{column}",
            description=f"Column {column} should not contain null values"
        )
        self.column = column

    def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
        null_count = df[self.column].isnull().sum()
        total_count = len(df)

        return {
            "rule_name": self.name,
            "passed": null_count == 0,
            "null_count": null_count,
            "total_count": total_count,
            "null_percentage": (null_count / total_count) * 100
        }

class DataQualityValidator:
    def __init__(self, rules: List[DataQualityRule]):
        self.rules = rules

    def validate_dataset(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
        results = []
        for rule in self.rules:
            result = rule.validate(df)
            results.append(result)

        return results

Cloud-Native Data Architecture

AWS Data Pipeline

import boto3
from datetime import datetime

class S3DataPipeline:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket_name = bucket_name

    def upload_data(self, data, key):
        """Upload data to S3 with partitioning"""
        partition_key = f"year={datetime.now().year}/month={datetime.now().month}/day={datetime.now().day}/{key}"

        try:
            self.s3.put_object(
                Bucket=self.bucket_name,
                Key=partition_key,
                Body=data
            )
            return partition_key
        except Exception as e:
            print(f"Upload failed: {e}")
            raise

    def trigger_processing(self, input_path, output_path):
        """Trigger Glue job for data processing"""
        glue = boto3.client('glue')

        response = glue.start_job_run(
            JobName='data-processing-job',
            Arguments={
                '--input_path': input_path,
                '--output_path': output_path
            }
        )

        return response['JobRunId']

Monitoring and Observability

Pipeline Monitoring

import time
import logging
from dataclasses import dataclass
from typing import Optional

@dataclass
class PipelineMetrics:
    pipeline_name: str
    start_time: float
    end_time: Optional[float] = None
    records_processed: int = 0
    errors_count: int = 0

    @property
    def duration(self) -> float:
        if self.end_time:
            return self.end_time - self.start_time
        return time.time() - self.start_time

class PipelineMonitor:
    def __init__(self, pipeline_name: str):
        self.metrics = PipelineMetrics(
            pipeline_name=pipeline_name,
            start_time=time.time()
        )
        self.logger = logging.getLogger(pipeline_name)

    def record_processed(self, count: int = 1):
        self.metrics.records_processed += count

    def record_error(self, error: Exception):
        self.metrics.errors_count += 1
        self.logger.error(f"Pipeline error: {error}")

    def finish(self):
        self.metrics.end_time = time.time()
        self.logger.info(
            f"Pipeline {self.metrics.pipeline_name} completed: "
            f"{self.metrics.records_processed} records processed "
            f"in {self.metrics.duration:.2f} seconds"
        )

Best Practices

  1. Design for Failure: Implement retry logic and dead letter queues
  2. Monitor Data Quality: Continuously validate data integrity
  3. Use Schema Evolution: Design schemas that can evolve over time
  4. Implement Backpressure: Handle varying data volumes gracefully
  5. Optimize for Cost: Use appropriate storage and compute resources

Modern data engineering requires understanding both batch and streaming patterns, implementing robust monitoring, and designing for scale and reliability.