Databricks is a unified data analytics platform that combines data engineering, data science, and business analytics into a single, integrated environment. Founded by the creators of Apache Spark, Databricks provides an enterprise-grade, cloud-based platform that enables organizations to process and analyze massive datasets efficiently.
Key Components of the Databricks Platform:
The Databricks platform consists of several integrated components:
The data lakehouse architecture represents a modern approach to data management that combines the best features of data warehouses and data lakes.
Data Lakehouse vs. Data Warehouse vs. Data Lake:
| Feature | Data Lake | Data Warehouse | Data Lakehouse |
|---|---|---|---|
| Data Structure | Unstructured/semi-structured | Highly structured | Supports all data types |
| Schema | Schema-on-read | Schema-on-write | Schema enforcement with flexibility |
| Data Quality | Limited | High | High |
| Performance | Variable | Optimized | Optimized |
| Cost | Lower storage costs | Higher costs | Balance of cost and performance |
| Use Cases | Data science, ML | BI reporting | Unified analytics |
The Medallion Architecture:
The medallion architecture is a data organization framework used in the lakehouse that organizes data into three tiers:
Benefits of the Lakehouse Approach:
Clusters are the computational resources that execute your code in Databricks.
Types of Clusters:
Databricks Runtime (DBR):
Cluster Termination:
Databricks Repos enables Git-based version control directly within the Databricks workspace.
Key Features:
CI/CD Workflows with Repos:
Limitations Compared to Traditional Git:
Databricks notebooks support multiple programming languages, allowing different cell types within the same notebook.
Supported Languages:
Language Switching in Notebooks:
%python for Python code%sql for SQL queries%scala for Scala code%r for R codeNotebook Workflows:
%run command%run /path/to/notebookExercise 1: Creating a Cluster
Exercise 2: Creating and Using Notebooks
# Python cell
print("Hello from Python!")
-- SQL cell (use %sql magic command)
%sql
SELECT "Hello from SQL!" AS greeting
// Scala cell (use %scala magic command)
%scala
println("Hello from Scala!")
Exercise 3: Exploring the Medallion Architecture
Create a new notebook and implement a simple medallion architecture:
%sql magic command%run command#python@sql%scala!r%sql magic command%run command%scalaLet’s proceed with detailed hands-on exercises to reinforce your understanding of the Databricks Lakehouse Platform. These exercises will give you practical experience with the concepts we’ve covered.
spark.sql.shuffle.partitions 8
spark.sql.adaptive.enabled true
While your cluster starts, explore the workspace:
# Python cell
print("Exploring Databricks Runtime Environment")
# Display Spark version
print(f"Spark Version: {spark.version}")
# Display Databricks Runtime version
dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
%sql
-- SQL cell
SELECT current_timestamp() AS current_time,
current_user() AS username,
'Databricks Lakehouse Platform' AS platform
%scala
// Scala cell
println("Scala is the native language of Apache Spark")
val sparkVersion = spark.version
println(s"Current Spark version: $sparkVersion")
# Create a simple DataFrame
data = [("Bronze", "Raw data"),
("Silver", "Cleaned data"),
("Gold", "Business-ready data")]
medallic_df = spark.createDataFrame(data, ["Layer", "Description"])
display(medallic_df)
Let’s create a new notebook to implement a basic medallion architecture:
# Generate sample sales data
from pyspark.sql import functions as F
import random
from datetime import datetime, timedelta
# Create a sample dataset with some inconsistencies and issues
data = []
start_date = datetime(2023, 1, 1)
# Generate 100 sample records
for i in range(100):
# Create some data quality issues randomly
if random.random() < 0.1:
# Some records have null values
customer_id = None
else:
customer_id = f"CUST-{random.randint(1, 20):04d}"
# Some dates are missing
if random.random() < 0.05:
date = None
else:
date = (start_date + timedelta(days=random.randint(0, 60))).strftime("%Y-%m-%d")
# Some product IDs have different formats
if random.random() < 0.15:
product_id = f"prod-{random.randint(1, 50)}"
else:
product_id = f"PROD-{random.randint(1, 50):03d}"
# Some quantities are invalid (negative)
if random.random() < 0.08:
quantity = -random.randint(1, 10)
else:
quantity = random.randint(1, 10)
# Some prices have incorrect decimal places or are zero
if random.random() < 0.07:
price = round(random.random() * 100, random.randint(0, 4))
else:
price = round(random.random() * 100, 2)
data.append((customer_id, date, product_id, quantity, price))
# Create a DataFrame
bronze_df = spark.createDataFrame(data, ["customer_id", "date", "product_id", "quantity", "price"])
# Save as a Delta table
bronze_df.write.format("delta").mode("overwrite").saveAsTable("bronze_sales")
# Display the bronze data
print("Bronze Layer Data (Raw):")
display(spark.table("bronze_sales"))
# Read from the bronze layer
bronze_data = spark.table("bronze_sales")
# Clean and validate the data
silver_data = (bronze_data
# Filter out records with null customer_id or date
.filter(F.col("customer_id").isNotNull() & F.col("date").isNotNull())
# Standardize product_id format
.withColumn("product_id",
F.when(F.col("product_id").startswith("prod-"),
F.concat(F.lit("PROD-"), F.lpad(F.regexp_extract(F.col("product_id"), "prod-(\d+)", 1), 3, "0")))
.otherwise(F.col("product_id")))
# Ensure quantity is positive
.withColumn("quantity",
F.when(F.col("quantity") <= 0, None)
.otherwise(F.col("quantity")))
# Ensure price has exactly 2 decimal places and is positive
.withColumn("price",
F.when(F.col("price") <= 0, None)
.otherwise(F.round(F.col("price"), 2)))
# Convert date string to date type
.withColumn("date", F.to_date(F.col("date")))
# Add a month column for aggregation in gold layer
.withColumn("month", F.date_format(F.col("date"), "yyyy-MM"))
)
# Save as a Delta table
silver_data.write.format("delta").mode("overwrite").saveAsTable("silver_sales")
# Display the silver data
print("Silver Layer Data (Cleaned and Validated):")
display(spark.table("silver_sales"))
# Read from the silver layer
silver_data = spark.table("silver_sales")
# Create monthly sales summary by product
gold_monthly_product_sales = (silver_data
.groupBy("month", "product_id")
.agg(
F.count("*").alias("transaction_count"),
F.sum("quantity").alias("total_quantity"),
F.sum(F.col("quantity") * F.col("price")).alias("total_sales_amount"),
F.avg("price").alias("average_price")
)
.orderBy("month", "product_id")
)
# Save as a Delta table
gold_monthly_product_sales.write.format("delta").mode("overwrite").saveAsTable("gold_monthly_product_sales")
# Create customer spending summary
gold_customer_summary = (silver_data
.groupBy("customer_id")
.agg(
F.countDistinct("date").alias("active_days"),
F.count("*").alias("transaction_count"),
F.sum(F.col("quantity") * F.col("price")).alias("total_spend"),
F.max("date").alias("last_purchase_date")
)
.orderBy(F.col("total_spend").desc())
)
# Save as a Delta table
gold_customer_summary.write.format("delta").mode("overwrite").saveAsTable("gold_customer_summary")
# Display the gold layer tables
print("Gold Layer Data - Monthly Product Sales:")
display(spark.table("gold_monthly_product_sales"))
print("Gold Layer Data - Customer Summary:")
display(spark.table("gold_customer_summary"))
# Run the Medallion Architecture notebook
%run /DE-Certification/Medallion-Architecture-Demo
# Now we can work with the tables created in that notebook
print("Accessing Gold Layer tables from the master notebook:")
display(spark.sql("SELECT * FROM gold_monthly_product_sales LIMIT 5"))
If your Databricks workspace has Repos enabled:
Apache Spark provides versatile capabilities for extracting data from various sources. The Databricks platform enhances these capabilities with optimized readers and connectors.
Spark supports multiple file formats, each with specific readers:
# Reading CSV files
csv_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/path/to/file.csv")
# Reading Parquet files
parquet_df = spark.read.parquet("/path/to/directory")
# Reading JSON files
json_df = spark.read.json("/path/to/file.json")
# Reading from a directory of files
directory_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/path/to/directory/*.csv")
Connecting to relational databases:
jdbc_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://server:port/database") \
.option("dbtable", "schema.table") \
.option("user", "username") \
.option("password", "password") \
.load()
Spark SQL offers different ways to reference datasets:
# Creating a temporary view
df.createOrReplaceTempView("temp_view_name")
# Using the temporary view
result = spark.sql("SELECT * FROM temp_view_name WHERE column > 10")
Temporary views exist only within the current Spark session.
# Creating a global temporary view
df.createOrReplaceGlobalTempView("global_view_name")
# Accessing the global temporary view
result = spark.sql("SELECT * FROM global_temp.global_view_name")
Global temporary views exist across all sessions within the same Spark application.
CTEs provide a way to write auxiliary statements for use in a larger query:
-- Using a CTE
WITH revenue_data AS (
SELECT product_id, SUM(amount) as total_revenue
FROM sales
GROUP BY product_id
)
SELECT p.name, r.total_revenue
FROM products p
JOIN revenue_data r ON p.id = r.product_id
ORDER BY r.total_revenue DESC
Spark SQL provides a comprehensive set of functions for data manipulation:
# Filtering data
filtered_df = df.filter(df.age > 25)
# SQL equivalent
spark.sql("SELECT * FROM people WHERE age > 25")
# Selecting columns
selected_df = df.select("name", "age", "department")
# SQL equivalent
spark.sql("SELECT name, age, department FROM people")
# Adding new columns
enhanced_df = df.withColumn("age_group",
when(df.age < 18, "minor")
.when(df.age < 65, "adult")
.otherwise("senior"))
# SQL equivalent
spark.sql("""
SELECT *,
CASE
WHEN age < 18 THEN 'minor'
WHEN age < 65 THEN 'adult'
ELSE 'senior'
END as age_group
FROM people
""")
# Grouping and aggregating
agg_df = df.groupBy("department").agg(
avg("salary").alias("avg_salary"),
count("*").alias("employee_count"),
sum("salary").alias("total_salary")
)
# SQL equivalent
spark.sql("""
SELECT
department,
AVG(salary) as avg_salary,
COUNT(*) as employee_count,
SUM(salary) as total_salary
FROM people
GROUP BY department
""")
# Saving as a managed Delta table
df.write.format("delta").saveAsTable("database_name.table_name")
# Saving as an external Delta table
df.write.format("delta").option("path", "/path/to/data").saveAsTable("database_name.table_name")
# Overwriting data
df.write.format("delta").mode("overwrite").saveAsTable("database_name.table_name")
# Appending data
df.write.format("delta").mode("append").saveAsTable("database_name.table_name")
# Reading a Delta table
table_df = spark.read.table("database_name.table_name")
# Alternative syntax
table_df = spark.table("database_name.table_name")
# SQL equivalent
table_df = spark.sql("SELECT * FROM database_name.table_name")
Let’s create a basic ETL pipeline that extracts data from a CSV file, transforms it, and loads it into a Delta table:
# Create a sample dataset
from pyspark.sql import functions as F
# Generate sample customer data
data = [
(1, "John Smith", "1980-05-15", "New York", 35000),
(2, "Mary Johnson", "1992-07-22", "Los Angeles", 42000),
(3, "James Brown", "1975-11-03", "Chicago", 55000),
(4, "Patricia Davis", "1988-03-29", "Houston", 67000),
(5, "Robert Miller", None, "Philadelphia", 48000),
(6, "Linda Wilson", "1990-09-12", "Phoenix", None),
(7, "Michael Moore", "1982-04-08", "San Antonio", 51000),
(8, "Elizabeth Taylor", "1985-12-25", "San Diego", 44000),
(9, "William Anderson", "1978-02-17", "Dallas", 39000),
(10, "Jennifer Thomas", "1995-06-10", None, 61000)
]
# Create DataFrame and save as CSV
columns = ["customer_id", "name", "birthdate", "city", "annual_income"]
customer_df = spark.createDataFrame(data, columns)
customer_df.write.mode("overwrite").csv("/tmp/customer_data", header=True)
print("Sample data created successfully")
# EXTRACT: Read data from CSV
raw_df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/tmp/customer_data")
print("Extracted data:")
display(raw_df)
# TRANSFORM: Clean and enhance the data
transformed_df = raw_df \
.withColumn("birthdate", F.to_date(F.col("birthdate"))) \
.withColumn("age", F.floor(F.months_between(F.current_date(), F.col("birthdate")) / 12)) \
.withColumn("city", F.when(F.col("city").isNull(), "Unknown").otherwise(F.col("city"))) \
.withColumn("annual_income", F.when(F.col("annual_income").isNull(), 0).otherwise(F.col("annual_income"))) \
.withColumn("income_bracket",
F.when(F.col("annual_income") < 40000, "Low")
.when(F.col("annual_income") < 60000, "Medium")
.otherwise("High"))
print("Transformed data:")
display(transformed_df)
# LOAD: Save as a Delta table
transformed_df.write.format("delta").mode("overwrite").saveAsTable("customer_data")
print("Data loaded into Delta table 'customer_data'")
# Verify the data
print("Data in Delta table:")
display(spark.table("customer_data"))
Duplicated data is a common issue in data engineering. Spark provides several methods for handling duplicates:
# Count duplicates
duplicate_counts = df.groupBy("id").count().filter("count > 1")
# SQL equivalent
spark.sql("""
SELECT id, COUNT(*) as count
FROM table_name
GROUP BY id
HAVING COUNT(*) > 1
""")
# Remove complete duplicates
deduplicated_df = df.distinct()
# SQL equivalent
spark.sql("SELECT DISTINCT * FROM table_name")
# Remove duplicates based on specific columns
deduplicated_df = df.dropDuplicates(["id", "transaction_date"])
# SQL equivalent
spark.sql("""
WITH ranked_data AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id, transaction_date ORDER BY id) as rn
FROM table_name
)
SELECT * FROM ranked_data WHERE rn = 1
""")
Validating data quality is crucial in ETL processes:
# Check if id column contains unique values
id_counts = df.groupBy("id").count()
duplicate_ids = id_counts.filter("count > 1")
if duplicate_ids.count() > 0:
print("Primary key constraint violated")
display(duplicate_ids)
else:
print("Primary key constraint satisfied")
# Check if all product_ids exist in the products table
product_ids_in_orders = orders_df.select("product_id").distinct()
valid_product_ids = products_df.select("id").distinct()
invalid_product_ids = product_ids_in_orders.join(
valid_product_ids,
product_ids_in_orders["product_id"] == valid_product_ids["id"],
"left_anti"
)
if invalid_product_ids.count() > 0:
print("Foreign key constraint violated")
display(invalid_product_ids)
else:
print("Foreign key constraint satisfied")
# Converting string to timestamp
df = df.withColumn("event_time", F.to_timestamp(F.col("event_time_string")))
# Extracting components from timestamps
df = df.withColumn("year", F.year("event_time")) \
.withColumn("month", F.month("event_time")) \
.withColumn("day", F.dayofmonth("event_time")) \
.withColumn("hour", F.hour("event_time"))
# Calculating date differences
df = df.withColumn("days_since_purchase",
F.datediff(F.current_date(), F.col("purchase_date")))
# Parsing JSON string into struct
df = df.withColumn("json_data", F.from_json(F.col("json_string"), schema))
# Accessing nested fields
df = df.withColumn("customer_name", F.col("json_data.customer.name"))
# Exploding arrays into multiple rows
df = df.withColumn("item", F.explode("items"))
# JSON schema definition example
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
schema = StructType([
StructField("customer", StructType([
StructField("name", StringType()),
StructField("email", StringType())
])),
StructField("items", ArrayType(StructType([
StructField("item_id", StringType()),
StructField("quantity", IntegerType())
])))
])
SQL UDFs allow for custom logic within SQL queries:
-- Creating a SQL UDF
CREATE OR REPLACE FUNCTION calculate_tax(amount DOUBLE)
RETURNS DOUBLE
RETURN amount * 0.07;
-- Using the UDF
SELECT item_name, price, calculate_tax(price) as tax_amount
FROM items;
The CASE WHEN statement provides conditional logic in Spark SQL:
-- Using CASE WHEN for conditional logic
SELECT
customer_id,
total_purchase,
CASE
WHEN total_purchase < 100 THEN 'Low Value'
WHEN total_purchase < 1000 THEN 'Medium Value'
ELSE 'High Value'
END as customer_segment
FROM customer_purchases;
Let’s work with a more complex scenario involving nested data and deduplication:
# Create sample order data with duplicates and nested structures
from pyspark.sql.types import *
import json
# Sample order data with JSON
order_data = [
(1, "2023-01-15", '{"customer": {"id": 101, "name": "John Doe"}, "items": [{"id": "A1", "qty": 2}, {"id": "B3", "qty": 1}]}'),
(2, "2023-01-16", '{"customer": {"id": 102, "name": "Jane Smith"}, "items": [{"id": "C2", "qty": 3}]}'),
(1, "2023-01-15", '{"customer": {"id": 101, "name": "John Doe"}, "items": [{"id": "A1", "qty": 2}, {"id": "B3", "qty": 1}]}'), # Duplicate
(3, "2023-01-17", '{"customer": {"id": 103, "name": "Bob Johnson"}, "items": [{"id": "A1", "qty": 1}, {"id": "D4", "qty": 4}]}'),
(4, "2023-01-18", '{"customer": {"id": 101, "name": "John Doe"}, "items": [{"id": "E5", "qty": 2}]}')
]
# Define the schema for the JSON data
json_schema = StructType([
StructField("customer", StructType([
StructField("id", IntegerType()),
StructField("name", StringType())
])),
StructField("items", ArrayType(StructType([
StructField("id", StringType()),
StructField("qty", IntegerType())
])))
])
# Create DataFrame
order_df = spark.createDataFrame(order_data, ["order_id", "order_date", "order_details"])
# Save as table
order_df.write.format("delta").mode("overwrite").saveAsTable("raw_orders")
print("Sample order data created:")
display(spark.table("raw_orders"))
# Now process the data with various transformations
processed_df = spark.table("raw_orders")
# 1. Remove duplicates
deduplicated_df = processed_df.dropDuplicates(["order_id", "order_date", "order_details"])
print(f"Removed {processed_df.count() - deduplicated_df.count()} duplicate orders")
# 2. Parse the JSON data
parsed_df = deduplicated_df.withColumn("order_details_parsed",
F.from_json(F.col("order_details"), json_schema))
# 3. Extract nested fields
extracted_df = parsed_df \
.withColumn("customer_id", F.col("order_details_parsed.customer.id")) \
.withColumn("customer_name", F.col("order_details_parsed.customer.name")) \
.withColumn("items", F.col("order_details_parsed.items"))
# 4. Convert order_date to date type
date_df = extracted_df \
.withColumn("order_date", F.to_date(F.col("order_date"))) \
.withColumn("order_day", F.dayofweek(F.col("order_date"))) \
.withColumn("order_month", F.month(F.col("order_date")))
print("After parsing JSON and extracting fields:")
display(date_df.select("order_id", "order_date", "customer_id", "customer_name", "items"))
# 5. Explode the items array into separate rows
exploded_df = date_df \
.withColumn("item", F.explode("items")) \
.withColumn("item_id", F.col("item.id")) \
.withColumn("quantity", F.col("item.qty")) \
.drop("items", "item", "order_details", "order_details_parsed")
print("After exploding items array:")
display(exploded_df)
# 6. Save the processed data
exploded_df.write.format("delta").mode("overwrite").saveAsTable("processed_orders")
print("Data saved to 'processed_orders' table")
Let’s also demonstrate working with pivots and user-defined functions:
# Create sample sales data
sales_data = [
("2023-01", "Electronics", 12500),
("2023-01", "Clothing", 8300),
("2023-01", "Home Goods", 5600),
("2023-02", "Electronics", 14200),
("2023-02", "Clothing", 9100),
("2023-02", "Home Goods", 6200),
("2023-03", "Electronics", 13800),
("2023-03", "Clothing", 8900),
("2023-03", "Home Goods", 7100)
]
sales_df = spark.createDataFrame(sales_data, ["month", "category", "revenue"])
sales_df.write.format("delta").mode("overwrite").saveAsTable("monthly_sales")
print("Monthly sales data:")
display(spark.table("monthly_sales"))
# Create a SQL UDF for calculating tax
spark.sql("""
CREATE OR REPLACE FUNCTION calculate_tax(amount DOUBLE)
RETURNS DOUBLE
RETURN amount * 0.08;
""")
# Use the UDF in a query
spark.sql("""
SELECT month, category, revenue, calculate_tax(revenue) AS tax
FROM monthly_sales
ORDER BY month, category
""").show()
# Create a pivot table
pivoted_sales = spark.sql("""
SELECT *
FROM monthly_sales
PIVOT (
SUM(revenue) FOR category IN ('Electronics', 'Clothing', 'Home Goods')
)
ORDER BY month
""")
print("Pivoted sales data:")
display(pivoted_sales)
firstRowHeaderheaderhasHeaderincludeHeaderremoveDuplicates()dropDuplicates()deduplicate()distinctRows()EXTRACT(YEAR FROM date_column)year(date_column)date_column.getYear()date_part('year', date_column)parse_jsonjson_parsefrom_jsonto_structexplode function in Spark?
WITH ranked_data AS (
SELECT *, ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date DESC) as rn
FROM orders
)
SELECT * FROM ranked_data WHERE rn = 1
CREATE FUNCTION discount(price DOUBLE) RETURN price * 0.95CREATE OR REPLACE FUNCTION discount(price DOUBLE) RETURNS DOUBLE RETURN price * 0.95CREATE UDF discount(price DOUBLE) AS price * 0.95DEFINE FUNCTION discount(price DOUBLE) AS price * 0.95df.filter(df.column.isNull()).count() > 0df.select(df.column).isNull().any()df.where("column IS NULL").count() > 0SELECT * FROM table ROTATE (sum(value) FOR category IN ('A', 'B', 'C'))SELECT * FROM table PIVOT (sum(value) FOR category IN ('A', 'B', 'C'))SELECT * FROM table CROSS TAB (sum(value) FOR category IN ('A', 'B', 'C'))SELECT * FROM table TRANSFORM (sum(value) FOR category IN ('A', 'B', 'C'))headerdropDuplicates()year(date_column)from_jsonCREATE OR REPLACE FUNCTION discount(price DOUBLE) RETURNS DOUBLE RETURN price * 0.95SELECT * FROM table PIVOT (sum(value) FOR category IN ('A', 'B', 'C'))Delta Lake is an open-source storage layer that brings reliability to data lakes. As a key technology in the Databricks Lakehouse platform, Delta Lake provides critical enterprise features for data management.
Delta Lake is a storage layer that sits on top of your existing data lake, providing:
Delta Lake stores data as Parquet files but adds a transaction log that tracks all changes to the table. This architecture consists of:
_delta_log directory) that records all operations performed on the tableWhen a Delta table is queried, the Delta Lake engine consults the transaction log to determine which data files to read, ensuring a consistent view of the data.
ACID transactions are a set of properties that guarantee reliability in database operations:
Delta Lake implements these properties through its transaction log mechanism:
# Example of an atomic operation
# This entire operation either succeeds or fails as a unit
df.write.format("delta").mode("overwrite").save("/path/to/delta-table")
In Databricks, metadata is stored in the metastore, which can be:
Databricks supports two types of Delta tables:
-- Creating a managed table
CREATE TABLE managed_table (id INT, name STRING);
-- Creating an external table
CREATE TABLE external_table (id INT, name STRING)
LOCATION '/path/to/external/storage';
To identify if a table is managed or external:
-- Check if a table is managed or external
DESCRIBE EXTENDED table_name;
Look for the Type property (managed or external) and the Location property.
Let’s build a practical exercise to explore Delta Lake fundamentals:
# Create a sample dataset
data = [(1, "Product A", 10.5, "2023-01-01"),
(2, "Product B", 20.75, "2023-01-02"),
(3, "Product C", 15.0, "2023-01-03"),
(4, "Product D", 8.25, "2023-01-04"),
(5, "Product E", 12.99, "2023-01-05")]
columns = ["id", "product_name", "price", "created_date"]
df = spark.createDataFrame(data, columns)
# Save as a managed Delta table
df.write.format("delta").mode("overwrite").saveAsTable("products_managed")
# Save as an external Delta table
external_path = "/tmp/delta/products_external"
df.write.format("delta").mode("overwrite").option("path", external_path).saveAsTable("products_external")
print("Created both managed and external Delta tables")
# Examine table information
print("\nManaged Table Information:")
display(spark.sql("DESCRIBE EXTENDED products_managed"))
print("\nExternal Table Information:")
display(spark.sql("DESCRIBE EXTENDED products_external"))
# Inspect Delta log structure
print("\nDelta Log Structure:")
display(dbutils.fs.ls(external_path + "/_delta_log"))
# Make modifications to track history
# Add a new product
new_product = [(6, "Product F", 22.5, "2023-01-06")]
new_df = spark.createDataFrame(new_product, columns)
new_df.write.format("delta").mode("append").saveAsTable("products_managed")
# Update a product price
spark.sql("UPDATE products_managed SET price = 11.99 WHERE id = 1")
# View table history
print("\nTable History:")
display(spark.sql("DESCRIBE HISTORY products_managed"))
# Time travel query
print("\nData as of Version 0:")
display(spark.sql("SELECT * FROM products_managed VERSION AS OF 0"))
print("\nCurrent data (latest version):")
display(spark.sql("SELECT * FROM products_managed"))
Delta Lake’s transaction log enables powerful versioning capabilities:
-- View the history of a Delta table
DESCRIBE HISTORY delta_table;
-- Query a specific version of a table
SELECT * FROM delta_table VERSION AS OF 3;
-- Query a table as of a specific timestamp
SELECT * FROM delta_table TIMESTAMP AS OF '2023-01-15T00:00:00.000Z';
-- Restore a table to a previous version
RESTORE TABLE delta_table TO VERSION AS OF 3;
Delta Lake provides several operations to optimize table performance:
Z-ordering is a technique that co-locates related data in the same files, improving query performance by reducing the amount of data that needs to be read:
-- Z-order by one or more columns
OPTIMIZE delta_table
ZORDER BY (date_column, region_column);
Benefits of Z-Ordering:
The VACUUM command permanently removes files no longer needed by the Delta table:
-- Remove files not needed for versions older than 7 days
VACUUM delta_table RETAIN 7 DAYS;
By default, Delta Lake retains 30 days of history. This can be configured through table properties:
-- Set retention period to 90 days
ALTER TABLE delta_table
SET TBLPROPERTIES ('delta.logRetentionDuration' = '90 days');
Small files can degrade query performance. The OPTIMIZE command compacts small files into larger ones:
-- Compact small files without reordering data
OPTIMIZE delta_table;
Generated columns are automatically calculated from other columns:
-- Create a table with a generated column
CREATE TABLE sales (
id INT,
amount DOUBLE,
tax DOUBLE GENERATED ALWAYS AS (amount * 0.07)
);
Add documentation and configure behavior through comments and properties:
-- Add a comment to a table
COMMENT ON TABLE sales IS 'Daily sales transactions';
-- Add a comment to a column
COMMENT ON COLUMN sales.amount IS 'Sale amount in USD';
-- Set table properties
ALTER TABLE sales
SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);
This operation replaces a table with a new definition:
-- Create or replace a table
CREATE OR REPLACE TABLE customer_data
AS SELECT * FROM customer_source WHERE region = 'Europe';
This operation replaces existing data while preserving the table schema and properties:
-- Overwrite all data in the table
INSERT OVERWRITE TABLE customer_data
SELECT * FROM customer_source WHERE region = 'North America';
Differences between these approaches:
CREATE OR REPLACE TABLE recreates the entire table, potentially changing the schemaINSERT OVERWRITE preserves the table structure and only replaces the dataThe MERGE statement is a powerful tool for upserting data (insert, update, or delete in a single atomic operation):
-- Basic MERGE operation
MERGE INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN
UPDATE SET
target_table.column1 = source_table.column1,
target_table.column2 = source_table.column2
WHEN NOT MATCHED THEN
INSERT (id, column1, column2)
VALUES (source_table.id, source_table.column1, source_table.column2);
Benefits of MERGE:
The COPY INTO command is a simple, idempotent way to load data incrementally:
-- Load data incrementally from a directory
COPY INTO target_table
FROM '/path/to/source/files'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Key features:
Delta Live Tables provides a declarative framework for building reliable data pipelines:
Auto Loader simplifies streaming ingestion from file sources:
# Using Auto Loader in Python
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/path/to/schema")
.load("/path/to/source/files")
Features:
DLT supports data quality validation through constraints:
# Table with constraints
@dlt.table(
comment="Validated customer data",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_id", "id IS NOT NULL")
@dlt.expect_or_fail("valid_email", "email LIKE '%@%.%'")
def validated_customers():
return spark.table("raw_customers").filter("age > 0")
Constraint handling options:
@dlt.expect_or_drop: Drop records that violate the constraint@dlt.expect_or_fail: Fail the pipeline if any record violates the constraint@dlt.expect: Track violations without affecting the pipelineCDC processes data changes from source systems:
# Apply CDC changes
@dlt.table
def customers_target():
return (
dlt.apply_changes(
target = "customers_target",
source = "customers_raw_stream",
keys = ["customer_id"],
sequence_by = "operation_timestamp",
ignore_null_updates = False,
apply_as_deletes = "operation = 'DELETE'"
)
)
Benefits:
Let’s implement a comprehensive incremental processing example:
# Set up sample data
from pyspark.sql import functions as F
from datetime import datetime, timedelta
# Create initial customer data
base_customers = [
(1, "John Doe", "john@example.com", "New York"),
(2, "Jane Smith", "jane@example.com", "Los Angeles"),
(3, "Mike Johnson", "mike@example.com", "Chicago"),
(4, "Lisa Brown", "lisa@example.com", "Houston"),
(5, "David Wilson", "david@example.com", "Phoenix")
]
# Save as initial table
customers_df = spark.createDataFrame(base_customers, ["id", "name", "email", "city"])
customers_df.write.format("delta").mode("overwrite").saveAsTable("customers")
print("Initial customer data created:")
display(spark.table("customers"))
# Create updates (some new, some modified records)
customer_updates = [
(3, "Michael Johnson", "michael@example.com", "Chicago"), # Modified
(4, "Lisa Brown", "lisa@example.com", "Dallas"), # Modified
(6, "Sarah Lee", "sarah@example.com", "Miami"), # New
(7, "Robert Chen", "robert@example.com", "Seattle") # New
]
updates_df = spark.createDataFrame(customer_updates, ["id", "name", "email", "city"])
updates_df.createOrReplaceTempView("customer_updates")
print("Customer updates:")
display(updates_df)
# Approach 1: Using MERGE
spark.sql("""
MERGE INTO customers target
USING customer_updates source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.email = source.email,
target.city = source.city
WHEN NOT MATCHED THEN
INSERT (id, name, email, city)
VALUES (source.id, source.name, source.email, source.city)
""")
print("After MERGE operation:")
display(spark.table("customers"))
# Approach 2: CREATE OR REPLACE TABLE with partitioning
# Create a new dataset with dates for partitioning
customers_with_dates = []
start_date = datetime(2023, 1, 1)
for i in range(1, 8):
if i <= 5:
date = (start_date + timedelta(days=i)).strftime("%Y-%m-%d")
customers_with_dates.append((i, f"Customer {i}", f"customer{i}@example.com", f"City {i}", date))
# Create partitioned table
partitioned_df = spark.createDataFrame(
customers_with_dates,
["id", "name", "email", "city", "registration_date"]
)
# Save as partitioned table
partitioned_df.write.format("delta") \
.partitionBy("registration_date") \
.mode("overwrite") \
.saveAsTable("customers_partitioned")
print("Partitioned customer table:")
display(spark.table("customers_partitioned"))
# Show partitions
print("Table partitions:")
display(spark.sql("SHOW PARTITIONS customers_partitioned"))
# Create updates for specific partition
partition_updates = [
(3, "Customer 3 Updated", "customer3new@example.com", "New City 3", "2023-01-03"),
(8, "Customer 8 New", "customer8@example.com", "City 8", "2023-01-03")
]
partition_updates_df = spark.createDataFrame(
partition_updates,
["id", "name", "email", "city", "registration_date"]
)
# Overwrite specific partition
spark.sql("""
INSERT OVERWRITE TABLE customers_partitioned
PARTITION (registration_date = '2023-01-03')
SELECT id, name, email, city, registration_date
FROM partition_updates_df
""")
print("After partition overwrite:")
display(spark.table("customers_partitioned"))
# Approach 3: COPY INTO for idempotent loads
# Create files to be loaded
incremental_data = [
(9, "New Customer 9", "customer9@example.com", "Seattle"),
(10, "New Customer 10", "customer10@example.com", "Portland")
]
incremental_df = spark.createDataFrame(incremental_data, ["id", "name", "email", "city"])
incremental_df.write.format("csv").option("header", "true").mode("overwrite").save("/tmp/incremental_loads/batch1")
# Second batch with some overlap
incremental_data2 = [
(10, "New Customer 10", "customer10@example.com", "Portland"), # Duplicate
(11, "New Customer 11", "customer11@example.com", "Boston")
]
incremental_df2 = spark.createDataFrame(incremental_data2, ["id", "name", "email", "city"])
incremental_df2.write.format("csv").option("header", "true").mode("overwrite").save("/tmp/incremental_loads/batch2")
# Create target table
spark.sql("CREATE TABLE IF NOT EXISTS copy_into_target (id INT, name STRING, email STRING, city STRING)")
# Load data using COPY INTO
spark.sql("""
COPY INTO copy_into_target
FROM '/tmp/incremental_loads'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
PATTERN = '*/batch*'
""")
print("After COPY INTO operation:")
display(spark.table("copy_into_target"))
# Run second time to demonstrate idempotency
spark.sql("""
COPY INTO copy_into_target
FROM '/tmp/incremental_loads'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
PATTERN = '*/batch*'
""")
print("After second COPY INTO operation (should be idempotent):")
display(spark.table("copy_into_target"))
SHOW HISTORY delta_tableDESCRIBE HISTORY delta_tableSELECT HISTORY FROM delta_tableDISPLAY OPERATIONS delta_tableCLEANPURGEVACUUMREMOVECREATE OR REPLACE TABLE and INSERT OVERWRITE?
CREATE OR REPLACE TABLE can change the schema, INSERT OVERWRITE preserves itINSERT OVERWRITE supports condition-based updates, CREATE OR REPLACE TABLE doesn’tCREATE OR REPLACE TABLE is only for managed tables, INSERT OVERWRITE works with bothMERGE operation in Delta Lake?
COPY INTO for data loading?
MERGE for updating existing records@dlt.expect_or_drop annotation do?
DESCRIBE HISTORY delta_tableVACUUMCREATE OR REPLACE TABLE can change the schema, INSERT OVERWRITE preserves itDatabricks Jobs provide a way to orchestrate and schedule your data processing workflows in a production environment. They allow you to automate notebook execution, specify dependencies between tasks, and set up monitoring and alerting.
Jobs in Databricks consist of several components:
Modern data pipelines often consist of multiple interconnected steps that need to be executed in a specific order. Databricks Jobs support this through multi-task workflows.
# Pseudocode representation of a multi-task workflow
job = {
"name": "Sales Processing Pipeline",
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/path/to/extract_notebook"
},
"job_cluster_key": "job_cluster"
},
{
"task_key": "transform_data",
"notebook_task": {
"notebook_path": "/path/to/transform_notebook"
},
"job_cluster_key": "job_cluster",
"depends_on": [
{"task_key": "extract_data"}
]
},
{
"task_key": "load_data",
"notebook_task": {
"notebook_path": "/path/to/load_notebook"
},
"job_cluster_key": "job_cluster",
"depends_on": [
{"task_key": "transform_data"}
]
}
]
}
The depends_on property specifies that a task should only run after its dependencies have completed successfully.
When a multi-task job runs:
Databricks jobs can be scheduled to run automatically using CRON expressions.
A CRON expression consists of 5-6 fields that define when a job should run:
┌────────── minute (0 - 59)
│ ┌──────── hour (0 - 23)
│ │ ┌────── day of month (1 - 31)
│ │ │ ┌──── month (1 - 12)
│ │ │ │ ┌── day of week (0 - 6) (Sunday = 0)
│ │ │ │ │
* * * * *
Common CRON patterns:
0 0 * * * - Run at midnight every day0 */6 * * * - Run every 6 hours0 8 * * 1-5 - Run at 8 AM on weekdays0 0 1 * * - Run at midnight on the first day of each monthWhen creating a job, you can specify a schedule using either a CRON expression or a simpler UI-based approach that generates the CRON expression for you.
Monitoring job execution and setting up alerts for failures are crucial for maintaining reliable data pipelines.
Databricks provides several ways to monitor jobs:
Databricks supports various notification mechanisms:
For transient failures, you can set up retry policies:
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/path/to/extract_notebook"
},
"retry_on_failure": {
"max_retries": 3,
"min_duration_between_retries_seconds": 60
}
}
This configuration will retry the task up to 3 times with a 60-second delay between retries if it fails.
Let’s build a complete production pipeline that performs the following steps:
For this exercise, we’ll create three notebooks and link them in a job:
# Notebook: extract_sales_data
# This notebook extracts daily sales data and performs initial processing
# Log the start of the process
import datetime
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Starting sales data extraction at {current_time}")
# Create sample sales data (in a real scenario, you would extract from a source)
from pyspark.sql import functions as F
import random
from datetime import datetime, timedelta
# Generate sales data for yesterday
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
products = ["Product A", "Product B", "Product C", "Product D", "Product E"]
regions = ["North", "South", "East", "West", "Central"]
# Generate 100 random sales records
data = []
for i in range(100):
product = random.choice(products)
region = random.choice(regions)
quantity = random.randint(1, 10)
unit_price = round(random.uniform(10, 100), 2)
total = round(quantity * unit_price, 2)
data.append((yesterday, product, region, quantity, unit_price, total))
# Create DataFrame
columns = ["date", "product", "region", "quantity", "unit_price", "total_amount"]
sales_df = spark.createDataFrame(data, columns)
# Save as a Delta table, partitioned by date
sales_df.write.format("delta").partitionBy("date").mode("append").saveAsTable("daily_sales")
# Verify the data was written
count = spark.table("daily_sales").filter(F.col("date") == yesterday).count()
print(f"Successfully extracted {count} sales records for {yesterday}")
# Set a notebook parameter to pass to downstream tasks
dbutils.jobs.taskValues.set(key = "sales_date", value = yesterday)
dbutils.jobs.taskValues.set(key = "record_count", value = count)
print("Data extraction complete")
# Notebook: compute_sales_metrics
# This notebook computes key sales metrics based on the extracted data
# Get the sales date from the upstream task
sales_date = dbutils.jobs.taskValues.get(taskKey = "extract_sales_data", key = "sales_date")
record_count = dbutils.jobs.taskValues.get(taskKey = "extract_sales_data", key = "record_count")
print(f"Computing sales metrics for {sales_date} with {record_count} records")
# Read the daily sales data
daily_sales = spark.table("daily_sales").filter(f"date = '{sales_date}'")
# Compute product-level metrics
product_metrics = daily_sales.groupBy("product").agg(
F.sum("quantity").alias("total_quantity"),
F.sum("total_amount").alias("total_revenue"),
F.avg("unit_price").alias("average_price")
).orderBy(F.col("total_revenue").desc())
# Save product metrics
product_metrics.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("product_metrics_daily")
# Compute region-level metrics
region_metrics = daily_sales.groupBy("region").agg(
F.countDistinct("product").alias("product_count"),
F.sum("quantity").alias("total_quantity"),
F.sum("total_amount").alias("total_revenue")
).orderBy(F.col("total_revenue").desc())
# Save region metrics
region_metrics.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("region_metrics_daily")
# Compute overall metrics for dashboard
overall_metrics = {
"date": sales_date,
"total_sales": daily_sales.select(F.sum("total_amount")).first()[0],
"total_units": daily_sales.select(F.sum("quantity")).first()[0],
"avg_order_value": daily_sales.select(F.avg("total_amount")).first()[0],
"top_product": product_metrics.first()["product"],
"top_region": region_metrics.first()["region"]
}
# Convert to DataFrame and save
metrics_df = spark.createDataFrame([overall_metrics])
metrics_df.write.format("delta").mode("append").saveAsTable("sales_overall_metrics")
# Pass metrics to the next task
dbutils.jobs.taskValues.set(key = "total_sales", value = overall_metrics["total_sales"])
dbutils.jobs.taskValues.set(key = "top_product", value = overall_metrics["top_product"])
dbutils.jobs.taskValues.set(key = "top_region", value = overall_metrics["top_region"])
print("Metrics computation complete")
# Notebook: update_dashboard
# This notebook updates a dashboard and sends a notification
# Get values from upstream tasks
sales_date = dbutils.jobs.taskValues.get(taskKey = "extract_sales_data", key = "sales_date")
total_sales = dbutils.jobs.taskValues.get(taskKey = "compute_sales_metrics", key = "total_sales")
top_product = dbutils.jobs.taskValues.get(taskKey = "compute_sales_metrics", key = "top_product")
top_region = dbutils.jobs.taskValues.get(taskKey = "compute_sales_metrics", key = "top_region")
print(f"Updating dashboard for {sales_date}")
print(f"Total sales: ${total_sales:.2f}")
print(f"Top product: {top_product}")
print(f"Top region: {top_region}")
# In a real scenario, you would use the Databricks API to refresh a dashboard
# or update an external BI tool
# Create a summary for notification
summary = f"""
Sales Report for {sales_date}
---------------------------
Total Sales: ${float(total_sales):.2f}
Top Product: {top_product}
Top Region: {top_region}
The complete report is available in the Databricks dashboard.
"""
# In a real scenario, you would send an email notification
# For this exercise, we'll just print the notification
print("Notification would be sent with the following content:")
print(summary)
print("Dashboard update and notification complete")
To set up this workflow as a job:
This creates a complete ETL pipeline that runs daily, processes sales data, and updates dashboards with the results.
Data governance encompasses the policies, procedures, and standards that ensure data is managed as a valuable organizational asset. It focuses on data availability, usability, integrity, and security.
Unity Catalog is Databricks’ solution for unified data governance across clouds and regions. It provides a centralized way to manage access to data, ML models, and analytics objects.
Metastore:
Catalog:
Unity Catalog uses a three-level namespace to address objects:
catalog.schema.object
For example:
-- Querying a table using the three-level namespace
SELECT * FROM marketing.customer_data.transactions;
Where:
marketing is the catalogcustomer_data is the schema (database)transactions is the tableUnity Catalog implements fine-grained access control to data and analytical assets.
Securables are objects to which permissions can be granted:
Unity Catalog follows these principles:
-- Grant permission to read data
GRANT SELECT ON TABLE catalog.schema.table TO user@example.com;
-- Grant permission to modify data
GRANT MODIFY ON TABLE catalog.schema.table TO group sales_analysts;
-- Grant permission to manage a schema
GRANT USAGE, CREATE ON SCHEMA catalog.schema TO group data_engineers;
-- Grant permission to access all tables in a schema
GRANT SELECT ON SCHEMA catalog.schema TO group analysts;
A service principal is an identity used by automated processes, applications, or services, rather than people.
Service principals are created in the Databricks account console and can be assigned permissions like human users:
-- Grant a service principal access to a table
GRANT SELECT ON TABLE catalog.schema.table TO `service-principal://etl-process`;
-- Use service principal for a connection
CREATE CONNECTION my_connection
TYPE MYSQL
PROPERTIES (
host = 'myhost.example.com',
port = '3306',
user = 'myuser',
password = 'supersecret'
)
WITH USER SERVICE PRINCIPAL `service-principal://etl-process`;
Unity Catalog works with specific cluster security modes:
Let’s implement a comprehensive data governance structure for a fictional organization:
# This is a demonstration of Unity Catalog concepts
# Note: Some commands require appropriate permissions to execute
# 1. Create a catalog structure for different departments
spark.sql("CREATE CATALOG IF NOT EXISTS marketing")
spark.sql("CREATE CATALOG IF NOT EXISTS finance")
spark.sql("CREATE CATALOG IF NOT EXISTS hr")
# 2. Create schemas within each catalog
spark.sql("CREATE SCHEMA IF NOT EXISTS marketing.customer_data")
spark.sql("CREATE SCHEMA IF NOT EXISTS marketing.campaign_analytics")
spark.sql("CREATE SCHEMA IF NOT EXISTS finance.transactions")
spark.sql("CREATE SCHEMA IF NOT EXISTS finance.reporting")
spark.sql("CREATE SCHEMA IF NOT EXISTS hr.employee_records")
# 3. Create sample tables with different security requirements
# Marketing customer data
spark.sql("""
CREATE TABLE IF NOT EXISTS marketing.customer_data.profiles (
customer_id INT,
name STRING,
email STRING,
signup_date DATE,
last_purchase_date DATE,
total_spend DOUBLE
)
COMMENT 'Customer profile information'
""")
# Finance transaction data (sensitive)
spark.sql("""
CREATE TABLE IF NOT EXISTS finance.transactions.orders (
order_id INT,
customer_id INT,
order_date DATE,
amount DOUBLE,
payment_method STRING,
status STRING
)
COMMENT 'Financial transaction records - Sensitive data'
""")
# HR employee data (highly sensitive)
spark.sql("""
CREATE TABLE IF NOT EXISTS hr.employee_records.employees (
employee_id INT,
name STRING,
hire_date DATE,
department STRING,
salary DOUBLE,
manager_id INT
)
COMMENT 'Employee records - Highly sensitive data'
""")
# 4. Insert some sample data
spark.sql("""
INSERT INTO marketing.customer_data.profiles VALUES
(1, 'John Doe', 'john@example.com', '2022-01-15', '2023-02-20', 1250.50),
(2, 'Jane Smith', 'jane@example.com', '2022-03-10', '2023-03-05', 876.25),
(3, 'Bob Johnson', 'bob@example.com', '2022-05-22', '2023-01-30', 543.75)
""")
spark.sql("""
INSERT INTO finance.transactions.orders VALUES
(101, 1, '2023-02-20', 250.50, 'CREDIT_CARD', 'COMPLETED'),
(102, 2, '2023-03-05', 876.25, 'PAYPAL', 'COMPLETED'),
(103, 3, '2023-01-30', 543.75, 'BANK_TRANSFER', 'COMPLETED'),
(104, 1, '2023-04-10', 125.00, 'CREDIT_CARD', 'PENDING')
""")
spark.sql("""
INSERT INTO hr.employee_records.employees VALUES
(1001, 'Sarah Wilson', '2020-05-10', 'Marketing', 85000.00, 1005),
(1002, 'Mark Davis', '2021-02-15', 'Finance', 92000.00, 1006),
(1003, 'Lisa Brown', '2019-11-20', 'HR', 78000.00, 1007)
""")
# 5. Create user groups (in real scenario, these would be created in account console)
# For demonstration purposes only
print("In a real deployment, you would create these groups in the account console:")
print("- marketing_team")
print("- finance_team")
print("- hr_team")
print("- data_analysts")
print("- data_engineers")
print("- executives")
# 6. Set up access control permissions
# Marketing team access
print("""
-- Grant marketing team access
GRANT USAGE ON CATALOG marketing TO GROUP marketing_team;
GRANT SELECT ON SCHEMA marketing.customer_data TO GROUP marketing_team;
GRANT SELECT, MODIFY ON SCHEMA marketing.campaign_analytics TO GROUP marketing_team;
""")
# Finance team access
print("""
-- Grant finance team access
GRANT USAGE ON CATALOG finance TO GROUP finance_team;
GRANT SELECT, MODIFY ON SCHEMA finance.transactions TO GROUP finance_team;
GRANT SELECT, MODIFY ON SCHEMA finance.reporting TO GROUP finance_team;
""")
# HR team access
print("""
-- Grant HR team access
GRANT USAGE ON CATALOG hr TO GROUP hr_team;
GRANT SELECT, MODIFY ON SCHEMA hr.employee_records TO GROUP hr_team;
""")
# Data analysts - limited access across departments
print("""
-- Grant data analysts limited access
GRANT USAGE ON CATALOG marketing TO GROUP data_analysts;
GRANT USAGE ON CATALOG finance TO GROUP data_analysts;
GRANT SELECT ON SCHEMA marketing.customer_data TO GROUP data_analysts;
GRANT SELECT ON TABLE finance.reporting.financial_metrics TO GROUP data_analysts;
-- Note: Explicitly deny access to sensitive HR data
DENY SELECT ON CATALOG hr TO GROUP data_analysts;
""")
# Data engineers - broader access
print("""
-- Grant data engineers broader access
GRANT USAGE ON CATALOG marketing, finance TO GROUP data_engineers;
GRANT SELECT, MODIFY ON SCHEMA marketing.customer_data TO GROUP data_engineers;
GRANT SELECT, MODIFY ON SCHEMA finance.reporting TO GROUP data_engineers;
GRANT SELECT ON SCHEMA finance.transactions TO GROUP data_engineers;
-- Limited HR access
GRANT USAGE ON CATALOG hr TO GROUP data_engineers;
GRANT SELECT ON SCHEMA hr.employee_records TO GROUP data_engineers;
""")
# Executives - read-only access to specific dashboards and reports
print("""
-- Grant executives read-only access
GRANT USAGE ON CATALOG marketing, finance, hr TO GROUP executives;
GRANT SELECT ON TABLE marketing.campaign_analytics.campaign_performance TO GROUP executives;
GRANT SELECT ON TABLE finance.reporting.financial_metrics TO GROUP executives;
GRANT SELECT ON TABLE hr.employee_records.headcount_summary TO GROUP executives;
""")
# 7. Create a service principal for automated ETL processes
print("""
-- In a real scenario, create a service principal in the account console
-- service-principal://etl-process
-- Grant appropriate permissions
GRANT USAGE ON CATALOG marketing, finance, hr TO `service-principal://etl-process`;
GRANT SELECT, MODIFY ON SCHEMA marketing.customer_data TO `service-principal://etl-process`;
GRANT SELECT, MODIFY ON SCHEMA finance.transactions TO `service-principal://etl-process`;
GRANT SELECT, MODIFY ON SCHEMA hr.employee_records TO `service-principal://etl-process`;
""")
# 8. Demonstrate querying with the three-level namespace
# Query marketing data
display(spark.sql("SELECT * FROM marketing.customer_data.profiles"))
# Query finance data
display(spark.sql("SELECT * FROM finance.transactions.orders"))
# Query HR data
display(spark.sql("SELECT * FROM hr.employee_records.employees"))
# 9. Create a row-level security policy on sensitive data
# This is a simplified example of how you might implement RLS
print("""
-- Create a view with row-level security for salary data
CREATE OR REPLACE VIEW hr.employee_records.employee_details AS
SELECT
employee_id,
name,
hire_date,
department,
-- Only show salary to HR team and the employee's manager
CASE
WHEN is_member('hr_team') OR current_user() IN (
SELECT DISTINCT manager_email
FROM hr.employee_records.manager_directory
WHERE employee_id = e.employee_id
)
THEN salary
ELSE NULL
END as salary,
manager_id
FROM hr.employee_records.employees e
""")
30 14 * * 1-530 2 * * 1-514 30 * * 1-530 2 * * 0-4dbutils.jobs.taskValues API30 14 * * 1-5dbutils.jobs.taskValues APIDatabricks Jobs provide a robust framework for automating, scheduling, and orchestrating data workflows in production environments.
Multi-task workflows with dependencies enable complex data pipelines with clearly defined execution sequences.
CRON expressions allow flexible scheduling of jobs based on time, day, month, and other parameters.
Monitoring, alerting, and retry policies are essential components of production pipelines to ensure reliability.
Unity Catalog provides centralized data governance across cloud environments using a three-level namespace (catalog.schema.object).
Service principals should be used for automated processes rather than user accounts, improving security and auditability.
Access control should follow the principle of least privilege, granting users only the permissions they need to perform their roles.
Organizing data by business unit across different catalogs simplifies access control and improves manageability.
Key Concepts:
The Databricks Lakehouse Platform combines the best features of data warehouses and data lakes, providing a unified platform for all data workloads.
Lakehouse Architecture:
Medallion Architecture:
Databricks Environment Components:
Databricks Repos and Version Control:
Data Extraction:
Data References:
df.createOrReplaceTempView("view_name")df.createOrReplaceGlobalTempView("name")WITH clause in SQLData Transformation Techniques:
df.filter() or WHERE clause in SQLdf.select() or SELECT in SQLdf.withColumn() or expressions in SQLdf.groupBy().agg() or GROUP BY in SQLData Deduplication:
df.distinct() or SELECT DISTINCT in SQLdf.dropDuplicates(["col1", "col2"])ROW_NUMBER() with PARTITION BYComplex Data Types:
to_timestamp(), date_format()from_json(), dot notation for accessing nested fieldsexplode(), array_contains(), flatten()SQL User-Defined Functions:
CREATE OR REPLACE FUNCTION function_name(param_type) RETURNS return_type RETURN expressionSELECT function_name(column) FROM tableControl Flow:
CASE WHEN statements for conditional logicDelta Lake Fundamentals:
Table Management:
Delta Operations:
DESCRIBE HISTORY table_nameSELECT * FROM table_name VERSION AS OF version_numberOPTIMIZE table ZORDER BY (column_name)VACUUM table_name RETAIN num_hours HOURSIncremental Data Loading Patterns:
CREATE OR REPLACE TABLE or INSERT OVERWRITEMERGE INTO target USING source ON conditionCOPY INTO target FROM sourceDelta Live Tables (DLT):
APPLY CHANGES INTODatabricks Jobs:
Data Governance with Unity Catalog:
Best Practices:
VACUUM permanently removes files, not just marks them for deletionOPTIMIZE compacts small files but doesn’t affect data organization without ZORDER BYRESTORE to revert a tableMERGE is for upserts (update + insert)COPY INTO is for idempotent new file ingestionINSERT OVERWRITE preserves schema but replaces all datadbutils.jobs.taskValues APINow let’s start our first full-length practice exam to assess your knowledge. This exam contains 45 questions covering all sections of the certification. You have 90 minutes to complete it.
%include command%run command%import command%execute commandoption("header", "true")option("hasHeaders", "true")option("firstRowHeader", "true")option("includeHeader", "true")createOrReplaceTempView()?
EXTRACT(YEAR FROM timestamp_column)year(timestamp_column)get_year(timestamp_column)timestamp_column.getYear()distinct(["column1", "column2"])dropDuplicates(["column1", "column2"])removeDuplicates(["column1", "column2"])uniqueRows(["column1", "column2"])SELECT column_name, COUNT(*) as count
FROM table_name
GROUP BY column_name
HAVING COUNT(*) > 1
split()explode()flatten()array_to_rows()parse_json()from_json()cast_json()json_to_struct()TRANSPOSEROTATEPIVOTCROSS TABCREATE FUNCTION name(param type) RETURNS type AS expressionCREATE OR REPLACE FUNCTION name(param type) RETURNS type RETURN expressionDEFINE FUNCTION name(param type) RETURNS type AS expressionCREATE UDF name(param type) RETURN expressionCASE statement to handle null values?
CASE WHEN column IS NULL THEN 'Unknown' ELSE column ENDCASE WHEN column = NULL THEN 'Unknown' ELSE column ENDCASE NULL THEN 'Unknown' ELSE column ENDCASE column WHEN NULL THEN 'Unknown' ELSE column ENDSELECT COUNT(DISTINCT product_id) = COUNT(*) FROM tableSELECT COUNT(*) FROM (SELECT product_id, COUNT(*) FROM table GROUP BY product_id HAVING COUNT(*) > 1)SELECT product_id FROM table GROUP BY product_id HAVING COUNT(*) = 1SELECT DISTINCT product_id FROM table WHERE product_id IS NOT NULLSHOW HISTORY table_nameDESCRIBE HISTORY table_nameSELECT HISTORY FROM table_nameDISPLAY OPERATIONS table_nameVACUUM command do in Delta Lake?
ALTER TABLE table_name ADD COLUMN tax DOUBLE DEFAULT price * 0.07ALTER TABLE table_name ADD COLUMN tax DOUBLE GENERATED ALWAYS AS (price * 0.07)CREATE FUNCTION calculate_tax(price DOUBLE) RETURNS DOUBLE RETURN price * 0.07UPDATE table_name SET tax = price * 0.07CREATE OR REPLACE TABLE and INSERT OVERWRITE?
CREATE OR REPLACE TABLE can change the schema, while INSERT OVERWRITE preserves itINSERT OVERWRITE supports partitioned tables, while CREATE OR REPLACE TABLE doesn’tCREATE OR REPLACE TABLE is atomic, while INSERT OVERWRITE isn’tINSERT OVERWRITE can target specific partitions, while CREATE OR REPLACE TABLE affects the entire tableMERGE command in Delta Lake?
COPY INTO command idempotent?
@dlt.expect_or_drop constraint do in Delta Live Tables?
APPLY CHANGES INTO operation in Delta Live Tables?
30 3 * * 130 3 * * 03 30 * * 13 30 * * MONdbutils.jobs.taskValues APIretry_on_failure: { max_retries: 3, min_duration_between_retries_seconds: 60 }auto_retry: { attempts: 3, delay_seconds: 60 }failure_handling: { retry_attempts: 3, retry_delay: 60 }error_recovery: { retries: 3, delay: 60 }workspace.catalog.tablecatalog.schema.tablemetastore.database.tableorganization.catalog.tableGRANT READ ON TABLE catalog.schema.table TO user@example.comGRANT SELECT ON TABLE catalog.schema.table TO user@example.comGRANT ACCESS ON TABLE catalog.schema.table TO user@example.comGRANT QUERY ON TABLE catalog.schema.table TO user@example.comNow let’s review the answers to the first mock exam and provide explanations for each question:
%run command allows you to execute one notebook from within another notebook, making the variables and functions from the called notebook available in the calling notebook.option("header", "true") to tell Spark to treat the first row as column names.createOrReplaceTempView() are only available within the current Spark session and are deleted when the session ends.WITH clause allow you to define auxiliary statements that can be referenced within a larger query.year() function extracts the year component from a timestamp column in Spark SQL.dropDuplicates() function removes duplicate records based on the specified columns.column_name and finds groups with more than one record, which identifies duplicate values in that column.explode() function transforms each element in an array column into a separate row, maintaining all other columns.from_json() function parses a JSON string into a structured column based on a provided schema.PIVOT keyword in Spark SQL converts data from a tall (normalized) format to a wide format.IS NULL operator, not = NULL.DESCRIBE HISTORY command shows the history of operations performed on a Delta table.VACUUM command permanently removes files that are no longer needed by the table based on the retention period.CREATE OR REPLACE TABLE can create a table with a different schema, while INSERT OVERWRITE preserves the existing schema and only replaces the data.MERGE command allows you to perform both update and insert operations in a single atomic transaction, commonly known as an upsert.COPY INTO command tracks which files have been loaded and automatically skips them in subsequent runs, making it idempotent.@dlt.expect_or_drop annotation drops individual records that violate the specified constraint.APPLY CHANGES INTO operation processes Change Data Capture (CDC) data and applies the changes (inserts, updates, deletes) to a target table.minute hour day-of-month month day-of-week. 30 3 * * 1 means 3:30 AM every Monday (1 represents Monday).dbutils.jobs.taskValues API allows tasks to share values with downstream tasks in a multi-task workflow.catalog.schema.table.GRANT SELECT command gives a user permission to query (read) a table.Now let’s take a second full-length practice exam to further assess your knowledge and readiness for the certification.
%sql or %pythonimport statement%run command%include commandsource() functionspark.read.csv("path/to/file.csv", header=True)spark.read.format("csv").option("header", "true").load("path/to/file.csv")spark.read.csv("path/to/file.csv", firstRowAsHeader=True)spark.read.format("csv").option("firstRow", "header").load("path/to/file.csv")SELECT customer_id, COUNT(DISTINCT email) FROM customers GROUP BY customer_id HAVING COUNT(DISTINCT email) > 1SELECT customer_id, email FROM customers WHERE email IS NOT NULLSELECT DISTINCT customer_id, email FROM customersSELECT customer_id FROM customers GROUP BY customer_id HAVING COUNT(*) > 1CAST(column AS TIMESTAMP)to_timestamp(column)timestamp(column)convert_to_timestamp(column)day_of_week(date_column)dayofweek(date_column)extract(DOW FROM date_column)date_column.dayextract_pattern()regex_extract()regexp_extract()pattern_match()SELECT customer.name FROM customersSELECT name FROM customers.customerSELECT customer->name FROM customersSELECT customers.customer.name FROM customerscontains(array, value)array_contains(array, value)has_value(array, value)exists(array, value)json_parse(json_column, schema)from_json(json_column, schema)parse_json(json_column, schema)to_struct(json_column, schema)PIVOT clause in SQL?
CREATE FUNCTION function_name(param type) RETURNS type AS expressionCREATE OR REPLACE FUNCTION function_name(param type) RETURNS type RETURN expressionDEFINE FUNCTION function_name(param type) AS expressionCREATE UDF function_name(param type) RETURNS type USING expressionDESCRIBE FORMATTED table_name and checking the Type propertySELECT is_managed FROM table_nameSELECT * FROM table_name.history(3)SELECT * FROM table_name VERSION AS OF 3SELECT * FROM table_name@v3SELECT * FROM table_name WHERE version = 3OPTIMIZE command do in Delta Lake?
INSERT OVERWRITE instead of MERGE?
COPY INTO command useful for data ingestion?
MERGE for large datasets15 * * * ** 15 * * **/15 * * * *0 */15 * * *DESCRIBE HISTORY commandGRANT command to assign permissions to users and groups%sql or %python
%sql, %python, %scala, and %r enable language switching within a single notebook.%run command
%run command allows you to execute one notebook from within another, making variables and functions defined in the called notebook available in the calling notebook.spark.read.format("csv").option("header", "true").load("path/to/file.csv")
SELECT customer_id, COUNT(DISTINCT email) FROM customers GROUP BY customer_id HAVING COUNT(DISTINCT email) > 1
to_timestamp(column)
to_timestamp() function is used to convert a string column to a timestamp datatype.dayofweek(date_column)
dayofweek() function extracts the day of the week from a date column.regexp_extract()
regexp_extract() is the correct function for extracting patterns from strings using regular expressions.SELECT customer.name FROM customers
array_contains(array, value)
array_contains() function checks whether an array column contains a specific value.from_json(json_column, schema)
from_json() function parses a JSON string into a structured column based on the provided schema.CREATE OR REPLACE FUNCTION function_name(param type) RETURNS type RETURN expression
DESCRIBE FORMATTED table_name and checking the Type property
DESCRIBE FORMATTED or DESCRIBE EXTENDED commands show detailed table information, including whether it’s managed or external.SELECT * FROM table_name VERSION AS OF 3
15 * * * *
GRANT command to assign permissions to users and groups
✅ Review all exam sections, especially those where you scored lower on practice exams
✅ Memorize key commands and syntax for common operations
✅ Understand the core concepts behind each technology rather than just memorizing facts
✅ Practice time management to ensure you can complete all questions
✅ Get a good night’s sleep before the exam
✅ Arrive early if taking the exam at a testing center, or ensure your environment is ready for online proctoring