The Databricks workspace is a web-based interface that provides:
DBML is a specialized runtime that includes:
To create a cluster with DBML:
Key Differences from Standard Runtime:
When configuring a Databricks cluster for ML workloads:
spark.databricks.io.cache.enabled: Set to true for improved data accessspark.sql.shuffle.partitions: Adjust based on dataset sizeDatabricks Repos allows you to:
Setting up Git integration:
Working with Repos:
Databricks notebooks provide:
Key Notebook Features:
%
%sql: Run SQL queries%md: Write markdown%run: Execute another notebook%pip: Install Python packagesdbutils.widgets commandsSpark Architecture:
Core Abstractions:
spark = SparkSession.builder.appName("ML Example").getOrCreate()
df = spark.read.format("csv").option("header", "true").load("/path/to/data.csv")
select(), filter()) are lazycount(), collect()) trigger execution# Repartition to 8 partitions
df_repartitioned = df.repartition(8)
Which Databricks runtime should you choose for machine learning workloads? a) Standard runtime b) Machine Learning runtime c) Genomics runtime d) Photon runtime
What is the primary purpose of Databricks Repos? a) Storing ML models b) Storing datasets c) Git-based version control d) User access management
Which command would you use to execute SQL code in a Python notebook?
a) execute_sql()
b) %sql
c) spark.sql()
d) Both b and c
Which of the following is NOT a supported language in Databricks notebooks? a) Python b) R c) JavaScript d) Scala
In Spark, which of the following is an example of an action?
a) select()
b) filter()
c) count()
d) orderBy()
AutoML automates the machine learning process, including:
Key Benefits:
AutoML Workflow:
Interpreting AutoML Results:
Feature Store provides:
Key Components:
Creating a Feature Table:
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# Create feature dataframe
features_df = spark.table("customer_data")
.select("customer_id", "recency", "frequency", "monetary")
# Create feature table
fs.create_table(
name="customer_features",
primary_keys=["customer_id"],
df=features_df,
description="Customer RFM features"
)
Feature Lookup for Training:
from databricks.feature_store import FeatureLookup
# Define feature lookup
feature_lookups = [
FeatureLookup(
table_name="customer_features",
feature_names=["recency", "frequency", "monetary"],
lookup_key=["customer_id"]
)
]
# Create training dataset with features
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label="churned"
)
# Get dataframe with features
training_df = training_set.load_df()
Batch Scoring with Feature Store:
predictions = fs.score_batch(
model_uri="models:/customer_churn/production",
df=batch_df
)
MLflow Tracking records:
Tracking Experiments:
import mlflow
# Start a run
with mlflow.start_run(run_name="gradient_boost"):
# Log parameters
mlflow.log_param("learning_rate", 0.1)
mlflow.log_param("max_depth", 5)
# Train model
model = train_model(learning_rate=0.1, max_depth=5)
# Log metrics
mlflow.log_metric("accuracy", 0.85)
mlflow.log_metric("f1_score", 0.82)
# Log model
mlflow.sklearn.log_model(model, "model")
Viewing Experiments:
The Model Registry provides:
Registering a Model:
# Register model from a run
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(model_uri, "customer_churn")
# Register model from a local path
registered_model = mlflow.register_model("file:///path/to/model", "customer_churn")
Managing Model Lifecycle:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn",
version=1,
stage="Staging"
)
# Add description
client.update_model_version(
name="customer_churn",
version=1,
description="Gradient boosting model with hyperparameter tuning"
)
Objective: Create an AutoML experiment for a classification problem and explore the results.
Steps:
# Load sample dataset
df = spark.read.table("default.diabetes")
# Display data overview
display(df.limit(5))
print(f"Dataset has {df.count()} rows and {len(df.columns)} columns")
# Check for missing values
from pyspark.sql.functions import col, count, isnan, when
missing_values = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
display(missing_values)
# Find the best run ID from AutoML experiment
from mlflow.tracking import MlflowClient
client = MlflowClient()
best_run = client.search_runs(
experiment_ids=["experiment_id"], # Replace with your experiment ID
filter_string="",
order_by=["metrics.accuracy DESC"],
max_results=1
)[0]
# Register the model
best_run_id = best_run.info.run_id
model_uri = f"runs:/{best_run_id}/model"
registered_model = mlflow.register_model(model_uri, "diabetes_predictor")
Which of the following is NOT a step in the AutoML workflow? a) Data exploration b) Feature engineering c) Manual hyperparameter tuning d) Model evaluation
What is the primary purpose of the Databricks Feature Store? a) To store raw data files b) To store ML model artifacts c) To manage and reuse feature transformations d) To provide version control for notebooks
When using MLflow, what is logged with mlflow.log_param()?
a) Model metrics like accuracy or RMSE
b) Configuration values like learning rate or max depth
c) Artifacts like plots or model files
d) Tags for organizing experiments
Which MLflow component allows you to manage models through different lifecycle stages? a) MLflow Tracking b) MLflow Projects c) MLflow Models d) MLflow Model Registry
In Feature Store, what is the purpose of a primary key in a feature table? a) To join features with the training data b) To encrypt sensitive feature data c) To sort features by importance d) To track feature versioning
Which statement about AutoML in Databricks is TRUE? a) AutoML only supports regression problems b) AutoML generates source code that can be modified c) AutoML requires GPU clusters to run d) AutoML automatically deploys models to production
What is a key benefit of using Databricks Runtime for ML over standard runtime? a) Lower cost per compute hour b) Pre-installed ML libraries and integrations c) Faster cluster startup time d) Support for SQL queries
Which method is used to create a training dataset with features from Feature Store?
a) fs.create_training_set()
b) fs.lookup_features()
c) fs.get_training_data()
d) fs.extract_features()
How can you transition a model in the Model Registry from Staging to Production?
a) Using the UI in the Models section
b) Using client.transition_model_version_stage()
c) Using mlflow.promote_model()
d) Both a and b
What does the Feature Store’s “feature lookup” functionality provide? a) A way to search for features by name b) A mechanism to join features to training data c) A method to calculate feature importance d) A tool to detect duplicate features
fs.create_training_set()Exploratory Data Analysis (EDA) is the critical first step in any machine learning workflow, helping you:
Basic DataFrame Exploration:
# Load data
df = spark.table("customer_data")
# Overview of dataset
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")
display(df.limit(5))
# Schema information
df.printSchema()
# Summary statistics
display(df.summary())
Using dbutils for Data Summarization:
# Comprehensive data profile
display(dbutils.data.summarize(df))
Statistical Analysis with Spark:
# Descriptive statistics
display(df.describe())
# Correlation analysis
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
numeric_cols = [field.name for field in df.schema.fields if field.dataType.typeName() in ["double", "integer", "float"]]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df_vector = assembler.transform(df).select("features")
matrix = Correlation.corr(df_vector, "features").collect()[0][0]
display(matrix.toArray())
Visualization with Databricks Display Functions:
# Histogram of numeric column
display(df.select("age"))
# Bar chart for categorical data
display(df.groupBy("category").count())
# Scatter plot between two variables
display(df.select("income", "spending"))
Advanced Visualizations with Python Libraries:
# Convert to pandas for visualization (small datasets only)
pandas_df = df.limit(10000).toPandas()
# Matplotlib example
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.hist(pandas_df['age'], bins=20)
plt.title('Age Distribution')
plt.xlabel('Age')
plt.ylabel('Count')
display(plt.gcf())
# Seaborn example
import seaborn as sns
plt.figure(figsize=(12, 8))
sns.heatmap(pandas_df[numeric_cols].corr(), annot=True, cmap='coolwarm')
plt.title('Correlation Matrix')
display(plt.gcf())
Check for Missing Values:
from pyspark.sql.functions import col, isnan, when, count
# Count missing values for each column
missing_values = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
display(missing_values)
# Calculate percentage of missing values
total_rows = df.count()
for column in df.columns:
missing_count = df.filter(col(column).isNull() | isnan(col(column))).count()
missing_percentage = (missing_count / total_rows) * 100
print(f"{column}: {missing_percentage:.2f}% missing")
Identify Duplicate Records:
# Count total duplicates
duplicate_count = df.count() - df.dropDuplicates().count()
print(f"Number of duplicate records: {duplicate_count}")
# Find examples of duplicates
duplicate_rows = df.exceptAll(df.dropDuplicates())
display(duplicate_rows.limit(5))
Check for Inconsistent Values:
# Check for inconsistent categories
display(df.groupBy("category").count().orderBy("count"))
# Check for outliers using box plot
display(df.select("income"))
Handle Duplicates:
# Remove all duplicate rows
df_clean = df.dropDuplicates()
# Remove duplicates based on specific columns
df_clean = df.dropDuplicates(["customer_id", "transaction_date"])
Filter Irrelevant Data:
# Remove rows with specific conditions
df_clean = df.filter(col("age") > 0)
df_clean = df_clean.filter(col("income").isNotNull())
# Filter out rows from before a certain date
from pyspark.sql.functions import to_date
df_clean = df_clean.filter(to_date(col("transaction_date"), "yyyy-MM-dd") >= "2020-01-01")
Basic Column Operations:
# Select specific columns
df_clean = df.select("customer_id", "age", "income", "spending")
# Rename columns
df_clean = df_clean.withColumnRenamed("spending", "monthly_spending")
# Add a new column
from pyspark.sql.functions import expr
df_clean = df_clean.withColumn("spending_ratio", expr("monthly_spending / income"))
# Drop columns
df_clean = df_clean.drop("temp_column", "internal_id")
Feature engineering is the process of transforming raw data into features that better represent the underlying problem, resulting in improved model performance. Good features:
Numeric Transformations:
from pyspark.sql.functions import log, sqrt, pow, round
# Log transformation (for skewed data)
df = df.withColumn("log_income", log(col("income")))
# Square root transformation
df = df.withColumn("sqrt_distance", sqrt(col("distance")))
# Polynomial features
df = df.withColumn("age_squared", pow(col("age"), 2))
# Binning numeric values
df = df.withColumn("income_group",
when(col("income") < 30000, "low")
.when(col("income") < 70000, "medium")
.otherwise("high"))
Date and Time Features:
from pyspark.sql.functions import year, month, dayofweek, hour, datediff, current_date
# Extract components from timestamp
df = df.withColumn("purchase_year", year(col("purchase_date")))
df = df.withColumn("purchase_month", month(col("purchase_date")))
df = df.withColumn("purchase_dayofweek", dayofweek(col("purchase_date")))
df = df.withColumn("purchase_hour", hour(col("purchase_time")))
# Calculate time differences
df = df.withColumn("days_since_purchase", datediff(current_date(), col("purchase_date")))
# Cyclical encoding for month (to capture seasonality)
from pyspark.sql.functions import sin, cos, lit, radians
df = df.withColumn("month_sin", sin(col("purchase_month") * 2 * 3.14159 / 12))
df = df.withColumn("month_cos", cos(col("purchase_month") * 2 * 3.14159 / 12))
Text Features:
from pyspark.sql.functions import length, split, size, lower, regexp_replace
# Basic text features
df = df.withColumn("text_length", length(col("description")))
df = df.withColumn("word_count", size(split(col("description"), " ")))
# Text cleaning
df = df.withColumn("clean_text",
regexp_replace(lower(col("description")), "[^a-zA-Z0-9\\s]", ""))
Aggregation Features:
# Group and aggregation
agg_df = df.groupBy("customer_id").agg(
count("transaction_id").alias("transaction_count"),
sum("amount").alias("total_spent"),
avg("amount").alias("avg_transaction"),
max("amount").alias("max_transaction")
)
# Join back to main dataset
df = df.join(agg_df, "customer_id")
Analyzing Missing Patterns:
# Visualize missing value patterns
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
# Convert to pandas for visualization (small datasets only)
pandas_df = df.limit(10000).toPandas()
# Create missing value map
plt.figure(figsize=(10, 6))
sns.heatmap(pandas_df.isnull(), cbar=False, yticklabels=False, cmap='viridis')
plt.title('Missing Value Pattern')
display(plt.gcf())
Dropping Missing Values:
# Drop rows with any missing values (use cautiously)
df_clean = df.na.drop()
# Drop rows where specific columns have missing values
df_clean = df.na.drop(subset=["income", "age"])
# Drop columns with high percentage of missing values
missing_percentages = {}
total_rows = df.count()
for column in df.columns:
missing_count = df.filter(col(column).isNull()).count()
missing_percentages[column] = (missing_count / total_rows) * 100
columns_to_drop = [col for col, pct in missing_percentages.items() if pct > 50]
df_clean = df.drop(*columns_to_drop)
Imputing Missing Values:
from pyspark.ml.feature import Imputer
# Create an imputer for numeric columns
numeric_cols = ["age", "income", "spending"]
imputer = Imputer(
inputCols=numeric_cols,
outputCols=[f"{col}_imputed" for col in numeric_cols]
).setStrategy("mean") # Options: "mean", "median", "mode"
# Fit and transform
imputer_model = imputer.fit(df)
df_imputed = imputer_model.transform(df)
# Manual imputation for categorical columns
from pyspark.sql.functions import lit
most_common_category = df.groupBy("category").count().orderBy("count", ascending=False).first()["category"]
df_imputed = df_imputed.withColumn(
"category",
when(col("category").isNull(), most_common_category).otherwise(col("category"))
)
Statistical Detection:
# Z-score method
from pyspark.sql.functions import avg, stddev
# Calculate z-scores
income_stats = df.select(
avg("income").alias("avg_income"),
stddev("income").alias("stddev_income")
).collect()[0]
df = df.withColumn("income_zscore",
(col("income") - income_stats["avg_income"]) / income_stats["stddev_income"])
# Filter outliers
df_no_outliers = df.filter(abs(col("income_zscore")) < 3)
IQR Method:
# Calculate Q1, Q3 and IQR
quantiles = df.stat.approxQuantile("income", [0.25, 0.75], 0.05)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1
# Define bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Filter outliers
df_no_outliers = df.filter((col("income") >= lower_bound) & (col("income") <= upper_bound))
Capping Outliers:
# Cap values at lower and upper bounds instead of removing
df_capped = df.withColumn(
"income_capped",
when(col("income") < lower_bound, lower_bound)
.when(col("income") > upper_bound, upper_bound)
.otherwise(col("income"))
)
Which of the following is the recommended first step when starting a machine learning project in Databricks? a) Feature engineering b) Model selection c) Exploratory data analysis d) Hyperparameter tuning
Which function is most appropriate for getting a quick overview of summary statistics for all numeric columns in a DataFrame?
a) df.describe()
b) df.summary()
c) dbutils.data.summarize(df)
d) df.stats()
When using the Imputer in Spark ML, which of these is NOT a valid strategy? a) “mean” b) “median” c) “mode” d) “zero”
What is the primary risk of using df.na.drop() without parameters?
a) It could drop all columns with any null values
b) It could drop all rows with any null values
c) It could drop the wrong column types
d) It only works on numeric columns
Which method would be most appropriate for handling outliers in a variable that follows a skewed distribution? a) Z-score method b) Log transformation before outlier detection c) Remove all values above the 95th percentile d) Simple capping at fixed values
Many ML algorithms are sensitive to the scale of numeric features. Scaling ensures all features contribute equally to model training.
Techniques using Spark ML:
from pyspark.ml.feature import StandardScaler, MinMaxScaler, MaxAbsScaler
from pyspark.ml.feature import VectorAssembler
# Prepare vector of features to scale
numeric_cols = ["age", "income", "spending", "tenure"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features_vector")
df_vector = assembler.transform(df)
# StandardScaler: (value - mean) / stddev
scaler = StandardScaler(
inputCol="features_vector",
outputCol="scaled_features",
withMean=True,
withStd=True
)
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)
# MinMaxScaler: (value - min) / (max - min)
min_max_scaler = MinMaxScaler(
inputCol="features_vector",
outputCol="scaled_features"
)
min_max_model = min_max_scaler.fit(df_vector)
df_min_max = min_max_model.transform(df_vector)
# MaxAbsScaler: value / max(abs(value))
max_abs_scaler = MaxAbsScaler(
inputCol="features_vector",
outputCol="scaled_features"
)
max_abs_model = max_abs_scaler.fit(df_vector)
df_max_abs = max_abs_model.transform(df_vector)
When to Use Each Scaling Method:
One-Hot Encoding:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Step 1: Convert strings to indices
indexer = StringIndexer(
inputCol="category",
outputCol="category_index",
handleInvalid="keep" # Options: "error", "skip", "keep"
)
df_indexed = indexer.fit(df).transform(df)
# Step 2: Apply one-hot encoding
encoder = OneHotEncoder(
inputCol="category_index",
outputCol="category_encoded"
)
df_encoded = encoder.fit(df_indexed).transform(df_indexed)
Handling High-Cardinality Categorical Features:
For features with many unique values (high cardinality), one-hot encoding can lead to too many dimensions. Alternatives include:
# 1. Target encoding (average of target by category)
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
# For binary classification
window_spec = Window.partitionBy("category")
df = df.withColumn(
"category_target_encoded",
avg("target").over(window_spec)
)
# 2. Frequency encoding (replace category with its frequency)
category_freq = df.groupBy("category").count()
df = df.join(category_freq, "category")
df = df.withColumn("category_freq_encoded", col("count") / df.count())
df = df.drop("count")
# 3. Hash encoding (for extremely high cardinality)
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(
inputCols=["category"],
outputCol="category_hashed",
numFeatures=50 # Number of buckets
)
df_hashed = hasher.transform(df)
Feature selection reduces dimensionality by selecting the most important features:
from pyspark.ml.feature import VectorSlicer, ChiSqSelector
from pyspark.ml.regression import LinearRegression
# Method 1: Using model-based feature importance
lr = LinearRegression(featuresCol="features_vector", labelCol="target")
lr_model = lr.fit(df_vector)
# Extract feature importance
import pandas as pd
feature_importance = pd.DataFrame({
'Feature': numeric_cols,
'Importance': [abs(coef) for coef in lr_model.coefficients]
})
feature_importance = feature_importance.sort_values('Importance', ascending=False)
display(feature_importance)
# Method 2: Chi-square selector (for classification)
selector = ChiSqSelector(
numTopFeatures=5,
featuresCol="features_vector",
outputCol="selected_features",
labelCol="target"
)
selector_model = selector.fit(df_vector)
df_selected = selector_model.transform(df_vector)
Pipelines chain multiple transformations and models into a single workflow, ensuring:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
# Define stages
# 1. Index categorical columns
categoricalCols = ["state", "gender", "occupation"]
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep")
for c in categoricalCols]
# 2. One-hot encode indexed categorical columns
encoders = [OneHotEncoder(inputCol=f"{c}_index", outputCol=f"{c}_encoded")
for c in categoricalCols]
# 3. Assemble all features into a vector
numericCols = ["age", "income", "credit_score"]
assembler_inputs = [f"{c}_encoded" for c in categoricalCols] + numericCols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_unscaled")
# 4. Scale features
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")
# 5. Define the model
lr = LogisticRegression(featuresCol="features", labelCol="target")
# Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, lr])
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Fit the pipeline
pipeline_model = pipeline.fit(train_df)
# Apply the fitted pipeline to test data
predictions = pipeline_model.transform(test_df)
# Evaluate results
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="target")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Save the pipeline model
pipeline_model.write().overwrite().save("/dbfs/ml/models/customer_churn_pipeline")
# Load the saved pipeline
from pyspark.ml import PipelineModel
loaded_pipeline = PipelineModel.load("/dbfs/ml/models/customer_churn_pipeline")
# Use the loaded pipeline
new_predictions = loaded_pipeline.transform(new_data)
Cross-validation helps estimate model performance more reliably by:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="target",
metricName="areaUnderROC" # Options: "areaUnderROC", "areaUnderPR"
)
# Create parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
# Create k-fold cross-validator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5, # Number of folds
seed=42
)
# Execute cross-validation and choose the best model
cv_model = cv.fit(train_df)
# Access the best model
best_model = cv_model.bestModel
# Make predictions using the best model
predictions = best_model.transform(test_df)
Grid Search:
# Exhaustive search over specified parameter values
# Already demonstrated in the cross-validation example
Random Search:
In Spark ML, random search isn’t built-in, but can be implemented by randomly sampling from the parameter space:
import random
from pyspark.ml.tuning import TrainValidationSplit
# Generate random parameter combinations
num_combinations = 10
reg_params = [random.uniform(0.001, 1.0) for _ in range(num_combinations)]
elastic_net_params = [random.uniform(0.0, 1.0) for _ in range(num_combinations)]
# Create parameter grid with random combinations
random_param_grid = [
{lr.regParam: reg_param, lr.elasticNetParam: elastic_param}
for reg_param, elastic_param in zip(reg_params, elastic_net_params)
]
# Use TrainValidationSplit with random parameter grid
tvs = TrainValidationSplit(
estimator=pipeline,
estimatorParamMaps=random_param_grid,
evaluator=evaluator,
trainRatio=0.8
)
tvs_model = tvs.fit(train_df)
Bayesian Optimization:
For Bayesian optimization, Databricks recommends using Hyperopt, which we’ll cover in more detail on Day 3.
Objective: Build a complete ML pipeline that performs data preprocessing, feature engineering, and model training for a customer churn prediction problem.
Steps:
# Load the dataset
df = spark.table("default.telco_customer_churn")
# Explore data
display(df.limit(5))
print(f"Total rows: {df.count()}")
print(f"Total columns: {len(df.columns)}")
# Check schema
df.printSchema()
# Summary statistics
display(df.summary())
# Check for missing values
from pyspark.sql.functions import col, when, count, isnan
missing_values = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
display(missing_values)
# Check target distribution
display(df.groupBy("Churn").count())
from pyspark.sql.functions import col, when
# Convert string values to appropriate types
df = df.withColumn("SeniorCitizen", col("SeniorCitizen").cast("double"))
df = df.withColumn("tenure", col("tenure").cast("double"))
df = df.withColumn("MonthlyCharges", col("MonthlyCharges").cast("double"))
df = df.withColumn("TotalCharges", col("TotalCharges").cast("double"))
# Handle missing values in TotalCharges
df = df.withColumn(
"TotalCharges",
when(col("TotalCharges").isNull(), col("MonthlyCharges") * col("tenure"))
.otherwise(col("TotalCharges"))
)
# Convert target to numeric
df = df.withColumn("label", when(col("Churn") == "Yes", 1.0).otherwise(0.0))
# Split into train and test
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
# Identify categorical and numeric columns
categorical_cols = [
"gender", "Partner", "Dependents", "PhoneService", "MultipleLines",
"InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection",
"TechSupport", "StreamingTV", "StreamingMovies", "Contract",
"PaperlessBilling", "PaymentMethod"
]
numeric_cols = ["SeniorCitizen", "tenure", "MonthlyCharges", "TotalCharges"]
# Create stages for the pipeline
# 1. String indexing for categorical features
indexers = [
StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
for col in categorical_cols
]
# 2. One-hot encoding for indexed categorical features
encoders = [
OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded")
for col in categorical_cols
]
# 3. Assemble features into a single vector
assembler_inputs = [f"{col}_encoded" for col in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_unscaled")
# 4. Scale features
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
# Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, lr])
# Create parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
# Create evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="label",
metricName="areaUnderROC"
)
# Create cross-validator
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3
)
# Train the model using cross-validation
print("Training model with cross-validation...")
cv_model = cv.fit(train_df)
# Get the best model
best_pipeline_model = cv_model.bestModel
# Get the best parameters
best_lr = best_pipeline_model.stages[-1]
print(f"Best regParam: {best_lr.getRegParam()}")
print(f"Best elasticNetParam: {best_lr.getElasticNetParam()}")
# Make predictions on test data
predictions = best_pipeline_model.transform(test_df)
# Evaluate model
auc = evaluator.evaluate(predictions)
print(f"AUC on test data: {auc}")
# Show metrics like precision, recall, f1-score
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
metrics = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]
for metric in metrics:
result = multi_evaluator.setMetricName(metric).evaluate(predictions)
print(f"{metric}: {result}")
# Confusion matrix
from pyspark.sql.functions import expr
confusion_matrix = predictions.groupBy("label").pivot("prediction").count().fillna(0)
display(confusion_matrix)
# Save the cross-validation model
cv_model.write().overwrite().save("/dbfs/ml/models/telco_churn_cv_model")
# Save the best pipeline model
best_pipeline_model.write().overwrite().save("/dbfs/ml/models/telco_churn_best_model")
# How to load the model later
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/dbfs/ml/models/telco_churn_best_model")
Which of the following scaling methods preserves the shape of the distribution while centering it around zero with a unit standard deviation? a) MinMaxScaler b) StandardScaler c) MaxAbsScaler d) Normalizer
When using StringIndexer in Spark ML, what does the “handleInvalid” parameter set to “keep” do? a) Keeps the original values unchanged b) Keeps the missing values as NA c) Assigns unknown categories to a special additional index d) Throws an error when new categories are encountered
In a Spark ML Pipeline, what is the difference between an Estimator and a Transformer? a) Estimators produce Transformers when fitted on data; Transformers directly transform data b) Estimators are used for regression; Transformers are used for classification c) Estimators handle categorical data; Transformers handle numeric data d) Estimators are used in training; Transformers are used in testing
When using CrossValidator with a ParamGrid containing 3 values for regParam and 2 values for maxIter, and setting numFolds to 5, how many models will be trained in total? a) 5 b) 6 c) 30 d) 60
Which method is most appropriate for encoding a categorical feature with thousands of unique values? a) One-Hot Encoding b) Feature Hashing c) StringIndexer d) Bucketizer
In Spark ML, what is the purpose of the VectorAssembler? a) It combines multiple columns into a single feature vector b) It calculates the principal components from a feature vector c) It splits a feature vector into multiple columns d) It standardizes features by removing the mean and scaling to unit variance
When scaling features, which of the following is TRUE? a) You should fit the scaler on the entire dataset before splitting b) You should fit the scaler on the test set and apply to the training set c) You should fit the scaler on the training set and apply to both training and test sets d) Scaling is only necessary for linear models, not tree-based models
Which hyperparameter tuning approach evaluates all possible combinations of the specified parameter values? a) Random search b) Grid search c) Bayesian optimization d) Genetic algorithms
What is the purpose of stratification when splitting data into training and testing sets? a) To ensure equal sizes of training and test sets b) To ensure similar distributions of the target variable in both sets c) To optimize the selection of features d) To minimize the impact of outliers
In a Spark ML Pipeline, what happens if a stage in the pipeline fails to transform the data? a) The pipeline continues with the next stage b) The pipeline returns the original dataset c) The pipeline throws an exception d) The pipeline retries the failed stage
Apache Spark ML is a unified machine learning library designed for distributed computing. It provides a high-level API built on top of DataFrames that helps you create and tune practical machine learning pipelines. Spark ML offers several advantages over traditional single-node ML frameworks:
The Spark ML architecture consists of several key components that work together:
DataFrame-Based API: Unlike the older RDD-based MLlib, Spark ML operates on DataFrames, which provide a more intuitive structure for ML workflows.
Pipeline API: Enables connecting multiple algorithms and data transformations into a single workflow that can be treated as a unit.
Parameter Handling: Consistent approach to algorithm parameterization using ParamMaps.
Persistence Layer: Allows saving and loading models and pipelines.
Distributed Matrix Operations: Optimized for large-scale linear algebra.
The Spark ML library (org.apache.spark.ml in Scala, pyspark.ml in Python) is organized into several subpackages:
pyspark.ml
├── classification (LogisticRegression, RandomForestClassifier, etc.)
├── clustering (KMeans, LDA, etc.)
├── evaluation (BinaryClassificationEvaluator, RegressionEvaluator, etc.)
├── feature (StandardScaler, PCA, StringIndexer, etc.)
├── recommendation (ALS)
├── regression (LinearRegression, GBTRegressor, etc.)
├── tuning (CrossValidator, TrainValidationSplit, etc.)
└── Pipeline, PipelineModel, Estimator, Transformer, etc.
Transformers implement a transform() method that converts one DataFrame into another by appending one or more columns.
Key Characteristics:
Common Examples:
Tokenizer: Splits text into wordsVectorAssembler: Combines multiple columns into a vectorSQLTransformer: Applies SQL transformationsExample Usage:
from pyspark.ml.feature import Tokenizer
# Create a transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
# Apply the transformer
transformed_df = tokenizer.transform(df)
Estimators implement a fit() method that trains a model on data and produces a Transformer (the model).
Key Characteristics:
Common Examples:
LogisticRegression: Produces a logistic regression modelStringIndexer: Learns string-to-index mappings from dataStandardScaler: Learns mean and standard deviation from dataExample Usage:
from pyspark.ml.feature import StandardScaler
# Create an estimator
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# Fit the estimator to produce a transformer (model)
scaler_model = scaler.fit(df)
# Use the transformer
scaled_df = scaler_model.transform(df)
| Estimator | Transformer | |
|---|---|---|
| Main Method | fit() |
transform() |
| Returns | A Transformer (model) | A transformed DataFrame |
| Stateful | Yes (learns from data) | No |
| Example | LogisticRegression() |
LogisticRegressionModel (result of fitting) |
Conceptual Relationship:
Estimator.fit(DataFrame) → Transformer
Transformer.transform(DataFrame) → DataFrame
Binary Classification: Predicting one of two classes (e.g., spam/not spam)
Multiclass Classification: Predicting one of multiple classes (e.g., digit recognition)
Multilabel Classification: Assigning multiple labels to each instance (e.g., tagging an article with multiple topics)
Logistic Regression:
from pyspark.ml.classification import LogisticRegression
# Create the model
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=10,
regParam=0.01,
elasticNetParam=0.8
)
# Train the model
lr_model = lr.fit(train_df)
# Make predictions
predictions = lr_model.transform(test_df)
# Examine coefficients
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
Decision Tree:
from pyspark.ml.classification import DecisionTreeClassifier
# Create the model
dt = DecisionTreeClassifier(
featuresCol="features",
labelCol="label",
maxDepth=5
)
# Train the model
dt_model = dt.fit(train_df)
# Make predictions
predictions = dt_model.transform(test_df)
# Examine feature importance
print("Feature Importances: " + str(dt_model.featureImportances))
Random Forest:
from pyspark.ml.classification import RandomForestClassifier
# Create the model
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=5,
seed=42
)
# Train the model
rf_model = rf.fit(train_df)
# Make predictions
predictions = rf_model.transform(test_df)
Gradient-Boosted Trees:
from pyspark.ml.classification import GBTClassifier
# Create the model
gbt = GBTClassifier(
featuresCol="features",
labelCol="label",
maxIter=10,
maxDepth=5
)
# Train the model
gbt_model = gbt.fit(train_df)
# Make predictions
predictions = gbt_model.transform(test_df)
Binary Classification Evaluation:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create evaluator for area under ROC
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC" # or "areaUnderPR"
)
# Calculate AUC
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Confusion matrix
from pyspark.sql.functions import col
confusion_matrix = predictions.select("label", "prediction").groupBy("label").pivot("prediction").count().fillna(0)
display(confusion_matrix)
Multiclass Classification Evaluation:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Create evaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy" # Options: "f1", "weightedPrecision", "weightedRecall", "accuracy"
)
# Calculate metrics
accuracy = evaluator.evaluate(predictions)
evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictions)
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictions)
evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
print(f"F1 Score: {f1}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
Linear Regression: Modeling linear relationship between features and target.
Nonlinear Regression: Capturing more complex patterns in the data.
Time Series Regression: Predicting values based on time patterns.
Linear Regression:
from pyspark.ml.regression import LinearRegression
# Create the model
lr = LinearRegression(
featuresCol="features",
labelCol="label",
maxIter=10,
regParam=0.1,
elasticNetParam=0.8
)
# Train the model
lr_model = lr.fit(train_df)
# Make predictions
predictions = lr_model.transform(test_df)
# Display model coefficients
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
# Summary statistics
train_summary = lr_model.summary
print(f"RMSE: {train_summary.rootMeanSquaredError}")
print(f"R²: {train_summary.r2}")
Decision Tree Regression:
from pyspark.ml.regression import DecisionTreeRegressor
# Create the model
dt = DecisionTreeRegressor(
featuresCol="features",
labelCol="label",
maxDepth=5
)
# Train the model
dt_model = dt.fit(train_df)
# Make predictions
predictions = dt_model.transform(test_df)
Random Forest Regression:
from pyspark.ml.regression import RandomForestRegressor
# Create the model
rf = RandomForestRegressor(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=5
)
# Train the model
rf_model = rf.fit(train_df)
# Make predictions
predictions = rf_model.transform(test_df)
Gradient-Boosted Tree Regression:
from pyspark.ml.regression import GBTRegressor
# Create the model
gbt = GBTRegressor(
featuresCol="features",
labelCol="label",
maxIter=10,
maxDepth=5
)
# Train the model
gbt_model = gbt.fit(train_df)
# Make predictions
predictions = gbt_model.transform(test_df)
from pyspark.ml.evaluation import RegressionEvaluator
# Create evaluator
evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse" # Options: "rmse", "mse", "mae", "r2"
)
# Calculate metrics
rmse = evaluator.evaluate(predictions)
evaluator.setMetricName("mse")
mse = evaluator.evaluate(predictions)
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
print(f"MSE: {mse}")
print(f"MAE: {mae}")
print(f"R²: {r2}")
Which of the following best describes the relationship between Estimators and Transformers in Spark ML? a) Estimators transform data, while Transformers estimate parameters b) Estimators produce Transformers when fitted on data c) Estimators perform feature extraction, while Transformers perform prediction d) Estimators work with classification; Transformers work with regression
In Spark ML, which of the following is NOT an Estimator? a) LogisticRegression b) RandomForestClassifier c) VectorAssembler d) KMeans
Which evaluation metric is commonly used to assess the performance of a regression model? a) Area Under ROC Curve b) Precision c) Recall d) Root Mean Squared Error (RMSE)
When using the BinaryClassificationEvaluator, which metricName should you use to calculate the area under the Precision-Recall curve? a) “areaUnderROC” b) “areaUnderPR” c) “precision” d) “recall”
What determines the final representation of data at each step in a Spark ML Pipeline? a) The transform() method of each Transformer b) The fit() method of each Estimator c) Both fit() and transform() methods of each stage d) The evaluate() method of each Evaluator
Hyperopt is a Python library for hyperparameter optimization that is well-integrated with Databricks. It provides more advanced tuning capabilities than Spark ML’s built-in grid search:
import mlflow
import numpy as np
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK, Trials
# Define the objective function
def objective(params):
# 1. Extract parameters
max_depth = int(params["max_depth"])
n_estimators = int(params["n_estimators"])
learning_rate = params["learning_rate"]
# 2. Build the model with these parameters
from sklearn.ensemble import GradientBoostingRegressor
model = GradientBoostingRegressor(
max_depth=max_depth,
n_estimators=n_estimators,
learning_rate=learning_rate,
random_state=42
)
# 3. Train and evaluate the model (using cross-validation)
from sklearn.model_selection import cross_val_score
scores = cross_val_score(model, X_train, y_train, cv=3, scoring="neg_mean_squared_error")
rmse = np.sqrt(-scores.mean())
# 4. Log the model with MLflow
with mlflow.start_run(nested=True):
mlflow.log_params({
"max_depth": max_depth,
"n_estimators": n_estimators,
"learning_rate": learning_rate
})
mlflow.log_metric("rmse", rmse)
# 5. Return results (Hyperopt minimizes the returned value)
return {"loss": rmse, "status": STATUS_OK}
Hyperopt allows you to define complex search spaces for your hyperparameters:
# Define the search space
search_space = {
"max_depth": hp.quniform("max_depth", 3, 10, 1), # Integer values between 3 and 10
"n_estimators": hp.quniform("n_estimators", 50, 500, 10), # Integer values between 50 and 500, step 10
"learning_rate": hp.loguniform("learning_rate", np.log(0.01), np.log(0.3)) # Log-uniform between 0.01 and 0.3
}
# Run the optimization with SparkTrials for distributed training
with mlflow.start_run(run_name="hyperopt_gbr"):
spark_trials = SparkTrials(parallelism=4) # Tune 4 models in parallel
best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest, # Bayesian optimization using Tree-of-Parzen-Estimators
max_evals=20, # Maximum number of evaluations
trials=spark_trials
)
# Log the best parameters
mlflow.log_params(best_params)
# Convert parameters to proper types
best_params["max_depth"] = int(best_params["max_depth"])
best_params["n_estimators"] = int(best_params["n_estimators"])
print("Best parameters:", best_params)
Random Search:
from hyperopt import rand
# Use random search instead of TPE
best_params = fmin(
fn=objective,
space=search_space,
algo=rand.suggest, # Random search
max_evals=20,
trials=spark_trials
)
Sequential vs. Parallel Tuning:
# Sequential tuning with Trials
trials = Trials()
best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=20,
trials=trials
)
# Parallel tuning with SparkTrials
spark_trials = SparkTrials(parallelism=4)
best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=20,
trials=spark_trials
)
Pandas API on Spark (previously known as Koalas) provides a Pandas-like API for PySpark DataFrame operations. It allows you to leverage the familiar Pandas syntax while working with distributed data.
Key advantages:
import pyspark.pandas as ps
# From a Spark DataFrame
spark_df = spark.table("customer_data")
psdf = spark_df.pandas_api()
# From pandas DataFrame
import pandas as pd
pandas_df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
psdf = ps.from_pandas(pandas_df)
# Directly from data
psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
# From CSV
psdf = ps.read_csv("/path/to/file.csv")
# Basic operations similar to pandas
psdf.head()
psdf.describe()
psdf.info()
# Selection and filtering
filtered = psdf[psdf["A"] > 1]
selected = psdf[["A", "B"]]
# Aggregations
grouped = psdf.groupby("A").agg({"B": "mean"})
# Plotting (creates pandas DataFrame for plotting)
psdf["A"].plot.hist()
# Pandas on Spark DataFrame to Spark DataFrame
spark_df = psdf.to_spark()
# Spark DataFrame to Pandas on Spark DataFrame
psdf = spark_df.pandas_api()
# Pandas on Spark DataFrame to pandas DataFrame (caution: collects data to driver)
pandas_df = psdf.to_pandas()
# Set default index type to distributed
ps.set_option("compute.default_index_type", "distributed")
# Cache intermediate results for multiple operations
psdf = psdf.spark.cache()
# Use spark.sql for complex operations
result = ps.sql("SELECT A, sum(B) FROM {psdf} GROUP BY A")
Pandas UDFs (User-Defined Functions) allow you to apply pandas functions to Spark DataFrames, providing a bridge between pandas and PySpark. They’re particularly useful for:
Pandas UDFs use Apache Arrow for efficient data transfer between JVM and Python processes.
Scalar Function (mapInPandas):
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
# Define a scalar pandas UDF
@pandas_udf(DoubleType())
def scaled_values(s: pd.Series) -> pd.Series:
return (s - s.mean()) / s.std()
# Apply to a column
df = df.withColumn("scaled_features", scaled_values(df["features"]))
Grouped Map Function:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
# Define the output schema
schema = StructType([
StructField("id", IntegerType()),
StructField("prediction", DoubleType())
])
# Define a grouped map function
@pandas_udf(schema)
def predict_group(group_pdf: pd.DataFrame) -> pd.DataFrame:
# Train a model on this group
X = group_pdf[['feature1', 'feature2']]
y = group_pdf['target']
model = RandomForestRegressor(n_estimators=10)
model.fit(X, y)
# Make predictions
group_pdf['prediction'] = model.predict(X)
return group_pdf[['id', 'prediction']]
# Apply to grouped data
result = df.groupBy("category").apply(predict_group)
Iterator of Pandas UDFs (mapInPandas):
from pyspark.sql.functions import pandas_udf
import pandas as pd
def predict_batch(batch_iter):
for batch in batch_iter:
# Prepare features
features = batch[['feature1', 'feature2', 'feature3']]
# Apply model (pre-trained)
batch['prediction'] = model.predict(features)
# Return the batch with predictions
yield batch[['id', 'prediction']]
# Apply function to DataFrame
predictions = df.mapInPandas(predict_batch, schema="id long, prediction double")
import mlflow
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
# Load a pre-trained model
loaded_model = mlflow.sklearn.load_model("runs:/my-run-id/model")
# Broadcast the model to all workers
broadcast_model = sc.broadcast(loaded_model)
# Define UDF for batch prediction
@pandas_udf(DoubleType())
def predict_udf(features_series: pd.Series) -> pd.Series:
# Access the broadcasted model
model = broadcast_model.value
# Convert each row from Vector to numpy array
features_array = np.stack(features_series.map(lambda x: x.toArray()))
# Make predictions
predictions = model.predict(features_array)
return pd.Series(predictions)
# Apply the UDF to make predictions
predictions = df.withColumn("prediction", predict_udf(df["features"]))
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
# Define schema for output
schema = StructType([
StructField("group", StringType()),
StructField("rmse", DoubleType()),
StructField("r2", DoubleType()),
StructField("model_info", StringType())
])
# Define UDF to train a model per group
@pandas_udf(schema)
def train_group_model(group_pdf: pd.DataFrame) -> pd.DataFrame:
# Extract group name
group_name = group_pdf["group"].iloc[0]
# Prepare features and target
X = group_pdf[['feature1', 'feature2', 'feature3']]
y = group_pdf['target']
# Train model for this group
model = RandomForestRegressor(n_estimators=100)
model.fit(X, y)
# Evaluate
from sklearn.metrics import mean_squared_error, r2_score
y_pred = model.predict(X)
rmse = mean_squared_error(y, y_pred, squared=False)
r2 = r2_score(y, y_pred)
# Create output DataFrame
result = pd.DataFrame({
"group": [group_name],
"rmse": [rmse],
"r2": [r2],
"model_info": [str(model.get_params())]
})
return result
# Apply to data grouped by 'group' column
model_metrics = df.groupBy("group").apply(train_group_model)
Objective: Build a machine learning model with hyperparameter tuning using Hyperopt and Spark ML.
Steps:
# Load dataset
df = spark.table("default.airbnb_listings")
# Inspect data
display(df.limit(5))
# Select features and target
from pyspark.sql.functions import col
# Features to use
categorical_cols = ["room_type", "property_type", "bed_type"]
numeric_cols = ["accommodates", "bathrooms", "bedrooms", "beds", "minimum_nights"]
# Target column
target_col = "price"
# Prepare data
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
# 1. Convert categorical columns to indices
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_index", handleInvalid="keep")
for c in categorical_cols]
# 2. One-hot encode indexed columns
encoders = [OneHotEncoder(inputCol=f"{c}_index", outputCol=f"{c}_encoded")
for c in categorical_cols]
# 3. Assemble features
assembler_inputs = [f"{c}_encoded" for c in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_unscaled")
# 4. Scale features
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features")
# Split data into train, validation, and test
train_df, val_df, test_df = df.randomSplit([0.7, 0.15, 0.15], seed=42)
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
# Create the base regressor
gbr = GBTRegressor(featuresCol="features", labelCol=target_col)
# Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, gbr])
import mlflow
import numpy as np
from hyperopt import STATUS_OK
from pyspark.ml.evaluation import RegressionEvaluator
def objective(params):
# 1. Extract parameters
max_depth = int(params["max_depth"])
max_iter = int(params["max_iter"])
step_size = params["step_size"]
# 2. Set parameters on the model
gbr.setMaxDepth(max_depth)
gbr.setMaxIter(max_iter)
gbr.setStepSize(step_size)
# 3. Train the model
with mlflow.start_run(nested=True):
# Log parameters
mlflow.log_params({
"max_depth": max_depth,
"max_iter": max_iter,
"step_size": step_size
})
# Fit the pipeline
pipeline_model = pipeline.fit(train_df)
# Make predictions on validation set
predictions = pipeline_model.transform(val_df)
# Evaluate the model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
# Log metrics
mlflow.log_metric("rmse", rmse)
return {"loss": rmse, "status": STATUS_OK, "model": pipeline_model}
from hyperopt import hp
search_space = {
"max_depth": hp.quniform("max_depth", 2, 10, 1),
"max_iter": hp.quniform("max_iter", 10, 100, 10),
"step_size": hp.loguniform("step_size", np.log(0.01), np.log(0.3))
}
from hyperopt import fmin, tpe, SparkTrials
# Set up SparkTrials for distributed tuning
spark_trials = SparkTrials(parallelism=4)
# Run the hyperparameter search
with mlflow.start_run(run_name="gbr_hyperopt"):
# Perform the search
best_result = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=20,
trials=spark_trials
)
# Log the best parameters
mlflow.log_params({
"best_max_depth": int(best_result["max_depth"]),
"best_max_iter": int(best_result["max_iter"]),
"best_step_size": best_result["step_size"]
})
# Print the best parameters
print("Best parameters:")
print(f"max_depth: {int(best_result['max_depth'])}")
print(f"max_iter: {int(best_result['max_iter'])}")
print(f"step_size: {best_result['step_size']}")
# Set the best parameters on the model
gbr.setMaxDepth(int(best_result["max_depth"]))
gbr.setMaxIter(int(best_result["max_iter"]))
gbr.setStepSize(best_result["step_size"])
# Train the final model on combined train+validation data
train_val_df = train_df.union(val_df)
final_model = pipeline.fit(train_val_df)
# Make predictions on test data
test_predictions = final_model.transform(test_df)
# Evaluate the final model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="rmse")
test_rmse = evaluator.evaluate(test_predictions)
evaluator.setMetricName("r2")
test_r2 = evaluator.evaluate(test_predictions)
print(f"Test RMSE: {test_rmse}")
print(f"Test R²: {test_r2}")
# Save the model with MLflow
with mlflow.start_run(run_name="final_gbr_model"):
mlflow.log_params({
"max_depth": int(best_result["max_depth"]),
"max_iter": int(best_result["max_iter"]),
"step_size": best_result["step_size"]
})
mlflow.log_metrics({
"test_rmse": test_rmse,
"test_r2": test_r2
})
mlflow.spark.log_model(final_model, "spark-model")
When using Hyperopt, which algorithm is used for Bayesian optimization? a) random.suggest b) grid.suggest c) tpe.suggest d) bayes.suggest
In Hyperopt, which of the following search space definitions would create an integer parameter with values 1, 2, 3, 4, or 5? a) hp.choice(“param”, [1, 2, 3, 4, 5]) b) hp.uniformint(“param”, 1, 5) c) hp.quniform(“param”, 1, 5, 1) d) hp.randint(“param”, 1, 6)
What is the primary advantage of using SparkTrials in Hyperopt? a) It allows for parallel hyperparameter tuning b) It provides better search algorithms c) It integrates with MLflow automatically d) It supports more complex parameter spaces
What is the correct way to convert a Spark DataFrame to a Pandas on Spark DataFrame? a) ps.from_spark(spark_df) b) ps.DataFrame(spark_df) c) spark_df.to_pandas_on_spark() d) spark_df.pandas_api()
When using Pandas UDFs, what is the purpose of the Apache Arrow framework? a) It provides the machine learning algorithms b) It optimizes data transfer between JVM and Python processes c) It manages cluster resources for distributed computing d) It handles data persistence and caching
Which type of Pandas UDF would you use to apply a pre-trained machine learning model to make predictions on batches of data? a) Scalar Pandas UDF b) Grouped Map Pandas UDF c) Iterator of Pandas UDFs (mapInPandas) d) Any of the above could be used
In Spark ML, what is the key difference between CrossValidator and TrainValidationSplit? a) CrossValidator uses multiple train-test splits, while TrainValidationSplit uses a single split b) CrossValidator works only with classification, while TrainValidationSplit works with regression c) CrossValidator requires more parameters, while TrainValidationSplit is simpler d) CrossValidator is distributed, while TrainValidationSplit runs on a single node
When building a regression model in Spark ML, which of the following parameters is NOT typically used to control overfitting? a) maxDepth in a decision tree b) regParam in linear/logistic regression c) numTrees in random forest d) predictionCol in the evaluator
What happens when using an early stopping criterion like maxIter in a Gradient Boosted Tree Classifier? a) Training stops when the maximum number of iterations is reached b) Training stops when the validation error starts increasing c) Training stops when tree depth reaches the maximum d) Training stops when feature importance stabilizes
When using a StringIndexer followed by a OneHotEncoder in a pipeline, what is the correct order of operations? a) StringIndexer first, then OneHotEncoder b) OneHotEncoder first, then StringIndexer c) They can be used in either order d) They should not be used together in a pipeline
Machine learning at scale presents unique challenges that traditional single-node approaches cannot efficiently address. As data volumes grow, distributed processing becomes essential for several reasons:
Performance Limitations: When datasets exceed memory capacity or computational capabilities of a single machine, distributed processing allows us to divide work across multiple nodes. This is particularly important for iterative algorithms common in machine learning.
Training Time Constraints: Training complex models on large datasets can take days or weeks on a single machine. Distributed processing can significantly reduce training time, enabling faster experimentation and deployment.
Real-time Requirements: Many ML applications need to process streaming data and make predictions in real-time, requiring horizontal scaling across multiple machines to handle high throughput.
Data Partitioning: Distributing data across nodes requires careful consideration to ensure balanced workloads and minimize data transfer. Poor partitioning can lead to skewed processing times and inefficient resource utilization.
# Example of controlling partitioning in Spark
from pyspark.sql import functions as F
# Repartition by a key column for better distribution
df_balanced = df.repartition(200, "customer_id")
# Repartition by range for better locality with sorted data
df_range = df.repartitionByRange(200, "timestamp")
Communication Overhead: ML algorithms often require information exchange between nodes, creating network overhead that can become a bottleneck.
Algorithm Parallelization: Not all algorithms parallelize easily. Some require substantial redesign to work efficiently in a distributed environment.
Synchronization: Coordinating updates across multiple nodes introduces complexity, especially for iterative algorithms that require global state.
Resource Management: Efficiently allocating CPU, memory, and network resources across a cluster requires careful tuning and monitoring.
Data Parallelism: The same model is replicated across multiple nodes, each processing a different subset of data. Results are then combined.
# Data parallelism example in Spark ML
from pyspark.ml.classification import LogisticRegression
# Spark ML automatically distributes data processing
lr = LogisticRegression(maxIter=10, regParam=0.1)
model = lr.fit(distributed_data) # Data is distributed across the cluster
Model Parallelism: Different parts of the model are distributed across nodes, with each node responsible for a portion of the calculations.
Parameter Server Architecture: A central server maintains global parameters while worker nodes compute gradients based on data subsets. The server aggregates gradients and updates parameters.
Ensemble Methods: Multiple models are trained independently and their predictions are combined, providing a naturally parallel approach.
Spark ML uses data parallelism as its primary distribution strategy. This approach works well for many algorithms but has important implementation differences based on the algorithm type.
Distributed Optimization: For iterative optimization algorithms like linear and logistic regression, Spark employs techniques to distribute work while maintaining model coherence:
# Linear regression in Spark ML - distributed by default
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(training_data)
For linear models (LinearRegression, LogisticRegression), Spark implements distributed training through:
Stochastic Gradient Descent (SGD): Spark can use mini-batch SGD where gradients are computed on data subsets and aggregated.
L-BFGS Optimization: For faster convergence, Spark implements the Limited-memory Broyden–Fletcher–Goldfarb–Shanno (L-BFGS) algorithm in a distributed manner.
The implementations automatically handle:
Tree-based models like Decision Trees, Random Forests, and Gradient-Boosted Trees use different distribution strategies:
Decision Trees:
# RandomForest with explicit configuration for distributed training
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(
numTrees=100,
maxDepth=10,
maxBins=32, # Controls feature binning - important for distributed performance
minInstancesPerNode=10 # Prevents overfitting on distributed data
)
rf_model = rf.fit(training_data)
Ensemble Models: For ensemble methods like Random Forest and Gradient-Boosted Trees, Spark adds another layer of parallelism:
For clustering algorithms like K-means, Spark uses an iterative approach:
# K-means with explicit iterations and tolerance settings
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=5, maxIter=20, tol=1e-4)
model = kmeans.fit(features_df)
Ensemble methods combine multiple models to achieve better predictive performance. They are particularly well-suited for distributed environments because individual models can often be trained independently.
Bagging (Bootstrap Aggregating): Multiple models are trained on different random subsets of the training data, with replacement. Predictions are combined through averaging (for regression) or voting (for classification).
# RandomForest in Spark ML (an example of bagging)
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(
numTrees=100, # Number of trees in the forest
featureSubsetStrategy="auto", # Number of features to consider for each split
subsamplingRate=0.8 # Fraction of data used for training each tree
)
model = rf.fit(training_data)
Boosting: Models are trained sequentially, with each model trying to correct errors made by the previous ones. Examples include Gradient Boosting and AdaBoost.
# GBTClassifier in Spark ML (an example of boosting)
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(
maxIter=100, # Number of boosting iterations
maxDepth=5, # Maximum depth of each tree
stepSize=0.1 # Learning rate
)
model = gbt.fit(training_data)
Stacking: Multiple models are combined using another model (meta-learner) that is trained on their outputs.
Parallel Training: For methods like Random Forest, multiple trees can be trained in parallel on different nodes:
# Configure Spark to utilize parallelism for RandomForest
spark.conf.set("spark.sql.shuffle.partitions", 200) # Control parallelism level
rf = RandomForestClassifier(
numTrees=500, # More trees can leverage more parallelism
maxDepth=10
)
Sequential Training with Parallelized Steps: For methods like Gradient Boosting that are inherently sequential, Spark parallelizes each boosting iteration:
Hyperparameter Tuning of Ensembles: When tuning ensemble models, we can parallelize across hyperparameter combinations:
# Distributed hyperparameter tuning of RandomForest
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.build()
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
parallelism=9 # Tune up to 9 combinations in parallel
)
Memory Usage: Ensemble methods can be memory-intensive since multiple models are maintained:
# Control memory usage for large ensembles
rf = RandomForestClassifier(
numTrees=100,
maxMemoryInMB=256, # Limit memory per node
cacheNodeIds=False # Reduce memory usage
)
Communication Overhead: Trees in Random Forest can be trained independently with minimal communication. In contrast, boosting methods require more synchronization between iterations.
Prediction Latency: Ensembles require evaluating multiple models, which can increase prediction time:
# For faster predictions with RandomForest
rf = RandomForestClassifier(
numTrees=100,
maxDepth=8 # Shallower trees for faster prediction
)
Efficient Storage Formats: Choose appropriate file formats for large-scale ML:
# Save data in Parquet format for efficient access
df.write.format("parquet").save("/path/to/data")
# Read efficiently with partitioning
df = spark.read.parquet("/path/to/data")
Partitioning Strategies: Properly partition data to enable efficient parallel processing and minimize data shuffling:
# Partition data by a frequently filtered column
df.write.partitionBy("date").format("parquet").save("/path/to/partitioned_data")
# Read specific partitions
df_subset = spark.read.parquet("/path/to/partitioned_data/date=2023-01-*")
Caching: Strategically cache datasets that are used repeatedly:
# Cache processed features for multiple iterations
processed_features = pipeline.fit(raw_data).transform(raw_data)
processed_features.cache()
# Use the cached data for multiple models
model1 = RandomForestClassifier().fit(processed_features)
model2 = GBTClassifier().fit(processed_features)
# Release memory when done
processed_features.unpersist()
Feature Selection and Dimensionality Reduction: Reduce computational load by working with fewer features:
from pyspark.ml.feature import PCA
# Reduce dimensions with PCA
pca = PCA(k=20, inputCol="features", outputCol="pca_features")
reduced_data = pca.fit(data).transform(data)
MiniBatch Processing: For algorithms that support it, use mini-batch processing to handle large datasets efficiently:
from pyspark.ml.classification import LogisticRegression
# Configure mini-batch size for optimization
lr = LogisticRegression(
maxIter=100,
regParam=0.01,
miniBatchFraction=0.1 # Process 10% of data per iteration
)
Checkpoint Intermediate Results: For iterative algorithms, checkpointing can prevent recomputation and reduce memory pressure:
# Enable checkpointing
sc.setCheckpointDir("/path/to/checkpoint/dir")
# Checkpoint a DataFrame to avoid recomputation of ancestry
intermediate_result = heavy_computation(input_data)
intermediate_result.checkpoint()
result = further_processing(intermediate_result)
Right-sizing Clusters: Scale your cluster based on the workload requirements:
# For Databricks clusters, you can configure autoscaling
# through the UI or programmatically with the Clusters API
Worker Configuration: Optimize worker configuration for ML workloads:
# Set executor memory
spark.conf.set("spark.executor.memory", "16g")
# Set executor cores
spark.conf.set("spark.executor.cores", "4")
# Set number of partitions for better parallelism
spark.conf.set("spark.sql.shuffle.partitions", "400")
Memory Management: Adjust memory allocation to prevent out-of-memory errors:
# Increase memory fraction for caching
spark.conf.set("spark.memory.fraction", "0.8")
# Reserve more memory for aggregations
spark.conf.set("spark.memory.storageFraction", "0.3")
Track Performance Metrics: Monitor cluster and job performance to identify bottlenecks:
# Use Spark UI to monitor job stages and task metrics
# In Databricks, this is available through the Spark UI tab
Understand Data Skew: Identify and address data skew that can cause certain partitions to process disproportionately large amounts of data:
# Inspect partition sizes
df.groupBy(F.spark_partition_id()).count().orderBy("count", ascending=False).show()
# Re-partition to address skew
df_balanced = df.repartition(200, "key_column")
Which of the following is NOT a common challenge in distributed machine learning? a) Communication overhead between nodes b) Data partitioning strategies c) Algorithm convergence guarantees d) User interface design
In Spark ML, how are tree-based models like Random Forest trained in a distributed manner? a) Each tree is trained on a single node b) Each node trains a complete Random Forest model c) Different trees are distributed across nodes, and results are combined d) Each node builds a portion of each tree in parallel
What is the primary distribution strategy used by Spark ML? a) Model parallelism b) Data parallelism c) Hybrid parallelism d) Parameter server architecture
When training a Gradient Boosted Tree model in Spark, which operation is performed in parallel? a) Sequential building of trees (since each tree depends on the previous one) b) Finding the best split for nodes at the same level c) Building all trees simultaneously d) Computing predictions across all partitions
Which storage format is most efficient for large-scale machine learning in Spark? a) CSV b) JSON c) Parquet d) Text files
An end-to-end machine learning workflow in a production environment involves multiple stages, from data ingestion to model serving. Let’s explore how to implement such a workflow in Databricks.
The first step is to reliably ingest data and validate its quality:
# Load data from various sources
from pyspark.sql.functions import col, isnan, when, count
# Load data
raw_data = spark.read.format("delta").table("bronze.user_interactions")
# Validate data quality
data_quality_report = {
"row_count": raw_data.count(),
"column_count": len(raw_data.columns),
"missing_values": {}
}
# Check for missing values
for column in raw_data.columns:
missing_count = raw_data.filter(col(column).isNull() | isnan(col(column))).count()
missing_percentage = (missing_count / data_quality_report["row_count"]) * 100
data_quality_report["missing_values"][column] = f"{missing_percentage:.2f}%"
# Validate data types
schema_issues = []
for field in raw_data.schema.fields:
sample_nulls = raw_data.filter(col(field.name).isNull()).count()
sample_non_nulls = raw_data.filter(col(field.name).isNotNull()).limit(100).collect()
# Check for type mismatches
if sample_non_nulls:
for row in sample_non_nulls:
try:
value = row[field.name]
if value is not None and not isinstance(value, field.dataType.pythonType):
schema_issues.append(f"Type mismatch in {field.name}: expected {field.dataType}, got {type(value)}")
break
except:
schema_issues.append(f"Error checking type for {field.name}")
break
Create a robust feature engineering pipeline that can be reused for both training and inference:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
# Define feature engineering steps
# 1. Impute missing values
numeric_cols = ["age", "tenure", "balance"]
categorical_cols = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]
imputer = Imputer(
inputCols=numeric_cols,
outputCols=[f"{c}_imputed" for c in numeric_cols],
strategy="mean"
)
# 2. Index categorical features
indexers = [
StringIndexer(inputCol=c, outputCol=f"{c}_indexed", handleInvalid="keep")
for c in categorical_cols
]
# 3. One-hot encode categorical features
encoders = [
OneHotEncoder(inputCol=f"{c}_indexed", outputCol=f"{c}_encoded")
for c in categorical_cols
]
# 4. Assemble features into a single vector
imputed_cols = [f"{c}_imputed" for c in numeric_cols]
encoded_cols = [f"{c}_encoded" for c in categorical_cols]
assembler = VectorAssembler(
inputCols=imputed_cols + encoded_cols,
outputCol="assembled_features"
)
# 5. Scale features
scaler = StandardScaler(
inputCol="assembled_features",
outputCol="features",
withMean=True,
withStd=True
)
# Create a pipeline for feature engineering
feature_pipeline = Pipeline(stages=[
imputer,
*indexers,
*encoders,
assembler,
scaler
])
Train models while tracking experiments with MLflow:
import mlflow
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Split data into training and validation sets
train_data, val_data = validated_data.randomSplit([0.8, 0.2], seed=42)
# Define evaluators
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
# Set up experiment tracking
mlflow.set_experiment("/Users/your_user/banking_classification")
# Train multiple models with MLflow tracking
with mlflow.start_run(run_name="model_comparison"):
# 1. Fit feature pipeline
feature_pipeline_model = feature_pipeline.fit(train_data)
train_features = feature_pipeline_model.transform(train_data)
val_features = feature_pipeline_model.transform(val_data)
# Log feature importance if available
mlflow.log_param("num_features", len(train_features.select("features").first()[0]))
# 2. Train Logistic Regression
with mlflow.start_run(run_name="logistic_regression", nested=True):
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
lr_model = lr.fit(train_features)
# Make predictions
lr_predictions = lr_model.transform(val_features)
# Evaluate model
lr_auc = binary_evaluator.evaluate(lr_predictions)
lr_accuracy = multi_evaluator.evaluate(lr_predictions)
# Log parameters and metrics
mlflow.log_params({
"model_type": "LogisticRegression",
"maxIter": lr.getMaxIter(),
"regParam": lr.getRegParam()
})
mlflow.log_metrics({
"auc": lr_auc,
"accuracy": lr_accuracy
})
# Log model
mlflow.spark.log_model(lr_model, "model")
# 3. Train Random Forest
with mlflow.start_run(run_name="random_forest", nested=True):
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)
rf_model = rf.fit(train_features)
# Make predictions
rf_predictions = rf_model.transform(val_features)
# Evaluate model
rf_auc = binary_evaluator.evaluate(rf_predictions)
rf_accuracy = multi_evaluator.evaluate(rf_predictions)
# Log parameters and metrics
mlflow.log_params({
"model_type": "RandomForest",
"numTrees": rf.getNumTrees(),
"maxDepth": rf.getMaxDepth()
})
mlflow.log_metrics({
"auc": rf_auc,
"accuracy": rf_accuracy
})
# Log model
mlflow.spark.log_model(rf_model, "model")
Perform hyperparameter tuning to optimize model performance:
# Hyperparameter tuning for the best model (Random Forest)
with mlflow.start_run(run_name="rf_hyperparameter_tuning"):
# Define parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 20]) \
.addGrid(rf.maxBins, [32, 64]) \
.build()
# Create cross-validator
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=paramGrid,
evaluator=binary_evaluator,
numFolds=3,
parallelism=4
)
# Run cross-validation on the preprocessed training data
cv_model = cv.fit(train_features)
# Get the best model
best_rf_model = cv_model.bestModel
# Evaluate on validation set
best_predictions = best_rf_model.transform(val_features)
best_auc = binary_evaluator.evaluate(best_predictions)
best_accuracy = multi_evaluator.evaluate(best_predictions)
# Log best parameters and metrics
mlflow.log_params({
"best_numTrees": best_rf_model.getNumTrees(),
"best_maxDepth": best_rf_model.getMaxDepth(),
"best_maxBins": best_rf_model.getMaxBins()
})
mlflow.log_metrics({
"best_auc": best_auc,
"best_accuracy": best_accuracy
})
# Log best model
mlflow.spark.log_model(
best_rf_model,
"best_model",
registered_model_name="banking_classification"
)
Register the best model and prepare it for deployment:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Get latest model version
model_name = "banking_classification"
latest_version = max([int(mv.version) for mv in client.search_model_versions(f"name='{model_name}'")])
# Transition model to staging
client.transition_model_version_stage(
name=model_name,
version=latest_version,
stage="Staging"
)
# Add model description
client.update_model_version(
name=model_name,
version=latest_version,
description="Random Forest model with tuned hyperparameters"
)
# Create the complete pipeline including feature engineering
complete_pipeline = Pipeline(stages=[
*feature_pipeline.getStages(),
best_rf_model
])
# Fit the complete pipeline on the full dataset
complete_model = complete_pipeline.fit(validated_data)
# Save the complete pipeline model for batch inference
complete_model.write().overwrite().save("/dbfs/models/banking_classification_pipeline")
# Register the complete pipeline model
with mlflow.start_run(run_name="register_complete_pipeline"):
mlflow.spark.log_model(
complete_model,
"complete_pipeline",
registered_model_name="banking_classification_pipeline"
)
Implement batch inference for generating predictions on new data:
# Load the complete pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/dbfs/models/banking_classification_pipeline")
# Load new data for batch inference
new_data = spark.table("bronze.new_customer_interactions")
# Generate predictions
predictions = loaded_model.transform(new_data)
# Select relevant columns
results = predictions.select(
"customer_id",
"interaction_date",
"prediction",
"probability"
)
# Save results to a table
results.write.format("delta").mode("overwrite").saveAsTable("silver.customer_predictions")
Set up monitoring to track model performance over time:
# Create a function to log model performance metrics
def log_model_performance(prediction_table, actual_label_table, run_date):
"""Log model performance metrics to a tracking table."""
# Join predictions with actual labels
performance_data = spark.sql(f"""
SELECT
p.customer_id,
p.prediction,
p.probability,
a.actual_label,
'{run_date}' as evaluation_date
FROM
{prediction_table} p
JOIN
{actual_label_table} a
ON
p.customer_id = a.customer_id
""")
# Calculate metrics
from pyspark.sql.functions import when, col, sum, count
metrics = performance_data.select(
count("*").alias("total_predictions"),
sum(when(col("prediction") == col("actual_label"), 1).otherwise(0)).alias("correct_predictions"),
sum(when((col("prediction") == 1.0) & (col("actual_label") == 1.0), 1).otherwise(0)).alias("true_positives"),
sum(when((col("prediction") == 1.0) & (col("actual_label") == 0.0), 1).otherwise(0)).alias("false_positives"),
sum(when((col("prediction") == 0.0) & (col("actual_label") == 1.0), 1).otherwise(0)).alias("false_negatives"),
sum(when((col("prediction") == 0.0) & (col("actual_label") == 0.0), 1).otherwise(0)).alias("true_negatives")
).first()
# Calculate derived metrics
accuracy = metrics.correct_predictions / metrics.total_predictions
if (metrics.true_positives + metrics.false_positives) > 0:
precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives)
else:
precision = 0
if (metrics.true_positives + metrics.false_negatives) > 0:
recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives)
else:
recall = 0
if (precision + recall) > 0:
f1_score = 2 * (precision * recall) / (precision + recall)
else:
f1_score = 0
# Log to performance tracking table
performance_metrics = spark.createDataFrame([
(run_date, metrics.total_predictions, accuracy, precision, recall, f1_score)
], ["evaluation_date", "total_predictions", "accuracy", "precision", "recall", "f1_score"])
performance_metrics.write.format("delta").mode("append").saveAsTable("gold.model_performance_tracking")
# Report current performance
print(f"Performance metrics for {run_date}:")
print(f" Accuracy: {accuracy:.4f}")
print(f" Precision: {precision:.4f}")
print(f" Recall: {recall:.4f}")
print(f" F1 Score: {f1_score:.4f}")
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": f1_score
}
Machine learning pipelines in production environments often encounter various issues. Understanding how to identify and resolve these problems is crucial for maintaining reliable systems.
Symptom: Out of Memory Errors
Diagnosis:
# Check the size of the DataFrame
print(f"Number of partitions: {df.rdd.getNumPartitions()}")
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")
# Estimate DataFrame size
from pyspark.sql.functions import lit, col
import sys
def estimate_size(df, n_sample=1000):
sample_df = df.limit(n_sample)
sample_size = sys.getsizeof(sample_df.toPandas())
full_size_estimate = sample_size * (df.count() / n_sample)
return full_size_estimate / (1024 * 1024) # MB
print(f"Estimated DataFrame size: {estimate_size(df):.2f} MB")
Solutions:
# 1. Increase memory allocation
spark.conf.set("spark.driver.memory", "16g")
spark.conf.set("spark.executor.memory", "16g")
# 2. Optimize storage with proper data types
from pyspark.sql.types import IntegerType, DoubleType, StringType
df = df.withColumn("int_col", col("int_col").cast(IntegerType()))
df = df.withColumn("double_col", col("double_col").cast(DoubleType()))
# 3. Repartition to better distribute data
df = df.repartition(100)
# 4. Process data in batches
def process_in_batches(df, batch_size=100000, process_fn=None):
total_rows = df.count()
num_batches = (total_rows + batch_size - 1) // batch_size
results = []
for i in range(num_batches):
start_idx = i * batch_size
batch = df.limit(batch_size).offset(start_idx)
result = process_fn(batch) if process_fn else batch
results.append(result)
return results
Symptom: Slow-running Tasks or Uneven Task Distribution
Diagnosis:
# Check partition sizes
from pyspark.sql.functions import spark_partition_id
partition_counts = df.groupBy(spark_partition_id()).count().orderBy("count", ascending=False)
display(partition_counts)
# Check key distribution for a specific column
key_distribution = df.groupBy("join_key").count().orderBy("count", ascending=False)
display(key_distribution.limit(20))
Solutions:
# 1. Salting keys for better distribution
from pyspark.sql.functions import rand, concat, lit
# Add a salt to skewed keys
num_salts = 10
df_salted = df.withColumn("salted_key",
concat(col("join_key"),
lit("_"),
(rand() * num_salts).cast("int").cast("string")))
# 2. Broadcast small tables in joins
from pyspark.sql.functions import broadcast
result = df.join(broadcast(small_df), "join_key")
# 3. Repartition by range for better distribution
df_balanced = df.repartitionByRange(200, "join_key")
Symptom: Pipeline Fit or Transform Fails
Diagnosis:
# Check for missing values in critical columns
missing_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
display(missing_values)
# Check for data type issues
for field in df.schema.fields:
print(f"Column: {field.name}, Type: {field.dataType}")
# Verify data constraints
from pyspark.sql.functions import min, max
numeric_stats = df.select([
min(col("num_col")).alias("min_value"),
max(col("num_col")).alias("max_value")
])
display(numeric_stats)
Solutions:
# 1. Add explicit error handling in pipeline stages
from pyspark.ml.feature import Imputer
# Handle missing values before problematic stages
imputer = Imputer(
inputCols=["feature1", "feature2"],
outputCols=["feature1_imputed", "feature2_imputed"]
)
# 2. Set handleInvalid parameter for relevant stages
indexer = StringIndexer(
inputCol="category",
outputCol="category_index",
handleInvalid="keep" # Options: "error", "skip", "keep"
)
# 3. Implement custom validation steps
def validate_pipeline_input(df, required_cols, numeric_cols, categorical_cols):
"""Validate dataframe before pipeline processing."""
validation_errors = []
# Check for required columns
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
validation_errors.append(f"Missing required columns: {', '.join(missing_cols)}")
# Check for null values in required columns
for col_name in required_cols:
if col_name in df.columns:
null_count = df.filter(col(col_name).isNull()).count()
if null_count > 0:
validation_errors.append(f"Column {col_name} contains {null_count} null values")
# Check data types
for col_name in numeric_cols:
if col_name in df.columns:
try:
df.select(col(col_name).cast("double")).limit(1).collect()
except:
validation_errors.append(f"Column {col_name} cannot be cast to numeric")
return validation_errors
Symptom: Slow Training or Inference
Diagnosis:
# Time different stages of the pipeline
import time
def time_operation(operation_name, operation_fn):
start_time = time.time()
result = operation_fn()
end_time = time.time()
duration = end_time - start_time
print(f"{operation_name} took {duration:.2f} seconds")
return result
# Time feature engineering
feature_data = time_operation("Feature Engineering", lambda: feature_pipeline.fit(train_data).transform(train_data))
# Time model training
model = time_operation("Model Training", lambda: rf.fit(feature_data))
# Time inference
predictions = time_operation("Inference", lambda: model.transform(feature_data))
Solutions:
# 1. Cache intermediate results
processed_features = feature_pipeline.fit(train_data).transform(train_data)
processed_features.cache()
# Train multiple models on cached features
model1 = time_operation("Model 1 Training", lambda: lr.fit(processed_features))
model2 = time_operation("Model 2 Training", lambda: rf.fit(processed_features))
processed_features.unpersist()
# 2. Optimize feature transformations
# Use more efficient transformations
from pyspark.ml.feature import Bucketizer
# Replace fine-grained binning with bucketizer
bucketizer = Bucketizer(
splits=[-float("inf"), 0, 10, 20, 30, float("inf")],
inputCol="raw_feature",
outputCol="bucketized_feature"
)
# 3. Use appropriate algorithms for the dataset size
# For large datasets, use distributed algorithms
# For smaller datasets, consider single-node algorithms with broadcast variables
Model Registry Best Practices:
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# 1. Use semantic versioning for models
model_name = "customer_churn_predictor"
model_version = 1
model_stage = "Development" # "Development", "Staging", "Production", "Archived"
# 2. Document models thoroughly
client.update_registered_model(
name=model_name,
description="Customer churn prediction model using gradient boosting"
)
client.update_model_version(
name=model_name,
version=model_version,
description="Trained on data from Jan-Mar 2023, includes new features for customer support interactions"
)
# 3. Tag models with relevant metadata
client.set_registered_model_tag(model_name, "team", "customer_analytics")
client.set_registered_model_tag(model_name, "use_case", "churn_prediction")
client.set_model_version_tag(model_name, model_version, "training_dataset", "customer_data_q1_2023")
client.set_model_version_tag(model_name, model_version, "feature_count", "42")
Model Testing and Validation:
# Define model validation criteria
def validate_model(model, validation_data, baseline_metrics):
"""Validate model against defined criteria before promotion."""
# Generate predictions
predictions = model.transform(validation_data)
# Calculate metrics
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
accuracy = multi_evaluator.setMetricName("accuracy").evaluate(predictions)
f1 = multi_evaluator.setMetricName("f1").evaluate(predictions)
# Check against thresholds
validation_results = {
"passed": True,
"metrics": {
"auc": auc,
"accuracy": accuracy,
"f1": f1
},
"warnings": []
}
# Validation rules
if auc < baseline_metrics["min_auc"]:
validation_results["passed"] = False
validation_results["warnings"].append(f"AUC {auc:.4f} below threshold {baseline_metrics['min_auc']:.4f}")
if accuracy < baseline_metrics["min_accuracy"]:
validation_results["passed"] = False
validation_results["warnings"].append(f"Accuracy {accuracy:.4f} below threshold {baseline_metrics['min_accuracy']:.4f}")
if f1 < baseline_metrics["min_f1"]:
validation_results["passed"] = False
validation_results["warnings"].append(f"F1 {f1:.4f} below threshold {baseline_metrics['min_f1']:.4f}")
return validation_results
Model Promotion Workflow:
def promote_model(model_name, version, target_stage, validation_data, baseline_metrics):
"""Promote a model through stages with validation."""
client = MlflowClient()
# 1. Load the model version
model_uri = f"models:/{model_name}/{version}"
model = mlflow.pyfunc.load_model(model_uri)
# 2. Validate the model
validation_results = validate_model(model, validation_data, baseline_metrics)
# 3. Promote if validation passes
if validation_results["passed"]:
client.transition_model_version_stage(
name=model_name,
version=version,
stage=target_stage
)
# Log promotion
print(f"Model {model_name} version {version} promoted to {target_stage}")
print(f"Metrics: {validation_results['metrics']}")
return True, validation_results
else:
# Log validation failure
print(f"Model {model_name} version {version} failed validation for {target_stage}")
print(f"Metrics: {validation_results['metrics']}")
print(f"Warnings: {validation_results['warnings']}")
return False, validation_results
Automated Retraining Pipeline:
def automated_retraining(
current_model_name,
current_model_version,
training_data_table,
feature_columns,
label_column,
evaluation_criteria
):
"""Automated retraining pipeline that evaluates if a new model should replace current model."""
# 1. Fetch current model metrics
client = MlflowClient()
current_model_info = client.get_model_version(current_model_name, current_model_version)
current_run_id = current_model_info.run_id
current_metrics = client.get_run(current_run_id).data.metrics
# 2. Load and prepare new training data
new_data = spark.table(training_data_table)
train_data, val_data = new_data.randomSplit([0.8, 0.2], seed=42)
# 3. Train new model
with mlflow.start_run(run_name="automated_retraining") as run:
# Feature engineering
feature_pipeline_model = feature_pipeline.fit(train_data)
train_features = feature_pipeline_model.transform(train_data)
val_features = feature_pipeline_model.transform(val_data)
# Train model
model = RandomForestClassifier(
featuresCol="features",
labelCol=label_column,
numTrees=100
)
new_model = model.fit(train_features)
# Evaluate new model
predictions = new_model.transform(val_features)
evaluator = BinaryClassificationEvaluator(labelCol=label_column, metricName="areaUnderROC")
new_auc = evaluator.evaluate(predictions)
# Log metrics
mlflow.log_metric("auc", new_auc)
# Compare with current model
improvement_threshold = evaluation_criteria.get("improvement_threshold", 0.01)
current_auc = current_metrics.get("auc", 0)
if new_auc > (current_auc + improvement_threshold):
# Register new model
mlflow.spark.log_model(
new_model,
"model",
registered_model_name=current_model_name
)
# Get the new version
latest_version = max([int(mv.version) for mv in
client.search_model_versions(f"name='{current_model_name}'")])
# Promote to staging
client.transition_model_version_stage(
name=current_model_name,
version=latest_version,
stage="Staging"
)
return {
"retrained": True,
"new_version": latest_version,
"improvement": new_auc - current_auc,
"metrics": {"auc": new_auc}
}
else:
return {
"retrained": False,
"current_version": current_model_version,
"improvement": new_auc - current_auc,
"metrics": {"auc": new_auc}
}
Drift Detection and Monitoring:
from scipy.stats import ks_2samp
import numpy as np
def detect_data_drift(reference_data, current_data, columns_to_monitor, threshold=0.1):
"""Detect data drift between reference and current datasets."""
drift_results = {}
# Convert to pandas for statistical testing
reference_pdf = reference_data.select(columns_to_monitor).toPandas()
current_pdf = current_data.select(columns_to_monitor).toPandas()
for column in columns_to_monitor:
# Skip if column has categorical data
if reference_pdf[column].dtype == 'object' or current_pdf[column].dtype == 'object':
continue
# Calculate Kolmogorov-Smirnov statistic
ks_result = ks_2samp(
reference_pdf[column].dropna(),
current_pdf[column].dropna()
)
# Store results
drift_detected = ks_result.pvalue < threshold
drift_results[column] = {
"statistic": float(ks_result.statistic),
"p_value": float(ks_result.pvalue),
"drift_detected": drift_detected
}
# Overall drift status
any_drift = any(result["drift_detected"] for result in drift_results.values())
return {
"overall_drift_detected": any_drift,
"column_drift": drift_results
}
Performance Monitoring Dashboard:
def create_performance_monitoring_dashboard():
"""Create a SQL query for model performance monitoring dashboard."""
query = """
WITH recent_metrics AS (
SELECT
evaluation_date,
accuracy,
precision,
recall,
f1_score
FROM
gold.model_performance_tracking
WHERE
evaluation_date >= date_sub(current_date(), 90)
ORDER BY
evaluation_date
),
alerts AS (
SELECT
evaluation_date,
f1_score < 0.7 as performance_alert,
abs(f1_score - lag(f1_score) OVER (ORDER BY evaluation_date)) > 0.05 as drift_alert
FROM
recent_metrics
)
SELECT
m.evaluation_date,
m.accuracy,
m.precision,
m.recall,
m.f1_score,
a.performance_alert,
a.drift_alert
FROM
recent_metrics m
JOIN
alerts a
ON
m.evaluation_date = a.evaluation_date
ORDER BY
m.evaluation_date
"""
return query
Objective: Build an end-to-end scalable machine learning solution for a large dataset that includes data processing, model training with distributed algorithms, hyperparameter tuning, and model deployment.
Dataset: We’ll use a large customer transaction dataset to predict customer churn.
Steps:
# Configure Spark for ML workloads
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Set up MLflow experiment
import mlflow
mlflow.set_experiment("/Users/your_user/scalable_churn_prediction")
# Load data
transactions = spark.read.table("bronze.customer_transactions")
profiles = spark.read.table("bronze.customer_profiles")
# Display data info
print(f"Transactions: {transactions.count()} rows, {len(transactions.columns)} columns")
print(f"Profiles: {profiles.count()} rows, {len(profiles.columns)} columns")
# Sample data
display(transactions.limit(5))
display(profiles.limit(5))
# Check for missing values
from pyspark.sql.functions import col, count, when, isnan
def check_missing(df, name):
print(f"Missing values in {name}:")
missing = df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns])
return missing
display(check_missing(transactions, "transactions"))
display(check_missing(profiles, "profiles"))
from pyspark.sql.functions import sum, avg, count, datediff, current_date
# Join datasets
df = profiles.join(transactions, "customer_id", "left")
# Aggregate transaction features
agg_features = transactions.groupBy("customer_id").agg(
count("transaction_id").alias("transaction_count"),
sum("amount").alias("total_spend"),
avg("amount").alias("avg_transaction_value"),
datediff(current_date(), max("transaction_date")).alias("days_since_last_transaction")
)
# Join aggregated features
customer_features = profiles.join(agg_features, "customer_id", "left")
# Handle missing values
from pyspark.sql.functions import lit, coalesce
# Fill missing aggregated values with appropriate defaults
customer_features = customer_features.withColumn(
"transaction_count",
coalesce(col("transaction_count"), lit(0))
).withColumn(
"total_spend",
coalesce(col("total_spend"), lit(0))
).withColumn(
"avg_transaction_value",
coalesce(col("avg_transaction_value"), lit(0))
).withColumn(
"days_since_last_transaction",
coalesce(col("days_since_last_transaction"), lit(365))
)
# Feature derivation
customer_features = customer_features.withColumn(
"is_active",
when(col("days_since_last_transaction") <= 90, 1).otherwise(0)
)
# Cache the feature dataframe to speed up subsequent operations
customer_features.cache()
# Create label column for churn prediction (churn = no transaction in the last 90 days)
customer_features = customer_features.withColumn(
"churn",
when(col("days_since_last_transaction") > 90, 1.0).otherwise(0.0)
)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
# Define categorical and numeric columns
categorical_cols = ["gender", "state", "subscription_type"]
numeric_cols = ["age", "tenure_months", "transaction_count", "total_spend",
"avg_transaction_value", "days_since_last_transaction"]
# Set up pipeline stages
# 1. Index categorical columns
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
for c in categorical_cols]
# 2. One-hot encode indexed columns
encoders = [OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_enc")
for c in categorical_cols]
# 3. Assemble features
encoded_cols = [f"{c}_enc" for c in categorical_cols]
assembler = VectorAssembler(inputCols=encoded_cols + numeric_cols, outputCol="features_raw")
# 4. Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
# Create a preprocessing pipeline
preprocessing_stages = indexers + encoders + [assembler, scaler]
preprocessing = Pipeline(stages=preprocessing_stages)
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
# Split data
train, val, test = customer_features.randomSplit([0.7, 0.15, 0.15], seed=42)
# Fit preprocessing pipeline
preprocessing_model = preprocessing.fit(train)
# Apply preprocessing
train_processed = preprocessing_model.transform(train)
val_processed = preprocessing_model.transform(val)
# Train Random Forest model
rf = RandomForestClassifier(
featuresCol="features",
labelCol="churn",
numTrees=100,
maxDepth=10,
maxBins=64,
seed=42
)
with mlflow.start_run(run_name="rf_initial"):
# Log parameters
mlflow.log_params({
"numTrees": rf.getNumTrees(),
"maxDepth": rf.getMaxDepth(),
"maxBins": rf.getMaxBins()
})
# Train model
rf_model = rf.fit(train_processed)
# Evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Make predictions
val_predictions = rf_model.transform(val_processed)
# Evaluate with different metrics
binary_evaluator = BinaryClassificationEvaluator(labelCol="churn", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="churn", predictionCol="prediction")
auc = binary_evaluator.evaluate(val_predictions)
accuracy = multi_evaluator.setMetricName("accuracy").evaluate(val_predictions)
f1 = multi_evaluator.setMetricName("f1").evaluate(val_predictions)
# Log metrics
mlflow.log_metrics({
"auc": auc,
"accuracy": accuracy,
"f1": f1
})
# Log model
mlflow.spark.log_model(rf_model, "model")
import mlflow
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import numpy as np
# Define the objective function
def objective(params):
# Extract parameters
max_depth = int(params["max_depth"])
num_trees = int(params["num_trees"])
# Create model with these parameters
rf = RandomForestClassifier(
featuresCol="features",
labelCol="churn",
numTrees=num_trees,
maxDepth=max_depth,
seed=42
)
# Train model
model = rf.fit(train_processed)
# Make predictions
predictions = model.transform(val_processed)
# Evaluate
auc = binary_evaluator.evaluate(predictions)
# Define the loss (negative AUC to minimize)
loss = -auc
# Log with MLflow
with mlflow.start_run(nested=True):
mlflow.log_params({
"max_depth": max_depth,
"num_trees": num_trees
})
mlflow.log_metrics({
"auc": auc,
"loss": loss
})
return {"loss": loss, "status": STATUS_OK}
# Define search space
search_space = {
"max_depth": hp.quniform("max_depth", 5, 15, 1),
"num_trees": hp.quniform("num_trees", 50, 200, 50)
}
# Run hyperparameter tuning with SparkTrials
with mlflow.start_run(run_name="rf_hyperopt"):
# Set up SparkTrials for distributed tuning
spark_trials = SparkTrials(parallelism=4)
# Run the optimization
best_params = fmin(
fn=objective,
space=search_space,
algo=tpe.suggest,
max_evals=20,
trials=spark_trials
)
# Convert to proper types
best_params["max_depth"] = int(best_params["max_depth"])
best_params["num_trees"] = int(best_params["num_trees"])
# Log best parameters
mlflow.log_params({
"best_max_depth": best_params["max_depth"],
"best_num_trees": best_params["num_trees"]
})
print("Best parameters:", best_params)
# Train final model with best parameters
final_rf = RandomForestClassifier(
featuresCol="features",
labelCol="churn",
numTrees=best_params["num_trees"],
maxDepth=best_params["max_depth"],
seed=42
)
# Combine train and validation for final training
train_val = train.union(val)
train_val_processed = preprocessing_model.transform(train_val)
with mlflow.start_run(run_name="rf_final"):
# Log parameters
mlflow.log_params({
"numTrees": best_params["num_trees"],
"maxDepth": best_params["max_depth"]
})
# Train final model
final_model = final_rf.fit(train_val_processed)
# Evaluate on test set
test_processed = preprocessing_model.transform(test)
test_predictions = final_model.transform(test_processed)
# Calculate metrics
test_auc = binary_evaluator.evaluate(test_predictions)
test_accuracy = multi_evaluator.setMetricName("accuracy").evaluate(test_predictions)
test_f1 = multi_evaluator.setMetricName("f1").evaluate(test_predictions)
# Log metrics
mlflow.log_metrics({
"test_auc": test_auc,
"test_accuracy": test_accuracy,
"test_f1": test_f1
})
# Log model
mlflow.spark.log_model(
final_model,
"model",
registered_model_name="customer_churn_predictor"
)
print(f"Test AUC: {test_auc:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")
# Create a full pipeline that includes preprocessing and model
full_pipeline_stages = preprocessing_stages + [final_model]
full_pipeline = Pipeline(stages=full_pipeline_stages)
# Train the full pipeline on the entire dataset
full_model = full_pipeline.fit(customer_features)
# Save the model for batch inference
full_model.write().overwrite().save("/dbfs/models/customer_churn_full_pipeline")
# Register the model with MLflow
with mlflow.start_run(run_name="full_pipeline_model"):
mlflow.spark.log_model(
full_model,
"full_pipeline",
registered_model_name="customer_churn_full_pipeline"
)
from pyspark.ml import PipelineModel
# Load the full pipeline model
loaded_model = PipelineModel.load("/dbfs/models/customer_churn_full_pipeline")
# Function for batch predictions
def predict_churn_batch(input_table, output_table):
"""Generate churn predictions for a batch of customers."""
# Load input data
input_data = spark.table(input_table)
# Generate predictions
predictions = loaded_model.transform(input_data)
# Select relevant columns
results = predictions.select(
"customer_id",
"prediction",
"probability",
current_date().alias("prediction_date")
)
# Write results
results.write.format("delta").mode("overwrite").saveAsTable(output_table)
print(f"Generated predictions for {results.count()} customers")
return results
# Create a function to monitor model performance
def monitor_model_performance(predictions_table, actuals_table):
"""Compare predictions with actual outcomes and log metrics."""
# Join predictions with actual outcomes
comparison = spark.sql(f"""
SELECT
p.customer_id,
p.prediction,
a.actual_churn,
p.prediction_date
FROM
{predictions_table} p
JOIN
{actuals_table} a
ON
p.customer_id = a.customer_id
""")
# Calculate metrics
from pyspark.sql.functions import when
metrics = comparison.select(
count("*").alias("total"),
sum(when(col("prediction") == col("actual_churn"), 1).otherwise(0)).alias("correct"),
sum(when((col("prediction") == 1.0) & (col("actual_churn") == 1.0), 1).otherwise(0)).alias("true_positives"),
sum(when((col("prediction") == 1.0) & (col("actual_churn") == 0.0), 1).otherwise(0)).alias("false_positives"),
sum(when((col("prediction") == 0.0) & (col("actual_churn") == 1.0), 1).otherwise(0)).alias("false_negatives"),
sum(when((col("prediction") == 0.0) & (col("actual_churn") == 0.0), 1).otherwise(0)).alias("true_negatives"),
).first()
# Calculate derived metrics
accuracy = metrics.correct / metrics.total
if (metrics.true_positives + metrics.false_positives) > 0:
precision = metrics.true_positives / (metrics.true_positives + metrics.false_positives)
else:
precision = 0
if (metrics.true_positives + metrics.false_negatives) > 0:
recall = metrics.true_positives / (metrics.true_positives + metrics.false_negatives)
else:
recall = 0
if (precision + recall) > 0:
f1 = 2 * (precision * recall) / (precision + recall)
else:
f1 = 0
# Log metrics
performance = spark.createDataFrame([
(current_date(), metrics.total, accuracy, precision, recall, f1)
], ["date", "sample_size", "accuracy", "precision", "recall", "f1"])
performance.write.format("delta").mode("append").saveAsTable("gold.churn_model_performance")
print(f"Performance metrics:")
print(f" Accuracy: {accuracy:.4f}")
print(f" Precision: {precision:.4f}")
print(f" Recall: {recall:.4f}")
print(f" F1 Score: {f1:.4f}")
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1
}
When scaling a machine learning pipeline to handle large datasets, which of the following is most critical to configure? a) The number of features used in the model b) The number of partitions in the DataFrame c) The depth of the decision trees d) The batch size for gradient descent
What is data skew in the context of distributed processing, and how can it affect machine learning workloads? a) It refers to imbalanced classes in the target variable b) It refers to uneven distribution of data across partitions, causing some tasks to run much longer than others c) It refers to features having different scales d) It refers to having too many categorical features compared to numerical features
When using feature hashing in Spark ML, what is the primary advantage? a) It improves model accuracy by creating more informative features b) It reduces dimensionality for high-cardinality categorical features without maintaining a dictionary c) It eliminates the need for feature scaling d) It allows for better interpretability of the model
In a production ML system, what is the purpose of model monitoring? a) To ensure that the model training code executes without errors b) To detect changes in data distribution or model performance over time c) To compare different model architectures d) To optimize hyperparameters regularly
Which of the following is NOT a best practice for scaling ML pipelines in Spark? a) Broadcasting small lookup tables b) Caching intermediate results c) Using a single partition for all data to avoid communication overhead d) Balancing partition sizes to avoid data skew
When using MLflow for experiment tracking in distributed environments, what is important to consider? a) Each executor should log to a separate MLflow experiment b) Only the driver should log parameters and metrics to avoid conflicts c) Artifacts should be stored in HDFS instead of the MLflow server d) Each model type should have its own project
In distributed hyperparameter tuning with Hyperopt and SparkTrials, what does the parallelism parameter control?
a) The number of executors allocated to each trial
b) The number of trials that can be evaluated concurrently
c) The number of cross-validation folds to run in parallel
d) The number of features to consider in parallel
Which of the following would be a sign that your Spark ML pipeline is encountering memory issues? a) Training accuracy is lower than expected b) ExecutorLostFailure errors in the Spark UI c) The model has high bias d) Data preprocessing takes longer than model training
When registering models in MLflow’s model registry, what is the purpose of model stages (Development, Staging, Production)? a) To track the physical location where the model is deployed b) To manage the model’s lifecycle and deployment workflow c) To assign different computing resources based on the stage d) To limit access to the model based on user roles
Which operation would be most difficult to distribute effectively in a Spark ML workload? a) Feature scaling b) Training independent trees in a Random Forest c) Hyperparameter tuning of multiple models d) Sequential model training where each iteration depends on the previous one
Databricks Workspace & Runtime for ML
Git Integration and Collaboration
AutoML
Feature Store
MLflow
Exploratory Data Analysis
df.summary(), dbutils.data.summarize(), visualization librariesData Cleaning
Feature Transformation
Building ML Pipelines
Cross-Validation and Hyperparameter Tuning
Estimators and Transformers
fit() method to train models and produce Transformerstransform() method to convert one DataFrame into anotherClassification Models
Regression Models
Advanced Tuning with Hyperopt
Pandas API on Spark and UDFs
Distributed ML Concepts
Model Distribution in Spark
Ensemble Methods
Scaling Considerations
End-to-End ML Pipeline Implementation
Best Practices for ML in Production
Key Focus Areas:
Time Management
Question Analysis
Handling Difficult Questions
Which of the following is NOT a valid strategy for handling missing values in Spark ML?
a) Drop rows with any missing values using df.na.drop()
b) Impute with mean using the Imputer transformer
c) Replace with zero using df.na.fill(0)
d) Use the missingValue parameter in RandomForestClassifier
When using MLflow for experiment tracking, which method is used to log model parameters?
a) mlflow.log_params()
b) mlflow.record_params()
c) mlflow.track_params()
d) mlflow.write_params()
In the Feature Store, what is the purpose of the primary_keys parameter when creating a feature table?
a) To define the access permissions for the table
b) To uniquely identify each record in the feature table
c) To specify which features are most important
d) To determine the storage location for the table
Which Spark ML component correctly represents the sequence of operations when building a pipeline? a) Transformer → Estimator → Pipeline → Model b) Estimator → Transformer → Pipeline → Model c) Pipeline → Estimator → Transformer → Model d) Pipeline → Estimator → Model → Transform
When using Hyperopt for hyperparameter tuning, what does the SparkTrials class provide?
a) More advanced search algorithms than standard Hyperopt
b) Parallel evaluation of hyperparameter combinations across a Spark cluster
c) Automatic integration with MLflow tracking
d) More complex parameter search spaces
Now let’s take a comprehensive mock exam that covers all the topics from the certification.
Instructions:
Which of the following is the recommended Databricks runtime type for machine learning workloads? a) Standard b) ML c) Genomics d) Photon
A data scientist needs to install a Python package that will be available for all notebooks attached to a cluster. What is the most appropriate approach?
a) Use %pip install in a notebook cell
b) Add the library through the cluster UI Libraries tab
c) Use %sh pip install in a notebook cell
d) Set up an init script with the installation command
What happens when you run AutoML for a classification problem in Databricks? a) It automatically deploys the best model to production b) It generates code notebooks with data exploration, model training, and model explanation c) It requires GPU acceleration to run effectively d) It only works with structured, tabular data
When using the Feature Store to train a model, what method creates a training dataset with features?
a) fs.create_training_data()
b) fs.create_training_set()
c) fs.get_training_features()
d) fs.generate_training_set()
In MLflow, what is the primary purpose of the Model Registry? a) To track experiment metrics during model training b) To manage model versions and lifecycle stages c) To store model artifacts and dependencies d) To optimize model performance
A data scientist wants to transition a model version from Staging to Production in the MLflow Model Registry. Which method can be used?
a) client.update_model_stage()
b) client.transition_model_version_stage()
c) client.promote_model_version()
d) client.change_stage()
Which of the following statements about Databricks Repos is TRUE? a) Repos only support integration with GitHub b) Repos enable Git-based version control for notebooks and other files c) Repos automatically sync changes between Databricks and external Git providers d) Repos can only be created by workspace administrators
What is the purpose of the handleInvalid parameter in StringIndexer?
a) To determine how to handle invalid entries in the schema
b) To specify what happens when new categorical values appear during transformation
c) To handle invalid column names
d) To set a policy for negative index values
Which scaling method normalizes features to have zero mean and unit standard deviation? a) MinMaxScaler b) StandardScaler c) MaxAbsScaler d) Normalizer
What does OneHotEncoder do in a Spark ML pipeline?
a) Converts categorical variables to numeric indices
b) Converts numeric indices to binary vector representations
c) Reduces the dimensionality of feature vectors
d) Normalizes feature values between 0 and 1
In Spark ML, what is the purpose of a Pipeline? a) To distribute data processing across a cluster b) To chain multiple algorithms and data transformations into a single workflow c) To optimize data storage for machine learning d) To validate the quality of input data
When implementing cross-validation in Spark ML, which parameter determines the number of data splits?
a) k
b) numFolds
c) splits
d) crossValidations
Which of the following evaluation metrics is most appropriate for a highly imbalanced binary classification problem? a) Accuracy b) Area under ROC curve c) Area under PR curve d) RMSE
A data scientist is using RandomForestClassifier in Spark ML. Which parameter controls the maximum number of features considered for splitting at each tree node?
a) maxFeatures
b) featureSubsetStrategy
c) numFeatures
d) featureSplit
What is the main difference between Estimators and Transformers in Spark ML? a) Estimators are for classification problems; Transformers are for regression problems b) Estimators produce models when fit on data; Transformers directly transform data c) Estimators run on the driver node; Transformers run on executor nodes d) Estimators require labeled data; Transformers work with unlabeled data
How many models are trained in total when using CrossValidator with 5 folds and a parameter grid with 3 values for maxDepth and 2 values for numTrees? a) 5 b) 6 c) 30 d) 60
What is the purpose of the VectorAssembler in Spark ML?
a) To create a dense vector from sparse input
b) To combine multiple columns into a single vector column
c) To assemble a vector by sampling from input data
d) To create a distributed vector representation
When using Hyperopt with SparkTrials, what does the parallelism parameter control?
a) The number of models trained in parallel
b) The number of hyperparameter combinations evaluated concurrently
c) The maximum number of trials to run
d) The degree of distributed computing for each trial
What is the main advantage of using Pandas API on Spark compared to standard PySpark DataFrames? a) Pandas API on Spark is always faster b) Pandas API on Spark provides a familiar pandas-like syntax while working with distributed data c) Pandas API on Spark automatically optimizes memory usage d) Pandas API on Spark enables more complex machine learning algorithms
Which type of Pandas UDF would be most appropriate for applying a pre-trained scikit-learn model to make predictions on batches of data? a) Scalar Pandas UDF b) Grouped Map Pandas UDF c) Iterator Pandas UDF d) Pandas Function API
What is the primary distribution strategy used by Spark ML for training models? a) Model parallelism b) Data parallelism c) Hybrid parallelism d) Parameter server architecture
For tree-based ensemble methods in Spark ML, which of the following represents how parallelization occurs? a) Each tree is trained sequentially, with parallelism only at the node level b) All trees are built simultaneously in parallel c) In Random Forest, different trees can be built independently in parallel d) Trees are distributed across nodes with each node building part of every tree
What is the main advantage of using the Parquet format for storing large datasets in machine learning pipelines? a) It supports streaming data sources b) It has efficient columnar storage and compression c) It is human-readable for debugging d) It stores metadata about machine learning models
When handling a large dataset that exceeds memory capacity, which technique would NOT help in processing the data? a) Increasing the number of partitions b) Caching intermediate results c) Using a lower precision data type when appropriate d) Loading the entire dataset into a pandas DataFrame
What is data skew in the context of distributed computing? a) The imbalance between positive and negative examples in classification b) The uneven distribution of data across partitions c) The difference in scales between features d) The tendency of certain features to be more important than others
In production ML pipelines, what is the primary purpose of monitoring model performance over time? a) To compare different model architectures b) To detect data drift and concept drift c) To optimize cluster resource utilization d) To track training time for different models
When implementing automated retraining for a production model, which of the following is a best practice? a) Retrain the model daily regardless of performance b) Only retrain when model performance drops below a threshold c) Always replace the production model with the newly trained model d) Disable monitoring during the retraining process
Which of the following is NOT typically a component of an end-to-end ML workflow in Databricks? a) Data ingestion and validation b) Manual hyperparameter tuning for each model version c) Model registration and deployment d) Performance monitoring
What is the purpose of the DBFS (Databricks File System) in ML workflows?
a) To manage database connections for ML models
b) To provide a distributed file system for storing data and models
c) To optimize disk I/O for machine learning operations
d) To secure model artifacts with encryption
When using MLflow for experiment tracking, what is the correct way to start a nested run?
a) mlflow.create_nested_run()
b) mlflow.start_run(nested=True)
c) mlflow.child_run()
d) mlflow.start_run().nest()
Which of the following is a key benefit of using Feature Store for feature management? a) Automatic feature selection for models b) Consistent feature transformations between training and inference c) Built-in hyperparameter tuning d) Automatic documentation generation
A data scientist discovers that a classification model is suffering from overfitting. Which of the following approaches would NOT help address this issue? a) Increase the regularization parameter b) Collect more training data c) Reduce model complexity (e.g., decrease tree depth) d) Increase the learning rate
In Spark ML, what is the purpose of the StringIndexer transformer?
a) To encode categorical string columns as numeric indices
b) To create an index for faster string searches
c) To generate a mapping of string values to their frequencies
d) To compress string data for more efficient storage
Which MLflow component is used to package code, data, and environment for reproducible runs? a) MLflow Tracking b) MLflow Projects c) MLflow Models d) MLflow Registry
When using the CrossValidator in Spark ML, what determines the model that is finally returned?
a) The model with the lowest training error
b) The model with the lowest validation error averaged across all folds
c) The model trained on the entire dataset with the best hyperparameters
d) The ensemble of all models trained during cross-validation
What does the maxBins parameter control in tree-based models in Spark ML?
a) The maximum number of trees in the ensemble
b) The maximum number of features used in each tree
c) The maximum number of bins used for discretizing continuous features
d) The maximum number of leaf nodes in each tree
Which of the following is TRUE about Spark ML’s Pipeline and PipelineModel?
a) A Pipeline is a Transformer, while a PipelineModel is an Estimator
b) A Pipeline is an Estimator, while a PipelineModel is a Transformer
c) Both Pipeline and PipelineModel are Estimators
d) Both Pipeline and PipelineModel are Transformers
When using Feature Store, what is the purpose of a feature lookup? a) To search for available features in the feature registry b) To join training data with features from feature tables c) To look up feature documentation d) To preview feature values before creating a table
Which scaling method would be most appropriate when features contain many zero values and sparsity should be maintained? a) StandardScaler b) MinMaxScaler c) MaxAbsScaler d) Normalizer
In the context of distributed ML with Spark, what is the primary benefit of caching a DataFrame? a) It speeds up all operations regardless of the workflow b) It enables data sharing across different users c) It avoids recomputation of the same data transformation for iterative algorithms d) It guarantees data consistency in a multi-user environment
When using a Random Forest model in Spark ML, which parameter does NOT help control overfitting? a) maxDepth b) maxBins c) minInstancesPerNode d) numTrees
Which of the following is the correct sequence for a typical machine learning workflow in Databricks? a) Model selection → Data preparation → Feature engineering → Model evaluation b) Data preparation → Feature engineering → Model selection → Model evaluation c) Feature engineering → Data preparation → Model selection → Model evaluation d) Model evaluation → Data preparation → Feature engineering → Model selection
When using Hyperopt, which function is used to minimize the objective function?
a) minimize()
b) optimize()
c) fmin()
d) find_minimum()
What is the primary advantage of using Delta Lake format for ML datasets? a) It provides ACID transactions and time travel capabilities b) It is optimized specifically for image data c) It automatically selects the best features for models d) It compresses data better than any other format
Which of the following would be the most efficient approach for handling high-cardinality categorical features in Spark ML? a) One-hot encoding all categories b) Using feature hashing to reduce dimensionality c) Creating a binary column for each unique value d) Using the raw string values directly in tree-based models
fs.create_training_set()client.transition_model_version_stage()numFoldsfeatureSubsetStrategymlflow.start_run(nested=True)fmin()Based on the mock exam, let’s identify any areas that need further review:
Databricks Environment and Tools
Feature Engineering and Data Processing
Model Training and Evaluation
MLflow and Model Management
Production ML Workflows
Let’s focus on reviewing the areas where you might need additional clarification:
Library Management
# Best practice for persistent library installation:
# 1. Add from PyPI in the Libraries tab of the cluster
# 2. For custom scripts, use an init script:
# Example init script content (dbfs:/databricks/scripts/install_libs.sh):
#!/bin/bash
/databricks/python/bin/pip install some-package==1.0.0
# Reference this script in the cluster configuration
Repos Functionality
# Key Repos operations:
# 1. Clone a repository: UI → Repos → Add Repo
# 2. Create a new branch:
# - UI: Click branch dropdown → Create new branch
# 3. Commit changes:
# - UI: Click Git icon in notebook → Add commit message → Commit
# 4. Pull latest changes:
# - UI: Click Git icon → Pull
Scaling Methods Comparison
| Scaler | Transformation | Use Case | ||||
|---|---|---|---|---|---|---|
| StandardScaler | (x - mean) / std | Features with Gaussian distribution | ||||
| MinMaxScaler | (x - min) / (max - min) | When specific range is needed (0-1) | ||||
| MaxAbsScaler | x / max(abs(x)) | Sparse data, preserves zero values | ||||
| Normalizer | x / | x | When only the direction (not magnitude) matters |
High-Cardinality Features
# Option 1: Feature hashing
from pyspark.ml.feature import FeatureHasher
hasher = FeatureHasher(inputCols=["category"], outputCol="category_hashed", numFeatures=50)
# Option 2: Target encoding for supervised learning
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col
window_spec = Window.partitionBy("category")
df = df.withColumn("category_encoded", avg("target").over(window_spec))
Nested Runs
# Parent run with nested children
with mlflow.start_run(run_name="parent_run") as parent_run:
mlflow.log_param("parent_param", "value")
# First child run
with mlflow.start_run(run_name="child_run_1", nested=True) as child_run_1:
mlflow.log_param("child_1_param", "value")
mlflow.log_metric("child_1_metric", 0.95)
# Second child run
with mlflow.start_run(run_name="child_run_2", nested=True) as child_run_2:
mlflow.log_param("child_2_param", "value")
mlflow.log_metric("child_2_metric", 0.98)
Model Registry Workflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register model from a run
model_uri = f"runs:/{run_id}/model"
registered_model = mlflow.register_model(model_uri, "model_name")
# Transition stages
client.transition_model_version_stage(
name="model_name",
version=1,
stage="Staging"
)
# Add description
client.update_model_version(
name="model_name",
version=1,
description="Model trained with hyperparameter tuning"
)
# Set tags
client.set_model_version_tag(
name="model_name",
version=1,
key="dataset",
value="Q1_2023_data"
)
Drift Detection
from scipy.stats import ks_2samp
def detect_feature_drift(reference_data, current_data, feature_col, threshold=0.05):
"""Detect drift in a feature using Kolmogorov-Smirnov test."""
# Convert to pandas for statistical test
ref_values = reference_data.select(feature_col).toPandas()[feature_col]
cur_values = current_data.select(feature_col).toPandas()[feature_col]
# Run KS test
ks_result = ks_2samp(ref_values, cur_values)
# Check for significant drift
drift_detected = ks_result.pvalue < threshold
return {
"feature": feature_col,
"statistic": float(ks_result.statistic),
"p_value": float(ks_result.pvalue),
"drift_detected": drift_detected
}
Automated Retraining
def should_retrain_model(current_metrics, new_data, min_improvement=0.01):
"""Determine if model should be retrained based on validation performance."""
# Split new data for evaluation
train_data, eval_data = new_data.randomSplit([0.8, 0.2], seed=42)
# Train a new model on new data
new_model = train_model(train_data)
# Evaluate on hold-out data
new_metrics = evaluate_model(new_model, eval_data)
# Compare with current model performance
improvement = new_metrics["auc"] - current_metrics["auc"]
return {
"should_retrain": improvement > min_improvement,
"improvement": improvement,
"new_metrics": new_metrics
}
Now let’s take a second comprehensive mock exam that focuses more on the areas that need reinforcement.
Instructions:
A data scientist wants to use a custom Python package in their Databricks notebooks. The package is not available on PyPI but is hosted in a private Git repository. What is the best way to install this package on a cluster?
a) Include the Git repository URL in the notebook requirements
b) Use %pip install git+https://your-repo-url.git in a notebook
c) Upload the package wheel file and install it through the cluster’s Libraries tab
d) Clone the repository using Databricks Repos and install it locally
When using StringIndexer in Spark ML, what will happen if new categories appear in the test data that weren’t present in the training data? a) An error will be thrown during transformation b) New categories will be assigned the index value of the most frequent category c) New categories will be assigned an index of -1 d) The behavior depends on the handleInvalid parameter
In the Feature Store, what is the difference between offline and online feature tables? a) Offline tables are used for training, while online tables are used for inference b) Offline tables contain historical data, while online tables contain only recent data c) Offline tables are stored in Delta format, while online tables are stored in a low-latency database d) Offline tables are batch-updated, while online tables are stream-updated
A data scientist is creating cross-validation folds with imbalanced classification data. Which parameter in CrossValidator helps ensure each fold has a similar class distribution?
a) stratify
b) balanceFolds
c) classBalance
d) There is no built-in parameter for this purpose
When using MLflow to track experiments, which method can be used to record the libraries and dependencies used in the experiment?
a) mlflow.log_requirements()
b) mlflow.log_artifacts()
c) mlflow.set_environment()
d) mlflow.set_tags()
Which of the following is NOT a valid metricName for BinaryClassificationEvaluator in Spark ML? a) “areaUnderROC” b) “areaUnderPR” c) “accuracy” d) “f1Score”
In Spark ML pipelines, what is returned when you call Pipeline.fit() on a training dataset?
a) A transformed DataFrame
b) A PipelineModel
c) An array of fitted models
d) A Pipeline with updated parameters
When using AutoML in Databricks, which of the following is NOT automatically generated? a) Data exploration notebook b) Best model notebook c) Feature importance analysis d) Production deployment configuration
What is the purpose of the vectorCol parameter in Spark ML’s Word2Vec model?
a) To specify the column containing text documents
b) To specify the column containing arrays of words
c) To specify the name of the output column for word vectors
d) To specify a column with pre-trained word vectors
When registering a model with MLflow, what is the significance of the alias in Unity Catalog? a) It specifies the model’s stage (Development, Staging, Production) b) It provides a stable reference to a specific model version c) It determines the model’s visibility to different users d) It defines the location where the model is stored
Which of the following would NOT be considered a best practice when using the Feature Store? a) Creating feature documentation and tracking feature lineage b) Rebuilding feature transformation logic for online serving c) Grouping related features into feature tables d) Reusing features across multiple projects
A data scientist notices that a regression model is performing poorly on the validation set but very well on the training set. Which of the following would be the LEAST effective solution? a) Adding more data b) Increasing regularization c) Reducing model complexity d) Increasing the learning rate
Which sampling method in Spark is most appropriate for handling class imbalance in a large classification dataset?
a) df.sample(fraction=0.1, seed=42)
b) df.sample(withReplacement=True, fraction=0.5, seed=42)
c) df.sampleBy("class", fractions={"0": 0.2, "1": 1.0}, seed=42)
d) df.randomSplit([0.8, 0.2], seed=42)
In Spark ML, what is the purpose of the featureSubsetStrategy parameter in RandomForestClassifier?
a) To select a subset of the training data for each tree
b) To determine how many features to consider for splitting at each tree node
c) To choose which features to include in the final model
d) To specify how features should be normalized before training
When using Hyperopt for hyperparameter tuning, which search algorithm uses Tree-of-Parzen-Estimators (TPE) for Bayesian optimization?
a) hyperopt.rand.suggest
b) hyperopt.tpe.suggest
c) hyperopt.bayes.suggest
d) hyperopt.atpe.suggest
What is the main advantage of using Apache Arrow in Pandas UDFs? a) It provides automatic parallelization of pandas operations b) It enables efficient data transfer between JVM and Python processes c) It optimizes pandas memory usage d) It allows direct execution of SQL on pandas DataFrames
Which of the following statements about caching in Spark is FALSE?
a) Caching can improve performance for iterative algorithms
b) Cached DataFrames are automatically uncached when no longer needed
c) The cache() method is equivalent to persist(StorageLevel.MEMORY_AND_DISK)
d) Caching is particularly useful when the same data is accessed multiple times
When deploying a model for batch inference in Databricks, which approach provides the best scalability for very large datasets? a) Loading the model with MLflow and applying it to a pandas DataFrame b) Using Spark UDFs with the model to distribute prediction c) Creating a REST endpoint and calling it for each record d) Exporting predictions to a CSV file for each batch
What does the VectorIndexer transformer do in Spark ML? a) It creates an index mapping for feature vectors b) It automatically identifies categorical features in a vector and indexes them c) It maps vector indices to their original feature names d) It creates a sparse vector representation from dense vectors
A data scientist is developing a machine learning pipeline in Databricks and wants to ensure code quality and collaboration. Which feature would be MOST helpful for this purpose? a) Delta Lake time travel b) Databricks Repos c) Databricks SQL d) Databricks Connect
When using CrossValidator in Spark ML, how are the final parameters chosen for the model?
a) By selecting the parameters that result in the lowest training error
b) By selecting the parameters that result in the lowest average validation error across folds
c) By selecting the parameters that result in the lowest validation error on the last fold
d) By averaging the parameters from each fold
What is the advantage of using the Feature Store’s point-in-time lookup functionality? a) It enables real-time feature computation b) It prevents feature leakage by ensuring historical feature values match the training data timestamp c) It reduces the storage requirements for features d) It accelerates feature computation by using approximate values
Which of the following can help mitigate data skew issues in distributed processing? a) Using more executors with less memory each b) Disabling adaptive query execution c) Increasing the number of shuffle partitions d) Using StringIndexer instead of OneHotEncoder
In Spark ML’s Random Forest implementation, which parameter has the greatest impact on preventing overfitting?
a) numTrees
b) maxDepth
c) impurity
d) subsamplingRate
When implementing a production ML workflow, which of the following is the MOST important for detecting model degradation over time? a) Regular retraining on the latest data b) A/B testing of model versions c) Monitoring feature and prediction distributions d) Using versioned feature engineering code
Which Spark storage level would be most appropriate for a DataFrame that is reused multiple times but is too large to fit entirely in memory?
a) MEMORY_ONLY
b) MEMORY_AND_DISK
c) DISK_ONLY
d) OFF_HEAP
What is the purpose of the seed parameter in machine learning algorithms?
a) To initialize the model with pre-trained weights
b) To ensure reproducibility of results
c) To speed up convergence
d) To enable transfer learning
In MLflow, what is the difference between logging a model with mlflow.spark.log_model versus mlflow.sklearn.log_model?
a) The flavor of the model that is logged, affecting how it can be loaded and used
b) The storage location of the model artifacts
c) The metrics that are automatically tracked
d) The versioning system used for the model
Which of the following is TRUE about StringIndexer and IndexToString in Spark ML? a) StringIndexer converts numeric values to strings; IndexToString does the opposite b) StringIndexer and IndexToString are both Estimators c) StringIndexer assigns indices based on frequency; IndexToString can convert these indices back to original strings d) IndexToString must be fit on data before it can transform indexed values
When using Pandas API on Spark, what is the correct way to convert a PySpark DataFrame to a Pandas on Spark DataFrame?
a) ps.from_spark(spark_df)
b) spark_df.to_pandas_on_spark()
c) spark_df.pandas_api()
d) ps.DataFrame(spark_df)
What is the primary benefit of using Delta Lake for machine learning datasets? a) It automatically tunes model hyperparameters b) It provides ACID transactions and time travel capabilities c) It performs automatic feature selection d) It enables real-time model serving
Which Spark ML Evaluator would be appropriate for assessing the performance of a k-means clustering model? a) ClusteringEvaluator b) MulticlassClassificationEvaluator c) RegressionEvaluator d) BinaryClassificationEvaluator
In the context of Spark ML, what is feature hashing used for? a) Securing sensitive feature data b) Reducing dimensionality for high-cardinality categorical features c) Creating unique identifiers for each training example d) Detecting duplicate features in the dataset
When using a Pipeline in Spark ML, what happens if one of the stages in the pipeline fails during fit or transform? a) Only the successful stages are executed, and a warning is issued b) An exception is thrown, and the pipeline execution stops c) The pipeline tries alternative approaches automatically d) The failing stage is skipped, and execution continues
What is the purpose of setting the family parameter in LogisticRegression to “multinomial”?
a) To handle missing values in multinomial features
b) To perform multiclass classification instead of binary classification
c) To enable multi-task learning
d) To use a multinomial probability distribution for regularization
When implementing cross-validation in Spark ML, what is the relationship between the evaluator’s higher-is-better parameter and the objective function being optimized? a) They must be the same for correct optimization b) They should be opposite to ensure proper minimization c) The higher-is-better parameter determines the direction of optimization d) They are unrelated parameters
Which of the following parameters in GBTClassifier directly affects the trade-off between bias and variance?
a) maxIter
b) stepSize
c) maxDepth
d) impurity
A data scientist notices that their Random Forest model’s performance improves as they increase the number of trees up to 100, but then plateaus with no significant improvement when using 500 or 1000 trees. What could explain this behavior? a) The model is suffering from high bias b) The dataset is too small to benefit from more trees c) Random Forests have a natural performance ceiling due to tree correlation d) Feature selection should be performed before adding more trees
When using Hyperopt with MLflow, what is the correct way to log the results of each trial?
a) Use mlflow.log_metrics() inside the objective function
b) Hyperopt automatically logs to MLflow when initialized
c) Use hyperopt.log_to_mlflow() after optimization
d) MLflow cannot track individual Hyperopt trials
In Spark ML, what is the purpose of the handleInvalid parameter in OneHotEncoder?
a) To specify how to handle out-of-range indices
b) To define behavior for missing values
c) To control how new categories are processed
d) To manage invalid parameter configurations
A data scientist is using MLflow to track experiments and has multiple related runs they want to group together. What is the most appropriate way to organize these runs? a) Use the same run name for all related runs b) Use tags to categorize related runs c) Create a parent run with nested child runs d) Store all metrics in a single run with different metric names
Which of the following is NOT a valid approach for handling class imbalance in classification problems? a) Oversampling the minority class b) Undersampling the majority class c) Using class weights in the loss function d) Increasing the learning rate for the minority class
When using the Feature Store, what method is used to write data to a feature table?
a) fs.write_table()
b) fs.create_feature_data()
c) fs.update_features()
d) fs.insert_features()
What problem can arise when using standard cross-validation for time series data? a) High variance in fold performance b) Excessive memory usage c) Data leakage from future to past d) Slow convergence of the optimization algorithm
In a distributed ML context, which of the following statements about the broadcast function is TRUE?
a) It broadcasts large datasets across the cluster
b) It makes small, read-only data available to all nodes efficiently
c) It automatically distributes model training across nodes
d) It sends real-time updates to all worker nodes
mlflow.log_artifacts()df.sampleBy("class", fractions={"0": 0.2, "1": 1.0}, seed=42)hyperopt.tpe.suggestmaxDepthMEMORY_AND_DISKspark_df.pandas_api()maxDepthmlflow.log_metrics() inside the objective functionfs.write_table()A data scientist needs to implement cross-validation for a machine learning model in Spark ML. Which parameters of CrossValidator determine how many models will be trained? a) numFolds and estimator b) numFolds and estimatorParamMaps c) evaluator and estimator d) evaluator and estimatorParamMaps
What is the main purpose of the Feature Store in Databricks? a) To store training datasets for machine learning b) To centralize feature definitions and promote feature reuse c) To automate feature selection for models d) To generate synthetic features for improving model performance
When using MLflow to track experiments in Databricks, which of the following is NOT automatically captured? a) Parameters of the model b) Source code used in the run c) The name of the user running the experiment d) The feature importance of the trained model
A data scientist has created a feature engineering pipeline that includes StringIndexer, OneHotEncoder, and VectorAssembler stages. What is the correct order for applying these stages? a) OneHotEncoder → StringIndexer → VectorAssembler b) StringIndexer → OneHotEncoder → VectorAssembler c) VectorAssembler → StringIndexer → OneHotEncoder d) StringIndexer → VectorAssembler → OneHotEncoder
In Spark ML, what is the purpose of the Imputer transformer? a) To remove rows with missing values b) To replace missing values with mean, median, or mode c) To identify columns with a high percentage of missing values d) To create indicator variables for missing values
When using AutoML in Databricks, what can be found in the “All Trials” notebook? a) Detailed logs of all experiments run by AutoML b) Information about all models that were trained, including hyperparameters and performance c) Code to reproduce all data preprocessing steps d) Comparison of the best model against baseline models
Which of the following statements about Spark ML’s VectorAssembler is TRUE? a) It requires all input columns to be of the same type b) It can only combine numeric columns c) It converts a set of columns into a single vector column d) It automatically handles missing values in the input columns
In Databricks, which runtime should be selected for machine learning workloads that require deep learning libraries? a) Standard b) Machine Learning c) Genomics d) Photon
A data scientist wants to search across multiple hyperparameters for a Random Forest model using CrossValidator. How should they create the parameter grid? a) Using an array of parameter maps b) Using ParamGridBuilder c) Using a dictionary of parameter values d) Using the hyperparameter search function
What is the primary advantage of using Pandas API on Spark over standard PySpark DataFrames? a) Pandas API on Spark automatically optimizes memory usage b) Pandas API on Spark provides a familiar pandas-like syntax while working with distributed data c) Pandas API on Spark executes all operations in-memory for better performance d) Pandas API on Spark supports more machine learning algorithms
When registering a model in MLflow Model Registry, what does the stage “Archived” indicate? a) The model is no longer in use and should not be considered for deployment b) The model is backed up in cloud storage c) The model is in read-only mode d) The model is pending deletion
What is the purpose of the handleInvalid parameter in OneHotEncoder?
a) To specify how to handle new categories not seen during fitting
b) To define behavior for missing values
c) To handle invalid encoding configurations
d) To manage out-of-bounds array indices
Which of the following is NOT a valid strategy for the Imputer transformer in Spark ML? a) “mean” b) “median” c) “mode” d) “zero”
In the Feature Store, what is the primary purpose of the primary_keys parameter when creating a feature table?
a) To restrict access to certain features
b) To uniquely identify records in the feature table
c) To define the most important features
d) To specify which features can be used for lookups
Which of these evaluation metrics is NOT suitable for assessing the performance of a regression model? a) Mean Squared Error (MSE) b) Root Mean Squared Error (RMSE) c) R² d) Area Under ROC Curve (AUC)
A data scientist wants to train models on multiple hyperparameter combinations in parallel using Hyperopt. Which object should they use? a) Trials b) SparkTrials c) ParallelTrials d) DistributedTrials
What is the correct way to register a model with MLflow after training?
a) mlflow.register_model(model, "model_name")
b) mlflow.models.register(model, "model_name")
c) mlflow.sklearn.register_model(model, "model_name")
d) mlflow.register_model("runs:/<run_id>/model", "model_name")
When using RandomForestClassifier in Spark ML, what does increasing the numTrees parameter typically do?
a) Increases model complexity and risk of overfitting
b) Reduces variance but may increase training time
c) Increases bias and reduces variance
d) Reduces both bias and variance
What is the main difference between using persist() and cache() on a Spark DataFrame?
a) persist() allows specifying a storage level, while cache() uses the default level
b) cache() stores data in memory, while persist() stores on disk
c) persist() is permanent, while cache() is temporary
d) cache() works with RDDs, while persist() works with DataFrames
Which of the following is a key benefit of using Delta Lake format for machine learning datasets? a) Automatic feature extraction b) Built-in model versioning c) ACID transactions and time travel capabilities d) Faster model training algorithms
In the context of Pandas UDFs, what does the Apache Arrow framework provide? a) Faster pandas operations through GPU acceleration b) Efficient data transfer between JVM and Python processes c) Automatic parallelization of pandas code d) Native integration with scikit-learn models
When using StringIndexer in Spark ML, what is the default order for assigning indices to categories? a) Alphabetical order b) Order of appearance in the dataset c) Frequency-based (most frequent gets index 0) d) Random assignment
Which Spark ML component can be used to extract features from text data? a) Word2Vec b) StringIndexer c) VectorSlicer d) FeatureHasher
A data scientist is using CrossValidator for hyperparameter tuning. How is the best model determined? a) The model with the highest accuracy on the training data b) The model with the best performance metric on the validation folds c) The model with the lowest loss on the validation folds d) The model with the most stable performance across all folds
What happens when you call transform() on a StringIndexer that has already been fit?
a) It re-fits the indexer on the new data and then transforms it
b) It applies the existing category-to-index mapping to the new data
c) It throws an error if new categories are encountered
d) It automatically updates the mapping to include new categories
In Databricks, what is the purpose of the Runtime for Machine Learning? a) It provides a distributed computing environment optimized for ML workloads b) It includes pre-installed libraries and optimizations for ML c) It offers automatic model selection and tuning d) It provides specialized hardware for deep learning
When using Feature Store, what method is used to retrieve features for model training?
a) fs.get_features()
b) fs.create_training_set()
c) fs.fetch_features()
d) fs.lookup_features()
Which of the following is NOT a typical stage in a machine learning pipeline in Spark ML? a) Data preprocessing and cleaning b) Feature extraction and transformation c) Model training and tuning d) Model deployment and serving
In a Spark ML Pipeline, what is returned when you call the transform() method on a fitted PipelineModel?
a) A new Pipeline with updated parameters
b) A new PipelineModel with refined stages
c) A DataFrame with transformed data
d) A summary of transformation statistics
When using AutoML in Databricks, which of the following problem types is supported? a) Reinforcement learning b) Semi-supervised learning c) Classification d) Unsupervised learning
A data scientist notices that their Random Forest model has high variance. Which of the following would MOST likely help reduce this issue? a) Increasing the maximum depth of the trees b) Reducing the number of trees c) Decreasing the maximum depth of the trees d) Increasing the minimum samples per leaf
What is the primary purpose of the MLflow Model Registry? a) To track experiment metrics during training b) To manage model versions and transitions between lifecycle stages c) To automate model deployment to production d) To optimize model performance
Which of the following is TRUE about caching in Spark?
a) Cached data is automatically persisted across sessions
b) Caching is always beneficial regardless of the workflow
c) Cached DataFrames must be explicitly uncached with unpersist()
d) Caching automatically distributes data optimally across the cluster
In the context of machine learning with Spark, what is data skew? a) A dataset with an imbalanced target variable b) When certain feature values are much more common than others c) When data is unevenly distributed across partitions d) When features have different scales
What does the vectorSize parameter control in Word2Vec?
a) The maximum number of words to process
b) The size of the sliding window
c) The dimensionality of the word embeddings
d) The minimum word frequency threshold
A data scientist wants to use cross-validation to tune hyperparameters for a GBTClassifier. What would be the most appropriate way to evaluate model performance for a binary classification problem? a) RegressionEvaluator with “rmse” metric b) BinaryClassificationEvaluator with “areaUnderROC” metric c) MulticlassClassificationEvaluator with “accuracy” metric d) ClusteringEvaluator with “silhouette” metric
In MLflow, what is the purpose of autologging? a) To automatically log all experiment parameters without manual tracking b) To create automated reports of experiment results c) To schedule periodic retraining of models d) To generate documentation for ML projects
What is the best way to handle categorical variables with high cardinality (many unique values) in Spark ML? a) Use OneHotEncoder for all categories b) Use StringIndexer without OneHotEncoder c) Use feature hashing to reduce dimensionality d) Drop these variables from the model
Which of the following statements about Pandas UDFs is FALSE? a) They use Apache Arrow for efficient data transfer b) They can be used to apply models in parallel c) They require the data to fit in the driver’s memory d) They support vectorized operations
When using Hyperopt for hyperparameter tuning, what does the algo parameter specify?
a) The type of model to tune
b) The search algorithm to use (e.g., random, TPE)
c) The objective function to optimize
d) The evaluation metric to use
In a production ML system, what is the primary purpose of model monitoring? a) To track compute resource usage b) To detect data drift and performance degradation c) To optimize model training speed d) To ensure user access control
What is the correct order of operations when using MLflow to manage models in production? a) Train → Log → Register → Deploy b) Register → Train → Log → Deploy c) Log → Train → Register → Deploy d) Train → Register → Log → Deploy
Which component in Spark ML automatically identifies categorical features in a feature vector and converts them to one-hot encoded features? a) OneHotEncoder b) VectorIndexer c) StringIndexer d) Bucketizer
A data scientist wants to build a machine learning pipeline that includes data validation checks. What would be the most effective approach in Spark ML? a) Use the built-in Validator class b) Add custom Transformers that check data quality c) Use SQL queries to filter invalid data before the pipeline d) Add validation logic to the pipeline’s fit method
When using Hyperopt with SparkTrials for distributed hyperparameter tuning, what determines the maximum parallelism that can be achieved? a) The number of available executor cores b) The parallelism parameter specified in SparkTrials c) The number of hyperparameter combinations d) The cluster’s driver node memory
mlflow.register_model("runs:/<run_id>/model", "model_name")persist() allows specifying a storage level, while cache() uses the default levelfs.create_training_set()unpersist()Based on your performance across the practice exams, let’s assess your readiness for the certification exam.
Strengths:
Areas for Improvement:
create_table(), create_training_set(), write_table()start_run(), log_param(), log_metric(), register_model()Pipeline, CrossValidator, ParamGridBuilderfeaturesCol, labelCol, predictionColhandleInvalid options (“keep”, “skip”, “error”)areaUnderROC, areaUnderPR, rmse, r2Before the Exam:
During the Exam:
Time Management: