The Databricks platform is built on a cloud-native architecture that separates control and data planes, providing both security and performance benefits.
Key Components:
The Databricks platform architecture consists of two primary planes:
Interaction Between Planes:
Key Exam Concepts:
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.
Core Delta Lake Components:
Transaction Log Deep Dive:
The transaction log is the backbone of Delta Lake and enables many of its key features:
/table_path/
├── _delta_log/
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ └── ...
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── ...
Each transaction log entry contains:
Key Exam Concepts:
Delta Lake is the first big data storage layer to provide full ACID (Atomicity, Consistency, Isolation, Durability) guarantees.
ACID Properties Explained:
Optimistic Concurrency Control:
Delta Lake uses optimistic concurrency control (OCC) rather than locking:
Which transactions might conflict?
Key Exam Concepts:
Let’s walk through how to create and perform basic operations with Delta tables.
Creating a Delta Table:
# Creating a Delta table from a DataFrame
df = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True)
df.write.format("delta").save("/path/to/delta-table")
# Creating a Delta table using SQL
spark.sql("""
CREATE TABLE my_delta_table
USING DELTA
LOCATION '/path/to/delta-table'
AS SELECT * FROM source_table
""")
Basic Operations:
# Reading a Delta table
df = spark.read.format("delta").load("/path/to/delta-table")
# Updating data
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")
deltaTable.update(
condition = "id = 100",
set = { "value": "'new_value'" }
)
# Time travel query
df_previous_version = spark.read.format("delta").option("versionAsOf", 1).load("/path/to/delta-table")
Delta Table History:
# View table history
deltaTable.history().show()
Delta Table Details:
# Detailed information about the table
deltaTable.detail().show()
Key Exam Concepts:
Delta Lake offers several optimization techniques to improve query performance and efficiency.
File Optimization: OPTIMIZE Command
The OPTIMIZE command compacts small files into larger ones:
-- Basic OPTIMIZE command
OPTIMIZE my_delta_table
-- OPTIMIZE with Z-ORDER
OPTIMIZE my_delta_table ZORDER BY (column1, column2)
Benefits:
Partitioning Delta Tables:
-- Creating a partitioned table
CREATE TABLE sales_data
USING DELTA
PARTITIONED BY (date)
LOCATION '/path/to/sales_data'
AS SELECT * FROM source_data
-- Writing to a partitioned table from DataFrame
df.write.format("delta")
.partitionBy("date")
.save("/path/to/sales_data")
Partitioning Best Practices:
Key Exam Concepts:
Z-ordering is a multi-dimensional clustering technique that helps co-locate related data.
Z-Ordering Example:
-- Apply Z-ORDER to specific columns
OPTIMIZE my_delta_table ZORDER BY (customer_id, product_id)
How Z-Ordering Works:
Z-Order vs. Partitioning:
Data Skipping:
Delta Lake automatically collects statistics on data files:
These statistics enable the query optimizer to skip files that can’t contain matching data.
Key Exam Concepts:
Delta clone creates a copy of a Delta table with minimal data duplication.
Types of Clones:
CREATE TABLE clone_table
SHALLOW CLONE source_table
CREATE TABLE clone_table
DEEP CLONE source_table
Common Use Cases:
Key Exam Concepts:
Bloom Filters:
Bloom filters are a space-efficient probabilistic data structure used to test whether an element is a member of a set.
-- Create a bloom filter index
CREATE BLOOMFILTER INDEX
ON TABLE my_delta_table
FOR COLUMNS(customer_id OPTIONS (fpp=0.1, numItems=10000000))
Benefits:
File Size Optimization:
Optimizing file sizes is crucial for Spark performance:
Controlling File Size:
# Using repartition to control number of files
df.repartition(8).write.format("delta").save("/path/to/table")
# Using coalesce for fewer files
df.coalesce(4).write.format("delta").save("/path/to/table")
Auto Optimize:
-- Enable auto optimization for a table
ALTER TABLE my_delta_table SET TBLPROPERTIES(
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
Key Exam Concepts:
Let’s test your understanding of today’s material with a short quiz:
What happens if two users try to update the same Delta table simultaneously? a) The first operation succeeds, and the second fails with a conflict error b) Both operations succeed, with the second overwriting the first c) Both operations fail with a locking error d) Delta automatically merges both changes
Which Delta Lake feature allows you to query the table as it appeared at a specific point in time? a) Time series b) Time travel c) Snapshot isolation d) Version control
What is the primary purpose of the Delta transaction log? a) To store table schemas b) To record all changes to the table c) To optimize query performance d) To enforce security policies
Which optimization technique is better suited for high-cardinality columns? a) Partitioning b) Z-ordering c) Bloom filters d) B-tree indexing
What is the difference between a shallow clone and a deep clone? a) Shallow clone copies only the schema, deep clone copies schema and data b) Shallow clone references original data files, deep clone copies data files c) Shallow clone is read-only, deep clone is read-write d) There is no difference, they’re synonyms
Which command would you use to compact small files in a Delta table? a) COMPACT b) OPTIMIZE c) VACUUM d) MERGE
What is the purpose of data skipping in Delta Lake? a) To avoid processing deleted records b) To skip unnecessary files during queries c) To ignore NULL values during aggregations d) To bypass schema validation
In the Databricks architecture, where does customer data reside? a) In the Databricks control plane b) In the customer’s data plane within their cloud account c) Across both control and data planes d) In a shared multi-tenant storage system
Answers and Explanations:
We covered the foundational elements of Databricks and Delta Lake:
Batch processing is the foundation of data engineering in Databricks, enabling the processing of large volumes of data in defined intervals.
Core Batch Processing Concepts:
Batch vs. Streaming:
Apache Spark Execution Model:
Understanding Spark’s execution model is crucial for optimizing batch jobs:
Transformations and Actions:
select(), filter(), groupBy()count(), collect(), write()Job → Stage → Task Hierarchy:
Key Performance Metrics:
Key Exam Concepts:
Effective partitioning is critical for Spark performance optimization.
Partitioning Fundamentals:
Partitioning Strategies:
Ideal Partition Size:
Handling Data Skew:
Data skew occurs when certain partitions contain significantly more data than others:
Key Exam Concepts:
Spark provides several mechanisms to control partitioning during processing.
Repartition vs. Coalesce:
# Repartition to 200 partitions
df = df.repartition(200)
# Repartition by specific column(s)
df = df.repartition(200, "customer_id")
# Reduce to 50 partitions
df = df.coalesce(50)
Repartition by Range:
# Repartition by range of values
df = df.repartitionByRange(200, "timestamp")
Rebalance:
# Available in Databricks Runtime (not in open-source Spark)
df = df.rebalance()
When to Use Each Approach:
| Operation | When to Use | Cost | Effect on Skew |
|---|---|---|---|
| Repartition | Need specific partition count, even distribution | High | Resolves skew |
| Coalesce | Need to reduce partitions efficiently | Low | May worsen skew |
| RepartitionByRange | Working with ordered data | High | Good for ordered data |
| Rebalance | Need to correct skew adaptively | Medium | Resolves moderate skew |
Key Exam Concepts:
Let’s walk through practical techniques for optimizing batch processing workloads.
Scenario: Processing Customer Transaction Data
Consider a large dataset of customer transactions that needs to be processed daily:
# Read raw transaction data
transactions = spark.read.format("delta").load("/data/raw/transactions")
# Initial processing
processed = transactions.filter(transactions.status == "completed") \
.join(customers, "customer_id") \
.groupBy("customer_id", "date") \
.agg(sum("amount").alias("daily_total"))
# Write results
processed.write.format("delta").mode("overwrite").save("/data/processed/daily_totals")
Step 1: Analyze the Current Job
Use the Spark UI to identify bottlenecks:
Step 2: Optimize Input
# Add partition pruning
transactions = spark.read.format("delta") \
.load("/data/raw/transactions") \
.filter(transactions.date == current_date)
# Broadcast small tables
customers = spark.read.format("delta").load("/data/customers")
broadcast_customers = F.broadcast(customers)
Step 3: Optimize Processing
# Add appropriate partitioning
processed = transactions.filter(transactions.status == "completed") \
.join(broadcast_customers, "customer_id") \
.repartition(200, "customer_id") \
.groupBy("customer_id", "date") \
.agg(sum("amount").alias("daily_total"))
Step 4: Optimize Output
# Optimize output partitioning
processed.repartition(100) \
.write.format("delta") \
.partitionBy("date") \
.mode("overwrite") \
.option("dataChange", "false") \
.save("/data/processed/daily_totals")
Step 5: Implement Caching for Iterative Processing
# Cache intermediate results when reused
enriched_transactions = transactions.filter(transactions.status == "completed") \
.join(broadcast_customers, "customer_id")
enriched_transactions.cache()
# Use for multiple aggregations
daily_totals = enriched_transactions.groupBy("customer_id", "date") \
.agg(sum("amount").alias("daily_total"))
product_analysis = enriched_transactions.groupBy("product_id") \
.count()
# Unpersist when done
enriched_transactions.unpersist()
Key Optimization Techniques:
Key Exam Concepts:
Structured Streaming brings real-time data processing capabilities to Spark’s DataFrame API.
Structured Streaming Fundamentals:
Core Concepts:
Stream Sources:
# Reading from a stream source
stream_df = spark.readStream.format("delta") \
.option("maxFilesPerTrigger", 10) \
.load("/data/source")
Common stream sources:
Processing Modes:
Output Modes:
# Only new rows are written to sink
query = df.writeStream.outputMode("append")...
# Full result table is written each time
query = df.writeStream.outputMode("complete")...
# Only updated rows are written
query = df.writeStream.outputMode("update")...
Watermarking:
# Setting a watermark for handling late data
df = df.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "device") \
.count()
Key Exam Concepts:
Joining streaming data with static (batch) data is a common pattern in data engineering.
Types of Stream Joins:
# Static DataFrame
static_df = spark.read.format("delta").load("/data/static")
# Streaming DataFrame
stream_df = spark.readStream.format("delta").load("/data/stream")
# Join
joined_df = stream_df.join(static_df, "join_key")
# Two streaming sources
stream1 = spark.readStream.format("delta").load("/data/stream1")
stream2 = spark.readStream.format("delta").load("/data/stream2")
# Join with watermarks
joined = stream1.withWatermark("time1", "10 minutes") \
.join(
stream2.withWatermark("time2", "5 minutes"),
expr("join_key AND time1 >= time2 - interval 5 minutes AND time1 <= time2 + interval 5 minutes"),
"inner"
)
State Management:
Stream joins maintain state information between batches:
Optimizing Stream-Static Joins:
# Broadcast the static DataFrame
joined_df = stream_df.join(F.broadcast(static_df), "join_key")
# Configure state store
spark.conf.set("spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# Tight watermark for state cleanup
df = df.withWatermark("event_time", "30 minutes")
Key Exam Concepts:
Change Data Feed (CDF) captures row-level changes in Delta tables, enabling incremental processing patterns.
Change Data Feed Overview:
-- Enable CDF for a table
ALTER TABLE my_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
-- Create a new table with CDF enabled
CREATE TABLE my_table (id INT, name STRING)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Reading Change Data:
# Read changes since version 10
changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", 10) \
.table("my_table")
Each CDF record contains:
CDC vs. CDF:
APPLY CHANGES INTO:
The APPLY CHANGES INTO operation simplifies CDC/CDF processing:
-- Apply changes from source to target
APPLY CHANGES INTO target_table
FROM source_cdc_table
KEYS (id)
SEQUENCE BY sequence_num
Implementing CDC Pipeline:
# Read CDC data
cdc_df = spark.readStream.format("delta") \
.option("readChangeData", "true") \
.load("/data/source")
# Process by operation type
def process_cdc_batch(batch_df, batch_id):
# Extract operation types
inserts = batch_df.filter("_change_type = 'insert'").drop("_change_type")
updates = batch_df.filter("_change_type = 'update'").drop("_change_type")
deletes = batch_df.filter("_change_type = 'delete'").drop("_change_type")
# Apply to target table
if not inserts.isEmpty():
inserts.write.format("delta").mode("append").save("/data/target")
if not updates.isEmpty() or not deletes.isEmpty():
deltaTable = DeltaTable.forPath(spark, "/data/target")
# Handle updates
if not updates.isEmpty():
deltaTable.alias("target").merge(
updates.alias("updates"),
"target.id = updates.id"
).whenMatched().updateAll().execute()
# Handle deletes
if not deletes.isEmpty():
deltaTable.alias("target").merge(
deletes.alias("deletes"),
"target.id = deletes.id"
).whenMatched().delete().execute()
# Apply foreachBatch function
query = cdc_df.writeStream \
.foreachBatch(process_cdc_batch) \
.option("checkpointLocation", "/checkpoints/cdc") \
.start()
Key Exam Concepts:
Let’s test your understanding of today’s material with a short quiz:
Which of the following operations will trigger a shuffle in Spark? a) filter() b) select() c) groupBy() d) withColumn()
What is the primary difference between repartition() and coalesce()? a) repartition() can only increase partitions, coalesce() can only decrease them b) repartition() performs a full shuffle, coalesce() minimizes shuffling c) repartition() works on RDDs, coalesce() works on DataFrames d) repartition() is faster but uses more memory
In Structured Streaming, which output mode requires an aggregation operation? a) Append b) Update c) Complete d) Incremental
What is the purpose of watermarking in Structured Streaming? a) To filter out records with timestamps older than a threshold b) To handle late-arriving data and manage state cleanup c) To ensure exactly-once processing semantics d) To synchronize processing across multiple streams
Which join strategy is most efficient when joining a streaming DataFrame with a small static DataFrame? a) Shuffle join b) Broadcast join c) Sort-merge join d) Nested loop join
What does Change Data Feed (CDF) track in Delta Lake? a) Schema changes only b) Row-level changes (inserts, updates, deletes) c) Partition changes only d) Metadata changes only
Which operation is used to efficiently propagate changes from a source CDC table to a target Delta table? a) INSERT OVERWRITE b) MERGE INTO c) APPLY CHANGES INTO d) UPDATE SET
What is the main advantage of using foreachBatch in Structured Streaming over standard output sinks? a) It provides higher throughput b) It allows you to apply different operations to each batch c) It guarantees exactly-once processing d) It eliminates the need for checkpointing
Answers and Explanations:
c) groupBy() - This operation requires data with the same keys to be on the same partition, which necessitates a shuffle operation.
b) repartition() performs a full shuffle, coalesce() minimizes shuffling - repartition() redistributes data evenly through a full shuffle, while coalesce() combines existing partitions with minimal data movement.
c) Complete - Complete output mode writes the entire result table each time, which requires an aggregation to maintain the complete state.
b) To handle late-arriving data and manage state cleanup - Watermarking defines how late data can arrive and allows the system to clean up state for records older than the watermark.
b) Broadcast join - Broadcasting a small static DataFrame to all executors eliminates the need for shuffling in the streaming job.
b) Row-level changes (inserts, updates, deletes) - CDF tracks all row-level modifications to the table, including the operation type and the changed data.
c) APPLY CHANGES INTO - This operation is specifically designed to handle CDC data, automatically applying inserts, updates, and deletes based on the source data.
b) It allows you to apply different operations to each batch - foreachBatch gives you complete control over how each micro-batch is processed, enabling complex logic that isn’t possible with standard sinks.
we covered essential data processing techniques in Databricks:
The medallion architecture is a design pattern that organizes data in the Lakehouse into three distinct layers, each representing a different level of data refinement.
Core Layers of the Medallion Architecture:
Data Flow Through the Medallion Architecture:
Source Systems → Bronze → Silver → Gold → Consumption
Implementation Considerations:
Key Exam Concepts:
Data quality is essential for reliable analytics and decision-making. Delta Lake provides several mechanisms for enforcing data quality.
Data Quality Enforcement Techniques:
# Schema enforcement in action
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("id", StringType(), False),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
# Write with schema
df.write.format("delta").schema(schema).save("/path/to/table")
# Attempt to write incompatible data will fail
incompatible_df.write.format("delta").mode("append").save("/path/to/table")
-- Add NOT NULL constraint
ALTER TABLE customer_data ADD CONSTRAINT valid_id CHECK (id IS NOT NULL);
-- Add uniqueness constraint
ALTER TABLE customer_data ADD CONSTRAINT unique_id CHECK (id IS NOT NULL AND id <> '');
-- Add value range constraint
ALTER TABLE customer_data ADD CONSTRAINT valid_age CHECK (age > 0 AND age < 120);
Quality Expectations in Delta Live Tables:
# Bronze layer with minimal expectations
@dlt.table
def bronze_customer_data():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/data/raw/customers")
# Silver layer with quality constraints
@dlt.table(
constraint="valid_customer_record",
expect_or_drop="id IS NOT NULL AND age > 0"
)
def silver_customer_data():
return dlt.read("bronze_customer_data") \
.dropDuplicates(["id"]) \
.withColumn("processed_timestamp", current_timestamp())
Enforcement Modes:
# Fail the entire batch if any record violates constraints
@dlt.table(
constraint="valid_metric",
expect_or_fail="metric_value > 0"
)
# Drop records that violate constraints
@dlt.table(
constraint="valid_customer",
expect_or_drop="id IS NOT NULL AND age > 0"
)
# Quarantine records that violate constraints
@dlt.table
def silver_good_records():
return dlt.read("bronze_table").where("id IS NOT NULL")
@dlt.table
def silver_bad_records():
return dlt.read("bronze_table").where("id IS NULL")
Implementing a Data Quality Framework:
Key Exam Concepts:
Slowly Changing Dimensions (SCDs) are techniques for handling historical changes in dimensional data. Delta Lake provides powerful capabilities for implementing various SCD types.
SCD Type 0: Retain Original
# Simple overwrite with only new records (no updates to existing)
new_customers.write.format("delta").mode("append").saveAsTable("customers_type0")
SCD Type 1: Overwrite
# Using MERGE to implement Type 1 SCD
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "customers_type1")
# Merge operation
deltaTable.alias("target").merge(
source=updates_df.alias("updates"),
condition="target.customer_id = updates.customer_id"
).whenMatched().updateAll().execute()
SCD Type 2: Add New Row
# Read current dimension table
dim_customer = spark.read.format("delta").table("dim_customer")
# Identify changed records
# 1. Join new data with current dimension
# 2. Filter for changes in tracked columns
changes = new_data.alias("new") \
.join(dim_customer.alias("current"),
"customer_id",
"left_outer") \
.where("current.customer_id IS NULL OR " +
"new.name <> current.name OR " +
"new.address <> current.address") \
.select("new.*")
# Expire current records
if not changes.isEmpty():
deltaTable = DeltaTable.forName(spark, "dim_customer")
# Match records to be updated
matched_records = deltaTable.alias("dim") \
.merge(
changes.alias("updates"),
"dim.customer_id = updates.customer_id AND dim.is_current = true"
)
# Update matched records (expire them)
matched_records.whenMatched().updateExpr({
"is_current": "false",
"end_date": "current_date()",
"updated_timestamp": "current_timestamp()"
})
# Insert new versions
.whenNotMatched().insertExpr({
"customer_id": "updates.customer_id",
"name": "updates.name",
"address": "updates.address",
"is_current": "true",
"start_date": "current_date()",
"end_date": "null",
"updated_timestamp": "current_timestamp()"
}).execute()
SCD Type 3: Add New Columns
# Read current dimension
dim_product = spark.read.format("delta").table("dim_product")
# Identify changes
changes = new_data.alias("new") \
.join(dim_product.alias("current"),
"product_id",
"inner") \
.where("new.category <> current.category") \
.select("new.*", "current.category AS previous_category")
# Apply Type 3 updates
if not changes.isEmpty():
deltaTable = DeltaTable.forName(spark, "dim_product")
deltaTable.alias("dim").merge(
changes.alias("updates"),
"dim.product_id = updates.product_id"
).whenMatched().updateExpr({
"previous_category": "dim.category",
"category": "updates.category",
"updated_timestamp": "current_timestamp()"
}).execute()
SCD Type 6: Hybrid Approach (Combined Type 1 + 2 + 3)
# Complex implementation that combines Type 1, 2, and 3 approaches
# Not showing full code due to complexity, but key concepts include:
# 1. Identifying which attributes are Type 1 vs. Type 2
# 2. Maintaining version numbers for records
# 3. Using both effective dates and is_current flags
# 4. Selective updating based on attribute type
Key Exam Concepts:
Let’s walk through implementing a complete medallion architecture pipeline using Delta Lake and Delta Live Tables.
Scenario: Customer Order Processing Pipeline
We’ll build a pipeline that processes customer order data through the bronze, silver, and gold layers.
1. Bronze Layer: Raw Data Ingestion
# Using Delta Live Tables
@dlt.table(
name="bronze_orders",
comment="Raw order data from JSON files"
)
def bronze_orders():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/tmp/schema/orders")
.load("/data/raw/orders")
)
@dlt.table(
name="bronze_customers",
comment="Raw customer data from CSV files"
)
def bronze_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", "/tmp/schema/customers")
.option("header", "true")
.load("/data/raw/customers")
)
2. Silver Layer: Cleansed and Validated Data
@dlt.table(
name="silver_customers",
comment="Cleansed customer records",
constraint="valid_customer_id",
expect_or_drop="customer_id IS NOT NULL"
)
def silver_customers():
return (
dlt.read("bronze_customers")
.dropDuplicates(["customer_id"])
.withColumn("processed_date", current_date())
.withColumn("valid_email",
F.expr("email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,6}$'"))
)
@dlt.table(
name="silver_orders",
comment="Cleansed order records with validated references",
constraint="valid_order",
expect_or_drop="order_id IS NOT NULL AND customer_id IS NOT NULL AND amount > 0"
)
def silver_orders():
# Join with customers to validate references
customers = dlt.read("silver_customers").select("customer_id").distinct()
return (
dlt.read("bronze_orders")
.dropDuplicates(["order_id"])
.withColumn("order_date", F.to_date(F.col("order_timestamp")))
.withColumn("processed_date", current_date())
.join(customers, "customer_id", "left_semi") # Only keep orders with valid customers
)
3. Gold Layer: Business-Ready Aggregations
@dlt.table(
name="gold_customer_orders",
comment="Customer order summaries by day"
)
def gold_customer_orders():
return (
dlt.read("silver_orders")
.groupBy("customer_id", "order_date")
.agg(
F.count("order_id").alias("order_count"),
F.sum("amount").alias("total_amount"),
F.avg("amount").alias("avg_order_value"),
F.max("amount").alias("largest_order")
)
)
@dlt.table(
name="gold_daily_sales",
comment="Daily sales aggregations"
)
def gold_daily_sales():
return (
dlt.read("silver_orders")
.groupBy("order_date")
.agg(
F.count("order_id").alias("total_orders"),
F.sum("amount").alias("total_sales"),
F.countDistinct("customer_id").alias("unique_customers")
)
)
4. Pipeline Configuration
{
"name": "orders_pipeline",
"configuration": {
"pipelines.autoOptimize.managed": "true",
"pipelines.autoOptimize.zOrderCols": {
"silver_orders": ["order_date", "customer_id"],
"silver_customers": ["customer_id"],
"gold_customer_orders": ["order_date", "customer_id"],
"gold_daily_sales": ["order_date"]
}
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"continuous": false,
"development": false
}
Key Implementation Concepts:
Key Exam Concepts:
Security and governance are critical aspects of enterprise data platforms, ensuring data protection, compliance, and appropriate access controls.
Core Security & Governance Areas:
Databricks Security Architecture:
Databricks implements security at multiple layers:
Databricks Unity Catalog:
Unity Catalog provides a unified governance solution for the Databricks Lakehouse:
Key Exam Concepts:
Dynamic views provide a powerful mechanism for implementing data masking and access control in Databricks.
Dynamic Views Concept:
Creating a Dynamic View for Column Masking:
-- Create a view that masks PII based on user group
CREATE OR REPLACE VIEW masked_customer_data AS
SELECT
customer_id,
-- Full name only visible to admins
CASE WHEN is_member('admins') THEN full_name
ELSE 'REDACTED'
END AS full_name,
-- Last 4 digits of SSN for finance team, nothing for others
CASE WHEN is_member('admins') THEN ssn
WHEN is_member('finance') THEN CONCAT('XXX-XX-', RIGHT(ssn, 4))
ELSE 'XXX-XX-XXXX'
END AS ssn,
-- Email domain only for most users
CASE WHEN is_member('admins') THEN email
WHEN is_member('marketing') THEN email
ELSE CONCAT('user@', SPLIT(email, '@')[1])
END AS email,
-- Only show birth year for most users
CASE WHEN is_member('admins') THEN date_of_birth
ELSE CONCAT(YEAR(date_of_birth), '-01-01')
END AS date_of_birth,
-- Non-sensitive fields visible to all
address_city,
address_state,
customer_status
FROM customers
Row-Level Security with Dynamic Views:
-- Create a view with row-level filtering based on region access
CREATE OR REPLACE VIEW regional_sales AS
SELECT *
FROM sales
WHERE
-- Admins see all regions
is_member('admins') OR
-- Regional managers see only their region
(is_member('region_na') AND region = 'North America') OR
(is_member('region_emea') AND region = 'EMEA') OR
(is_member('region_apac') AND region = 'APAC')
Combining Row and Column Security:
-- View that implements both row and column level security
CREATE OR REPLACE VIEW secured_employee_data AS
SELECT
employee_id,
-- Column masking
CASE WHEN is_member('hr') OR is_member('admins') THEN salary
ELSE NULL
END AS salary,
CASE WHEN is_member('hr') OR is_member('admins') THEN personal_email
ELSE work_email
END AS contact_email,
department,
title,
hire_date
FROM employees
WHERE
-- Row filtering
is_member('admins') OR
-- Managers see their department
(is_member('managers') AND department IN
(SELECT department FROM employee_managers WHERE manager_id = current_user())) OR
-- Employees see only themselves
(is_member('employees') AND employee_id IN
(SELECT employee_id FROM employees WHERE email = current_user()))
Best Practices for Dynamic Views:
Key Exam Concepts:
Beyond dynamic views, Databricks provides additional mechanisms for implementing row and column-level security.
Unity Catalog Column-Level Security:
Unity Catalog allows direct control over column-level access:
-- Grant access to specific columns
GRANT SELECT ON TABLE customers(customer_id, name, city) TO role_analyst;
-- Deny access to sensitive columns
DENY SELECT ON TABLE customers(credit_card, ssn) TO role_analyst;
Row-Level Security with Row Filters:
-- Create a row filter
CREATE ROW FILTER region_filter ON sales AS
CASE WHEN is_member('admins') THEN TRUE
WHEN is_member('na_sales') AND region = 'North America' THEN TRUE
WHEN is_member('emea_sales') AND region = 'EMEA' THEN TRUE
ELSE FALSE
END;
-- Apply the row filter to a role
ALTER TABLE sales ADD ROW FILTER region_filter TO role_regional_analyst;
Column-Level Security with Column Masks:
-- Create a column mask for PII
CREATE MASK pii_mask ON TABLE customers COLUMN email AS
CASE WHEN is_member('admins') THEN email
WHEN is_member('marketing') THEN email
ELSE CONCAT('user@', SPLIT(email, '@')[1])
END;
-- Apply the mask to a role
ALTER TABLE customers ALTER COLUMN email SET MASK pii_mask TO role_analyst;
Data Access Patterns:
Different access patterns require different security approaches:
Key Exam Concepts:
Let’s test your understanding of today’s material with a short quiz:
In the medallion architecture, which layer typically contains data that has been cleansed and standardized but not yet aggregated? a) Bronze layer b) Silver layer c) Gold layer d) Platinum layer
Which SCD type maintains the complete history of dimension changes by creating new records when attributes change? a) SCD Type 0 b) SCD Type 1 c) SCD Type 2 d) SCD Type 3
When implementing data quality checks in Delta Live Tables, which constraint mode would you use to fail the entire pipeline if any record violates the constraint? a) expect_or_drop b) expect_or_fail c) expect_or_quarantine d) expect_all_or_none
Which of the following is NOT one of the four main areas of data governance? a) Data Security b) Data Lineage c) Data Privacy d) Data Transformation
What is the primary purpose of a dynamic view in Databricks? a) To improve query performance through materialization b) To automatically update when source data changes c) To implement row and column-level security based on user attributes d) To create temporary views of data that expire after a session
In Unity Catalog, which of the following represents the correct hierarchy (from highest to lowest)? a) Catalog → Schema → Table b) Database → Catalog → Table c) Workspace → Schema → Table d) Catalog → Table → Column
Which method provides the most efficient way to mask sensitive columns for different user groups? a) Creating separate tables for each user group b) Using dynamic views with conditional logic c) Creating column masks in Unity Catalog d) Encrypting sensitive columns in the base table
Which SCD implementation would be most appropriate for tracking customer address changes while maintaining historical records? a) SCD Type 0 b) SCD Type 1 c) SCD Type 2 d) SCD Type 3
Answers and Explanations:
b) Silver layer - The silver layer contains data that has been cleansed, validated, and standardized, but has not yet been aggregated or transformed for specific business use cases (which happens in the gold layer).
c) SCD Type 2 - Type 2 SCDs create new records when dimension attributes change, typically using start and end dates or current/historical flags to track the complete history.
b) expect_or_fail - The expect_or_fail constraint causes the entire pipeline to fail if any record violates the specified condition, which is useful for critical data quality checks.
d) Data Transformation - The four main areas of data governance are Data Security, Data Governance (metadata management), Compliance, and Privacy. Data Transformation is part of data processing, not governance.
c) To implement row and column-level security based on user attributes - Dynamic views in Databricks are primarily used to implement security controls that adjust what data is visible based on who is viewing it.
a) Catalog → Schema → Table - In Unity Catalog, the hierarchy is Catalog at the top, containing Schemas (also called Databases), which contain Tables and other objects.
c) Creating column masks in Unity Catalog - Column masks in Unity Catalog provide the most efficient and maintainable way to implement column-level security across multiple users and roles.
c) SCD Type 2 - SCD Type 2 is ideal for tracking customer address changes over time while maintaining the complete history, which is important for historical analysis and compliance.
We covered essential data modeling and security concepts in Databricks:
The Spark UI is the primary tool for analyzing and troubleshooting Spark application performance. Understanding how to interpret the information it provides is crucial for optimizing data pipelines.
Accessing Spark UI in Databricks:
The Spark UI is available in multiple ways:
Key Components of the Spark UI:
Jobs Tab: The Jobs tab shows high-level execution information for each job:
When analyzing the Jobs tab, look for:
Stages Tab: The Stages tab provides deeper insights into each stage:
Critical metrics to examine:
Executors Tab: The Executors tab shows resource utilization across the cluster:
Key indicators of problems:
Storage Tab: The Storage tab displays information about cached data:
Environment Tab: Contains configuration settings for the Spark application:
SQL Tab: For Spark SQL and DataFrame operations:
Common Performance Issues and Indicators:
1. Data Skew:
2. Shuffle Bottlenecks:
3. Memory Pressure:
4. Inefficient Query Plans:
Key Exam Concepts:
Streaming jobs require continuous monitoring to ensure they meet performance SLAs and operate reliably.
Key Structured Streaming Metrics:
Processing Rate Metrics:
Latency Metrics:
Operational Metrics:
Monitoring Streaming Jobs in Databricks:
1. Structured Streaming UI: Available through the Spark UI, provides:
2. Streaming Query Metrics:
# Get active streaming queries
active_streams = spark.streams.active
# For each active stream
for stream in active_streams:
# Get recent progress metrics
progress = stream.recentProgress
# Get current status
status = stream.status
# Get specific metrics
latest = progress[-1] if progress else None
if latest:
print(f"Query: {stream.name}")
print(f"Input rate: {latest['inputRowsPerSecond']}")
print(f"Processing rate: {latest['processedRowsPerSecond']}")
print(f"Batch duration: {latest['batchDuration']}")
3. Metrics Collection for Long-Term Analysis:
# Function to log streaming metrics to Delta table
def log_streaming_metrics(batch_df, batch_id):
# Get all active streams
active_streams = spark.streams.active
# Collect metrics
metrics = []
for stream in active_streams:
progress = stream.lastProgress
if progress:
metrics.append({
"timestamp": datetime.now(),
"query_id": stream.id.toString(),
"query_name": stream.name,
"input_rows_per_second": progress.get("inputRowsPerSecond", 0),
"processed_rows_per_second": progress.get("processedRowsPerSecond", 0),
"batch_duration_ms": progress.get("batchDuration", 0),
"num_input_rows": progress.get("numInputRows", 0),
"batch_id": batch_id
})
# Create DataFrame from metrics and write to Delta table
if metrics:
metrics_df = spark.createDataFrame(metrics)
metrics_df.write.format("delta").mode("append").saveAsTable("streaming_metrics")
# Apply to streaming query
query = df.writeStream \
.foreachBatch(log_streaming_metrics) \
.outputMode("update") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
SLA Monitoring and Alerting:
1. Define SLA Metrics:
2. Implement Monitoring Dashboards:
# Create dashboard for streaming metrics
from pyspark.sql.functions import col, window
# Read metrics table
metrics = spark.read.table("streaming_metrics")
# Aggregations for dashboard
sla_metrics = metrics.groupBy(window("timestamp", "1 hour")) \
.agg(
min("processed_rows_per_second").alias("min_rate"),
max("batch_duration_ms").alias("max_batch_duration"),
avg("input_rows_per_second").alias("avg_input_rate")
) \
.withColumn("sla_throughput_met", col("min_rate") > 1000) \
.withColumn("sla_latency_met", col("max_batch_duration") < 60000)
# Write to dashboard table
sla_metrics.write.format("delta").mode("overwrite").saveAsTable("streaming_sla_dashboard")
3. Implement Alerting:
# Function to check SLAs and trigger alerts
def check_slas(batch_df, batch_id):
# Get latest metrics
active_streams = spark.streams.active
for stream in active_streams:
progress = stream.lastProgress
if progress:
# Check throughput SLA
input_rate = progress.get("inputRowsPerSecond", 0)
processed_rate = progress.get("processedRowsPerSecond", 0)
if processed_rate < 1000: # Example SLA threshold
# Trigger throughput alert
send_alert(f"Throughput SLA violated: {processed_rate} rows/sec")
# Check latency SLA
batch_duration = progress.get("batchDuration", 0)
if batch_duration > 60000: # Example SLA threshold (60 seconds)
# Trigger latency alert
send_alert(f"Latency SLA violated: {batch_duration} ms")
# Helper function to send alerts
def send_alert(message):
# Integration with alerting system (email, Slack, etc.)
print(f"ALERT: {message}")
# In production, integrate with actual alerting systems
Key Exam Concepts:
Beyond the Spark UI, Databricks provides additional interfaces for monitoring cluster resources and overall performance.
Ganglia UI:
Ganglia is a distributed monitoring system for clusters that collects and visualizes key system metrics:
Cluster-Level Metrics:
Node-Level Metrics:
Interpreting Ganglia Metrics:
1. CPU Metrics:
2. Memory Metrics:
3. Network Metrics:
4. Disk Metrics:
Databricks Cluster UI:
The Databricks Cluster UI provides information specific to the Databricks environment:
Cluster Configuration:
Driver and Executor Logs:
Cluster Metrics:
Common Issues and Signs:
1. Resource Underutilization:
2. Resource Overutilization:
3. Imbalanced Cluster:
4. Unstable Cluster:
Key Exam Concepts:
Let’s walk through a practical approach to identifying and solving common performance issues in Databricks.
Scenario: Investigating a Slow-Running ETL Pipeline
You’ve received alerts that an important ETL pipeline is taking significantly longer than usual. Let’s go through a systematic approach to debug and optimize it.
Step 1: Identify Bottlenecks in Spark UI
First, examine the Jobs tab to find the slowest jobs:
Sample Job Analysis:
- Job 4: 45 minutes (normally takes 10 minutes)
- Stage 10: 40 minutes
- Stage 11: 5 minutes
Step 2: Analyze Slow Stages
For the slowest stage (Stage 10), examine:
Stage 10 Analysis:
- 1000 total tasks
- Task duration: min=5s, median=20s, max=35min
- Significant task skew detected
- Large shuffle write: 200GB
Step 3: Check Ganglia for Resource Utilization
Look at Ganglia metrics during the slow period:
Ganglia Findings:
- Memory utilization near maximum
- High disk I/O on several nodes
- GC activity increased
- Network traffic normal
Step 4: Examine Query Plan for Inefficiencies
For SQL operations, analyze the query plan:
# Examine the query plan
slow_query.explain(True)
# Sample output showing inefficient join:
# == Physical Plan ==
# *(5) SortMergeJoin [large_key], [large_key]
# :- *(2) Sort [large_key ASC], false, 0
# : +- Exchange hashpartitioning(large_key, 200)
# : +- *(1) Filter (isnotnull(large_key))
# : +- Scan Delta [large_table]
# +- *(4) Sort [large_key ASC], false, 0
# +- Exchange hashpartitioning(large_key, 200)
# +- *(3) Filter (isnotnull(large_key))
# +- Scan Delta [other_large_table]
Step 5: Implement Targeted Optimizations
Based on the analysis, implement specific optimizations:
For Data Skew:
# Add salting to skewed key
from pyspark.sql.functions import rand, col
# Instead of joining directly on skewed_key
df = df.withColumn("salted_key", concat(col("skewed_key"), (rand() * 10).cast("int").cast("string")))
# Join on salted key
result = df.join(other_df.withColumn("salted_key",
explode(array([lit(i) for i in range(10)]))
.withColumn("salted_key",
concat(col("skewed_key"), col("salted_key").cast("string")))),
"salted_key")
For Inefficient Joins:
# Use broadcast join for small tables
from pyspark.sql.functions import broadcast
# Before: Regular join causing shuffle
result = large_df.join(small_df, "key")
# After: Broadcast the small table
result = large_df.join(broadcast(small_df), "key")
For Memory Pressure:
# Repartition to better distribute data
df = df.repartition(500, "key")
# Control caching strategy
df.cache() # Default caching
# or
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if needed
For Inefficient I/O:
# Add appropriate file size optimization
df.write.option("maxRecordsPerFile", 1000000).save("/path")
# Add partitioning for query efficiency
df.write.partitionBy("date", "region").save("/path")
Step 6: Monitor Improvements
After implementing optimizations:
Key Debugging Approaches:
Key Exam Concepts:
Databricks Jobs provide a way to schedule, monitor, and manage production workloads reliably.
Job Components:
1. Tasks:
2. Job Configuration:
3. Workflows:
Creating Multi-Task Jobs:
Simple Job with Sequential Tasks:
{
"name": "Customer Data Processing",
"tasks": [
{
"task_key": "ingest_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/ingest_customer_data",
"base_parameters": {
"data_date": "2023-05-01"
}
},
"existing_cluster_id": "0923-164819-h73vpgf0"
},
{
"task_key": "transform_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/transform_customer_data",
"base_parameters": {
"data_date": "2023-05-01"
}
},
"existing_cluster_id": "0923-164819-h73vpgf0",
"depends_on": [
{
"task_key": "ingest_data"
}
]
},
{
"task_key": "load_analytics",
"notebook_task": {
"notebook_path": "/Shared/ETL/load_analytics",
"base_parameters": {
"data_date": "2023-05-01"
}
},
"existing_cluster_id": "0923-164819-h73vpgf0",
"depends_on": [
{
"task_key": "transform_data"
}
]
}
]
}
Complex Job with Parallel Tasks:
{
"name": "Enterprise Data Pipeline",
"tasks": [
{
"task_key": "ingest_raw_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/ingest_raw_data"
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
},
{
"task_key": "process_customer_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/process_customer_data"
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"depends_on": [
{
"task_key": "ingest_raw_data"
}
]
},
{
"task_key": "process_product_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/process_product_data"
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"depends_on": [
{
"task_key": "ingest_raw_data"
}
]
},
{
"task_key": "process_order_data",
"notebook_task": {
"notebook_path": "/Shared/ETL/process_order_data"
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
},
"depends_on": [
{
"task_key": "ingest_raw_data"
}
]
},
{
"task_key": "build_analytics",
"notebook_task": {
"notebook_path": "/Shared/ETL/build_analytics"
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
},
"depends_on": [
{
"task_key": "process_customer_data"
},
{
"task_key": "process_product_data"
},
{
"task_key": "process_order_data"
}
]
}
]
}
Job Scheduling Options:
Retry and Recovery Policies:
{
"task_key": "critical_task",
"notebook_task": {
"notebook_path": "/Shared/ETL/critical_processing"
},
"existing_cluster_id": "0923-164819-h73vpgf0",
"retry_on_timeout": true,
"max_retries": 3,
"min_retry_interval_millis": 60000,
"retry_on_timeout": true
}
Parameter Passing Between Tasks:
# In a notebook task, set output parameters
dbutils.jobs.taskValues.set(key = "customer_count", value = customer_df.count())
dbutils.jobs.taskValues.set(key = "process_date", value = current_date)
# In a dependent task, retrieve values
customer_count = dbutils.jobs.taskValues.get(taskKey = "ingest_data", key = "customer_count")
process_date = dbutils.jobs.taskValues.get(taskKey = "ingest_data", key = "process_date")
Notification and Alerting:
{
"name": "Production ETL Pipeline",
"email_notifications": {
"on_start": ["etl-monitoring@company.com"],
"on_success": ["etl-monitoring@company.com"],
"on_failure": ["etl-alerts@company.com", "oncall@company.com"],
"no_alert_for_skipped_runs": false
},
"webhook_notifications": {
"on_failure": [
{
"id": "webhook-id"
}
]
},
"tasks": [
// Task definitions
]
}
Key Exam Concepts:
The Databricks CLI and REST API provide programmatic ways to interact with Databricks resources, enabling automation and integration with external systems.
Databricks CLI:
Installation and Configuration:
# Install Databricks CLI
pip install databricks-cli
# Configure authentication
databricks configure --token
# Enter host (e.g., https://your-workspace.cloud.databricks.com)
# Enter token
# Verify configuration
databricks workspace ls
Working with Workspace Assets:
# List workspace contents
databricks workspace ls /Shared
# Export notebook
databricks workspace export /Shared/MyNotebook -f JUPYTER -o my_notebook.ipynb
# Import notebook
databricks workspace import my_notebook.ipynb /Shared/ImportedNotebook -l PYTHON -f JUPYTER
# Create directory
databricks workspace mkdirs /Shared/NewDirectory
Managing Jobs:
# List jobs
databricks jobs list
# Get job details
databricks jobs get --job-id 123
# Run a job
databricks jobs run-now --job-id 123
# Create a job from JSON definition
databricks jobs create --json @job_definition.json
Cluster Management:
# List clusters
databricks clusters list
# Start/stop cluster
databricks clusters start --cluster-id 0923-164819-h73vpgf0
databricks clusters terminate --cluster-id 0923-164819-h73vpgf0
# Create a cluster
databricks clusters create --json @cluster_config.json
DBFS Operations:
# List files
databricks fs ls dbfs:/data/
# Copy files to DBFS
databricks fs cp local_file.csv dbfs:/data/
# Move files within DBFS
databricks fs mv dbfs:/data/old.csv dbfs:/data/new.csv
# Remove files
databricks fs rm dbfs:/data/old_data.csv
Databricks REST API:
The REST API enables the same operations with direct HTTP requests:
Authentication:
import requests
# Authentication
headers = {
"Authorization": f"Bearer {your_access_token}",
"Content-Type": "application/json"
}
# Base URL
base_url = "https://your-workspace.cloud.databricks.com/api/2.0"
Working with Jobs:
# List jobs
response = requests.get(
f"{base_url}/jobs/list",
headers=headers
)
jobs = response.json()
# Run a job
job_id = 123
response = requests.post(
f"{base_url}/jobs/run-now",
headers=headers,
json={"job_id": job_id}
)
run_id = response.json()["run_id"]
# Get run status
response = requests.get(
f"{base_url}/jobs/runs/get",
headers=headers,
params={"run_id": run_id}
)
run_status = response.json()["state"]["life_cycle_state"]
Creating a Job:
# Job definition
job_config = {
"name": "API Created Job",
"tasks": [
{
"task_key": "main_task",
"notebook_task": {
"notebook_path": "/Shared/MyNotebook",
"base_parameters": {
"param1": "value1"
}
},
"new_cluster": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 10 ? * MON-FRI",
"timezone_id": "America/Los_Angeles"
}
}
# Create job
response = requests.post(
f"{base_url}/jobs/create",
headers=headers,
json=job_config
)
new_job_id = response.json()["job_id"]
Automating Deployment with CI/CD:
import os
import requests
import json
# Function to deploy a notebook to production
def deploy_notebook(notebook_path, target_path):
# Export from development workspace
export_response = requests.get(
f"{dev_workspace_url}/api/2.0/workspace/export",
headers=dev_headers,
params={
"path": notebook_path,
"format": "SOURCE"
}
)
if export_response.status_code != 200:
raise Exception(f"Failed to export: {export_response.text}")
# Import to production workspace
import_response = requests.post(
f"{prod_workspace_url}/api/2.0/workspace/import",
headers=prod_headers,
json={
"path": target_path,
"format": "SOURCE",
"content": export_response.json()["content"],
"overwrite": True
}
)
if import_response.status_code != 200:
raise Exception(f"Failed to import: {import_response.text}")
print(f"Successfully deployed {notebook_path} to {target_path}")
# Example usage in CI/CD pipeline
if __name__ == "__main__":
# Get environment variables (set by CI/CD system)
dev_workspace = os.environ["DEV_WORKSPACE_URL"]
dev_token = os.environ["DEV_TOKEN"]
prod_workspace = os.environ["PROD_WORKSPACE_URL"]
prod_token = os.environ["PROD_TOKEN"]
# Set up headers
dev_headers = {
"Authorization": f"Bearer {dev_token}",
"Content-Type": "application/json"
}
prod_headers = {
"Authorization": f"Bearer {prod_token}",
"Content-Type": "application/json"
}
# Deploy notebooks
deploy_notebook("/Dev/ETL/ingest_data", "/Prod/ETL/ingest_data")
deploy_notebook("/Dev/ETL/transform_data", "/Prod/ETL/transform_data")
# Update job configuration
# ... similar code to update job definitions
Key Exam Concepts:
Implementing continuous integration and continuous deployment (CI/CD) for Databricks requires specific patterns and tools.
CI/CD Components for Databricks:
1. Version Control Integration:
2. Development Environments:
3. Automated Testing:
4. Deployment Automation:
Common CI/CD Patterns:
Pattern 1: Notebook-Based Development and Deployment
Implementation with Databricks Repos:
# Clone repository in Databricks
dbutils.widgets.get("Clone Repository")
# Run in Repos UI
# Run tests within notebook
def test_transformation():
test_data = spark.createDataFrame([
(1, "test"),
(2, "test2")
], ["id", "name"])
result = transform_function(test_data)
# Assertions
assert result.count() == 2
assert "transformed_column" in result.columns
# Test execution
test_transformation()
Pattern 2: External CI/CD with API Integration
GitHub Actions Workflow Example:
name: Databricks CI/CD
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.8
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest databricks-cli
- name: Run unit tests
run: pytest tests/
deploy-to-test:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Deploy to Test
env:
DATABRICKS_HOST: $
DATABRICKS_TOKEN: $
run: |
python deployment/deploy_to_databricks.py --environment test
- name: Run Integration Tests
env:
DATABRICKS_HOST: $
DATABRICKS_TOKEN: $
run: |
python tests/integration/run_integration_tests.py
deploy-to-prod:
needs: deploy-to-test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Deploy to Production
env:
DATABRICKS_HOST: $
DATABRICKS_TOKEN: $
run: |
python deployment/deploy_to_databricks.py --environment prod
Pattern 3: Delta Live Tables (DLT) CI/CD
DLT Pipeline Deployment:
# Python script to update DLT pipeline configuration
import requests
import json
import os
def update_dlt_pipeline(pipeline_id, config_updates, workspace_url, token):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Get current pipeline config
get_response = requests.get(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}",
headers=headers
)
if get_response.status_code != 200:
raise Exception(f"Failed to get pipeline: {get_response.text}")
current_config = get_response.json()
# Update config
for key, value in config_updates.items():
if key in current_config:
current_config[key] = value
# Update pipeline
update_response = requests.put(
f"{workspace_url}/api/2.0/pipelines/{pipeline_id}",
headers=headers,
json=current_config
)
if update_response.status_code != 200:
raise Exception(f"Failed to update pipeline: {update_response.text}")
print(f"Successfully updated pipeline {pipeline_id}")
# Example usage
if __name__ == "__main__":
# Environment-specific configurations
env = os.environ.get("DEPLOY_ENV", "dev")
if env == "dev":
target_dir = "/development"
cluster_size = "Small"
elif env == "test":
target_dir = "/test"
cluster_size = "Medium"
elif env == "prod":
target_dir = "/production"
cluster_size = "Large"
# Update pipeline
update_dlt_pipeline(
pipeline_id="12345",
config_updates={
"name": f"Customer Data Pipeline - {env.upper()}",
"target": f"{target_dir}/customer_data",
"configuration": {
"pipeline.trigger.interval": "1 hour" if env == "prod" else "6 hours"
},
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 8 if env == "prod" else 4
}
}
]
},
workspace_url=os.environ["DATABRICKS_HOST"],
token=os.environ["DATABRICKS_TOKEN"]
)
Key Exam Concepts:
Let’s test your understanding of today’s material with a short quiz:
In the Spark UI, which tab would you examine to identify data skew in a specific stage? a) Jobs b) Stages c) Executors d) SQL
Which metric is most important to monitor for streaming applications to ensure they are keeping up with incoming data? a) Processing rate vs. input rate b) Batch processing time c) Record count d) Total duration
When setting up a multi-task job in Databricks, how do you specify that Task B should only run after Task A completes successfully? a) Set “run_after” parameter in Task B b) Add Task A as a prerequisite in Task B c) Include “depends_on” with Task A’s task_key in Task B d) Configure Task A to trigger Task B
Which Databricks CLI command would you use to programmatically create a job from a JSON configuration file? a) databricks jobs create-from-file job_config.json b) databricks jobs create –json @job_config.json c) databricks jobs import –file job_config.json d) databricks jobs new –config job_config.json
What is a key advantage of using Databricks Repos for CI/CD compared to external Git integration? a) Built-in version control within the Databricks workspace b) Automatic deployment to production c) Better performance for notebook execution d) Support for more Git providers
In Ganglia UI, which metric would best indicate memory pressure on your cluster? a) CPU utilization b) Network bytes in/out c) Swap usage d) Disk I/O
Which REST API endpoint would you use to run an existing Databricks job? a) /api/2.0/jobs/start b) /api/2.0/jobs/execute c) /api/2.0/jobs/run-now d) /api/2.0/jobs/trigger
In a CI/CD pipeline for Databricks, which approach is recommended for handling environment-specific configurations? a) Hard-code configurations in notebooks for each environment b) Use different branches for different environments c) Parameterize notebooks and jobs with environment-specific values d) Duplicate workspaces for each environment
Answers and Explanations:
b) Stages - The Stages tab provides detailed information about task distribution and duration, which helps identify skew where some tasks take significantly longer than others.
a) Processing rate vs. input rate - Monitoring the relationship between processing rate and input rate is crucial to ensure the streaming application can keep up with incoming data; if input rate consistently exceeds processing rate, the application will fall behind.
c) Include “depends_on” with Task A’s task_key in Task B - In Databricks Jobs, task dependencies are specified using the “depends_on” parameter with the dependent task’s task_key.
b) databricks jobs create –json @job_config.json - This is the correct syntax for creating a job from a JSON configuration file using the Databricks CLI.
a) Built-in version control within the Databricks workspace - Databricks Repos provides native Git integration directly within the workspace, allowing developers to version control notebooks without leaving the Databricks environment.
c) Swap usage - High swap usage indicates that the system is under memory pressure and is using disk space as virtual memory, which significantly impacts performance.
c) /api/2.0/jobs/run-now - This is the correct endpoint for triggering an existing job to run via the Databricks REST API.
c) Parameterize notebooks and jobs with environment-specific values - This approach allows the same code to be deployed across environments while adapting configurations based on the target environment, which is a best practice for CI/CD.
We covered essential monitoring, testing, and deployment concepts in Databricks:
These concepts are crucial for managing, monitoring, and deploying production-grade data pipelines in Databricks. Tomorrow, we’ll review all the material covered so far and focus on comprehensive practice exams to prepare for the certification.
Let’s start by reviewing the essential concepts from each exam section:
Section 1: Databricks Tooling
Delta Lake Core Concepts:
Performance Optimization Techniques:
Delta Lake Operations:
Section 2: Data Processing
Batch Processing Concepts:
Streaming Processing:
Incremental Processing:
Section 3: Data Modeling
Medallion Architecture:
Data Quality:
Slowly Changing Dimensions:
Section 4: Security & Governance
Security Fundamentals:
Data Masking:
Section 5: Monitoring & Logging
Performance Analysis:
Streaming Monitoring:
Section 6: Testing & Deployment
Job Orchestration:
Automation:
Now, let’s test your knowledge with a half-length practice exam covering all sections proportionally.
Instructions:
Question 1: What is the primary purpose of the transaction log in Delta Lake?
A) To store table schema information
B) To track all changes made to the table
C) To optimize query performance
D) To manage access control permissions
Question 2: When implementing Z-ordering on a Delta table, which columns should you prioritize?
A) High-cardinality columns used in JOIN conditions
B) Low-cardinality columns used in filtering
C) Columns with the largest data sizes
D) Timestamp columns regardless of query patterns
Question 3: What happens when you execute a MERGE operation on a Delta table with a condition that matches multiple source records to the same target record?
A) The operation succeeds and applies all matches sequentially
B) The operation fails with a runtime error about duplicate matches
C) The operation succeeds but only applies the first match
D) The operation requires a “matchedBy” clause to resolve duplicates
Question 4: Which partition strategy would be most efficient for a table that is frequently queried with filters on both “date” (daily, high cardinality) and “region” (10 distinct values)?
A) Partition by date only
B) Partition by region only
C) Partition by both date and region
D) Do not partition but use Z-ORDER by date, region
Question 5: What is the main difference between repartition() and coalesce() in Spark?
A) repartition() can only decrease partitions, coalesce() can only increase them
B) repartition() performs a full shuffle, coalesce() avoids shuffling when reducing partitions
C) repartition() works with any data type, coalesce() only works with numeric columns
D) repartition() preserves ordering, coalesce() does not preserve ordering
Question 6: In Structured Streaming, which watermark configuration would allow processing of events that arrive up to 2 hours late?
A) .withWatermark("eventTime", "2 hours")
B) .withWatermark("eventTime", "120 minutes")
C) .option("watermarkDelay", "2 hours")
D) Both A and B are correct
Question 7: Which output mode in Structured Streaming requires an aggregation and updates the entire result table when triggered?
A) Append mode
B) Update mode
C) Complete mode
D) Overwrite mode
Question 8: When implementing a stream-static join, what is the best practice for the static DataFrame?
A) Cache it before joining
B) Broadcast it if it’s small enough
C) Repartition it to match the streaming DataFrame’s partitioning
D) Checkpoint it to ensure consistency
Question 9: In the medallion architecture, where should deduplication of records typically occur?
A) In the bronze layer
B) During transformation from bronze to silver
C) During transformation from silver to gold
D) In all layers independently
Question 10: Which constraint type in Delta Live Tables will drop records that don’t meet the condition rather than failing the entire batch?
A) expect
B) expect_or_drop
C) expect_or_fail
D) expect_or_null
Question 11: Which SCD type would you implement if you need to track the complete history of customer address changes, including when each address was valid?
A) SCD Type 0
B) SCD Type 1
C) SCD Type 2
D) SCD Type 3
Question 12: What type of join would be most efficient when joining a large fact table with a small dimension table in Spark?
A) Shuffle hash join
B) Broadcast join
C) Sort-merge join
D) Nested loop join
Question 13: Which approach is most efficient for handling late-arriving data in a streaming application that computes hourly aggregations?
A) Use a larger trigger interval to wait for late data
B) Implement watermarking with allowed lateness
C) Store raw events and recompute aggregations periodically
D) Use foreachBatch to manually handle late records
Question 14: In Delta Lake, what happens when you run VACUUM on a table?
A) It physically removes data files not referenced in the current table version
B) It optimizes the metadata for faster queries
C) It compacts small files into larger ones
D) It removes old versions from the transaction log
Question 15: Which feature of Delta Lake would you use to implement incremental processing based on changes to source data?
A) Time travel
B) MERGE operation
C) Change Data Feed
D) COPY INTO command
Question 16: Which approach provides the most secure method for masking sensitive columns for different user groups in Databricks?
A) Creating separate tables for each user group
B) Using dynamic views with conditional expressions
C) Creating column masks in Unity Catalog
D) Encrypting sensitive columns in the base table
Question 17: What is the primary benefit of using Unity Catalog in Databricks?
A) Improved query performance through optimization
B) Unified governance across workspaces and clouds
C) Automatic data quality enforcement
D) Built-in data lineage visualization
Question 18: In the Spark UI, which tab would you examine to identify if a stage has skewed task durations?
A) Jobs tab
B) Stages tab
C) Executors tab
D) Storage tab
Question 19: Which metric is most important to monitor in a streaming application to ensure it’s keeping up with incoming data?
A) Processing rate vs. input rate
B) Batch duration over time
C) Number of records processed
D) Memory usage of executors
Question 20: When designing a multi-task job in Databricks, what’s the best way to pass data between tasks?
A) Write temporary tables that subsequent tasks read
B) Use dbutils.jobs.taskValues API
C) Configure job parameters that all tasks access
D) Store data in DBFS for shared access
Question 21: Which Databricks CLI command would you use to export a notebook to the local file system?
A) databricks workspace export
B) databricks workspace get
C) databricks notebooks download
D) databricks fs cp
Question 22: What is a shallow clone in Delta Lake?
A) A copy of the table data with a reduced schema
B) A reference to the original table’s files with its own transaction log
C) A sampled subset of the original table
D) A copy with only the most recent version of the data
Question 23: Which storage level is most appropriate for a DataFrame that is reused multiple times but too large to fit entirely in memory?
A) MEMORY_ONLY
B) MEMORY_AND_DISK
C) DISK_ONLY
D) OFF_HEAP
Question 24: What is the primary purpose of using foreachBatch in Structured Streaming instead of standard output sinks?
A) It provides higher throughput for large batches
B) It allows applying different operations to each micro-batch
C) It eliminates the need for checkpointing
D) It guarantees exactly-once processing semantics
Question 25: When implementing CDC processing with Delta Lake, which operation simplifies applying inserts, updates, and deletes from a CDC source?
A) MERGE INTO
B) APPLY CHANGES INTO
C) UPDATE
D) UPSERT
Question 26: Which technique would you use to optimize a Delta table for queries that frequently filter on a high-cardinality column?
A) Partition by that column
B) Z-ORDER by that column
C) Create a bloom filter index on that column
D) Both B and C are recommended
Question 27: In a CI/CD pipeline for Databricks, what is the best approach for managing environment-specific configurations?
A) Hard-code configurations in notebooks for each environment
B) Use different Git branches for different environments
C) Parameterize notebooks and jobs with environment-specific values
D) Duplicate workspaces for each environment
Question 28: Which performance issue would be indicated by high shuffle read/write metrics in the Spark UI?
A) Inefficient partition pruning
B) Poor broadcast variable utilization
C) Excessive data movement across the cluster
D) Memory pressure on executors
Question 29: What does the “dataChange” flag in a Delta write operation control?
A) Whether schema validation is enforced
B) Whether the write triggers downstream processing
C) Whether statistics are computed for the new data
D) Whether partition discovery is performed
Question 30: Which approach is most efficient for updating a small number of records in a large Delta table?
A) Overwrite the entire table with updated data
B) Use MERGE with a targeted condition
C) DELETE old records and INSERT new versions
D) Create a new table with updated records and UNION with unchanged records
Answer Key and Explanations:
B - The transaction log records all changes (adds, removes, updates) to the table, enabling ACID properties and time travel.
A - Z-ordering is most effective on high-cardinality columns used in JOIN conditions, as it co-locates related data and enables efficient data skipping.
B - MERGE operations require that each source record matches at most one target record; multiple matches cause a runtime error.
C - For frequent filtering on both columns, partitioning by both date and region would create an efficient directory structure that enables partition pruning on either dimension.
B - repartition() performs a full shuffle to distribute data evenly, while coalesce() minimizes data movement when reducing partition count.
D - Both expressions are equivalent and configure a watermark that allows processing of events up to 2 hours late.
C - Complete mode requires an aggregation and outputs the entire result table each time, including all keys and updated values.
B - Broadcasting the static DataFrame (if small enough) eliminates the need for shuffling and improves join performance.
B - Deduplication typically occurs during the bronze to silver transformation, as silver tables should contain clean, validated, and deduplicated data.
B - expect_or_drop constraints in DLT will filter out records that don’t meet the condition without failing the entire pipeline.
C - SCD Type 2 tracks historical changes by creating new records with start and end dates or current/historical flags.
B - Broadcast join is most efficient when joining with a small dimension table, as it eliminates shuffling by broadcasting the small table to all executors.
B - Watermarking with allowed lateness allows the stream to process late data within the defined threshold while enabling state cleanup.
A - VACUUM physically removes data files that are not referenced in the current or recent versions of the table.
C - Change Data Feed tracks row-level changes and enables incremental processing based on what has changed in the source.
C - Column masks in Unity Catalog provide a centralized, efficient way to implement column-level security across users and roles.
B - Unity Catalog provides unified governance across workspaces and clouds with a consistent security model and metadata management.
B - The Stages tab shows task distribution and duration, making it easy to identify skewed tasks where some take significantly longer than others.
A - Comparing processing rate to input rate reveals whether the streaming application can keep up with incoming data or is falling behind.
B - The dbutils.jobs.taskValues API allows tasks to set and retrieve values, enabling direct data passing between dependent tasks.
A - databricks workspace export is the correct command to export a notebook from Databricks to the local file system.
B - A shallow clone creates a reference to the original table’s files but maintains its own transaction log, allowing independent modifications.
B - MEMORY_AND_DISK is appropriate for DataFrames that may not fit entirely in memory, as it will spill to disk as needed.
B - foreachBatch gives you complete control over how each micro-batch is processed, enabling complex logic that isn’t possible with standard sinks.
B - APPLY CHANGES INTO is specifically designed for CDC processing, automatically handling inserts, updates, and deletes based on operation type.
D - For high-cardinality columns, using both Z-ORDER and bloom filter indices provides optimal query performance through data co-location and filtering.
C - Parameterizing notebooks and jobs allows the same code to be deployed across environments while adapting configurations based on the target environment.
C - High shuffle read/write metrics indicate excessive data movement across the cluster, often caused by operations like join, groupBy, or repartition.
B - The dataChange flag indicates whether the write operation should trigger downstream processing like table notifications or streaming queries.
B - MERGE with a targeted condition efficiently updates specific records without rewriting the entire table or requiring multiple operations.
Now let’s review the practice exam results and focus on areas that need additional attention. For each section where questions were missed, we’ll provide targeted reinforcement.
Common Challenge Areas:
Time Management:
The exam consists of 60 questions to be completed in 120 minutes, giving you 2 minutes per question on average.
Recommended approach:
Question Types and Strategies:
Scenario-based questions:
Code-based questions:
Best practice questions:
Troubleshooting questions:
Exam Psychology:
Let’s address some commonly confused concepts and provide final clarification:
Delta Lake Table Types:
Streaming vs. Batch Processing:
| Streaming | Batch | |
|---|---|---|
| Data scope | Unbounded | Bounded |
| Processing model | Continuous or micro-batch | Scheduled or triggered |
| State management | Required for aggregations | Generally stateless |
| Latency | Low (seconds to minutes) | High (minutes to hours) |
| Fault tolerance | Checkpointing | Job restart |
| Use cases | Real-time dashboards, alerts | Historical analysis, reporting |
SCD Implementation Patterns:
# Using MERGE
deltaTable.alias("target").merge(
updates.alias("updates"),
"target.key = updates.key"
).whenMatched().updateAll().execute()
# Step 1: Expire current records
deltaTable.alias("target").merge(
updates.alias("updates"),
"target.key = updates.key AND target.is_current = true"
).whenMatched().updateExpr({
"is_current": "false",
"end_date": "current_date()"
})
# Step 2: Insert new versions
.whenNotMatched().insertExpr({
"key": "updates.key",
"attribute": "updates.attribute",
"is_current": "true",
"start_date": "current_date()",
"end_date": "null"
}).execute()
Performance Optimization Decision Tree:
Security Implementation Best Practices:
Now it’s time for the full practice exam, which simulates the actual certification test experience.
Instructions:
I’ll provide you with the first 10 questions to give you a sense of the format and difficulty level. In a real setting, you would continue with all 60 questions.
Question 1: A data engineer needs to track all changes (inserts, updates, and deletes) made to a Delta table for audit purposes. Which feature should they enable?
A) Delta Lake versioning
B) Change Data Feed
C) Transaction log retention
D) Time travel
Question 2: Which Delta Lake operation would most efficiently handle a scenario where millions of new records need to be inserted while a few thousand existing records need to be updated based on a matching key?
A) MERGE INTO
B) INSERT OVERWRITE
C) COPY INTO with deduplication
D) DELETE followed by INSERT
Question 3: A streaming application is processing sensor data and computing aggregations with a 10-minute window. The team notices that some events arrive up to 15 minutes late but are being dropped. Which change would allow these late events to be included in the aggregations?
A) Increase the trigger interval to 15 minutes
B) Change the watermark delay to 20 minutes
C) Switch to complete output mode
D) Use a tumbling window instead of a sliding window
Question 4: A data engineer observes that a Spark job with 1000 tasks has 990 tasks completing in approximately 2 minutes, while 10 tasks take over 30 minutes. What is the most likely cause?
A) Insufficient executor memory
B) Inefficient UDF implementation
C) Data skew
D) Incorrect partition count
Question 5: Which approach would be most effective for optimizing a Delta table that is frequently queried with filters on both timestamp (range queries) and product_category (equality filters on 50 distinct values)?
A) Partition by date extracted from timestamp and Z-ORDER by product_category
B) Partition by product_category and Z-ORDER by timestamp
C) Partition by both date and product_category
D) Z-ORDER by both timestamp and product_category without partitioning
Question 6: A data engineering team needs to implement a Type 2 SCD for customer dimension data. Which columns would be necessary to add to the dimension table?
A) valid_from_date, valid_to_date, is_current
B) updated_timestamp, updated_by
C) version_number, source_system
D) hash_key, change_type
Question 7: Which storage level would be most appropriate for a large DataFrame that is reused in multiple operations but doesn’t fit entirely in memory?
A) MEMORY_ONLY
B) MEMORY_ONLY_SER
C) MEMORY_AND_DISK
D) DISK_ONLY
Question 8: In a multi-task Databricks job, what is the best way to pass data between tasks without writing intermediate tables?
A) Use broadcast variables
B) Use dbutils.jobs.taskValues API
C) Store values in widget parameters
D) Use dbutils.notebook.run with arguments
Question 9: A data engineer needs to create a dynamic view that shows different columns to different user groups. Which approach is correct?
A) Use CASE expressions based on current_user() or is_member() functions
B) Create multiple views with different permissions
C) Use row-level security with column masking
D) Implement a Python UDF that returns different columns based on the user
Question 10: Which Ganglia metric would best indicate that a cluster is experiencing memory pressure?
A) High CPU utilization
B) Increased network traffic
C) Elevated disk I/O
D) High swap usage
Question 11: What is the most efficient way to apply changes from a CDC source to a Delta table when the CDC data contains operation types (insert, update, delete) and sequence numbers?
A) Write custom merge logic for each operation type
B) Use APPLY CHANGES INTO with appropriate key and sequence columns
C) Use individual INSERT, UPDATE, and DELETE statements based on operation type
D) Use DLT’s CDC capabilities with SYNC table directive
Question 12: Which command would you use to verify if the size of a Delta table is growing excessively due to retention of old file versions?
A) DESCRIBE HISTORY delta_table
B) DESCRIBE DETAIL delta_table
C) VACUUM delta_table DRY RUN
D) OPTIMIZE delta_table
Question 13: A data engineer needs to capture schema changes in source data while loading it into a Delta table. Which approach is most appropriate?
A) Use schema enforcement to reject incompatible records
B) Use schema evolution with mergeSchema option
C) Manually update the table schema before loading new data
D) Create a new table for each schema version
Question 14: When using Delta Live Tables, which expectation mode should be used if data quality is critical and any violation should stop the pipeline?
A) expect_all
B) expect_or_drop
C) expect_or_fail
D) expect_or_warn
Question 15: Which Structured Streaming output mode would you use for an aggregation query where you only want to see updated aggregation results in the output?
A) Append
B) Update
C) Complete
D) Upsert
Question 16: A team is experiencing slow performance with complex SQL queries involving multiple joins. Which Spark UI tab would provide the most valuable information for optimizing these queries?
A) Jobs
B) Stages
C) Storage
D) SQL
Question 17: Which technique would most effectively handle a streaming join where one stream has events that arrive significantly later than the other stream?
A) Increase the watermark delay on both streams
B) Use a left outer join with the delayed stream on the right side
C) Implement a stateful foreachBatch function
D) Convert the delayed stream to a static DataFrame
Question 18: What is the primary benefit of shallow cloning a Delta table compared to deep cloning?
A) Reduced storage requirements
B) Faster query performance
C) Support for more concurrent readers
D) Better compatibility with external systems
Question 19: Which operation in Delta Lake is designed specifically for handling small file issues?
A) VACUUM
B) OPTIMIZE
C) COMPACT
D) REWRITE
Question 20: Which approach provides the best security for accessing secrets in a Databricks notebook?
A) Store secrets as environment variables
B) Store secrets in Databricks Secret Scope
C) Hardcode secrets but restrict notebook permissions
D) Pass secrets as notebook parameters
Question 21: When implementing streaming aggregations with event-time windows, how do you ensure state doesn’t grow unbounded?
A) Use stateless transformations only
B) Set a watermark on the event time column
C) Use complete output mode
D) Increase trigger interval
Question 22: Which parameter in a Databricks job configuration controls automatic termination of a cluster after completion?
A) terminate_after_execution
B) auto_terminate
C) terminate_on_success
D) cluster_termination
Question 23: What is the recommended approach to handle schema drift in a data pipeline that processes JSON data with evolving structure?
A) Set strict schema validation to prevent inconsistent data
B) Use schema inference with schema evolution enabled
C) Create a fixed schema with all possible fields
D) Process raw data as strings and parse in the application layer
Question 24: Which Delta Lake property would you modify to extend the retention period for historical versions beyond the default 30 days?
A) delta.logRetentionDuration
B) delta.retentionDurationCheck
C) delta.deletedFileRetentionDuration
D) delta.enableVersionTracking
Question 25: In a multi-hop architecture, which layer should contain business-level aggregations optimized for specific analytics use cases?
A) Bronze layer
B) Silver layer
C) Gold layer
D) Platinum layer
Question 26: Which join strategy would Spark choose when joining a very large table with a small table that has been broadcasted?
A) Broadcast hash join
B) Shuffle hash join
C) Sort-merge join
D) Nested loop join
Question 27: What is the primary benefit of using Auto Loader in Databricks compared to manually tracking and loading new files?
A) Higher throughput for batch processing
B) Support for more file formats
C) Automatic schema inference and evolution
D) Efficient incremental processing without file listings
Question 28: Which REST API endpoint would you use to retrieve the execution results of a completed Databricks job run?
A) /api/2.0/jobs/runs/get-output
B) /api/2.0/jobs/runs/get-result
C) /api/2.0/jobs/runs/export
D) /api/2.0/jobs/runs/result
Question 29:
What does the Z in Z-ORDER represent in the context of Delta Lake?
A) Zero-latency indexing
B) Z-curve spatial filling indexing
C) Zig-zag processing pattern
D) Zone-based file organization
Question 30: Which mechanism in Delta Lake helps prevent concurrent writes from causing data corruption?
A) Write-ahead logging
B) Journaling
C) Optimistic concurrency control
D) Pessimistic locking
Question 31: A data pipeline needs to ensure that customer records always have valid email addresses. Which is the most efficient approach in Delta Live Tables?
A) Implement a WHERE clause in the SELECT statement
B) Use a CASE statement to replace invalid emails with NULL
C) Apply a constraint with expect_or_drop
D) Create a separate quality monitoring job
Question 32: Which storage format serves as the underlying file format for Delta Lake tables?
A) Avro
B) Parquet
C) ORC
D) JSON
Question 33: What is the key benefit of implementing table constraints in Delta Lake?
A) Improved query performance
B) Data quality enforcement
C) Reduced storage requirements
D) Simplified schema evolution
Question 34: In the context of Delta Lake, what does the VACUUM command do?
A) Optimizes table statistics for better query planning
B) Compacts small files into larger ones
C) Removes files no longer referenced by the table
D) Forces garbage collection of cached data
Question 35: Which feature would you use to efficiently propagate deletes from a source system to a target Delta table?
A) DELETE WHERE condition
B) MERGE with delete clause
C) Change Data Feed
D) APPLY CHANGES with sequence tracking
Question 36: What is the recommended approach for handling late-arriving data in a streaming application?
A) Increase the trigger interval
B) Configure a watermark with appropriate delay
C) Use foreachBatch to implement custom logic
D) Switch to complete output mode
Question 37: Which command allows you to update multiple columns in a Delta table based on matching a condition?
A) UPDATE
B) MERGE
C) ALTER
D) UPSERT
Question 38: What is the main advantage of using Unity Catalog for data governance?
A) Improved query performance
B) Enhanced security with fine-grained access controls
C) Reduced storage costs
D) Automatic data quality validation
Question 39: Which approach would you use to implement row-level security in a Delta table?
A) Create separate tables for each security level
B) Use dynamic views with filtering based on user attributes
C) Encrypt sensitive rows
D) Use column-level access controls
Question 40: When analyzing Spark performance issues, which metric indicates data skew most directly?
A) Task duration variance within a stage
B) Overall job completion time
C) Number of tasks per stage
D) Executor CPU utilization
Question 41: Which feature is most useful for implementing incremental ETL patterns with Delta Lake?
A) Time travel
B) Schema enforcement
C) COPY INTO
D) Change Data Feed
Question 42: In a Delta Live Tables pipeline, what does the LIVE keyword indicate?
A) The table is continuously updated with streaming data
B) The table is managed by DLT and follows lineage tracking
C) The table has constraints that are actively enforced
D) The table is exposed for querying while being updated
Question 43: Which approach is most efficient for updating a subset of columns in a large Delta table?
A) CREATE OR REPLACE TABLE
B) INSERT OVERWRITE
C) UPDATE SET
D) MERGE INTO
Question 44: What is the best practice for organizing Delta tables for optimal query performance?
A) Create many small tables with highly specific data
B) Partition by columns frequently used in WHERE clauses
C) Z-ORDER by all columns used in queries
D) Avoid partitioning to simplify file management
Question 45: Which method provides the most efficient way to access a small lookup table in a Spark join?
A) Cache the lookup table before joining
B) Broadcast the lookup table to all executors
C) Repartition both tables on the join key
D) Use a subquery instead of a join
Question 46: What is the primary purpose of the Databricks CLI?
A) To provide a SQL interface for querying data
B) To automate and script Databricks operations
C) To monitor cluster performance
D) To develop and test Spark code
Question 47: Which pattern is most appropriate for implementing a multi-environment CI/CD pipeline for Databricks?
A) Using separate workspaces for each environment
B) Using branches to represent different environments
C) Deploying all code to a single workspace with environment-specific parameters
D) Creating separate notebooks for each environment
Question 48: What is the recommended approach for passing credentials securely to a Databricks job?
A) Store credentials in environment variables
B) Use Databricks Secret Scopes
C) Include credentials in job parameters
D) Store credentials in DBFS with restricted permissions
Question 49: Which type of cluster is most appropriate for running scheduled production jobs?
A) All-purpose cluster
B) High Concurrency cluster
C) Single Node cluster
D) Job cluster
Question 50: What is the most efficient way to handle data quality issues in streaming data?
A) Apply filters before processing
B) Use watermarks to drop late data
C) Implement quality checks in foreachBatch
D) Configure expectations in Delta Live Tables
Question 51: Which approach provides the best performance for querying a subset of a very large Delta table partitioned by date?
A) Use a subquery to filter the partition first
B) Include the partition column in the WHERE clause
C) Cache the entire table before querying
D) Repartition the data before querying
Question 52: What happens when a constraint is added to an existing Delta table that contains records violating the constraint?
A) The constraint is added but not enforced retroactively
B) The constraint addition fails
C) Existing violating records are automatically deleted
D) The constraint is only applied to future write operations
Question 53: Which API would you use to programmatically manage Databricks jobs?
A) Compute API
B) Jobs API
C) Workflow API
D) Scheduler API
Question 54: What is the primary benefit of using partitioning in Delta tables?
A) Improved storage compression
B) Automatic data clustering
C) Efficient data pruning during queries
D) Simplified data management
Question 55: Which permission is required to view query history in Databricks SQL?
A) CAN VIEW
B) CAN RUN
C) CAN MANAGE
D) CAN MODIFY
Question 56: What feature in Delta Lake helps maintain table statistics for query optimization?
A) Data skipping
B) File statistics tracking
C) Bloom filters
D) Adaptive Query Execution
Question 57: Which approach is recommended for handling schema changes in a Delta table?
A) Create a new table with each schema change
B) Set mergeSchema = true when writing new data
C) Maintain schema versions in separate columns
D) Convert all columns to string type for flexibility
Question 58: What is the main purpose of time travel in Delta Lake?
A) To schedule future queries
B) To optimize query performance over time ranges
C) To query historical versions of a table
D) To track data access patterns over time
Question 59: Which command would you use to see the history of operations performed on a Delta table?
A) DESCRIBE HISTORY table_name
B) SHOW HISTORY table_name
C) SELECT * FROM HISTORY(table_name)
D) DESCRIBE DETAIL table_name
Question 60: What is the best approach for implementing a Type 2 SCD in Delta Lake?
A) Use MERGE with standard source-to-target matching
B) Use MERGE with conditional updates for current records and inserts for new versions
C) Use INSERT OVERWRITE with all records having new surrogate keys
D) Create a view that unions all historical versions
Answer Key (first 10 questions):
B - Change Data Feed tracks all row-level changes (inserts, updates, deletes) and is designed for audit and incremental processing scenarios.
A - MERGE INTO efficiently handles the combined scenario of inserting new records and updating existing ones in a single operation.
B - Increasing the watermark delay to 20 minutes would allow events arriving up to 15 minutes late to be included in the aggregations.
C - The pattern of a small number of tasks taking significantly longer than others is a classic indicator of data skew.
A - For this query pattern, partitioning by date (extracted from timestamp) provides efficient pruning for range queries, while Z-ORDER by product_category optimizes the equality filters within partitions.
A - A Type 2 SCD requires tracking the validity period (valid_from_date, valid_to_date) and current status (is_current) for each version of a dimension record.
C - MEMORY_AND_DISK is appropriate for large DataFrames that don’t fit entirely in memory, as it will store as much as possible in memory and spill the rest to disk.
B - The dbutils.jobs.taskValues API is specifically designed to pass values between tasks in a multi-task job.
A - Dynamic views implement column-level security using CASE expressions with current_user() or is_member() functions to return different values based on the user’s identity or role membership.
D - High swap usage indicates that the system is using disk space as virtual memory because physical memory is exhausted, which is a clear sign of memory pressure.
B - APPLY CHANGES INTO is specifically designed for CDC processing, automatically handling inserts, updates, and deletes based on operation type and sequence.
C - VACUUM with DRY RUN shows which files would be deleted based on the current retention period, helping identify if excessive versions are being retained.
B - Using schema evolution with mergeSchema option allows the Delta table to automatically adapt to changes in the source schema.
C - expect_or_fail in DLT causes the entire pipeline to fail if any records violate the constraint, appropriate when data quality is critical.
B - Update mode outputs only the rows that have been updated since the last trigger, making it ideal for updated aggregation results.
D - The SQL tab provides detailed information about query execution plans and optimization opportunities specific to SQL queries.
A - Increasing the watermark delay on the stream with late-arriving events ensures these events can still be processed within the join.
A - Shallow clones reference the original data files (reducing storage requirements) while maintaining an independent transaction log.
B - OPTIMIZE is specifically designed to address small file issues by compacting them into larger, more efficiently sized files.
B - Databricks Secret Scope provides secure storage and retrieval of secrets without exposing them in code or logs.
B - Setting a watermark on the event time column allows the system to discard state for windows that are no longer needed, preventing unbounded growth.
A - The terminate_after_execution parameter controls whether a cluster automatically terminates after job completion.
B - Using schema inference with schema evolution enabled allows the pipeline to adapt to changes in the JSON structure over time.
A - delta.logRetentionDuration controls how long historical versions of a Delta table are retained.
C - The Gold layer in a multi-hop architecture contains business-level aggregations and metrics optimized for specific analytics use cases.
A - When joining with a broadcasted small table, Spark will choose a broadcast hash join, which eliminates the need for shuffling.
D - Auto Loader efficiently processes new files incrementally without expensive file listing operations, which is its primary benefit.
A - The /api/2.0/jobs/runs/get-output endpoint retrieves the execution results of a completed job run.
B - Z in Z-ORDER refers to Z-curve spatial filling indexing, which maps multidimensional data to one dimension while preserving locality.
C - Optimistic concurrency control prevents data corruption from concurrent writes by detecting conflicts during the commit phase.
C - Using expect_or_drop in DLT applies a constraint that automatically removes records with invalid email addresses during processing.
B - Parquet is the underlying file format used by Delta Lake tables, providing columnar storage and efficient compression.
B - Table constraints in Delta Lake enforce data quality rules, preventing invalid data from being written to the table.
C - VACUUM removes files that are no longer referenced by the table and are older than the retention threshold.
D - APPLY CHANGES with sequence tracking efficiently propagates deletes from a source system to a target Delta table.
B - Configuring an appropriate watermark delay is the recommended approach for handling late-arriving data in streaming applications.
A - The UPDATE command allows updating multiple columns based on a condition in a Delta table.
B - Unity Catalog’s main advantage is enhanced security with fine-grained access controls across workspaces and cloud providers.
B - Dynamic views with filtering based on user attributes is the recommended approach for implementing row-level security.
A - Task duration variance within a stage directly indicates data skew, where some tasks process significantly more data than others.
D - Change Data Feed is most useful for implementing incremental ETL patterns as it tracks row-level changes efficiently.
B - The LIVE keyword in DLT indicates that the table is managed by Delta Live Tables and participates in lineage tracking.
C - UPDATE SET is most efficient for updating a subset of columns in a large Delta table as it only modifies the specified columns.
B - Partitioning by columns frequently used in WHERE clauses provides the best query performance through partition pruning.
B - Broadcasting a small lookup table to all executors is the most efficient method for using it in Spark joins.
B - The primary purpose of the Databricks CLI is to automate and script Databricks operations programmatically.
A - Using separate workspaces for each environment (dev, test, prod) is the most appropriate pattern for implementing a multi-environment CI/CD pipeline.
B - Databricks Secret Scopes provide a secure way to manage and access credentials in jobs without exposing them.
D - Job clusters are specifically designed for running scheduled production jobs, automatically starting and terminating as needed.
D - Configuring expectations in Delta Live Tables provides the most efficient way to handle data quality issues in streaming data.
B - Including the partition column in the WHERE clause enables partition pruning, which significantly improves query performance.
A - When adding a constraint to an existing Delta table, it is added but not enforced retroactively on existing data.
B - The Jobs API is used to programmatically manage Databricks jobs, including creation, modification, and execution.
C - The primary benefit of partitioning is efficient data pruning during queries, as irrelevant partitions can be skipped entirely.
C - CAN MANAGE permission is required to view query history in Databricks SQL.
A - Data skipping helps maintain statistics about min/max values in files, enabling the query optimizer to skip irrelevant files.
B - Setting mergeSchema = true when writing new data is the recommended approach for handling schema changes in Delta tables.
C - The main purpose of time travel in Delta Lake is to query historical versions of a table at specific points in time.
A - DESCRIBE HISTORY table_name shows the history of operations performed on a Delta table.
B - For Type 2 SCDs, using MERGE with conditional logic to update current records (setting end dates and is_current flags) and insert new versions is the best approach.
Based on your performance on the practice exams, here are some final preparation strategies for success:
Focus on Areas of Uncertainty:
Practical Application:
Last-Minute Review:
Day-Before Preparation:
During the Exam: