Modern Data Engineering Patterns
Data engineering has evolved dramatically with cloud-native technologies and streaming architectures. This guide covers modern patterns for building scalable, reliable data pipelines.
Streaming Architecture with Kafka
Producer Pattern
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
def send_event(topic, key, data):
try:
future = producer.send(topic, key=key, value=data)
record_metadata = future.get(timeout=10)
print(f"Sent to {record_metadata.topic} partition {record_metadata.partition}")
except Exception as e:
print(f"Failed to send message: {e}")
Consumer Pattern
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='analytics-group',
enable_auto_commit=True,
auto_offset_reset='latest'
)
for message in consumer:
try:
process_event(message.value)
except Exception as e:
print(f"Error processing message: {e}")
# Send to dead letter queue
send_to_dlq(message)
Batch Processing with Spark
Data Transformation Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()
def process_user_events(input_path, output_path):
# Read data
events_df = spark.read.parquet(input_path)
# Data quality checks
clean_events = events_df.filter(
col("user_id").isNotNull() &
col("event_type").isNotNull() &
col("timestamp").isNotNull()
)
# Aggregations
user_stats = clean_events.groupBy("user_id").agg(
count("*").alias("event_count"),
avg("session_duration").alias("avg_session_duration")
)
# Add derived columns
user_stats_enhanced = user_stats.withColumn(
"user_segment",
when(col("event_count") > 100, "high_activity")
.when(col("event_count") > 10, "medium_activity")
.otherwise("low_activity")
)
# Write output
user_stats_enhanced.write.mode("overwrite").parquet(output_path)
Data Quality Framework
Validation Rules
from typing import List, Dict, Any
import pandas as pd
class DataQualityRule:
def __init__(self, name: str, description: str):
self.name = name
self.description = description
def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
raise NotImplementedError
class NonNullRule(DataQualityRule):
def __init__(self, column: str):
super().__init__(
name=f"non_null_{column}",
description=f"Column {column} should not contain null values"
)
self.column = column
def validate(self, df: pd.DataFrame) -> Dict[str, Any]:
null_count = df[self.column].isnull().sum()
total_count = len(df)
return {
"rule_name": self.name,
"passed": null_count == 0,
"null_count": null_count,
"total_count": total_count,
"null_percentage": (null_count / total_count) * 100
}
class DataQualityValidator:
def __init__(self, rules: List[DataQualityRule]):
self.rules = rules
def validate_dataset(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
results = []
for rule in self.rules:
result = rule.validate(df)
results.append(result)
return results
Cloud-Native Data Architecture
AWS Data Pipeline
import boto3
from datetime import datetime
class S3DataPipeline:
def __init__(self, bucket_name):
self.s3 = boto3.client('s3')
self.bucket_name = bucket_name
def upload_data(self, data, key):
"""Upload data to S3 with partitioning"""
partition_key = f"year={datetime.now().year}/month={datetime.now().month}/day={datetime.now().day}/{key}"
try:
self.s3.put_object(
Bucket=self.bucket_name,
Key=partition_key,
Body=data
)
return partition_key
except Exception as e:
print(f"Upload failed: {e}")
raise
def trigger_processing(self, input_path, output_path):
"""Trigger Glue job for data processing"""
glue = boto3.client('glue')
response = glue.start_job_run(
JobName='data-processing-job',
Arguments={
'--input_path': input_path,
'--output_path': output_path
}
)
return response['JobRunId']
Monitoring and Observability
Pipeline Monitoring
import time
import logging
from dataclasses import dataclass
from typing import Optional
@dataclass
class PipelineMetrics:
pipeline_name: str
start_time: float
end_time: Optional[float] = None
records_processed: int = 0
errors_count: int = 0
@property
def duration(self) -> float:
if self.end_time:
return self.end_time - self.start_time
return time.time() - self.start_time
class PipelineMonitor:
def __init__(self, pipeline_name: str):
self.metrics = PipelineMetrics(
pipeline_name=pipeline_name,
start_time=time.time()
)
self.logger = logging.getLogger(pipeline_name)
def record_processed(self, count: int = 1):
self.metrics.records_processed += count
def record_error(self, error: Exception):
self.metrics.errors_count += 1
self.logger.error(f"Pipeline error: {error}")
def finish(self):
self.metrics.end_time = time.time()
self.logger.info(
f"Pipeline {self.metrics.pipeline_name} completed: "
f"{self.metrics.records_processed} records processed "
f"in {self.metrics.duration:.2f} seconds"
)
Best Practices
- Design for Failure: Implement retry logic and dead letter queues
- Monitor Data Quality: Continuously validate data integrity
- Use Schema Evolution: Design schemas that can evolve over time
- Implement Backpressure: Handle varying data volumes gracefully
- Optimize for Cost: Use appropriate storage and compute resources
Modern data engineering requires understanding both batch and streaming patterns, implementing robust monitoring, and designing for scale and reliability.