Follow this accelerated timeline to prepare for your Databricks Data Engineer Professional certification in two weeks:
Objectives:
- Understand Databricks Lakehouse Platform architecture
- Learn Unity Catalog fundamentals and data governance
- Review exam structure and domains
- Set up Databricks workspace and clusters
Resources:
- Databricks Platform Documentation
- Unity Catalog Guide
- Official Certification Portal
Practice Examples:
-- Unity Catalog setup and basic operations
CREATE CATALOG IF NOT EXISTS production;
USE CATALOG production;
CREATE SCHEMA IF NOT EXISTS bronze_data;
USE SCHEMA bronze_data;
-- Create managed table in Unity Catalog
CREATE TABLE sales_raw (
id INT,
transaction_date TIMESTAMP,
amount DECIMAL(10,2),
customer_id INT,
region STRING
) USING DELTA;
-- Grant permissions
GRANT USAGE ON CATALOG production TO DATA_ENGINEERS;
GRANT USE SCHEMA ON SCHEMA bronze_data TO DATA_ENGINEERS;
Objectives:
- Master PySpark DataFrame API
- Learn Spark SQL optimizations
- Practice UDFs and Pandas UDFs
- Understand execution plans
Resources:
- PySpark Documentation
- Spark SQL Language Reference
- Databricks Notebook Examples
Practice Examples:
# PySpark DataFrame operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Create Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
# Read data with schema enforcement
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("salary", DoubleType(), True),
StructField("department", StringType(), True)
])
df = spark.read.schema(schema).csv("/mnt/data/employees.csv")
# Complex transformations
processed_df = (df
.filter(col("salary") > 50000)
.withColumn("bonus", col("salary") * 0.1)
.withColumn("total_comp", col("salary") + col("bonus"))
.groupBy("department")
.agg(
avg("total_comp").alias("avg_comp"),
sum("total_comp").alias("total_dept_comp"),
count("*").alias("employee_count")
)
)
# UDF example
@udf(returnType=StringType())
def salary_category(salary):
if salary < 50000: return "Junior"
elif salary < 100000: return "Mid"
else: return "Senior"
df_with_category = df.withColumn("category", salary_category(col("salary")))
Objectives:
- Master Auto Loader for incremental ingestion
- Learn batch ingestion patterns
- Understand LakeFlow Connect connectors
- Practice streaming ingestion
Resources:
- Auto Loader Documentation
- LakeFlow Connect Guide
- Structured Streaming Examples
Practice Examples:
# Auto Loader for incremental file processing
from pyspark.sql.streaming import DataStreamReader
# Auto Loader configuration for cloud files
streaming_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/tmp/schema_location")
.option("cloudFiles.inferColumnTypes", "true")
.load("/mnt/raw-data/")
)
# Write stream to Delta table
(streaming_df.writeStream
.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.table("bronze_data.raw_events")
)
# Batch ingestion with COPY INTO
-- COPY INTO for idempotent batch loads
COPY INTO bronze_data.sales_data
FROM '/mnt/external/sales/'
FILEFORMAT = PARQUET
PATTERN = '*.parquet'
COPY_OPTIONS ('mergeSchema' = 'true');
# Kafka streaming ingestion
kafka_df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
Objectives:
- Master Delta Lake ACID transactions
- Learn MERGE, UPDATE, DELETE operations
- Understand time travel and versioning
- Practice OPTIMIZE and ZORDER
Resources:
- Delta Lake Documentation
- Delta Lake API Reference
- Performance Tuning Guide
Practice Examples:
-- Delta Lake MERGE operation (UPSERT)
MERGE INTO silver_data.customers AS target
USING (
SELECT
customer_id,
customer_name,
email,
updated_at,
operation
FROM bronze_data.customers_staging
WHERE operation IN ('INSERT', 'UPDATE')
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.operation = 'UPDATE' THEN
UPDATE SET *
WHEN NOT MATCHED AND source.operation = 'INSERT' THEN
INSERT *
-- Time travel queries
SELECT * FROM silver_data.customers TIMESTAMP AS OF '2025-01-01'
SELECT * FROM silver_data.customers VERSION AS OF 12
-- Delta table maintenance
OPTIMIZE silver_data.customers
ZORDER BY (customer_id, region)
-- VACUUM to remove old files
VACUUM silver_data.customers RETAIN 168 HOURS
-- Change Data Feed (CDF)
SELECT * FROM table_changes('silver_data.customers', 10, 15)
-- Streaming from Delta table with CDF
(spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.table("silver_data.customers")
)
Objectives:
- Implement data quality checks
- Learn data validation patterns
- Practice complex transformations
- Understand schema evolution
Resources:
- Data Quality Framework Docs
- Schema Evolution Guide
- Expectations Library
Practice Examples:
# Data quality checks with expectations
from databricks.feature_store import Expectation
# Define data quality expectations
expectations = [
Expectation("valid_amount", "amount IS NOT NULL AND amount > 0"),
Expectation("valid_date", "transaction_date >= '2020-01-01'"),
Expectation("unique_ids", "id IS NOT NULL AND id > 0")
]
# Apply expectations to DataFrame
validated_df = (df
.expect_all(expectations)
.when_failed_drop() # or .when_failed_fail()
)
# Complex data cleansing
cleansed_df = (df
.filter(col("status") == "active")
.withColumn("email", lower(trim(col("email"))))
.withColumn("phone", regexp_replace(col("phone"), "[^0-9]", ""))
.withColumn("age",
when(col("age").isNull(), lit(0))
.when(col("age") < 0, lit(0))
.when(col("age") > 120, lit(120))
.otherwise(col("age"))
)
.dropDuplicates(["user_id", "transaction_date"])
)
# Schema evolution handling
evolved_df = (spark.read
.format("delta")
.option("mergeSchema", "true")
.load("/mnt/tables/customer_data")
)
Objectives:
- Implement Bronze, Silver, Gold layers
- Learn Slowly Changing Dimensions (SCD)
- Practice star schema modeling
- Understand data partitioning strategies
Resources:
- Medallion Architecture Guide
- Data Modeling Best Practices
- SCD Type 2 Implementation
Practice Examples:
-- Bronze layer (raw data ingestion)
CREATE TABLE bronze_data.sales_raw
USING DELTA
AS SELECT * FROM read_files('/mnt/raw/sales/*.json');
-- Silver layer (cleansed and validated data)
CREATE TABLE silver_data.sales_cleansed
USING DELTA
AS SELECT
id,
customer_id,
product_id,
CAST(amount AS DECIMAL(10,2)) as amount,
CAST(transaction_date AS TIMESTAMP) as transaction_date,
region,
CURRENT_TIMESTAMP() as processed_at
FROM bronze_data.sales_raw
WHERE amount IS NOT NULL
AND transaction_date IS NOT NULL;
-- Gold layer (business aggregates)
CREATE TABLE gold_data.sales_aggregates
USING DELTA
PARTITIONED BY (region, year_month)
AS SELECT
region,
DATE_TRUNC('month', transaction_date) as year_month,
COUNT(*) as transaction_count,
SUM(amount) as total_sales,
AVG(amount) as avg_sale_amount,
COUNT(DISTINCT customer_id) as unique_customers
FROM silver_data.sales_cleansed
GROUP BY region, DATE_TRUNC('month', transaction_date);
-- SCD Type 2 implementation
MERGE INTO gold_data.customers_dim AS target
USING (
SELECT
customer_id,
customer_name,
email,
address,
'2025-01-01' as valid_from,
'9999-12-31' as valid_to,
TRUE as is_current
FROM silver_data.customers_updated
) AS source
ON target.customer_id = source.customer_id AND target.is_current = TRUE
WHEN MATCHED AND target.email <> source.email THEN
UPDATE SET
valid_to = CURRENT_DATE(),
is_current = FALSE
WHEN NOT MATCHED THEN
INSERT (customer_id, customer_name, email, address, valid_from, valid_to, is_current)
VALUES (source.customer_id, source.customer_name, source.email, source.address,
source.valid_from, source.valid_to, source.is_current);
Objectives:
- Learn query optimization techniques
- Master Spark UI and query profiling
- Practice partitioning and ZORDERing
- Understand cost management
Resources:
- Spark UI Documentation
- Performance Tuning Guide
- Cost Optimization Best Practices
Practice Examples:
-- Performance optimization techniques
-- Create optimized table with partitioning and ZORDER
CREATE TABLE gold_data.optimized_sales
USING DELTA
PARTITIONED BY (sale_year, sale_month)
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
AS SELECT
*,
YEAR(transaction_date) as sale_year,
MONTH(transaction_date) as sale_month
FROM silver_data.sales_cleansed;
-- Optimize existing table
OPTIMIZE gold_data.optimized_sales
ZORDER BY (customer_id, product_id, region);
-- Analyze table statistics
ANALYZE TABLE gold_data.optimized_sales COMPUTE STATISTICS FOR ALL COLUMNS;
-- Show table details for optimization
DESCRIBE DETAIL gold_data.optimized_sales;
-- Check file layout for optimization
SELECT * FROM delta_file_snapshot('gold_data.optimized_sales');
# PySpark performance configurations
# Enable adaptive query execution
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skew.enabled", "true")
# Enable Delta cache
spark.conf.set("spark.databricks.io.cache.enabled", "true")
# Optimize shuffle partitions
spark.conf.set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "1000")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
Objectives:
- Master Unity Catalog security
- Learn row and column level security
- Practice data masking and encryption
- Understand compliance frameworks
Resources:
- Unity Catalog Security Guide
- Data Governance Documentation
- Compliance Framework Docs
Practice Examples:
-- Unity Catalog security implementation
-- Create roles and grants
CREATE ROLE data_engineer;
CREATE ROLE data_analyst;
CREATE ROLE data_scientist;
-- Grant catalog permissions
GRANT USE CATALOG ON CATALOG production TO data_engineer;
GRANT USE SCHEMA ON SCHEMA silver_data TO data_engineer;
GRANT SELECT ON ALL TABLES IN SCHEMA silver_data TO data_analyst;
-- Row level security with dynamic views
CREATE VIEW silver_data.sales_regional AS
SELECT *
FROM silver_data.sales_cleansed
WHERE region IN (
SELECT region
FROM current_user_regions
WHERE user_email = current_user()
);
-- Column level security with masking
CREATE VIEW silver_data.sales_masked AS
SELECT
id,
customer_id,
amount,
transaction_date,
region,
CASE
WHEN is_member('hr_team') THEN customer_name
ELSE '***MASKED***'
END as customer_name,
CASE
WHEN is_member('finance_team') THEN email
ELSE regexp_replace(email, '(.).*@', '***@')
END as email
FROM silver_data.sales_cleansed;
-- Data lineage tracking
-- Unity Catalog automatically tracks lineage for tables and views
-- Query lineage information
SELECT * FROM system.information_schema.table_lineage;
Objectives:
- Implement pipeline monitoring
- Learn alert configuration
- Practice log analysis
- Understand observability patterns
Resources:
- Monitoring & Alerting Documentation
- Observability Best Practices
- Log Analysis Guide
Practice Examples:
-- Monitor Delta table changes and metrics
-- Check table history for changes
DESCRIBE HISTORY gold_data.sales_aggregates;
-- Monitor streaming progress
SELECT * FROM stream_reads('gold_data.sales_stream');
-- Check query history and performance
SELECT
query_text,
user_name,
execution_time,
read_bytes,
write_bytes,
status
FROM system.query.history
WHERE start_time > CURRENT_DATE() - INTERVAL 1 DAY
ORDER BY execution_time DESC;
-- Set up alerts for data quality
CREATE ALERT data_freshness_alert
ON gold_data.sales_aggregates
CONDITION: MAX(transaction_date) < CURRENT_DATE() - INTERVAL 1 DAY
ACTION: SEND_EMAIL('data-team@company.com', 'Data Freshness Alert');
-- Monitor cluster performance
SELECT
cluster_id,
avg(cpu_usage) as avg_cpu,
avg(memory_usage) as avg_memory,
sum(num_queries) as total_queries
FROM system.cluster.usage
WHERE start_time > CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY cluster_id;
# Python monitoring with custom metrics
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
def monitor_pipeline_health():
w = WorkspaceClient()
# Get recent run details
runs = w.jobs.list_runs(
job_id=12345,
limit=10,
expand_tasks=True
)
# Check for failures
failed_runs = [r for r in runs if r.state.result_state == "FAILED"]
if failed_runs:
# Send alert
send_alert(f"Pipeline failures detected: {len(failed_runs)}")
return len(failed_runs)
Objectives:
- Master Workflows configuration
- Learn task dependencies and scheduling
- Practice error handling and retries
- Understand workflow patterns
Resources:
- Workflows Documentation
- Orchestration Best Practices
- Job Scheduling Guide
Practice Examples:
# Workflow configuration with Python API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
w = WorkspaceClient()
# Create a multi-task workflow
pipeline_job = w.jobs.create(
name="Medallion Architecture Pipeline",
tags={
"environment": "production",
"team": "data-engineering"
},
tasks=[
jobs.Task(
task_key="ingest_bronze",
description="Ingest raw data to bronze layer",
notebook_task=jobs.NotebookTask(
notebook_path="/Workspace/Users/engineer/bronze_ingestion",
base_parameters={"source_path": "/mnt/raw/sales/"}
),
timeout_seconds=3600,
email_notifications=jobs.JobEmailNotifications(
on_failure=["engineer@company.com"]
)
),
jobs.Task(
task_key="transform_silver",
description="Transform bronze to silver layer",
notebook_task=jobs.NotebookTask(
notebook_path="/Workspace/Users/engineer/silver_transformation"
),
depends_on=[jobs.TaskDependency(task_key="ingest_bronze")],
timeout_seconds=7200
),
jobs.Task(
task_key="aggregate_gold",
description="Create gold layer aggregates",
notebook_task=jobs.NotebookTask(
notebook_path="/Workspace/Users/engineer/gold_aggregation"
),
depends_on=[jobs.TaskDependency(task_key="transform_silver")],
timeout_seconds=3600
)
],
schedule=jobs.CronSchedule(
quartz_cron_expression="0 0 6 * * ?", # Daily at 6 AM
timezone_id="America/New_York"
),
max_concurrent_runs=1
)
# Trigger workflow run programmatically
run = w.jobs.run_now(job_id=pipeline_job.job_id)
Objectives:
- Master DLT pipeline development
- Learn expectations and data quality
- Practice incremental processing
- Understand DLT orchestration
Resources:
- DLT Documentation
- DLT Examples and Patterns
- Data Quality with DLT
Practice Examples:
# Delta Live Tables pipeline example
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
@dlt.table(
comment="Raw sales data from source system",
table_properties={
"quality": "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def sales_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/tmp/schema/sales_bronze")
.load("/mnt/raw/sales/")
)
@dlt.table(
comment="Cleansed and validated sales data",
table_properties={"quality": "silver"}
)
@dlt.expect("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "transaction_date IS NOT NULL")
@dlt.expect_or_fail("valid_customer", "customer_id IS NOT NULL")
def sales_silver():
return (
dlt.read_stream("sales_bronze")
.select(
col("id").cast("integer"),
col("customer_id").cast("integer"),
col("product_id").cast("integer"),
col("amount").cast("decimal(10,2)"),
col("transaction_date").cast("timestamp"),
col("region"),
current_timestamp().alias("processed_at")
)
.dropDuplicates(["id", "transaction_date"])
)
@dlt.table(
comment="Aggregated sales data by region and month",
table_properties={"quality": "gold"}
)
def sales_gold_aggregates():
return (
dlt.read("sales_silver")
.groupBy(
"region",
date_trunc("month", "transaction_date").alias("year_month")
)
.agg(
sum("amount").alias("total_sales"),
avg("amount").alias("avg_sale_amount"),
count("*").alias("transaction_count"),
countDistinct("customer_id").alias("unique_customers")
)
)
@dlt.view(
comment="Latest sales data for reporting"
)
def sales_current_month():
return (
dlt.read("sales_silver")
.filter(col("transaction_date") >= date_sub(current_date(), 30))
)
Objectives:
- Master Databricks SQL Connector for Python
- Learn REST API integration
- Practice authentication methods
- Understand programmatic workflows
Resources:
- SQL Connector Documentation
- REST API Reference
- Authentication Guide
Practice Examples:
# Databricks SQL Connector for Python
from databricks import sql
import os
import pandas as pd
# Connection with personal access token
with sql.connect(
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
access_token=os.getenv("DATABRICKS_TOKEN")
) as connection:
with connection.cursor() as cursor:
# Execute query with parameters (safe from SQL injection)
cursor.execute("""
SELECT
region,
COUNT(*) as transaction_count,
SUM(amount) as total_sales
FROM gold_data.sales_aggregates
WHERE year_month >= %s
GROUP BY region
""", ('2025-01-01',))
# Fetch results
result = cursor.fetchall()
# Convert to pandas DataFrame
df = pd.DataFrame(result, columns=[desc[0] for desc in cursor.description])
print(df)
# OAuth M2M authentication example
from databricks.sdk.core import Config, oauth_service_principal
def credential_provider():
config = Config(
host=f"https://{os.getenv('DATABRICKS_SERVER_HOSTNAME')}",
client_id=os.getenv("DATABRICKS_CLIENT_ID"),
client_secret=os.getenv("DATABRICKS_CLIENT_SECRET")
)
return oauth_service_principal(config)
# Query with tags for tracking
with sql.connect(
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
access_token=os.getenv("DATABRICKS_TOKEN"),
session_configuration={
'query_tags': 'team:engineering,pipeline:sales_etl,env:prod'
}
) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT CURRENT_TIMESTAMP() as current_time")
result = cursor.fetchone()
print(f"Query executed at: {result['current_time']}")
Objectives:
- Master Databricks Asset Bundles
- Learn Git integration and best practices
- Practice deployment automation
- Understand testing strategies
Resources:
- Asset Bundles Documentation
- Git Integration Guide
- CI/CD Best Practices
Practice Examples:
# databricks.yml - Asset Bundle configuration
bundle:
name: "sales-data-pipeline"
include:
- "*.yml"
targets:
development:
mode: "development"
default: true
workspace:
host: "https://adb-1234567890123456.7.databricks.com"
bundle:
git:
branch: "main"
origin_url: "https://github.com/company/sales-pipeline.git"
production:
mode: "production"
workspace:
host: "https://adb-9876543210987654.7.databricks.com"
bundle:
git:
branch: "production"
origin_url: "https://github.com/company/sales-pipeline.git"
resources:
jobs:
sales_medallion_pipeline:
name: "Sales Medallion Pipeline"
tasks:
- task_key: "ingest_bronze"
job_cluster_key: "main_cluster"
notebook_task:
notebook_path: "../src/bronze/ingest_sales.py"
libraries:
- notebook:
path: "../src/bronze/ingest_sales.py"
pipelines:
sales_dlt_pipeline:
name: "Sales DLT Pipeline"
development: true
continuous: false
clusters:
- label: "default"
num_workers: 2
libraries:
- notebook:
path: "../src/dlt/sales_pipeline.py"
# GitHub Actions workflow for CI/CD
name: Deploy to Databricks
on:
push:
branches: [ main, production ]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Deploy to Databricks
uses: databricks/deploy-databricks-asset-bundle@v1
with:
bundle: sales-data-pipeline
target: ${{ github.ref == 'refs/heads/production' && 'production' || 'development' }}
env:
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
Objectives:
- Complete comprehensive practice exams
- Review weak areas and domains
- Final documentation review
- Schedule certification exam
Resources:
- Free Practice Test (provided link)
- Official Exam Guide Review
- Domain-specific quick references
Final Checklist:
- ✓ Completed all 10 exam domains
- ✓ Scored 80%+ on practice tests
- ✓ Hands-on practice with all key features
- ✓ Reviewed weak areas thoroughly
- ✓ Scheduled certification exam
Key Patterns Quick Reference:
-- Essential SQL patterns for exam
-- 1. Delta Lake MERGE pattern
MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- 2. Incremental processing with Auto Loader
CREATE STREAMING TABLE bronze_raw
AS SELECT * FROM cloud_files("/mnt/raw/", "json");
-- 3. Data quality with expectations
CREATE TABLE silver_cleansed (
CONSTRAINT valid_id EXPECT (id IS NOT NULL),
CONSTRAINT positive_amount EXPECT (amount > 0)
) AS SELECT * FROM bronze_raw;
-- 4. SCD Type 2 implementation
MERGE INTO dim_customers t
USING stg_customers s ON t.customer_id = s.customer_id AND t.is_current = true
WHEN MATCHED AND t.email <> s.email THEN
UPDATE SET valid_to = current_date(), is_current = false
WHEN NOT MATCHED THEN
INSERT (customer_id, email, valid_from, valid_to, is_current)
VALUES (s.customer_id, s.email, current_date(), '9999-12-31', true);
-- 5. Performance optimization
OPTIMIZE sales_table ZORDER BY (customer_id, region);
-- 6. Time travel and data lineage
SELECT * FROM sales_table TIMESTAMP AS OF '2025-01-01';
DESCRIBE HISTORY sales_table;
# Essential PySpark patterns
from pyspark.sql import functions as F
# Incremental DataFrame processing
incremental_df = (spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2025-01-01T00:00:00Z")
.table("source_table")
)
# Complex aggregation with windows
window_spec = Window.partitionBy("customer_id").orderBy("transaction_date")
result_df = (df
.withColumn("prev_amount", F.lag("amount").over(window_spec))
.withColumn("amount_change", F.col("amount") - F.col("prev_amount"))
.withColumn("row_num", F.row_number().over(window_spec))
)
No comments:
Post a Comment