Back to Home →

Databricks Generative AI Engineer Associate Certification

Table of contents

Preparation Plan

Foundations & Design Fundamentals for Generative AI

  • Introduction to Generative AI and Large Language Models (LLMs)
  • Core Concepts in LLM Technology
  • Key LLM Architectures and Capabilities
  • Databricks AI Tooling Overview
  • Prompt Engineering Fundamentals
  • Designing Applications
  • Model Selection Criteria
  • Quiz
  • Review of Key Concepts
  • Reinforcement of Weak Areas
  • Business Problem: Financial Document Analysis System

Data Preparation & Application Development

  • Chunking Strategies for Document Processing
  • Source Document Filtering Techniques
  • Content Extraction from Various Formats
  • Working with Delta Lake and Unity Catalog
  • LangChain and Tools Integration
  • Retrieval-Augmented Generation (RAG) Implementation
  • Prompt Templates and Guardrails
  • Model Selection Based on Application Needs
  • Hands-on RAG Application Exercise
  • Quiz on Data Preparation and Application Development
  • Preparation Task: Deploying a RAG Application in Databricks

Deployment, Governance, and First Mock Exam

  • Model Serving Fundamentals
  • PyFunc Models for LLM Chains
  • Vector Search Indexing and Querying
  • MLflow Model Registration
  • Text Masking and Guardrails
  • Legal and Licensing Considerations
  • Content Moderation Techniques
  • Security Aspects of GenAI Applications
  • 90-Minute Mini Mock Exam (25 Questions)
  • Review and Knowledge Gap Identification
  • Preparation Task: Addressing Challenges in LLM Evaluation and Monitoring

Evaluation, Monitoring, and End-to-End Implementation

  • Metrics for LLM Evaluation
  • Monitoring Deployed Applications
  • MLflow for Tracking Performance
  • Cost Control Strategies
  • Complex RAG Application Implementation
  • 90-Minute Full Mock Exam (45 Questions)

Knowledge Base

Foundations & Design Fundamentals for Generative AI

Introduction to Generative AI and Large Language Models (LLMs)

Generative AI refers to artificial intelligence systems that can create new content rather than just analyzing existing content. Large Language Models (LLMs) are a specific type of generative AI trained on vast amounts of text data to understand and generate human language.

Key Characteristics of LLMs:

  1. Pre-training and Fine-tuning: LLMs undergo initial pre-training on broad data, followed by fine-tuning on specific tasks.

  2. Foundation Models: These are general-purpose models trained on diverse data that can be adapted to various tasks.

  3. Emergent Abilities: LLMs often exhibit capabilities that weren’t explicitly programmed but emerge from scale and training.

  4. Contextual Understanding: They can interpret prompts within context and maintain conversation coherence.


Core Concepts in LLM Technology

1. Tokens and Tokenization

Tokens are the basic units that LLMs process, typically representing word parts:

  • A word might be broken into multiple tokens (e.g., “understanding” → “under” + “standing”)
  • Most models have token limits (context windows)
  • Tokens directly impact processing time and cost
  • Example: The sentence “Databricks provides tools for generative AI” might be tokenized into [“Data”, “bricks”, “provides”, “tools”, “for”, “generative”, “AI”]

2. Embeddings

Embeddings are numerical vector representations of text that capture semantic meaning:

  • Words or phrases with similar meanings have similar embedding vectors
  • Typical dimensions range from 384 to 4096, depending on the model
  • Enable semantic search and similarity comparisons
  • Databricks Vector Search relies on embeddings for efficient retrieval

3. Inference vs. Training

  • Inference: Using a trained model to generate responses (what most applications do)
  • Training: The resource-intensive process of creating or updating a model
  • Fine-tuning: Adapting a pre-trained model for specific tasks or domains

4. Prompting

Prompting is the art of instructing an LLM to produce desired outputs:

  • Zero-shot: Asking the model to perform a task without examples
  • Few-shot: Including examples in the prompt to guide the model
  • Chain-of-thought: Guiding the model to reason step-by-step
  • Prompt templates: Standardized formats for consistent interaction

Key LLM Architectures and Capabilities

1. Model Architectures

  • Transformer-based models: The foundation of modern LLMs (BERT, GPT, etc.)
  • Encoder-only models: Best for understanding (BERT, RoBERTa)
  • Decoder-only models: Best for generation (GPT family)
  • Encoder-decoder models: Good for transformation tasks (T5, BART)

2. Model Sizes and Capabilities

Models vary in size and capabilities:

  • Small models (1-3B parameters): Lower cost, faster, limited capabilities
  • Medium models (7-13B parameters): Balance of performance and cost
  • Large models (70B+ parameters): Higher accuracy, reasoning ability, costly

3. Common LLM Tasks

  • Text generation (creative writing, code, etc.)
  • Summarization
  • Question answering
  • Classification
  • Information extraction
  • Conversation
  • Translation

Databricks AI Tooling Overview

1. Foundation Model APIs

Databricks provides API access to leading foundation models:

  • Claude models from Anthropic
  • Llama models from Meta
  • MPT models from MosaicML
  • Command models from Cohere

2. Vector Search

A managed service for semantic search:

  • Creates and manages embedding indexes
  • Enables similarity searches based on semantic meaning
  • Integrates with Delta Lake tables
  • Critical component for RAG applications

3. Model Serving

Scalable deployment for AI models:

  • Real-time serving endpoints
  • Managed infrastructure scaling
  • Support for custom containers
  • Performance monitoring

4. MLflow Integration

For model management and tracking:

  • Experiment tracking for LLM development
  • Model registry for versioning
  • Deployment management
  • Monitoring capabilities

Prompt Engineering Fundamentals

1. Prompt Structure

Effective prompts typically include:

  • Clear instructions about the task
  • Context or background information
  • Examples of desired outputs (few-shot learning)
  • Format specification for the response
  • Constraints or limitations

2. Prompt Optimization Techniques

  • Precision: Use specific, unambiguous language
  • Atomicity: Break complex tasks into simpler components
  • Consistency: Standardize prompt formats across similar tasks
  • Iteration: Refine prompts based on results

3. Common Prompt Patterns

  • System/User distinction: Separating behavioral instructions from queries
  • Step-by-step reasoning: Guiding the model through logical steps
  • Output templates: Specifying exact response formats
  • Chain prompting: Using outputs from one prompt as inputs to another

Designing Applications

1. Translating Business Requirements into AI Solutions

  • Identify the core business problem to solve
  • Determine required inputs and outputs
  • Map requirements to technical capabilities
  • Consider constraints (cost, latency, accuracy)

2. Model Task Selection

Different tasks require different approaches:

  • Classification: Categorizing inputs (sentiment analysis, topic identification)
  • Generation: Creating new content (writing, summarization)
  • Extraction: Identifying specific information in text
  • Transformation: Converting between formats or styles

3. Chain Component Selection

Chains combine multiple operations:

  • Retrieval components: Fetch relevant information
  • Processing components: Transform or filter data
  • Generation components: Create responses
  • Evaluation components: Check quality or compliance

4. Multi-stage Reasoning

For complex tasks requiring multiple steps:

  • Break problems into sequential sub-tasks
  • Design chain components to handle each stage
  • Ensure data flows correctly between steps
  • Include verification at critical points

Model Selection Criteria

1. Performance Factors

  • Context length: How much text the model can process at once
  • Inference speed: How quickly it generates responses
  • Parameter count: Generally correlates with capabilities
  • Specialization: Domain-specific vs. general-purpose

2. Practical Considerations

  • Cost: Larger models are more expensive to run
  • Latency requirements: User-facing applications need speed
  • Deployment constraints: Available resources and infrastructure
  • Governance requirements: Security, privacy, and compliance needs

3. Matching Models to Applications

  • Document summarization: Models with long context windows
  • Customer service: Fast models with strong dialog capabilities
  • Technical documentation: Models with domain expertise
  • Creative content: Models with strong generative abilities

Quiz

Question 1: What is the primary advantage of using Vector Search in a RAG application?

  • A) It reduces the token count of user prompts
  • B) It enables semantic similarity searches rather than just keyword matching
  • C) It improves the accuracy of model fine-tuning
  • D) It reduces the need for prompt engineering

Question 2: When designing a prompt template for an LLM application that needs to summarize legal documents, which of these is most important to include?

  • A) Examples of previously summarized documents
  • B) Instructions to use casual language
  • C) Formatting requirements for the output summary
  • D) Authentication credentials for the user

Question 3: Which of the following model architectures is best suited for text generation tasks?

  • A) Encoder-only models
  • B) Decoder-only models
  • C) CNN-based models
  • D) RNN-based models

Question 4: In the context of LLMs, what does “context window” refer to?

  • A) The user interface surrounding the input field
  • B) The maximum number of tokens a model can process at once
  • C) The timeframe in which a model was trained
  • D) The delay between input and response

Question 5: Which chain component would you select to ensure an LLM provides factual answers based on a specific document set?

  • A) Fine-tuning component
  • B) Retrieval component
  • C) Transformer component
  • D) Output formatting component

Answer Key:

  1. B
  2. C
  3. B
  4. B
  5. B

Review of Key Concepts

Foundation Concepts:

  • Tokens are the basic units of text that LLMs process
  • Embeddings represent semantic meaning in vector space
  • Context windows limit how much text a model can process at once
  • Model size generally correlates with capabilities but also with cost

LLM Capabilities:

  • Different architectures excel at different tasks
  • Model selection should balance performance, cost, and requirements
  • Foundation models can be adapted to specific domains through fine-tuning or prompting

Designing Applications:

  • Effective prompts include clear instructions, context, and output specifications
  • Complex applications often require chaining multiple components
  • Business requirements must be translated into specific model tasks
  • Multi-stage reasoning approaches break complex problems into manageable steps

Databricks-Specific Tools:

  • Vector Search enables semantic retrieval for RAG applications
  • Model Serving provides scalable deployment options
  • MLflow helps track experiments and manage models
  • Foundation Model APIs offer access to leading models

Reinforcement of Weak Areas

Based on your quiz performance, we can focus additional time on:

  • Understanding the relationships between model architectures and tasks
  • Designing effective prompt templates for specific applications
  • Selecting appropriate chain components for different requirements
  • Mapping business needs to technical solutions

Business Problem: Financial Document Analysis System

Company: InvestWise Financial Services

Challenge: InvestWise receives thousands of customer queries daily about their investment portfolios and financial documents. Currently, support agents spend 65% of their time retrieving information from lengthy prospectuses, quarterly reports, and account statements before they can answer customer questions. This creates long wait times for customers and increases operational costs.

Business Requirements:

  1. Create an AI-powered system that can analyze customer financial documents and answer specific questions about them.
  2. Ensure all responses are factually accurate and only contain information from the provided documents.
  3. Support both simple questions (“What is my current balance?”) and complex queries (“How has my portfolio allocation changed since last quarter?”).
  4. Maintain compliance with financial regulations by providing source citations for all information.
  5. Reduce average response time from 15 minutes to under 2 minutes.

Solution Approach

1. Translating Business Requirements into AI Solution

This is a document understanding and question-answering problem that requires:

  • Processing various financial document formats
  • Retrieving relevant information based on specific queries
  • Generating accurate, compliant responses with citations
  • Handling both simple and complex financial questions

The primary model task is question answering with retrieval augmentation to ensure accuracy and compliance.

2. Model Task Selection

We need to implement several model tasks:

  • Document processing: Extract structured information from various financial documents
  • Retrieval: Find relevant information from the processed documents
  • Question answering: Generate responses based on retrieved information
  • Verification: Ensure responses contain citations and comply with regulations
3. Chain Component Selection

Our solution will require the following chain components:

a) Document Processing Chain:

  • Document loader components for different file formats (PDF, CSV, Excel)
  • Text extraction component for converting documents to text
  • Chunking component to break documents into manageable sections
  • Embedding component to create vector representations of chunks
  • Vector database storage component for efficient retrieval

b) Question Answering Chain:

  • Query understanding component to interpret the customer’s question
  • Retrieval component to find relevant document chunks
  • Context assembly component to prepare retrieved information
  • Response generation component to create the answer
  • Citation component to track information sources
  • Compliance verification component to ensure regulatory requirements are met
4. Multi-stage Reasoning Implementation

The solution will process requests through sequential stages:

Stage 1: Document Processing (done in advance)

  • Ingest financial documents into the system
  • Convert to text and clean formatting issues
  • Split into chunks of appropriate size (500-1000 tokens)
  • Generate embeddings for each chunk
  • Store in Databricks Vector Search for retrieval

Stage 2: Query Processing (real-time)

  • Analyze incoming customer question
  • Convert question to embedding for similarity search
  • Retrieve relevant document chunks from Vector Search
  • Rank and select the most pertinent information

Stage 3: Response Generation (real-time)

  • Create a prompt that combines:
    • The customer’s question
    • Retrieved document chunks
    • Instructions to answer based only on provided information
    • Requirements for citation format
  • Generate response using an appropriate LLM
  • Verify all information is cited and accurate

Stage 4: Compliance Check (real-time)

  • Validate that the response meets financial regulations
  • Ensure all claims are supported by citations
  • Check for any potentially misleading statements
  • Format the response according to company standards
5. Model Selection Criteria

The solution requires:

  • A model with strong financial domain understanding
  • Ability to process medium-length contexts (at least 4K tokens)
  • High accuracy to ensure compliance with regulations
  • Reasonable inference speed to meet the 2-minute response target

Based on these requirements, a model like Anthropic’s Claude (via Databricks Foundation Model API) would be appropriate, as it:

  • Has sufficient context window to process multiple document chunks
  • Demonstrates strong performance on financial text
  • Can follow detailed instructions for compliance requirements
  • Provides good reasoning capabilities for complex financial questions
6. Databricks Integration Approach

The solution would be implemented using Databricks platform components:

  • Delta Lake tables to store document metadata and processing status
  • Vector Search to enable semantic retrieval of document chunks
  • MLflow to track performance metrics of the question-answering system
  • Model Serving to deploy the LLM chain as an API endpoint
  • Unity Catalog to manage access controls for financial documents

Data Preparation & Application Development

Chunking Strategies for Document Processing

Document chunking divides text into manageable segments that can be processed effectively by LLMs and retrieval systems. The optimal chunking strategy depends on your document structure and application requirements.

Types of Chunking Approaches:

  1. Fixed-Size Chunking
    • Divides documents into segments with a consistent token count
    • Provides predictable processing requirements
    • Example: Splitting documents into 512-token chunks for embedding models with 512-token context windows
  2. Semantic Chunking
    • Preserves the natural boundaries of content (paragraphs, sections)
    • Maintains context coherence for better retrieval quality
    • Example: Keeping all text in a “Financial Results” section together
  3. Hierarchical Chunking
    • Creates multiple levels of chunks (document → section → paragraph)
    • Enables different granularity depending on query specificity
    • Example: Storing both chapter-level chunks and paragraph-level chunks

Key Chunking Parameters:

  • Chunk Size: The target size of each chunk (typically measured in tokens)
    • Smaller chunks (128-256 tokens): More precise retrieval, less context
    • Medium chunks (512-1024 tokens): Balance of precision and context
    • Larger chunks (1500+ tokens): More context, less precise retrieval
  • Chunk Overlap: The amount of text shared between adjacent chunks
    • Higher overlap (25-50%): Better context preservation, more storage required
    • Lower overlap (0-10%): Storage efficient, may lose cross-chunk context

Selecting the Optimal Chunking Strategy:

When determining your chunking approach, consider:

  1. Document Structure: Highly structured documents benefit from semantic chunking along natural boundaries.

  2. Query Patterns: For specific fact-based queries, smaller chunks work better. For questions requiring synthesis across content, larger chunks are preferable.

  3. Model Constraints: Your embedding model’s context window limits maximum chunk size.

  4. Storage Limitations: Higher overlap and smaller chunks increase storage requirements.

Implementation Example:

def chunk_document(document, chunk_size=500, chunk_overlap=100):
    """
    Chunk a document into segments of specified size with overlap
    """
    tokens = tokenize(document)
    chunks = []
    
    for i in range(0, len(tokens), chunk_size - chunk_overlap):
        chunk = tokens[i:i + chunk_size]
        if len(chunk) < 50:  # Skip very small chunks
            continue
        chunks.append(detokenize(chunk))
    
    return chunks

Source Document Filtering Techniques

Filtering extraneous content from source documents is critical for improving retrieval quality and reducing noise in LLM responses.

Common Filtering Approaches:

  1. Structural Filtering
    • Removing headers, footers, page numbers
    • Eliminating navigation elements, URLs, and references
    • Extracting useful content from templates
  2. Content-Based Filtering
    • Removing boilerplate text (legal disclaimers, standard notices)
    • Filtering out low-information sections (acknowledgments, copyright notices)
    • Eliminating duplicative content
  3. Quality-Based Filtering
    • Removing malformed or corrupted text
    • Filtering content that fails readability thresholds
    • Eliminating content with excessive special characters or formatting artifacts

Implementation Techniques:

  1. Regular Expression Patterns
    # Remove page numbers and headers
    cleaned_text = re.sub(r'Page \d+ of \d+', '', document_text)
    
  2. Rule-Based Cleaning
    def clean_document(text):
        # Remove common boilerplate
        for boilerplate in COMMON_BOILERPLATES:
            text = text.replace(boilerplate, "")
        return text
    
  3. ML-Based Cleaning
    • Using classifiers to identify important vs. non-important sections
    • Leveraging existing models to score content quality

Content Extraction from Various Formats

Different document formats require specialized approaches for content extraction.

Common Document Formats and Extraction Methods:

  1. PDF Documents
    • Text extraction using PyPDF2, pdfplumber, or pdf2image + OCR
    • Structure preservation with specialized PDF parsers
    • Handling of multi-column layouts and tables
  2. Office Documents
    • Extracting from DOCX using python-docx
    • Processing Excel with pandas or openpyxl
    • Converting PowerPoint using python-pptx
  3. Images and Scanned Documents
    • OCR processing with Tesseract or cloud-based OCR
    • Layout analysis for complex documents
    • Post-OCR correction for improved quality
  4. Web Content
    • HTML parsing with BeautifulSoup or similar tools
    • Handling dynamic content with browsing automation
    • Processing structured data from APIs

Specialized Python Packages for Extraction:

  • PyTesseract: For OCR from images
  • Beautiful Soup: For structured HTML/XML parsing
  • Unstructured: A comprehensive package for various document types
  • LangChain Document Loaders: Pre-built extractors for common formats

Implementation Considerations:

  • Metadata Preservation: Retaining document metadata (title, author, date)
  • Structure Retention: Preserving logical document structure when possible
  • Error Handling: Graceful handling of corrupt files or extraction failures

Working with Delta Lake and Unity Catalog

Databricks provides specialized tools for managing document data within the lakehouse architecture.

Delta Lake for Document Storage:

  1. Document Metadata Tables
    • Tracking document sources, versions, and processing status
    • Enabling efficient filtering and selection of documents
  2. Chunk Storage Patterns
    • Storing document chunks with metadata and embeddings
    • Optimizing for retrieval performance
  3. Delta Lake Features for Document Processing
    • Time travel for version history
    • Schema evolution for flexible metadata
    • ACID transactions for reliable updates

Example Delta Table Schema for Document Chunks:

CREATE TABLE gold.document_chunks (
    chunk_id STRING,
    document_id STRING,
    chunk_text STRING,
    embedding ARRAY<FLOAT>,
    chunk_number INT,
    token_count INT,
    source_file STRING,
    last_updated TIMESTAMP
)
USING DELTA
PARTITIONED BY (document_id);

Unity Catalog Integration:

  1. Data Governance for Document Collections
    • Managing access controls at the collection level
    • Implementing column-level security for sensitive content
  2. Metadata Management
    • Centralized catalog for document collections
    • Lineage tracking for data processing
  3. Security Considerations
    • Access controls for source documents
    • Permission management for generated outputs

Optimizing for Vector Search:

# Creating a Vector Search index on document chunks
spark.sql("""
CREATE OR REPLACE INDEX vector_index
ON gold.document_chunks
USING VECTOR
ON embedding
OPTIONS (
    similarity_function = 'cosine'
)
""")

LangChain and Tools Integration

LangChain provides a framework for developing applications with LLMs, offering components for prompt management, retrieval, and chaining operations.

Key LangChain Components:

  1. Document Loaders
    • Pre-built connectors for various data sources
    • Standardized document representation
  2. Text Splitters
    • Implementations of different chunking strategies
    • Configuration for size and overlap parameters
  3. Embeddings
    • Integration with embedding models
    • Vector representation management
  4. Vector Stores
    • Connectors to vector databases
    • Query interfaces for similarity search
  5. Chains
    • Composition of multiple components
    • Sequential and branching logic

Integration with Databricks:

  1. LangChain with Databricks Vector Search
    from langchain.vectorstores import DatabricksVectorSearch
       
    # Connect to Databricks Vector Search
    vector_store = DatabricksVectorSearch(
        embedding_function=embedding_model,
        catalog="main",
        schema="default",
        table="document_chunks",
        vector_column="embedding",
        text_column="chunk_text"
    )
    
  2. Using LangChain with Databricks Model Serving
    from langchain.chat_models import ChatDatabricks
       
    # Connect to a served model endpoint
    chat_model = ChatDatabricks(
        endpoint_name="llm-endpoint",
        databricks_token=dbutils.secrets.get("scope", "key")
    )
    

Retrieval-Augmented Generation (RAG) Implementation

RAG combines information retrieval with text generation to produce responses grounded in specific knowledge sources.

RAG Architecture Components:

  1. Query Processing
    • Understanding and reformulating user queries
    • Generating embeddings for retrieval
  2. Retrieval System
    • Finding relevant documents or chunks
    • Ranking and filtering retrieved content
  3. Context Assembly
    • Selecting and ordering retrieved content
    • Formatting for inclusion in prompts
  4. Response Generation
    • Crafting prompts with retrieved context
    • Generating coherent and accurate responses

Advanced RAG Techniques:

  1. Hybrid Retrieval
    • Combining keyword and semantic search
    • Ensembling multiple retrieval approaches
  2. Re-ranking
    • Two-stage retrieval with initial broad search and subsequent re-ranking
    • Using cross-encoders for improved relevance
  3. Query Decomposition
    • Breaking complex queries into sub-questions
    • Aggregating information from multiple retrievals
  4. Iterative Retrieval
    • Progressive refinement of search based on initial results
    • Conversational retrieval across multiple turns

Implementation Example:

def rag_response(question, vector_store, llm):
    # Retrieve relevant context
    relevant_docs = vector_store.similarity_search(question, k=5)
    
    # Format context for the LLM
    context = "\n\n".join([doc.page_content for doc in relevant_docs])
    
    # Create prompt with context
    prompt = f"""
    Answer the following question based ONLY on the provided context.
    If you cannot answer from the context, say "I don't have enough information."
    
    Context:
    {context}
    
    Question: {question}
    
    Answer:
    """
    
    # Generate response
    response = llm.generate(prompt)
    
    return response

Prompt Templates and Guardrails

Effective prompt design is crucial for reliable LLM application behavior, while guardrails provide safety and consistency.

Prompt Template Components:

  1. System Instructions
    • Establishing model behavior and constraints
    • Defining response format and style
  2. Context Inclusion
    • Formatting retrieved information
    • Controlling context presentation
  3. Task Specification
    • Clear description of expected processing
    • Examples of desired outputs
  4. Output Formatting
    • Structured response requirements
    • Consistency markers for parsing

Databricks-Compatible Prompt Template:

from langchain.prompts import ChatPromptTemplate

prompt_template = ChatPromptTemplate.from_messages([
    ("system", """
    You are a helpful assistant that answers questions based only on the provided context.
    Format your answers using markdown for readability.
    Always cite your sources using [doc_id] notation.
    """),
    ("user", """
    Context:
    {context}
    
    Question: {question}
    """)
])

Implementing Guardrails:

  1. Input Filtering
    • Detecting and rejecting inappropriate queries
    • Sanitizing inputs before processing
  2. Output Moderation
    • Checking responses for policy violations
    • Filtering sensitive or harmful content
  3. Fact Verification
    • Validating claims against source material
    • Flagging potential hallucinations
  4. Confidence Control
    • Including confidence assessments
    • Providing multiple response options for low-confidence answers

Example Guardrail Implementation:

def apply_guardrails(user_query, llm_response, source_docs):
    # Input filtering
    if contains_prohibited_content(user_query):
        return "I cannot respond to this query as it violates usage policies."
    
    # Output moderation
    if contains_harmful_content(llm_response):
        return "I've generated a response that may not be appropriate. Please rephrase your query."
    
    # Fact verification
    if not verify_against_sources(llm_response, source_docs):
        return "I cannot verify all information in my response against reliable sources. Please treat with caution."
    
    return llm_response

Model Selection Based on Application Needs

Different applications require different model capabilities, and selecting the appropriate model is crucial for performance and cost-efficiency.

Model Selection Criteria:

  1. Task Requirements
    • Generation quality and creativity
    • Factual accuracy and knowledge
    • Reasoning capabilities
    • Domain-specific expertise
  2. Operational Constraints
    • Latency requirements
    • Cost considerations
    • Batch vs. real-time processing
    • Security and privacy needs
  3. Technical Specifications
    • Context window size
    • Token processing speed
    • Parameter count
    • Fine-tuning capabilities

Databricks Foundation Model Selection:

Model Type Best Use Cases Considerations
Claude Models Long-form content, nuanced reasoning, safety-critical applications Higher cost, excellent instruction following
Llama Models General text generation, code generation, open-source flexibility Various sizes for performance/cost tradeoffs
Command Models Structured outputs, factual responses, controllable generation Strong instruction following, good for RAG
MPT Models Enterprise applications, customizable deployments Databricks-optimized performance

Embedding Model Selection:

Embedding Model Dimensions Context Window Optimal Use
Small (384d) 384 512 Efficient retrieval, lower storage needs
Medium (768d) 768 512-1024 Balance of quality and efficiency
Large (1536d+) 1536+ 2048+ Highest quality retrieval, more storage

Selection Process:

  1. Define Application Requirements
    • Primary tasks and expected outputs
    • Performance metrics and SLAs
    • Budget constraints
  2. Evaluate Model Options
    • Review model cards and benchmarks
    • Test representative samples
    • Measure relevant metrics
  3. Implement with Flexibility
    • Design for model interchangeability
    • Monitor performance metrics
    • Enable A/B testing between models

Hands-on RAG Application Exercise

Let’s implement a basic RAG application using Databricks components:

Exercise: Build a Financial Document RAG System

Step 1: Document Processing

# Sample document processing pipeline
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings

# Load documents
loader = PyPDFLoader("financial_report.pdf")
documents = loader.load()

# Split into chunks
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", " "]
)
chunks = text_splitter.split_documents(documents)

# Generate embeddings
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")

Step 2: Store in Delta Lake

# Create a DataFrame with chunks and embeddings
chunk_data = []
for i, chunk in enumerate(chunks):
    embedding = embedding_model.embed_query(chunk.page_content)
    chunk_data.append({
        "chunk_id": f"chunk_{i}",
        "document_id": "financial_report",
        "chunk_text": chunk.page_content,
        "embedding": embedding,
        "chunk_number": i,
        "token_count": len(chunk.page_content.split()),
        "source_file": "financial_report.pdf",
        "last_updated": datetime.now()
    })

# Create DataFrame and write to Delta
chunk_df = spark.createDataFrame(chunk_data)
chunk_df.write.format("delta").mode("overwrite").saveAsTable("gold.document_chunks")

Step 3: Create Vector Search Index

# Create Vector Search index
spark.sql("""
CREATE OR REPLACE INDEX vector_index
ON gold.document_chunks
USING VECTOR
ON embedding
OPTIONS (
    similarity_function = 'cosine'
)
""")

Step 4: Implement RAG Query Function

def query_financial_documents(question, top_k=3):
    # Generate embedding for the question
    question_embedding = embedding_model.embed_query(question)
    
    # Query Vector Search
    results = spark.sql(f"""
    SELECT chunk_text, source_file, 
           vector_dot_product(embedding, array({str(question_embedding)[1:-1]})) as relevance
    FROM gold.document_chunks
    ORDER BY relevance DESC
    LIMIT {top_k}
    """).collect()
    
    # Format context from retrieved chunks
    context = "\n\n".join([row.chunk_text for row in results])
    
    # Create prompt with context
    prompt = f"""
    You are a financial analyst assistant. Answer the following question 
    based ONLY on the provided financial document excerpts.
    
    Financial Document Excerpts:
    {context}
    
    Question: {question}
    
    Provide a concise answer with specific numbers and facts from the documents when available.
    """
    
    # Send to LLM
    response = llm_client.completions.create(
        model="databricks-llama-2-70b",
        prompt=prompt,
        max_tokens=500,
        temperature=0.0
    )
    
    return response.choices[0].text

Step 5: Test the Application

questions = [
    "What was the revenue growth in the last quarter?",
    "What are the main risk factors mentioned in the report?",
    "What are the company's plans for expansion in international markets?"
]

for question in questions:
    print(f"Question: {question}")
    print(f"Answer: {query_financial_documents(question)}")
    print("="*80)

Quiz on Data Preparation and Application Development

Question 1: When implementing a RAG application with long technical documents, which chunking strategy would be most appropriate?

  • A) Fixed-size chunks of 128 tokens with no overlap
  • B) Semantic chunks based on document sections with 10% overlap
  • C) Fixed-size chunks of 1024 tokens with 20% overlap
  • D) Character-based chunks of exactly 2000 characters

Question 2: In the context of Vector Search in Databricks, what is the primary purpose of embeddings?

  • A) To compress documents for efficient storage
  • B) To transform text into numerical vectors for semantic similarity search
  • C) To encrypt sensitive document content
  • D) To format document text for display in user interfaces

Question 3: Which component in a RAG pipeline is responsible for determining which chunks of information to include in the context provided to the LLM?

  • A) The text splitter
  • B) The embedding model
  • C) The retrieval system
  • D) The LLM itself

Question 4: When working with financial documents in a RAG application, which filtering technique would be most important to implement?

  • A) Removing images and charts
  • B) Removing boilerplate legal disclaimers
  • C) Converting all numbers to a standardized format
  • D) Translating all content to English

Question 5: What is the primary advantage of implementing guardrails in an LLM application?

  • A) Reducing token usage and associated costs
  • B) Increasing the creativity of generated content
  • C) Ensuring responses adhere to safety and quality standards
  • D) Improving the response speed of the application

Question 6: When selecting an embedding model for a RAG application, which factor is most important to consider for retrieval quality?

  • A) The model’s release date
  • B) The dimensionality of the embeddings
  • C) Whether the model was trained on similar content to your documents
  • D) The model’s parameter count

Question 7: In Databricks Vector Search, what SQL function is typically used to find semantically similar documents?

  • A) vector_similarity()
  • B) cosine_distance()
  • C) vector_dot_product()
  • D) semantic_search()

Question 8: Which LangChain component would you use to break documents into chunks with appropriate overlap?

  • A) DocumentLoader
  • B) TextSplitter
  • C) Embeddings
  • D) VectorStore

Answer Key:

  1. B
  2. B
  3. C
  4. B
  5. C
  6. C
  7. C
  8. B

Preparation Task: Deploying a RAG Application in Databricks

Business Problem: Clinical Knowledge Assistant

Organization: MedSearch Health Systems

Challenge: MedSearch Health Systems manages a network of hospitals and clinics with over 5,000 healthcare providers. Their medical staff needs rapid access to the latest clinical guidelines, research papers, treatment protocols, and drug information. Currently, clinicians spend an average of 5.2 hours per week searching through various medical databases, which reduces patient care time and creates inconsistencies in treatment approaches.

Requirements:

  1. Deploy a secure RAG application that provides accurate, evidence-based answers to clinical queries
  2. Ensure all responses include citations to source documents for verification
  3. Handle complex medical terminology and contextual understanding
  4. Maintain strict compliance with HIPAA and other healthcare regulations
  5. Monitor usage patterns and answer quality to continuously improve the system
  6. Scale to accommodate thousands of daily queries from medical staff

Deployment Architecture

1. Components for Model Serving

The deployment architecture for this clinical knowledge assistant would include:

Core Processing Pipeline:

  • Document processing layer for ingesting and processing medical literature
  • Vector Search index for retrieving relevant medical information
  • LLM inference layer for generating evidence-based responses
  • Citation and verification system to track information sources

Service Components:

  • API Gateway to handle authentication and request routing
  • Request processing service to manage prompt construction
  • Response validation service to ensure medical accuracy
  • Logging and analytics service to capture usage data

Resource Allocation:

  • Dedicated Databricks Model Serving endpoints for the LLM
  • Optimized Vector Search instance for low-latency retrieval
  • Serverless compute for document processing pipeline
  • Autoscaling configuration to handle variable loads during peak hospital hours
2. Implementation Approach

Phase 1: Model and Data Preparation

# Register the embedding model with MLflow
with mlflow.start_run() as run:
    # Define model signature and example inputs
    signature = mlflow.models.signature.infer_signature(
        model_input=["What are the latest guidelines for treating hypertension in diabetic patients?"],
        model_output=["Embedding vector..."]
    )
    
    # Save model with dependencies
    mlflow.pyfunc.log_model(
        artifact_path="embedding_model",
        python_model=EmbeddingModel(),
        artifacts={"model_weights": "./model_weights"},
        signature=signature,
        pip_requirements=["transformers==4.30.2", "torch==2.0.1"],
        registered_model_name="clinical_embeddings"
    )

Phase 2: Vector Search Configuration

# Create Vector Search index on the clinical documents table
spark.sql("""
CREATE OR REPLACE INDEX clinical_vector_index
ON gold.clinical_documents
USING VECTOR
ON embedding
OPTIONS (
    similarity_function = 'cosine',
    optimization_hint = 'latency'
)
""")

Phase 3: RAG Chain Implementation

# Define the RAG application as a PyFunc model
class ClinicalRAG(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        # Load the LLM
        self.llm_client = DatabricksFoundationModelClient()
        
        # Initialize vector search connection
        self.vector_store = DatabricksVectorSearch(
            catalog="main", 
            schema="gold", 
            table="clinical_documents",
            vector_column="embedding", 
            text_column="document_text",
            metadata_columns=["source", "publication_date", "evidence_level"]
        )
        
    def predict(self, context, model_input):
        # Extract query from input
        query = model_input["query"][0]
        
        # Retrieve relevant clinical documents
        results = self.vector_store.search(query, k=5)
        
        # Format prompt with medical context
        prompt = self._format_medical_prompt(query, results)
        
        # Generate response with citations
        response = self.llm_client.completions.create(
            model="databricks-claude-3-sonnet-20240229",
            prompt=prompt,
            max_tokens=1000,
            temperature=0.2
        )
        
        return self._format_response_with_citations(response, results)
        
    def _format_medical_prompt(self, query, results):
        # Specialized prompt formatting for medical context
        # ...

    def _format_response_with_citations(self, response, results):
        # Add citations to the response
        # ...

Phase 4: Model Registration and Endpoint Creation

# Register the RAG model
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        artifact_path="clinical_rag",
        python_model=ClinicalRAG(),
        registered_model_name="clinical_knowledge_assistant"
    )

# Create serving endpoint
client = DatabricksServingClient()
client.create_endpoint(
    name="clinical-assistant-endpoint",
    model_name="clinical_knowledge_assistant",
    model_version=1,
    workload_size="Medium",
    scale_to_zero_enabled=True,
    min_provisioned_replicas=1,
    max_provisioned_replicas=10
)

Security Considerations

  1. Data Protection:
    • Use Unity Catalog for fine-grained access control to medical documents
    • Implement column-level security for sensitive patient information
    • Enable encryption at rest and in transit for all medical data
  2. Authentication and Authorization:
    • Integrate with hospital single sign-on systems
    • Implement role-based access controls (clinician, researcher, administrator)
    • Audit all access to the RAG system
  3. Compliance Requirements:
    • Ensure all processing complies with HIPAA regulations
    • Implement data residency controls for geographic compliance
    • Configure retention policies for query logs
  4. Content Safety:
    • Deploy guardrails to prevent harmful medical advice
    • Implement citation verification to reduce hallucinations
    • Add disclaimers about AI assistance vs. clinical judgment

Monitoring Approach

  1. Performance Monitoring:
    • Track query latency (target <2 seconds for retrieval)
    • Monitor embedding generation time
    • Measure end-to-end response time
  2. Quality Assessment:
    • Implement automated evaluation of citation accuracy
    • Conduct periodic clinical review of random responses
    • Compare responses against gold-standard medical reference answers
  3. Usage Analytics:
    • Track query patterns by medical specialty
    • Identify most common clinical questions
    • Monitor system utilization across hospital departments
  4. Cost Optimization:
    • Implement tiered serving based on query urgency
    • Monitor token usage and optimize prompt length
    • Schedule batch processing of new medical literature during off-peak hours

Implementation Workflow

  1. Start with a limited deployment to the oncology department
  2. Monitor performance and gather feedback from specialist physicians
  3. Optimize retrieval and response quality based on usage patterns
  4. Gradually expand to additional medical specialties
  5. Implement continuous integration for weekly updates to the medical knowledge base
  6. Establish a regular clinical review committee to evaluate system performance

Deployment, Governance, and First Mock Exam

Model Serving Fundamentals

Model serving refers to the deployment of machine learning models as API endpoints that can process requests in real-time. Databricks Model Serving provides a managed infrastructure for deploying generative AI applications.

Core Components of Databricks Model Serving:

The Databricks Model Serving architecture includes several essential components that work together to provide a robust deployment solution. These components handle different aspects of the deployment process, from model registration to request processing.

First, the Model Registry serves as a centralized repository for managing model versions and transitions between stages. This enables formal tracking of model lineage and governance throughout the development lifecycle.

Second, Serving Endpoints act as the interface for client applications to interact with deployed models. These endpoints handle authentication, request routing, and load balancing to ensure optimal performance.

Third, Compute Resources are provisioned automatically based on configuration specifications. This includes scaling options to accommodate varying workloads efficiently.

Finally, the Monitoring System tracks performance metrics, utilization patterns, and error rates. This provides valuable insights for troubleshooting and optimization.

Deployment Options:

Databricks offers multiple deployment configurations to meet diverse application requirements. These include:

  1. Real-time serving for interactive applications requiring immediate responses.
  2. Serverless deployments that automatically scale to zero when not in use, optimizing cost efficiency.
  3. GPU-accelerated endpoints for computationally intensive models that benefit from hardware acceleration.
  4. Token-based authentication to secure access to model endpoints.

Sizing and Scaling Considerations:

When configuring serving endpoints, several factors influence the appropriate resource allocation:

  • Expected request volume and patterns determine base capacity needs.
  • Response time requirements dictate processing power requirements.
  • Model complexity affects memory and computational requirements.
  • Batch size optimization can improve throughput for certain workloads.

PyFunc Models for LLM Chains

PyFunc provides a flexible framework for packaging Python code as MLflow models, allowing complex LLM chains to be deployed as unified serving endpoints.

PyFunc Model Structure:

A PyFunc model consists of:

  1. A Python class that implements the MLflow PythonModel interface.
  2. The load_context method that initializes resources when the model is loaded.
  3. The predict method that processes input data and returns results.
  4. Associated artifacts such as configuration files or model weights.

Example Implementation:

import mlflow.pyfunc
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk import WorkspaceClient

class RAGModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        # Initialize clients and resources
        self.workspace_client = WorkspaceClient()
        self.vs_client = VectorSearchClient(workspace_client=self.workspace_client)
        
        # Connect to Vector Search index
        self.vs_index = self.vs_client.get_index(
            endpoint_name="vs-endpoint",
            index_name="document-index"
        )
        
        # Initialize Foundation Model API client
        self.fm_client = self.workspace_client.serving_endpoints

    def predict(self, context, model_input):
        # Extract query from input
        query = model_input.iloc[0]["query"]
        
        # Generate embeddings for the query
        query_embedding = self._embed_query(query)
        
        # Retrieve relevant documents
        results = self.vs_index.similarity_search(
            query_vector=query_embedding,
            columns=["content", "source", "title"],
            num_results=3
        )
        
        # Construct context from retrieved documents
        context_docs = [item["content"] for item in results]
        context_text = "\n\n".join(context_docs)
        
        # Generate response using foundation model
        response = self._generate_response(query, context_text)
        
        return response

    def _embed_query(self, query):
        # Implementation of query embedding
        pass
        
    def _generate_response(self, query, context):
        # Implementation of response generation
        pass

Pre-processing and Post-processing:

Within PyFunc models, pre-processing and post-processing steps help transform inputs and outputs for optimal results:

  1. Pre-processing functions convert raw inputs into formats suitable for LLMs and retrieval systems.
  2. Post-processing enhances model outputs by adding citations, formatting responses, or implementing business logic rules.
  3. Error handling mechanisms catch and address exceptions during processing.

Vector Search Indexing and Querying

Vector Search enables semantic similarity retrieval of documents based on their embedding representations.

Index Creation and Management:

Creating and managing Vector Search indexes involves several key steps:

  1. Define the embedding model to be used for vector representation.
  2. Specify the Delta table containing document text and metadata.
  3. Create the vector index configuration with appropriate similarity metrics.
  4. Set optimization parameters for latency or throughput requirements.

Example index creation:

CREATE INDEX IF NOT EXISTS document_embeddings
ON gold.document_chunks
USING VECTOR (embedding)
OPTIONS (
  similarity_function = 'cosine'
)

Querying Vector Indexes:

Vector Search supports multiple querying methods to retrieve relevant information:

  1. K-Nearest Neighbors (KNN) retrieves the most similar documents based on vector distance.
  2. Filter-based search combines vector similarity with metadata filtering.
  3. Hybrid search integrates keyword and semantic matching for improved results.

Example query implementation:

# Simple KNN query
results = spark.sql(f"""
SELECT chunk_text, source, 
       vector_dot_product(embedding, array{query_embedding}) AS similarity
FROM gold.document_chunks
ORDER BY similarity DESC
LIMIT 5
""")

# Advanced hybrid search with filtering
results = spark.sql(f"""
SELECT chunk_text, source, publication_date,
       vector_dot_product(embedding, array{query_embedding}) * 0.7 + 
       bm25(chunk_text, '{query_text}') * 0.3 AS relevance_score
FROM gold.document_chunks
WHERE published_date > '2022-01-01'
ORDER BY relevance_score DESC
LIMIT 5
""")

Performance Optimization:

Several techniques improve vector search performance:

  1. Appropriate index sharding distributes the workload across resources.
  2. Filtering before vector search reduces computation overhead.
  3. Caching frequently used queries improves response times.
  4. Monitoring query latency identifies optimization opportunities.

MLflow Model Registration

MLflow provides tools for tracking, packaging, and registering models to ensure reproducibility and governance.

Model Registration Process:

The formal model registration process includes:

  1. Logging models with MLflow to capture metadata, dependencies, and artifacts.
  2. Registering models in the Model Registry with appropriate naming conventions.
  3. Transitioning models through lifecycle stages (Development, Staging, Production).
  4. Versioning models to maintain history and enable rollbacks.

Example registration workflow:

# Log model
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        artifact_path="rag_model",
        python_model=RAGModel(),
        code_path=["./rag_utils.py"],
        conda_env={
            "channels": ["conda-forge"],
            "dependencies": [
                "python=3.9.0",
                "pip=22.0.4",
                {"pip": ["databricks-sdk==0.8.0", "transformers==4.28.1"]}
            ]
        },
        registered_model_name="clinical_assistant_rag"
    )

# Transition model to production
client = MlflowClient()
client.transition_model_version_stage(
    name="clinical_assistant_rag",
    version=1,
    stage="Production"
)

Model Signature and Input Examples:

Proper model documentation includes:

  1. Model signatures defining expected input and output schemas.
  2. Input examples demonstrating correct usage formats.
  3. Model description documenting purpose, limitations, and usage patterns.

Example signature definition:

from mlflow.models.signature import infer_signature

# Define example input
example_input = pd.DataFrame({
    "query": ["What are the treatment options for acute myocardial infarction?"]
})

# Define example output
example_output = ["Treatment options include immediate reperfusion therapy..."]

# Infer signature from examples
signature = infer_signature(example_input, example_output)

# Use signature when logging model
mlflow.pyfunc.log_model(
    artifact_path="rag_model",
    python_model=RAGModel(),
    signature=signature,
    input_example=example_input,
    registered_model_name="clinical_assistant_rag"
)

Text Masking and Guardrails

Text masking and guardrails protect users and organizations from harmful or inappropriate content.

Types of Text Masking:

Several text masking approaches address different security requirements:

  1. PII Detection and Redaction identifies and masks personally identifiable information.
  2. Sensitive Information Masking protects confidential business data.
  3. Contextual Masking applies different levels of protection based on context.
  4. Token-level Filtering removes or replaces specific words or phrases.

Implementation example:

def mask_sensitive_information(text):
    # Mask email addresses
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
    
    # Mask phone numbers
    text = re.sub(r'\b(\+\d{1,2}\s)?\(?\d{3}\)?[\s.-]\d{3}[\s.-]\d{4}\b', '[PHONE]', text)
    
    # Mask credit card numbers
    text = re.sub(r'\b(?:\d{4}[- ]?){3}\d{4}\b', '[CREDIT_CARD]', text)
    
    return text

Implementing LLM Guardrails:

Guardrail implementation follows a multi-layered approach:

  1. Input Validation checks user queries for policy violations before processing.
  2. Response Filtering evaluates LLM outputs for potentially harmful content.
  3. Continuous Monitoring tracks patterns of use and misuse.
  4. Feedback Loops improve guardrail effectiveness over time.

Example guardrail implementation:

def apply_guardrails(query, response):
    # Define prohibited content patterns
    prohibited_patterns = [
        # List of patterns to detect harmful content
    ]
    
    # Check input query
    for pattern in prohibited_patterns:
        if re.search(pattern, query, re.IGNORECASE):
            return {
                "status": "rejected",
                "reason": "Input query contains prohibited content",
                "original_query": query,
                "response": None
            }
    
    # Check generated response
    for pattern in prohibited_patterns:
        if re.search(pattern, response, re.IGNORECASE):
            return {
                "status": "filtered",
                "reason": "Generated response contains prohibited content",
                "original_query": query,
                "response": "I apologize, but I'm not able to provide that information."
            }
    
    # Return safe response
    return {
        "status": "approved",
        "reason": None,
        "original_query": query,
        "response": response
    }

AI applications must comply with various legal requirements and licensing restrictions.

Content Licensing Issues:

Several licensing considerations affect AI systems:

  1. Training Data Rights determine what content can be used to train models.
  2. Output Licensing defines how generated content can be used.
  3. Attribution Requirements may necessitate citing sources in responses.
  4. Commercial Use Restrictions limit certain applications of AI systems.

Compliance Frameworks:

Generative AI applications must adhere to multiple regulatory frameworks:

  1. Data Protection Regulations (GDPR, CCPA, HIPAA) govern personal data handling.
  2. Intellectual Property Laws address copyright and fair use considerations.
  3. Industry-Specific Regulations impose additional requirements in sectors like healthcare or finance.
  4. Algorithmic Accountability laws require transparency and fairness in AI systems.

Risk Mitigation Strategies:

Organizations can implement several strategies to reduce legal risks:

  1. Source Tracking maintains lineage information for all content.
  2. Consent Management ensures appropriate permissions for data usage.
  3. Disclosure Requirements inform users about AI-generated content.
  4. Regular Compliance Audits verify ongoing adherence to regulations.

Content Moderation Techniques

Content moderation ensures AI systems generate appropriate, safe, and compliant outputs.

Pre-generation Moderation:

Pre-generation techniques filter problematic content before processing:

  1. Query Classification categorizes inputs by risk level.
  2. Intent Recognition identifies potentially harmful queries.
  3. Topic Blocking prevents queries on prohibited subjects.
  4. User Authentication verifies user permissions for sensitive queries.

Post-generation Moderation:

Post-generation approaches evaluate content after it has been created:

  1. Content Classification categorizes generated text by risk level.
  2. Toxicity Detection identifies harmful or offensive content.
  3. Fact Verification checks factual accuracy of claims.
  4. Policy Compliance ensures adherence to usage guidelines.

Moderation Implementation Examples:

def moderate_content(generated_text):
    # Define moderation categories
    categories = {
        "harmful_content": detect_harmful_content(generated_text),
        "personal_information": detect_pii(generated_text),
        "copyrighted_material": detect_copyright_issues(generated_text),
        "factual_accuracy": verify_facts(generated_text)
    }
    
    # Calculate overall risk score
    risk_score = sum(categories.values()) / len(categories)
    
    # Apply moderation policy
    if risk_score > 0.8:
        return None  # Block content completely
    elif risk_score > 0.5:
        return apply_filtering(generated_text)  # Apply filtering
    else:
        return generated_text  # Allow content

Security Aspects of GenAI Applications

Security considerations are paramount for generative AI systems handling sensitive information.

Authentication and Authorization:

Proper access control includes:

  1. Token-based Authentication for secure API access.
  2. Role-based Access Control for different permission levels.
  3. Contextual Authorization based on request patterns.
  4. Session Management to control access duration.

Example authorization implementation:

def check_authorization(user_id, requested_action, resource_id):
    # Retrieve user permissions
    user_permissions = get_user_permissions(user_id)
    
    # Check if user has required permission for the action
    if requested_action not in user_permissions:
        return False
    
    # Check resource-specific access control
    resource_acl = get_resource_acl(resource_id)
    if user_id not in resource_acl:
        return False
    
    # Additional contextual checks
    if is_suspicious_pattern(user_id, requested_action, resource_id):
        log_security_event("Suspicious access pattern detected")
        return False
    
    return True

Data Protection:

Comprehensive data protection includes:

  1. Encryption of data in transit and at rest.
  2. Data Minimization principles to limit exposure.
  3. Secure Processing environments for sensitive operations.
  4. Retention Policies for query logs and generated content.

Secure Development Practices:

Building secure AI applications requires:

  1. Input Validation to prevent prompt injection attacks.
  2. Output Sanitization to prevent cross-site scripting.
  3. Dependency Management to address vulnerabilities.
  4. Regular Security Testing to identify weaknesses.

90-Minute Mini Mock Exam (25 Questions)

Section 1: Designing Applications

Question 1: A data scientist has created a RAG application that needs to be deployed as an API endpoint in Databricks. The application requires both an embedding model and an LLM for generation. Which approach would be most efficient for deployment?

A) Deploy separate endpoints for the embedding model and LLM, then create a client application to coordinate between them.

B) Create a PyFunc model that encapsulates both the embedding process and LLM generation, then deploy it as a single endpoint.

C) Use MLflow to deploy the LLM and a custom solution for the embedding model.

D) Export the models to ONNX format and deploy them using a container-based solution.

Question 2: When designing a prompt template for a financial advisory application, which component is most important to include for regulatory compliance?

A) Instructions for the model to generate diverse investment options

B) Examples of previous successful responses

C) Instructions to disclose that the response is AI-generated and not professional financial advice

D) Commands to maximize response creativity

Question 3: A marketing team wants to build an application that can expand brief product descriptions into comprehensive marketing copy. Which model task would be most appropriate for this requirement?

A) Summarization

B) Text classification

C) Text expansion

D) Sentiment analysis

Question 4: Which chain component would you select to ensure an LLM provides factual answers based on a specific document set?

A) Fine-tuning component

B) Retrieval component

C) Transformer component

D) Output formatting component

Question 5: When designing a multi-stage reasoning chain for a complex legal document analysis application, what is the optimal ordering of components?

A) Document chunking → Entity extraction → Legal classification → Response generation

B) Document chunking → Document retrieval → Entity extraction → Response generation

C) Entity extraction → Document chunking → Document retrieval → Response generation

D) Document retrieval → Entity extraction → Document chunking → Response generation

Section 2: Data Preparation

Question 6: When implementing a Vector Search index in Databricks, which parameter is most important for optimizing retrieval speed for a real-time application?

A) The number of columns included in the index

B) The dimensionality of the embedding vectors

C) The choice between optimization_hint = 'latency' or optimization_hint = 'throughput'

D) The Delta table partitioning strategy

Question 7: A data scientist is creating document chunks for a RAG application. The documents contain technical specifications with tables, bullet points, and paragraphs. Which chunking strategy would best preserve the semantic meaning of these documents?

A) Fixed-size chunks of 256 tokens with no overlap

B) Recursive character splitting with paragraph-level boundaries

C) Semantic chunking based on document structure and section headings

D) Character-level chunking with 50% overlap between chunks

Question 8: When processing a large corpus of financial documents for a RAG application, which filtering technique would be most important to implement?

A) Removing all images and charts

B) Removing boilerplate legal disclaimers

C) Converting all dates to a standard format

D) Translating all documents to English

Question 9: Which Python package would be most appropriate for extracting text from scanned PDF documents?

A) BeautifulSoup

B) Pandas

C) PyTesseract

D) Scrapy

Question 10: A data engineer needs to prepare a document collection with 10 million short paragraphs for a RAG application. The application requires low-latency responses. What is the most efficient approach for storing these documents in Databricks?

A) Store the raw text in a Delta Lake table without embeddings

B) Generate embeddings for each paragraph and store them with the text in a Delta Lake table with a Vector Search index

C) Store the documents in a NoSQL database outside of Databricks

D) Convert all documents to a single large text file and use in-memory processing

Section 3: Application Development

Question 11: When building a RAG application using LangChain, which component is responsible for converting retrieved documents into a format suitable for inclusion in the LLM prompt?

A) Document loader

B) Text splitter

C) Prompt template

D) Chain

Question 12: Which embedding model characteristic is most important when selecting a model for a retrieval system that needs to understand technical medical terminology?

A) The model’s dimensionality

B) The model’s training domain and corpus

C) The model’s parameter count

D) The model’s inference speed

Question 13: A generative AI application built with Databricks is producing inconsistent outputs for similar queries. Which technique would most effectively improve consistency?

A) Increasing the temperature parameter of the LLM

B) Using a structured prompt template with clear instructions

C) Reducing the number of retrieved documents

D) Switching to a larger model

Question 14: Which approach would be most effective for minimizing hallucinations in a customer support RAG application?

A) Using the highest temperature setting

B) Including instructions to cite specific documents in the prompt

C) Removing all context from the prompt

D) Using the largest available model regardless of other factors

Question 15: A data scientist is building a RAG application that needs to understand both text and tabular data. Which LangChain component would best enable this capability?

A) LLMChain

B) MultiModalRetriever

C) SQLDatabaseChain

D) ConversationalRetrievalChain

Section 4: Deploying Applications

Question 16: A generative AI application is being deployed to handle sensitive customer information. Which two techniques should be implemented to ensure data privacy? (Select two)

A) Implementing PII detection and masking in document preprocessing

B) Using the largest available LLM to ensure accuracy

C) Maintaining audit logs of all queries and responses

D) Storing all generated content indefinitely for quality control

E) Implementing user authentication and role-based access controls

Question 17: When preparing a RAG model for production deployment using MLflow, which component is essential to include to ensure the model can be properly served?

A) A graphic visualization of the model architecture

B) The model signature defining input and output schemas

C) The raw training data used to fine-tune the model

D) A separate endpoint for monitoring model drift

Question 18: Which code structure is required when implementing a PyFunc model for deploying an LLM chain in Databricks?

A) A class that extends mlflow.pyfunc.PythonModel with load_context and predict methods

B) A series of SQL commands that define the model behavior

C) A YAML configuration file that defines the model architecture

D) A Python dictionary mapping inputs to outputs

Question 19: When creating a Vector Search index in Databricks, which SQL command correctly creates an index optimized for low-latency queries?

A) CREATE INDEX vector_index ON table USING VECTOR (embedding_column) WITH (similarity_function = 'cosine')

B) CREATE INDEX vector_index ON table (embedding_column) USING VECTOR WITH (similarity_function = 'cosine', optimization_hint = 'latency')

C) CREATE OR REPLACE INDEX vector_index ON table USING VECTOR (embedding_column) OPTIONS (similarity_function = 'cosine', optimization_hint = 'latency')

D) CREATE VECTOR INDEX vector_index ON table (embedding_column) WITH OPTIONS (function = 'cosine', optimization = 'latency')

Question 20: When deploying a foundation model-based application in Databricks, which serving configuration would be most appropriate for an application with highly variable traffic patterns?

A) Fixed-size serving endpoints with dedicated compute

B) Serverless endpoints with auto-scaling enabled

C) Single-node endpoints with maximum resources

D) Multi-region endpoint deployment with global load balancing

Section 5: Governance

Question 21: A generative AI application is being developed to process sensitive healthcare information. Which two techniques should be implemented to ensure HIPAA compliance? (Select two)

A) Implementing PII detection and masking in document preprocessing

B) Using the largest available LLM to ensure accuracy

C) Maintaining audit logs of all queries and responses

D) Storing all generated content indefinitely for quality control

E) Implementing user authentication and role-based access controls

Question 22: When implementing text masking in a RAG application, which approach provides the most comprehensive protection for sensitive information?

A) Simple regular expression matching for common patterns like email addresses and phone numbers

B) Multi-layered approach combining pattern matching, named entity recognition, and contextual analysis

C) Manual review of all documents before ingestion

D) Using only public domain documents in the knowledge base

Question 23: Which guardrail implementation would be most effective for preventing harmful outputs in a public-facing generative AI application?

A) Implementing only input filtering to block problematic queries

B) Implementing only output filtering to block harmful responses

C) Implementing both input and output filtering with continuous monitoring

D) Relying solely on the built-in safety features of the foundation model

Section 6: Evaluation and Monitoring

Question 24: When evaluating the retrieval performance of a RAG application, which metric would be most valuable for understanding if the system is retrieving relevant documents?

A) The total number of tokens in retrieved documents

B) The publication date of retrieved documents

C) The relevance score between query and retrieved documents

D) The processing time for retrieval operations

Question 25: Which approach would be most effective for monitoring and controlling the cost of a production RAG application deployed on Databricks?

A) Manually reviewing logs to identify expensive queries

B) Implementing token tracking, query caching, and automated alerts for unusual usage patterns

C) Restricting the application to a fixed number of queries per day

D) Using only the smallest available models regardless of performance requirements

Answer Key

  1. B
  2. C
  3. C
  4. B
  5. A
  6. C
  7. C
  8. B
  9. C
  10. B
  11. C
  12. B
  13. B
  14. B
  15. C
  16. A, E
  17. B
  18. A
  19. C
  20. B
  21. A, C
  22. B
  23. C
  24. C
  25. B

Review and Knowledge Gap Identification

After completing the mock exam, we’ll review your answers and identify areas for further study. This process includes:

  1. Analysis of incorrect answers to understand the underlying concepts that need reinforcement.
  2. Pattern identification to detect systematic knowledge gaps.
  3. Topic prioritization for focused review in the remaining days.
  4. Practice strategy adjustment based on time management and question approach analysis.

Common Knowledge Gaps:

Based on experience with Databricks certification candidates, these areas often require additional focus:

  1. Vector Search optimization parameters and their performance implications.
  2. MLflow model serving configurations for different workload requirements.
  3. Security and governance implementation for specific compliance scenarios.
  4. Embedding model selection criteria for different document types.
  5. Troubleshooting strategies for RAG application performance issues.

Preparation Task: Addressing Challenges in LLM Evaluation and Monitoring

Scenario: HealthGuide AI Performance Assessment

Company: HealthGuide Technologies

Challenge: You’ve deployed a RAG-based clinical assistant on Databricks that helps healthcare providers quickly access treatment guidelines and medication information. The system has been in production for two weeks, and you’re now tasked with evaluating its performance and implementing a comprehensive monitoring strategy. The executive team has expressed concerns about three specific issues:

  1. Some responses contain outdated treatment recommendations despite having recent medical literature in the knowledge base
  2. Response times are inconsistent, with occasional delays exceeding 10 seconds
  3. The cost of running the system is higher than initially projected

As the lead engineer responsible for this system, you need to develop an evaluation and monitoring plan that addresses these concerns.

Solution Approach

1. Evaluating Response Accuracy and Relevance

To address the issue of outdated treatment recommendations, I would implement a multi-faceted evaluation strategy:

First, I would establish a ground truth dataset by creating a test set of clinical questions paired with verified answers from recent medical guidelines. This provides a baseline for automated evaluation.

Next, I would implement reference-based evaluation metrics that compare model outputs against these ground truth answers:

def evaluate_clinical_accuracy(model_responses, reference_answers):
    results = []
    for query, response, reference in zip(test_queries, model_responses, reference_answers):
        # Calculate semantic similarity using embeddings
        response_embedding = embedding_model.embed_text(response)
        reference_embedding = embedding_model.embed_text(reference)
        similarity_score = cosine_similarity(response_embedding, reference_embedding)
        
        # Check for presence of key medical concepts
        key_concepts = extract_medical_entities(reference)
        concept_coverage = sum(1 for concept in key_concepts if concept in response) / len(key_concepts)
        
        # Log evaluation results to MLflow
        with mlflow.start_run(nested=True):
            mlflow.log_metric("semantic_similarity", similarity_score)
            mlflow.log_metric("concept_coverage", concept_coverage)
            mlflow.log_text(response, "model_response.txt")
            mlflow.log_text(reference, "reference_answer.txt")
        
        results.append({
            "query": query,
            "similarity_score": similarity_score,
            "concept_coverage": concept_coverage,
            "model_response": response,
            "reference_answer": reference
        })
    
    return pd.DataFrame(results)

I would also implement retrieval evaluation to assess whether the system is accessing the most current information:

def evaluate_retrieval_quality(queries, retrieved_documents):
    results = []
    for query, documents in zip(queries, retrieved_documents):
        # Check publication dates of retrieved documents
        recent_docs_ratio = sum(1 for doc in documents if doc["publication_date"] > "2022-01-01") / len(documents)
        
        # Evaluate relevance of retrieved documents
        query_embedding = embedding_model.embed_text(query)
        relevance_scores = [cosine_similarity(query_embedding, doc["embedding"]) for doc in documents]
        avg_relevance = sum(relevance_scores) / len(relevance_scores)
        
        # Log to MLflow
        with mlflow.start_run(nested=True):
            mlflow.log_metric("recent_docs_ratio", recent_docs_ratio)
            mlflow.log_metric("avg_relevance", avg_relevance)
        
        results.append({
            "query": query,
            "recent_docs_ratio": recent_docs_ratio,
            "avg_relevance": avg_relevance,
            "top_documents": [doc["title"] for doc in documents[:3]]
        })
    
    return pd.DataFrame(results)

Finally, I would implement a human-in-the-loop evaluation process where medical experts periodically review a sample of responses for clinical accuracy and currency.

2. Performance Monitoring and Optimization

To address inconsistent response times, I would implement comprehensive performance monitoring:

First, I would set up end-to-end latency tracking that breaks down response time by component:

def track_response_latency(query_id, query_text):
    metrics = {}
    
    # Track embedding generation time
    start_time = time.time()
    query_embedding = embedding_model.embed_text(query_text)
    embedding_time = time.time() - start_time
    metrics["embedding_time"] = embedding_time
    
    # Track retrieval time
    start_time = time.time()
    retrieved_docs = vector_store.similarity_search(query_embedding, k=5)
    retrieval_time = time.time() - start_time
    metrics["retrieval_time"] = retrieval_time
    
    # Track LLM generation time
    start_time = time.time()
    prompt = create_prompt(query_text, retrieved_docs)
    response = llm_client.completions.create(model="clinical-model", prompt=prompt)
    generation_time = time.time() - start_time
    metrics["generation_time"] = generation_time
    
    # Calculate total latency
    total_latency = embedding_time + retrieval_time + generation_time
    metrics["total_latency"] = total_latency
    
    # Log metrics to MLflow
    with mlflow.start_run():
        for metric_name, metric_value in metrics.items():
            mlflow.log_metric(metric_name, metric_value)
        mlflow.log_param("query_id", query_id)
        mlflow.log_param("query_length", len(query_text))
    
    return metrics

I would then set up Databricks dashboard alerts for latency spikes:

# SQL query for latency monitoring dashboard
"""
SELECT 
  date_trunc('hour', timestamp) as hour,
  avg(total_latency) as avg_latency,
  percentile(total_latency, 0.95) as p95_latency,
  percentile(total_latency, 0.99) as p99_latency,
  count(*) as request_count
FROM ml_monitoring.response_metrics
GROUP BY 1
ORDER BY 1 DESC
"""

# Alert definition for excessive latency
"""
SELECT count(*) 
FROM ml_monitoring.response_metrics 
WHERE total_latency > 10.0 AND timestamp > now() - interval 15 minutes
HAVING count(*) > 5
"""

Based on the monitoring data, I would implement targeted optimizations:

  1. For retrieval latency issues: Adjust Vector Search index parameters with an optimization hint for latency
  2. For LLM generation issues: Implement response caching for common queries or reduce the maximum token generation limit
  3. For embedding generation issues: Consider a smaller, faster embedding model if accuracy isn’t compromised
3. Cost Monitoring and Optimization

To address the higher-than-expected costs, I would implement a comprehensive cost tracking and optimization strategy:

First, I would set up detailed token usage tracking:

def track_token_usage(query_id, prompt, response):
    # Calculate token counts
    prompt_tokens = count_tokens(prompt)
    completion_tokens = count_tokens(response)
    total_tokens = prompt_tokens + completion_tokens
    
    # Estimate cost based on current pricing
    prompt_cost = prompt_tokens * PROMPT_TOKEN_COST
    completion_cost = completion_tokens * COMPLETION_TOKEN_COST
    total_cost = prompt_cost + completion_cost
    
    # Log to MLflow
    with mlflow.start_run():
        mlflow.log_metric("prompt_tokens", prompt_tokens)
        mlflow.log_metric("completion_tokens", completion_tokens)
        mlflow.log_metric("total_tokens", total_tokens)
        mlflow.log_metric("estimated_cost", total_cost)
        mlflow.log_param("query_id", query_id)
    
    # Store in Delta table for analysis
    spark.createDataFrame([{
        "query_id": query_id,
        "timestamp": datetime.now(),
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "estimated_cost": total_cost
    }]).write.format("delta").mode("append").saveAsTable("ml_monitoring.token_usage")
    
    return {
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "estimated_cost": total_cost
    }

I would then implement cost-saving strategies based on usage patterns:

  1. Optimize prompt templates to reduce token count while maintaining quality:
# Before optimization
original_prompt = f"""
You are a clinical assistant providing evidence-based information to healthcare providers.
Given the following patient situation and medical question, provide an answer based on the medical literature.
Use only the information from the provided references. If the information isn't in the references, say you don't know.

Patient situation: {patient_situation}

Medical question: {question}

References:
{references}

Your response should include specific recommendations with dosages when applicable, and cite the specific reference.
"""

# After optimization
optimized_prompt = f"""
Answer based only on these references:
{references}

Question: {question}
"""
  1. Implement caching for common queries to reduce redundant LLM calls:
def get_response_with_caching(query, reference_docs):
    # Generate cache key
    cache_key = generate_hash(query + "".join(doc["id"] for doc in reference_docs))
    
    # Check cache
    cached_response = spark.sql(f"SELECT response FROM response_cache WHERE cache_key = '{cache_key}'").collect()
    
    if cached_response:
        return cached_response[0]["response"]
    
    # Generate new response if not cached
    prompt = create_prompt(query, reference_docs)
    response = llm_client.completions.create(model="clinical-model", prompt=prompt)
    
    # Store in cache
    spark.createDataFrame([{
        "cache_key": cache_key,
        "query": query,
        "response": response,
        "timestamp": datetime.now(),
        "doc_ids": [doc["id"] for doc in reference_docs]
    }]).write.format("delta").mode("append").saveAsTable("response_cache")
    
    return response
  1. Use analytics to identify cost outliers and optimization opportunities:
# SQL query to identify expensive queries
"""
SELECT 
  query_text,
  avg(total_tokens) as avg_tokens,
  avg(estimated_cost) as avg_cost,
  count(*) as query_count,
  sum(estimated_cost) as total_cost
FROM ml_monitoring.token_usage
JOIN ml_monitoring.queries USING (query_id)
GROUP BY 1
ORDER BY 5 DESC
LIMIT 20
"""
4. Comprehensive Monitoring Dashboard

Finally, I would create a comprehensive monitoring dashboard that combines all these metrics:

# Dashboard SQL query
"""
SELECT
  date_trunc('hour', t.timestamp) as hour,
  count(*) as request_count,
  avg(rm.total_latency) as avg_latency,
  percentile(rm.total_latency, 0.95) as p95_latency,
  avg(t.total_tokens) as avg_tokens,
  sum(t.estimated_cost) as hourly_cost,
  avg(e.semantic_similarity) as avg_accuracy,
  avg(e.concept_coverage) as avg_coverage
FROM ml_monitoring.token_usage t
JOIN ml_monitoring.response_metrics rm USING (query_id)
LEFT JOIN ml_monitoring.evaluation_results e USING (query_id)
GROUP BY 1
ORDER BY 1 DESC
"""

This comprehensive approach addresses all three concerns by:

  1. Systematically evaluating response accuracy with attention to recency of information
  2. Monitoring and optimizing each component of the response pipeline to ensure consistent performance
  3. Tracking and optimizing costs through data-driven analysis and targeted improvements

The implementation leverages Databricks’ MLflow for experiment tracking, Delta Lake for storage, and SQL Analytics for monitoring dashboards, creating an integrated solution within the Databricks ecosystem.


Evaluation, Monitoring, and End-to-End Implementation

Metrics for LLM Evaluation

Evaluating large language models requires specialized approaches beyond traditional ML metrics. Comprehensive evaluation addresses model capabilities, reliability, and alignment with business needs.

Automated Evaluation Metrics

Automated metrics provide quantitative assessment of model performance:

  1. Relevance Metrics: These measure how well responses address the query.
    • BLEU/ROUGE/METEOR: Text similarity between generated responses and references
    • BERTScore: Contextual similarity using embedding models
    • Embedding Similarity: Cosine similarity between response and reference embeddings
  2. Factuality Metrics: These assess the accuracy of information in responses.
    • Fact Verification Score: Comparing extracted facts against trusted sources
    • Hallucination Detection: Identifying statements not supported by context
    • Source Attribution Accuracy: Checking if citations correctly reference source material
  3. Coherence and Fluency Metrics: These evaluate response quality.
    • Perplexity: Measuring how well the model predicts text
    • Self-BLEU: Evaluating diversity of responses
    • Grammaticality Scores: Assessing linguistic correctness

Implementation example for embedding-based evaluation:

def evaluate_response_relevance(queries, generated_responses, reference_responses, embedding_model):
    results = []
    
    for query, generated, reference in zip(queries, generated_responses, reference_responses):
        # Generate embeddings
        query_emb = embedding_model.embed_text(query)
        gen_emb = embedding_model.embed_text(generated)
        ref_emb = embedding_model.embed_text(reference)
        
        # Calculate similarities
        query_gen_sim = cosine_similarity(query_emb, gen_emb)
        gen_ref_sim = cosine_similarity(gen_emb, ref_emb)
        
        # Log results
        result = {
            "query": query,
            "query_response_similarity": query_gen_sim,
            "reference_similarity": gen_ref_sim,
            "generated_response": generated,
            "reference_response": reference
        }
        results.append(result)
    
    return pd.DataFrame(results)

Human Evaluation Framework

Human evaluation remains essential for assessing subjective aspects of LLM performance:

  1. Evaluation Criteria Definition:
    • Relevance: How well the response addresses the query
    • Accuracy: Factual correctness of the information
    • Helpfulness: Practical utility of the response
    • Safety: Absence of harmful or inappropriate content
  2. Evaluation Scale Design:
    • Likert Scales: 1-5 ratings across dimensions
    • Comparative Judgments: A/B testing between model versions
    • Error Categorization: Classifying response issues
  3. Evaluator Guidelines:
    • Clear instructions for consistent assessment
    • Rubrics with examples of different rating levels
    • Calibration exercises to align evaluator understanding

Example human evaluation form:

def create_evaluation_form(query, response, system_name):
    form = f"""
    Query: {query}
    
    Response from {system_name}:
    {response}
    
    Please rate on a scale of 1-5 (1=Poor, 5=Excellent):
    
    Relevance (How directly does the response address the query?): [___]
    
    Accuracy (Are the facts correct and properly sourced?): [___]
    
    Helpfulness (How useful is this response for the user?): [___]
    
    Safety (Is the response free from harmful/inappropriate content?): [___]
    
    General comments (optional):
    ______________________________________________________
    ______________________________________________________
    """
    return form

Specialized RAG Evaluation

RAG applications require additional evaluation focused on retrieval effectiveness:

  1. Retrieval Quality Assessment:
    • Precision@K: Relevance of top K retrieved documents
    • Recall@K: Proportion of relevant documents retrieved in top K
    • Mean Reciprocal Rank (MRR): Position of first relevant document
    • Normalized Discounted Cumulative Gain (nDCG): Relevance accounting for position
  2. RAG-specific Metrics:
    • Context Relevance: Assessing if retrieved context is helpful
    • Context Utilization: Measuring how effectively the model uses context
    • Knowledge Grounding: Evaluating if responses are based on retrieved information

Implementation example for retrieval evaluation:

def evaluate_retrieval_quality(queries, retrieved_docs, relevance_judgments):
    results = []
    
    for query_id, docs in zip(queries, retrieved_docs):
        # Get relevant document IDs for this query
        relevant_doc_ids = relevance_judgments[query_id]
        
        # Calculate precision@k
        k = min(5, len(docs))
        retrieved_doc_ids = [doc["id"] for doc in docs[:k]]
        relevant_retrieved = [doc_id for doc_id in retrieved_doc_ids if doc_id in relevant_doc_ids]
        precision_k = len(relevant_retrieved) / k if k > 0 else 0
        
        # Calculate recall@k
        recall_k = len(relevant_retrieved) / len(relevant_doc_ids) if len(relevant_doc_ids) > 0 else 0
        
        # Calculate MRR
        ranks = [i+1 for i, doc_id in enumerate(retrieved_doc_ids) if doc_id in relevant_doc_ids]
        mrr = 1 / min(ranks) if ranks else 0
        
        results.append({
            "query_id": query_id,
            "precision@k": precision_k,
            "recall@k": recall_k,
            "mrr": mrr,
            "retrieved_docs": retrieved_doc_ids,
            "relevant_docs": relevant_doc_ids
        })
    
    return pd.DataFrame(results)

Monitoring Deployed Applications

Comprehensive monitoring ensures reliable operation and helps identify improvement opportunities.

Performance Monitoring

Performance monitoring tracks system efficiency and reliability:

  1. Latency Monitoring:
    • End-to-end Response Time: Total time from request to response
    • Component-specific Latency: Time for embedding, retrieval, and generation
    • P95/P99 Latencies: Capturing worst-case performance
  2. Throughput Monitoring:
    • Requests per Second: System load over time
    • Concurrent Users: Peak usage patterns
    • Queue Depth: Backlog of pending requests
  3. Reliability Monitoring:
    • Success Rate: Proportion of successful responses
    • Error Rates by Type: Categorized system failures
    • Timeout Frequency: Requests exceeding time limits

Implementation example for latency monitoring:

def monitor_component_latency(query_id, timing_dict):
    """
    Log component-specific timing information to MLflow and Delta table
    """
    # Log to MLflow
    with mlflow.start_run(run_name=f"query_{query_id}_performance"):
        for component, latency in timing_dict.items():
            mlflow.log_metric(f"{component}_latency_ms", latency)
        mlflow.log_metric("total_latency_ms", sum(timing_dict.values()))
    
    # Log to Delta table for long-term analysis
    spark.createDataFrame([{
        "query_id": query_id,
        "timestamp": datetime.now(),
        **{f"{component}_latency_ms": latency for component, latency in timing_dict.items()},
        "total_latency_ms": sum(timing_dict.values())
    }]).write.format("delta").mode("append").saveAsTable("monitoring.latency_metrics")

Quality Monitoring

Quality monitoring ensures responses meet required standards:

  1. Response Quality Metrics:
    • Confidence Scores: Model certainty in responses
    • Toxicity Detection: Identifying problematic content
    • Response Length Distribution: Tracking verbosity patterns
  2. User Feedback Integration:
    • Explicit Ratings: Direct user evaluations
    • Implicit Signals: User follow-up questions or refinements
    • Feedback Categorization: Classifying types of issues
  3. Business Impact Metrics:
    • Task Completion Rate: Whether users achieved their goals
    • Time-to-Resolution: Efficiency in resolving user queries
    • User Satisfaction Scores: Overall experience quality

Example implementation for quality monitoring:

def log_response_quality(query_id, query, response, quality_metrics):
    """
    Track response quality metrics
    """
    # Calculate metrics if not provided
    if not quality_metrics:
        quality_metrics = {}
        # Calculate response length
        quality_metrics["response_length"] = len(response.split())
        # Calculate response entropy (diversity)
        quality_metrics["response_entropy"] = calculate_entropy(response)
        # Check for citation presence
        quality_metrics["has_citations"] = 1 if re.search(r'\[\d+\]', response) else 0
    
    # Log to MLflow
    with mlflow.start_run(run_name=f"query_{query_id}_quality"):
        for metric_name, metric_value in quality_metrics.items():
            mlflow.log_metric(metric_name, metric_value)
        mlflow.log_text(query, "query.txt")
        mlflow.log_text(response, "response.txt")
    
    # Log to Delta table
    spark.createDataFrame([{
        "query_id": query_id,
        "timestamp": datetime.now(),
        "query": query,
        "response": response,
        **quality_metrics
    }]).write.format("delta").mode("append").saveAsTable("monitoring.quality_metrics")

Usage and Cost Monitoring

Tracking usage patterns and costs enables optimization:

  1. Token Usage Metrics:
    • Tokens per Request: Input and output token counts
    • Cost per Query: Financial impact of each request
    • Total Daily/Weekly Costs: Aggregate spending patterns
  2. Usage Patterns:
    • Query Type Distribution: Categories of user requests
    • Peak Usage Times: Time-based load patterns
    • User Segment Analysis: Usage across different groups
  3. Optimization Opportunities:
    • Costly Query Identification: Detecting inefficient patterns
    • Caching Effectiveness: Measuring cache hit rates
    • Prompt Efficiency: Token usage relative to value

Example implementation for cost monitoring:

def track_token_usage(query_id, prompt, response, model_name):
    """
    Track token usage and associated costs
    """
    # Calculate token counts
    prompt_tokens = count_tokens(prompt)
    completion_tokens = count_tokens(response)
    total_tokens = prompt_tokens + completion_tokens
    
    # Calculate costs based on model pricing
    model_rates = get_model_rates(model_name)
    prompt_cost = prompt_tokens * model_rates["prompt_rate"]
    completion_cost = completion_tokens * model_rates["completion_rate"]
    total_cost = prompt_cost + completion_cost
    
    # Log to MLflow
    with mlflow.start_run(run_name=f"query_{query_id}_cost"):
        mlflow.log_metric("prompt_tokens", prompt_tokens)
        mlflow.log_metric("completion_tokens", completion_tokens)
        mlflow.log_metric("total_tokens", total_tokens)
        mlflow.log_metric("total_cost_usd", total_cost)
    
    # Log to Delta table
    spark.createDataFrame([{
        "query_id": query_id,
        "timestamp": datetime.now(),
        "model_name": model_name,
        "prompt_tokens": prompt_tokens,
        "completion_tokens": completion_tokens,
        "total_tokens": total_tokens,
        "prompt_cost_usd": prompt_cost,
        "completion_cost_usd": completion_cost,
        "total_cost_usd": total_cost
    }]).write.format("delta").mode("append").saveAsTable("monitoring.token_usage")

MLflow for Tracking Performance

MLflow provides comprehensive tools for tracking experiments, models, and performance.

Experiment Tracking

MLflow experiments organize and compare different approaches:

  1. Experiment Organization:
    • Hierarchical Structure: Grouping related experiments
    • Tagging System: Categorizing experiments by purpose
    • Versioning: Tracking iterations of approaches
  2. Parameter Tracking:
    • Model Configuration: Recording model settings
    • Prompt Templates: Documenting prompt structures
    • Retrieval Settings: Tracking search configurations
  3. Metric Logging:
    • Automated Metrics: Recording quantitative measures
    • Custom Metrics: Tracking application-specific performance
    • Visual Artifacts: Storing graphical representations

Example experiment tracking implementation:

def run_rag_experiment(experiment_name, model_config, prompt_template, retrieval_config, test_queries):
    """
    Run and track a RAG experiment with MLflow
    """
    # Create or get experiment
    experiment = mlflow.get_experiment_by_name(experiment_name)
    if not experiment:
        experiment_id = mlflow.create_experiment(experiment_name)
    else:
        experiment_id = experiment.experiment_id
    
    # Start run
    with mlflow.start_run(experiment_id=experiment_id) as run:
        # Log configurations
        mlflow.log_params(model_config)
        mlflow.log_params(retrieval_config)
        mlflow.log_text(prompt_template, "prompt_template.txt")
        
        # Initialize components
        embedding_model = initialize_embedding_model(model_config["embedding_model"])
        llm = initialize_llm(model_config["llm_model"])
        retriever = initialize_retriever(retrieval_config, embedding_model)
        
        # Run evaluation
        results = []
        for query in test_queries:
            # Track timing
            start_time = time.time()
            documents = retriever.retrieve(query)
            retrieval_time = time.time() - start_time
            
            prompt = format_prompt(prompt_template, query, documents)
            
            start_time = time.time()
            response = llm.generate(prompt)
            generation_time = time.time() - start_time
            
            # Calculate metrics
            metrics = calculate_response_metrics(query, response, documents)
            metrics["retrieval_time"] = retrieval_time
            metrics["generation_time"] = generation_time
            
            # Log individual query results
            for metric_name, metric_value in metrics.items():
                mlflow.log_metric(f"query_{len(results)}_{metric_name}", metric_value)
            
            results.append({
                "query": query,
                "response": response,
                "retrieved_docs": documents,
                **metrics
            })
        
        # Log aggregate metrics
        for metric_name in results[0].keys():
            if isinstance(results[0][metric_name], (int, float)):
                avg_value = sum(r[metric_name] for r in results) / len(results)
                mlflow.log_metric(f"avg_{metric_name}", avg_value)
        
        # Save detailed results
        results_df = pd.DataFrame(results)
        mlflow.log_table(data=results_df, artifact_file="detailed_results.json")
        
        return results_df, run.info.run_id

Model Registry Integration

The MLflow Model Registry manages model lifecycle:

  1. Model Registration:
    • Versioning: Tracking iterations of models
    • Metadata Annotation: Adding descriptive information
    • Artifact Management: Storing model files and dependencies
  2. Stage Transitions:
    • Development → Staging → Production: Formal lifecycle management
    • Approval Workflows: Governance for stage changes
    • Rollback Capabilities: Reverting to previous versions
  3. Deployment Integration:
    • Serving Endpoint Association: Linking models to endpoints
    • Configuration Management: Tracking deployment settings
    • Version Control: Managing production releases

Example model registry workflow:

def register_rag_model(run_id, model_name, description):
    """
    Register a RAG model with the MLflow Model Registry
    """
    # Load the model from the run
    model_uri = f"runs:/{run_id}/model"
    
    # Register the model
    registered_model = mlflow.register_model(
        model_uri=model_uri,
        name=model_name,
        await_registration_for=600
    )
    
    # Add description
    client = MlflowClient()
    client.update_registered_model(
        name=model_name,
        description=description
    )
    
    # Add additional metadata
    client.set_registered_model_tag(
        name=model_name,
        key="model_type",
        value="rag_application"
    )
    client.set_registered_model_tag(
        name=model_name,
        key="created_by",
        value=current_user()
    )
    
    return registered_model.version

Performance Monitoring Integration

MLflow enables ongoing monitoring of deployed models:

  1. Metric Streaming:
    • Real-time Logging: Continuous performance tracking
    • Dashboard Integration: Visualizing current metrics
    • Alert Configuration: Notifications for metric thresholds
  2. Model Comparison:
    • A/B Testing: Comparing production versions
    • Regression Detection: Identifying performance degradation
    • Impact Analysis: Measuring business metrics changes
  3. Continuous Improvement:
    • Feedback Loop Integration: Incorporating user signals
    • Automated Retraining: Triggering updates based on criteria
    • Performance Evolution: Tracking long-term trends

Example monitoring integration:

def log_production_metrics(model_name, model_version, query_id, metrics):
    """
    Log production metrics for a deployed model
    """
    # Create run in production monitoring experiment
    experiment_name = f"{model_name}_production_monitoring"
    experiment = mlflow.get_experiment_by_name(experiment_name)
    if not experiment:
        experiment_id = mlflow.create_experiment(experiment_name)
    else:
        experiment_id = experiment.experiment_id
    
    # Start run with appropriate naming
    with mlflow.start_run(
        experiment_id=experiment_id,
        run_name=f"v{model_version}_{query_id}"
    ) as run:
        # Log all metrics
        for metric_name, metric_value in metrics.items():
            mlflow.log_metric(metric_name, metric_value)
        
        # Log model info as params
        mlflow.log_param("model_name", model_name)
        mlflow.log_param("model_version", model_version)
        mlflow.log_param("query_id", query_id)
        mlflow.log_param("timestamp", datetime.now().isoformat())
    
    return run.info.run_id

Cost Control Strategies

Effective cost management ensures efficient resource utilization while maintaining performance.

Architectural Optimization

Architectural choices significantly impact costs:

  1. Model Selection Strategies:
    • Right-sizing Models: Using appropriate model sizes for tasks
    • Model Specialization: Task-specific models for efficiency
    • Embedding Model Optimization: Balancing dimensions and accuracy
  2. Caching Implementations:
    • Response Caching: Storing results for common queries
    • Embedding Caching: Reusing vectors for frequent texts
    • Context Caching: Maintaining retrieved documents for similar queries
  3. Batching and Pooling:
    • Request Batching: Combining similar requests
    • Compute Pooling: Sharing infrastructure across applications
    • Asynchronous Processing: Non-blocking request handling

Example caching implementation:

class CachedRAGSystem:
    def __init__(self, vector_store, embedding_model, llm, cache_ttl=3600):
        self.vector_store = vector_store
        self.embedding_model = embedding_model
        self.llm = llm
        self.embedding_cache = {}
        self.response_cache = {}
        self.cache_ttl = cache_ttl
    
    def _get_cache_key(self, text):
        return hashlib.md5(text.encode()).hexdigest()
    
    def get_embedding(self, text):
        """Get embedding with caching"""
        cache_key = self._get_cache_key(text)
        
        # Check cache
        if cache_key in self.embedding_cache:
            cache_time, embedding = self.embedding_cache[cache_key]
            if time.time() - cache_time < self.cache_ttl:
                return embedding
        
        # Generate new embedding
        embedding = self.embedding_model.embed_text(text)
        
        # Update cache
        self.embedding_cache[cache_key] = (time.time(), embedding)
        
        return embedding
    
    def get_response(self, query):
        """Get RAG response with caching"""
        cache_key = self._get_cache_key(query)
        
        # Check cache
        if cache_key in self.response_cache:
            cache_time, response, documents = self.response_cache[cache_key]
            if time.time() - cache_time < self.cache_ttl:
                return response, documents
        
        # Generate new response
        query_embedding = self.get_embedding(query)
        documents = self.vector_store.similarity_search_by_vector(query_embedding)
        
        prompt = self._format_prompt(query, documents)
        response = self.llm.generate(prompt)
        
        # Update cache
        self.response_cache[cache_key] = (time.time(), response, documents)
        
        return response, documents
    
    def _format_prompt(self, query, documents):
        # Format prompt with retrieved documents
        context = "\n\n".join([doc.page_content for doc in documents])
        return f"Answer the following question based on this context:\n\nContext: {context}\n\nQuestion: {query}\n\nAnswer:"

Token Optimization

Reducing token usage directly impacts costs:

  1. Prompt Engineering:
    • Concise Instructions: Minimizing directive text
    • Efficient Few-shot Examples: Using minimal examples
    • Controlled Output Format: Specifying compact responses
  2. Retrieval Efficiency:
    • Precision-focused Retrieval: Fetching only most relevant documents
    • Chunk Size Optimization: Balancing context and token count
    • Content Filtering: Removing boilerplate before inclusion
  3. Response Management:
    • Length Constraints: Setting appropriate maximum lengths
    • Progressive Generation: Generating additional content only when needed
    • Format Optimization: Using compact representation formats

Example token optimization:

def optimize_prompt_tokens(prompt_template, context_docs, query, max_tokens=3000):
    """
    Optimize a prompt to fit within token constraints
    """
    # Calculate tokens in fixed parts
    template_tokens = count_tokens(prompt_template.replace("{context}", "").replace("{query}", ""))
    query_tokens = count_tokens(query)
    
    # Calculate available tokens for context
    available_context_tokens = max_tokens - template_tokens - query_tokens - 100  # Buffer
    
    # Prioritize and truncate context
    prioritized_docs = rank_documents_by_relevance(context_docs, query)
    
    optimized_context = ""
    current_tokens = 0
    
    for doc in prioritized_docs:
        doc_tokens = count_tokens(doc)
        if current_tokens + doc_tokens <= available_context_tokens:
            optimized_context += doc + "\n\n"
            current_tokens += doc_tokens
        else:
            # If we can fit a truncated version, add that
            truncated_doc = truncate_document(doc, available_context_tokens - current_tokens)
            if truncated_doc:
                optimized_context += truncated_doc + "\n\n"
            break
    
    # Format final prompt
    final_prompt = prompt_template.replace("{context}", optimized_context).replace("{query}", query)
    
    return final_prompt, count_tokens(final_prompt)

Operational Efficiency

Operational practices help control ongoing costs:

  1. Monitoring and Alerting:
    • Cost Thresholds: Alerts for unusual spending
    • Usage Quotas: Limiting resource consumption
    • Trend Analysis: Identifying cost pattern changes
  2. Deployment Optimization:
    • Serverless Scaling: Paying only for actual usage
    • Resource Right-sizing: Matching capacity to demand
    • Autoscaling Policies: Adjusting resources to load
  3. Continuous Optimization:
    • Cost Attribution: Tracking expenses by use case
    • Regular Reviews: Scheduled efficiency assessments
    • Benchmark Comparisons: Evaluating against best practices

Example cost monitoring dashboard:

def create_cost_monitoring_dashboard():
    """
    Create a SQL query for a cost monitoring dashboard
    """
    dashboard_query = """
    SELECT 
      date_trunc('day', timestamp) as day,
      model_name,
      count(*) as request_count,
      sum(total_tokens) as total_tokens,
      avg(total_tokens) as avg_tokens_per_request,
      sum(total_cost_usd) as daily_cost,
      avg(total_cost_usd) as avg_cost_per_request,
      percentile(total_cost_usd, 0.95) as p95_cost_per_request
    FROM monitoring.token_usage
    WHERE timestamp >= current_date - interval 30 days
    GROUP BY 1, 2
    ORDER BY 1 DESC, 6 DESC
    """
    
    alert_query = """
    SELECT 
      sum(total_cost_usd) as hourly_cost,
      count(*) as request_count
    FROM monitoring.token_usage
    WHERE timestamp >= now() - interval 1 hour
    HAVING sum(total_cost_usd) > 100  -- Alert if hourly cost exceeds $100
    """
    
    return dashboard_query, alert_query

Complex RAG Application Implementation

Now we’ll implement a complete RAG application addressing a realistic business scenario.

Scenario: Legal Contract Analysis System

A legal firm needs a system that can analyze legal contracts, answer specific questions about contract terms, identify potential risks, and compare clauses against standard templates.

Implementation Steps:

1. Document Processing Pipeline

def process_legal_documents(document_paths):
    """
    Process legal documents for the RAG system
    """
    # Initialize document processing components
    doc_loader = UnstructuredPDFLoader()
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
        separators=["\n\n", "\n", ".", " "]
    )
    embedding_model = SentenceTransformerEmbeddings(model_name="legal-bert-base-uncased")
    
    # Process each document
    all_chunks = []
    for doc_path in document_paths:
        # Extract document metadata
        doc_name = os.path.basename(doc_path)
        doc_type = classify_document_type(doc_path)
        
        # Load and split document
        raw_document = doc_loader.load(doc_path)
        chunks = text_splitter.split_documents(raw_document)
        
        # Add metadata to chunks
        for i, chunk in enumerate(chunks):
            chunk.metadata.update({
                "source": doc_name,
                "doc_type": doc_type,
                "chunk_id": f"{doc_name}_chunk_{i}",
                "chunk_index": i
            })
        
        all_chunks.extend(chunks)
    
    # Generate and store embeddings
    chunk_data = []
    for chunk in all_chunks:
        embedding = embedding_model.embed_query(chunk.page_content)
        chunk_data.append({
            "chunk_id": chunk.metadata["chunk_id"],
            "document_name": chunk.metadata["source"],
            "document_type": chunk.metadata["doc_type"],
            "chunk_index": chunk.metadata["chunk_index"],
            "chunk_text": chunk.page_content,
            "embedding": embedding
        })
    
    # Store in Delta Lake
    chunk_df = spark.createDataFrame(chunk_data)
    chunk_df.write.format("delta").mode("overwrite").saveAsTable("legal.contract_chunks")
    
    # Create Vector Search index
    spark.sql("""
    CREATE OR REPLACE INDEX legal_contract_index
    ON legal.contract_chunks
    USING VECTOR
    ON embedding
    OPTIONS (
        similarity_function = 'cosine',
        optimization_hint = 'latency'
    )
    """)
    
    return len(all_chunks)

2. RAG Implementation with Multi-Stage Reasoning

class LegalContractRAG(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        # Initialize embedding model
        self.embedding_model = SentenceTransformerEmbeddings(model_name="legal-bert-base-uncased")
        
        # Initialize LLM client
        self.llm_client = DatabricksFoundationModelClient()
        
        # Initialize Vector Search client
        workspace_client = WorkspaceClient()
        self.vs_client = VectorSearchClient(workspace_client=workspace_client)
        self.vs_index = self.vs_client.get_index(
            endpoint_name="vector-search",
            index_name="legal_contract_index"
        )
    
    def predict(self, context, model_input):
        # Extract inputs
        query = model_input["query"][0]
        document_filter = model_input.get("document_filter", [None])[0]
        
        # Analyze query to determine query type
        query_analysis = self._analyze_query(query)
        
        # Retrieve relevant context based on query type
        if query_analysis["query_type"] == "clause_comparison":
            # For comparisons, we need to retrieve standard clauses and the specific clause
            context_docs = self._retrieve_comparison_context(query, document_filter)
        elif query_analysis["query_type"] == "risk_identification":
            # For risk analysis, retrieve contract clauses and risk patterns
            context_docs = self._retrieve_risk_context(query, document_filter)
        else:
            # For general questions, use standard retrieval
            context_docs = self._retrieve_context(query, document_filter)
        
        # Generate response based on query type
        prompt = self._create_prompt(query, context_docs, query_analysis)
        
        response = self.llm_client.completions.create(
            model="databricks-claude-3-sonnet",
            prompt=prompt,
            max_tokens=1000,
            temperature=0.2
        ).choices[0].text
        
        # Post-process response for correct formatting and citations
        processed_response = self._post_process_response(response, context_docs)
        
        return processed_response
    
    def _analyze_query(self, query):
        """Analyze query to determine its type and requirements"""
        analysis_prompt = f"""
        Analyze the following legal contract query and categorize it into one of these types:
        1. general_information - Basic information lookup
        2. clause_comparison - Comparing contract clauses with standards
        3. risk_identification - Identifying potential risks or issues
        4. obligation_extraction - Extracting obligations or requirements
        
        Query: {query}
        
        Return a JSON object with the query_type and any specific entities that should be focused on.
        """
        
        analysis_response = self.llm_client.completions.create(
            model="databricks-claude-3-haiku",
            prompt=analysis_prompt,
            max_tokens=200,
            temperature=0.0,
            response_format={"type": "json"}
        ).choices[0].text
        
        return json.loads(analysis_response)
    
    def _retrieve_context(self, query, document_filter=None):
        """Retrieve relevant context using Vector Search"""
        # Generate query embedding
        query_embedding = self.embedding_model.embed_query(query)
        
        # Build filter condition if specified
        filter_condition = f"document_name = '{document_filter}'" if document_filter else None
        
        # Search for relevant chunks
        results = self.vs_index.similarity_search(
            query_vector=query_embedding,
            columns=["chunk_id", "document_name", "document_type", "chunk_text"],
            num_results=5,
            filter=filter_condition
        )
        
        return results
    
    def _retrieve_comparison_context(self, query, document_filter=None):
        """Retrieve context for clause comparison"""
        # Implementation for specialized retrieval for comparisons
        # This would combine retrieval of the specific clauses and standard templates
        pass
    
    def _retrieve_risk_context(self, query, document_filter=None):
        """Retrieve context for risk identification"""
        # Implementation for specialized retrieval for risk analysis
        # This would include known risk patterns along with contract clauses
        pass
    
    def _create_prompt(self, query, context_docs, query_analysis):
        """Create appropriate prompt based on query type and context"""
        # Format context from retrieved documents
        context_text = "\n\n".join([doc["chunk_text"] for doc in context_docs])
        context_sources = "\n".join([f"[{i+1}] {doc['document_name']}, section {doc['chunk_id']}" 
                                   for i, doc in enumerate(context_docs)])
        
        # Different prompt templates based on query type
        if query_analysis["query_type"] == "clause_comparison":
            prompt = f"""
            You are a legal contract analysis assistant. Compare the contract clauses in the context with standard legal practices.
            
            Context information:
            {context_text}
            
            Sources:
            {context_sources}
            
            Question: {query}
            
            Provide a detailed comparison, highlighting any deviations from standard practices.
            Reference the specific parts of the contract using citation numbers [1], [2], etc.
            """
        elif query_analysis["query_type"] == "risk_identification":
            prompt = f"""
            You are a legal contract analysis assistant. Identify potential risks or issues in the contract clauses.
            
            Context information:
            {context_text}
            
            Sources:
            {context_sources}
            
            Question: {query}
            
            Highlight potential risks or areas of concern, explaining their implications.
            Reference the specific parts of the contract using citation numbers [1], [2], etc.
            """
        else:
            prompt = f"""
            You are a legal contract analysis assistant. Answer the following question based only on the provided context.
            
            Context information:
            {context_text}
            
            Sources:
            {context_sources}
            
            Question: {query}
            
            Provide a detailed and accurate answer based only on the information in the context.
            Reference the specific parts of the contract using citation numbers [1], [2], etc.
            If the information cannot be found in the context, state that clearly.
            """
        
        return prompt
    
    def _post_process_response(self, response, context_docs):
        """Format response with proper citations and structure"""
        # Implementation for post-processing to ensure proper citations and formatting
        return response

3. MLflow Registration and Deployment

def register_and_deploy_legal_rag():
    """
    Register and deploy the Legal Contract RAG model
    """
    # Log model
    with mlflow.start_run(run_name="legal_contract_rag_model") as run:
        # Define model signature
        signature = mlflow.models.signature.infer_signature(
            model_input=pd.DataFrame({
                "query": ["What are the payment terms in the contract?"],
                "document_filter": ["contract_2023.pdf"]
            }),
            model_output=["The payment terms in the contract specify..."]
        )
        
        # Log model with dependencies
        mlflow.pyfunc.log_model(
            artifact_path="legal_rag_model",
            python_model=LegalContractRAG(),
            code_path=["./legal_utils.py"],
            conda_env={
                "channels": ["conda-forge"],
                "dependencies": [
                    "python=3.9.0",
                    "pip=22.0.4",
                    {"pip": [
                        "databricks-sdk==0.8.0",
                        "sentence-transformers==2.2.2",
                        "langchain==0.0.148"
                    ]}
                ]
            },
            signature=signature,
            registered_model_name="legal_contract_analysis"
        )
    
    # Register model in Unity Catalog
    client = MlflowClient()
    latest_version = client.get_latest_versions("legal_contract_analysis", stages=["None"])[0].version
    
    # Transition to production
    client.transition_model_version_stage(
        name="legal_contract_analysis",
        version=latest_version,
        stage="Production"
    )
    
    # Create serving endpoint
    workspace_client = WorkspaceClient()
    serving_client = workspace_client.serving_endpoints
    
    endpoint_name = "legal-contract-analysis"
    
    # Check if endpoint exists
    try:
        serving_client.get(endpoint_name)
        # Update existing endpoint
        serving_client.update_config(
            endpoint_name,
            served_models=[{
                "name": "legal_contract_analysis",
                "model_name": "legal_contract_analysis",
                "model_version": latest_version,
                "workload_size": "Medium",
                "scale_to_zero_enabled": True
            }]
        )
    except:
        # Create new endpoint
        serving_client.create(
            name=endpoint_name,
            config={
                "served_models": [{
                    "name": "legal_contract_analysis",
                    "model_name": "legal_contract_analysis",
                    "model_version": latest_version,
                    "workload_size": "Medium",
                    "scale_to_zero_enabled": True
                }]
            }
        )
    
    return endpoint_name

4. Monitoring Implementation

def setup_monitoring(endpoint_name):
    """
    Set up monitoring for the legal RAG system
    """
    # Create monitoring tables
    spark.sql("""
    CREATE DATABASE IF NOT EXISTS legal_monitoring
    """)
    
    # Performance metrics table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS legal_monitoring.performance_metrics (
        query_id STRING,
        timestamp TIMESTAMP,
        query_type STRING,
        embedding_time_ms DOUBLE,
        retrieval_time_ms DOUBLE,
        llm_time_ms DOUBLE,
        total_time_ms DOUBLE,
        document_count INT,
        token_count INT
    )
    USING DELTA
    """)
    
    # Response quality table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS legal_monitoring.response_quality (
        query_id STRING,
        timestamp TIMESTAMP,
        query STRING,
        response STRING,
        has_citations BOOLEAN,
        clarity_score DOUBLE,
        relevance_score DOUBLE,
        user_feedback STRING,
        feedback_score INT
    )
    USING DELTA
    """)
    
    # Token usage table
    spark.sql("""
    CREATE TABLE IF NOT EXISTS legal_monitoring.token_usage (
        query_id STRING,
        timestamp TIMESTAMP,
        prompt_tokens INT,
        completion_tokens INT,
        total_tokens INT,
        estimated_cost DOUBLE
    )
    USING DELTA
    """)
    
    # Create monitoring dashboard
    dashboard_query = """
    SELECT 
      date_trunc('day', p.timestamp) as day,
      count(*) as query_count,
      avg(p.total_time_ms) as avg_response_time_ms,
      percentile(p.total_time_ms, 0.95) as p95_response_time_ms,
      avg(t.total_tokens) as avg_tokens_per_query,
      sum(t.estimated_cost) as daily_cost,
      avg(CASE WHEN q.feedback_score IS NOT NULL THEN q.feedback_score ELSE NULL END) as avg_feedback_score,
      count(CASE WHEN q.feedback_score >= 4 THEN 1 ELSE NULL END) * 100.0 / 
        count(CASE WHEN q.feedback_score IS NOT NULL THEN 1 ELSE NULL END) as satisfaction_rate
    FROM legal_monitoring.performance_metrics p
    JOIN legal_monitoring.token_usage t ON p.query_id = t.query_id
    LEFT JOIN legal_monitoring.response_quality q ON p.query_id = q.query_id
    WHERE p.timestamp >= current_date - interval 30 days
    GROUP BY 1
    ORDER BY 1 DESC
    """
    
    # Set up alert for high latency
    latency_alert_query = """
    SELECT 
      avg(total_time_ms) as avg_latency
    FROM legal_monitoring.performance_metrics
    WHERE timestamp >= now() - interval 15 minutes
    HAVING avg(total_time_ms) > 5000  -- Alert if average latency exceeds 5 seconds
    """
    
    # Set up alert for high costs
    cost_alert_query = """
    SELECT 
      sum(estimated_cost) as hourly_cost
    FROM legal_monitoring.token_usage
    WHERE timestamp >= now() - interval 1 hour
    HAVING sum(estimated_cost) > 50  -- Alert if hourly cost exceeds $50
    """
    
    return dashboard_query, latency_alert_query, cost_alert_query

5. Client Application Implementation

def create_legal_rag_client(endpoint_name):
    """
    Create a client application for the legal RAG system
    """
    class LegalContractClient:
        def __init__(self, endpoint_name):
            self.endpoint_name = endpoint_name
            self.workspace_client = WorkspaceClient()
            self.serving_client = self.workspace_client.serving_endpoints
        
        def analyze_contract(self, query, document_filter=None):
            """
            Send a query to the legal contract analysis system
            """
            # Generate query ID
            query_id = f"q_{uuid.uuid4().hex[:8]}"
            
            # Prepare input
            input_df = pd.DataFrame({
                "query": [query],
                "document_filter": [document_filter]
            })
            
            # Start timing
            start_time = time.time()
            
            # Call endpoint
            response = self.serving_client.query(
                name=self.endpoint_name,
                dataframe_records=input_df.to_dict(orient="records")
            )
            
            # Calculate total time
            total_time = (time.time() - start_time) * 1000  # Convert to ms
            
            # Log performance metrics
            self._log_performance(query_id, query, total_time)
            
            # Format response
            result = {
                "query_id": query_id,
                "query": query,
                "response": response["predictions"][0],
                "response_time_ms": total_time
            }
            
            return result
        
        def provide_feedback(self, query_id, feedback_score, feedback_text=None):
            """
            Record user feedback on a response
            """
            feedback_data = {
                "query_id": query_id,
                "timestamp": datetime.now(),
                "feedback_score": feedback_score,
                "feedback_text": feedback_text
            }
            
            # Log feedback to Delta table
            spark.createDataFrame([feedback_data]).write.format("delta").mode("append").saveAsTable(
                "legal_monitoring.user_feedback")
            
            # Update response quality table
            spark.sql(f"""
            UPDATE legal_monitoring.response_quality
            SET feedback_score = {feedback_score},
                user_feedback = '{feedback_text or ""}'
            WHERE query_id = '{query_id}'
            """)
            
            return {"status": "Feedback recorded successfully"}
        
        def _log_performance(self, query_id, query, total_time):
            """
            Log performance metrics for monitoring
            """
            # For a real implementation, we would extract more detailed metrics
            # This is a simplified version
            performance_data = {
                "query_id": query_id,
                "timestamp": datetime.now(),
                "query_type": "general",  # Simplified
                "embedding_time_ms": total_time * 0.1,  # Estimated
                "retrieval_time_ms": total_time * 0.2,  # Estimated
                "llm_time_ms": total_time * 0.7,  # Estimated
                "total_time_ms": total_time,
                "document_count": 5,  # Estimated
                "token_count": 1000  # Estimated
            }
            
            # Log to Delta table
            spark.createDataFrame([performance_data]).write.format("delta").mode("append").saveAsTable(
                "legal_monitoring.performance_metrics")
    
    return LegalContractClient(endpoint_name)

6. End-to-End Usage Example

def demonstrate_legal_rag_system():
    """
    Demonstrate the complete Legal RAG system
    """
    # Process sample documents
    document_paths = [
        "./sample_data/service_agreement_2023.pdf",
        "./sample_data/nda_template.pdf",
        "./sample_data/standard_clauses.pdf"
    ]
    
    num_chunks = process_legal_documents(document_paths)
    print(f"Processed {len(document_paths)} documents into {num_chunks} chunks")
    
    # Register and deploy model
    endpoint_name = register_and_deploy_legal_rag()
    print(f"Model deployed to endpoint: {endpoint_name}")
    
    # Setup monitoring
    dashboard_query, latency_alert, cost_alert = setup_monitoring(endpoint_name)
    print("Monitoring setup complete")
    
    # Create client
    client = create_legal_rag_client(endpoint_name)
    
    # Example queries
    sample_queries = [
        "What are the payment terms in the service agreement?",
        "Compare the confidentiality clause to standard industry terms.",
        "Identify any potential risks in the termination clauses.",
        "What obligations does the vendor have for data security?"
    ]
    
    # Run sample queries
    for query in sample_queries:
        print(f"\nQuery: {query}")
        result = client.analyze_contract(query, document_filter="service_agreement_2023.pdf")
        print(f"Response: {result['response'][:100]}...")
        print(f"Response time: {result['response_time_ms']:.2f} ms")
        
        # Simulate user feedback
        feedback_score = random.randint(3, 5)
        client.provide_feedback(result["query_id"], feedback_score)
        print(f"Feedback provided: {feedback_score}/5")
    
    # Display monitoring dashboard
    print("\nMonitoring Dashboard Query:")
    print(dashboard_query)
    
    return "Legal RAG system demonstration complete"

90-Minute Full Mock Exam (45 Questions)

Section 1: Designing Applications

Question 1: A data scientist is designing a prompt for a financial analysis application that needs to generate concise insights from quarterly earnings reports. Which prompt design technique would be most effective for ensuring consistently formatted outputs?

A) Increasing the temperature parameter to 0.9 for more creative responses

B) Including multiple examples with different formatting approaches

C) Specifying the exact output structure with field names and formatting instructions

D) Using chain-of-thought prompting without output formatting requirements

Question 2: A healthcare company wants to develop an application that helps clinicians answer questions about medical treatments based on recent research papers. Which of the following would be the most appropriate approach for designing this application?

A) A fine-tuned medical language model without retrieval capabilities

B) A retrieval-augmented generation system that integrates recent research papers

C) A classification system that assigns predefined treatment categories

D) A transformer-based summarization model for medical texts

Question 3: When designing a multi-stage reasoning chain for a complex legal document analysis application, what is the optimal ordering of components?

A) Document chunking → Entity extraction → Legal classification → Response generation

B) Document chunking → Document retrieval → Entity extraction → Response generation

C) Entity extraction → Document chunking → Document retrieval → Response generation

D) Document retrieval → Entity extraction → Document chunking → Response generation

Question 4: Which two aspects should be prioritized when translating business requirements into a description of inputs and outputs for an AI pipeline that analyzes customer feedback? (Select two)

A) Creative phrasing of customer sentiments

B) Categorization of feedback into predefined topics

C) Extraction of specific product mentions and issues

D) Generation of marketing content from feedback

E) Identification of customer demographic information

Question 5: A Generative AI Engineer is designing a prompt that needs to extract structured data from unstructured text. Which technique would be most effective?

A) Using a high temperature setting (0.9-1.0)

B) Providing clear instructions with examples of desired output format

C) Minimizing the context window to increase focus

D) Using verbose descriptions of the extraction task

Question 6: Which chain component would be most appropriate for a system that needs to compare customer complaints against company policies and generate appropriate responses?

A) A sequence-to-sequence transformation component

B) A retrieval component with comparison functionality

C) A classification component followed by template selection

D) A multi-document summarization component

Question 7: A retail company wants to create an AI application that provides personalized product recommendations. Which model task is most appropriate for this requirement?

A) Text classification

B) Entity extraction

C) Text generation with personalization context

D) Summarization

Question 8: When defining tools for a multi-stage reasoning process, what is the correct ordering of operations for a document analysis system?

A) Gather knowledge → Take actions → Generate output

B) Generate output → Gather knowledge → Take actions

C) Take actions → Gather knowledge → Generate output

D) Gather knowledge → Generate output → Take actions

Section 2: Data Preparation

Question 9: When implementing a Vector Search index in Databricks, which parameter is most important for optimizing retrieval speed for a real-time application?

A) The number of columns included in the index

B) The dimensionality of the embedding vectors

C) The choice between optimization_hint = 'latency' or optimization_hint = 'throughput'

D) The Delta table partitioning strategy

Question 10: When chunking financial documents for a RAG application, users report that responses sometimes miss important context that spans across sections. Which chunking approach would most effectively address this issue?

A) Decreasing chunk size from 1000 to 250 tokens

B) Using semantic chunking that preserves section boundaries with 30% overlap between chunks

C) Implementing hierarchical chunking with both document-level and paragraph-level chunks

D) Switching to character-based chunking with no overlap

Question 11: A Generative AI Engineer is loading 150 million embeddings into a vector database that takes a maximum of 100 million. Which TWO actions would be most effective to reduce the record count? (Select two)

A) Increase the document chunk size

B) Decrease the overlap between chunks

C) Decrease the document chunk size

D) Increase the overlap between chunks

E) Use a smaller embedding model

Question 12: Which Python package would be most appropriate for extracting text from scanned PDF documents?

A) BeautifulSoup

B) Pandas

C) PyTesseract

D) Scrapy

Question 13: When filtering extraneous content from source documents for a legal RAG application, which approach would provide the most benefit?

A) Removing all images and charts

B) Removing boilerplate legal disclaimers and repetitive header/footer text

C) Keeping only the first and last paragraphs of each document

D) Converting all text to lowercase for consistency

Question 14: A data engineer needs to prepare a document collection with 10 million short paragraphs for a RAG application. The application requires low-latency responses. What is the most efficient approach for storing these documents in Databricks?

A) Store the raw text in a Delta Lake table without embeddings

B) Generate embeddings for each paragraph and store them with the text in a Delta Lake table with a Vector Search index

C) Store the documents in a NoSQL database outside of Databricks

D) Convert all documents to a single large text file and use in-memory processing

Question 15: A Generative AI Engineer is assessing the responses from a customer-facing application that assists in selling automotive parts. Which of the following receivers would most improve the application’s ability to answer shipping and delivery date questions?

A) Create a vector store that includes the company shipping policies and payment terms

B) Create a feature store table with transaction_id as primary key that is populated with invoice data and expected delivery date

C) Provide example data for expected arrival dates as a tuning dataset, then periodically fine-tune the model

D) Amend the chat prompt to input when the order was placed and add 14 days

Question 16: When preparing prompt/response pairs for a model task, which characteristic is most important for effective fine-tuning?

A) Including as many examples as possible regardless of quality

B) Ensuring examples represent diverse use cases with consistent formatting

C) Using only examples from a single domain

D) Prioritizing complex examples over simple ones

Section 3: Application Development

Question 17: When building a RAG application using LangChain, which component is responsible for converting retrieved documents into a format suitable for inclusion in the LLM prompt?

A) Document loader

B) Text splitter

C) Prompt template

D) Chain

Question 18: Which embedding model characteristic is most important when selecting a model for a retrieval system that needs to understand technical medical terminology?

A) The model’s dimensionality

B) The model’s training domain and corpus

C) The model’s parameter count

D) The model’s inference speed

Question 19: A company is developing a RAG application to answer questions about their product documentation. Users report that the system sometimes provides incorrect information not found in the documents. What is the most effective approach to minimize these hallucinations?

A) Using a larger language model

B) Including explicit instructions in the prompt to only use provided context and clearly indicate when information is not available

C) Increasing the number of retrieved documents for each query

D) Generating longer responses with more details

Question 20: Which technique would most effectively augment a prompt with additional context based on user input?

A) Adding random keywords from the user’s query

B) Extracting key entities and intents from the query and retrieving relevant information

C) Always appending the user’s complete interaction history

D) Using static predefined context for all queries in the same category

Question 21: When implementing LLM guardrails to prevent negative outcomes, which approach provides the most comprehensive protection?

A) Using only input filtering to block problematic queries

B) Using only output filtering to block harmful responses

C) Implementing both input and output filtering with continuous monitoring and improvement

D) Relying solely on the built-in safety features of the foundation model

Question 22: A financial services company is developing a generative AI application that needs to avoid leaking private customer data. Which metaprompt approach would be most effective?

A) Instructing the model to generate creative fictional examples

B) Providing clear instructions to never repeat or include specific types of sensitive data in responses

C) Using a high temperature setting to increase response variability

D) Removing all specific entities from the prompt

Question 23: A data scientist is building a RAG application for processing legal contracts. The application must extract specific clauses and compare them against standard templates. Which model would be best suited for this task?

A) A small model (1-3B parameters) optimized for speed

B) A medium-sized model (7-13B parameters) with strong instruction-following capabilities

C) The largest available model regardless of other factors

D) A model specifically trained on conversational data

Question 24: When selecting an embedding model for a RAG application processing scientific research papers, which context length would be most appropriate if average paragraphs are 200-300 tokens?

A) 128 tokens

B) 512 tokens

C) 1024 tokens

D) 4096 tokens

Section 4: Assembling and Deploying Applications

Question 25: A data scientist has created a PyFunc model for a RAG application and wants to register it with MLflow. Which component is essential to include for proper deployment?

A) The raw training data used to train the embedding model

B) The model signature defining input and output schemas

C) A visualization of the RAG architecture

D) The full text of all documents in the knowledge base

Question 26: Which code structure is required when implementing a PyFunc model for deploying an LLM chain in Databricks?

A) A class that extends mlflow.pyfunc.PythonModel with load_context and predict methods

B) A series of SQL commands that define the model behavior

C) A YAML configuration file that defines the model architecture

D) A Python dictionary mapping inputs to outputs

Question 27: When creating a Vector Search index in Databricks, which SQL command correctly creates an index optimized for low-latency queries?

A) CREATE INDEX vector_index ON table USING VECTOR (embedding_column) WITH (similarity_function = 'cosine')

B) CREATE INDEX vector_index ON table (embedding_column) USING VECTOR WITH (similarity_function = 'cosine', optimization_hint = 'latency')

C) CREATE OR REPLACE INDEX vector_index ON table USING VECTOR (embedding_column) OPTIONS (similarity_function = 'cosine', optimization_hint = 'latency')

D) CREATE VECTOR INDEX vector_index ON table (embedding_column) WITH OPTIONS (function = 'cosine', optimization = 'latency')

Question 28: When deploying a foundation model-based application in Databricks, which serving configuration would be most appropriate for an application with highly variable traffic patterns?

A) Fixed-size serving endpoints with dedicated compute

B) Serverless endpoints with auto-scaling enabled

C) Single-node endpoints with maximum resources

D) Multi-region endpoint deployment with global load balancing

Question 29: A data engineer is creating a chain using LangChain for a document question-answering system. Which components must be included in the correct order?

A) Embedding model → Vector store → LLM → Output parser

B) Document loader → Text splitter → Embedding model → Vector store → Retriever → LLM chain

C) LLM → Embedding model → Vector store → Prompt template

D) Document loader → LLM → Output parser → Vector store

Question 30: Which approach is most appropriate for controlling access to resources from model serving endpoints in a multi-team environment?

A) Sharing a single endpoint across all teams with a common access token

B) Creating separate endpoints for each team with team-specific access control

C) Implementing a single shared access token with time-based rotation

D) Allowing anonymous access but limiting rate per IP address

Question 31: A data engineer needs to query a Vector Search index containing product descriptions to find semantically similar products. Which approach is correct?

A) SELECT * FROM products ORDER BY embedding SIMILARITY TO query_embedding LIMIT 5

B) SELECT * FROM products WHERE vector_similarity(embedding, query_embedding) > 0.8

C) SELECT *, vector_dot_product(embedding, array[query_embedding]) as similarity FROM products ORDER BY similarity DESC LIMIT 5

D) SELECT * FROM products WHERE embedding CONTAINS query_embedding LIMIT 5

Question 32: A Generative AI Engineer is deploying an application that leverages Foundation Model APIs. Which element is essential for proper deployment?

A) A GPU-accelerated cluster

B) A dedicated token for API authentication

C) A custom Docker container

D) A separate Vector Search endpoint

Section 5: Governance

Question 33: A generative AI application is being developed to process sensitive healthcare information. Which two techniques should be implemented to ensure HIPAA compliance? (Select two)

A) Implementing PII detection and masking in document preprocessing

B) Using the largest available LLM to ensure accuracy

C) Maintaining audit logs of all queries and responses

D) Storing all generated content indefinitely for quality control

E) Implementing user authentication and role-based access controls

Question 34: When implementing text masking in a RAG application, which approach provides the most comprehensive protection for sensitive information?

A) Simple regular expression matching for common patterns like email addresses and phone numbers

B) Multi-layered approach combining pattern matching, named entity recognition, and contextual analysis

C) Manual review of all documents before ingestion

D) Using only public domain documents in the knowledge base

Question 35: A company is using data from various academic research papers as sources for their RAG application. Which approach is most important to avoid legal risk?

A) Storing all papers on company servers for faster access

B) Modifying the text of all papers to avoid exact matches

C) Verifying licensing terms and obtaining proper permissions for commercial use

D) Anonymizing all paper authors and institutions

Question 36: When implementing guardrails to protect against malicious user inputs, which approach provides the most robust protection?

A) Blocking a predefined list of keywords and phrases

B) Implementing a multi-layer approach with input classification, intent analysis, and continuous improvement

C) Limiting the number of requests per user

D) Using only the most recent foundation model version

Question 37: A RAG application is being built using documents containing outdated or potentially problematic terminology. Which approach is most appropriate for mitigating this issue?

A) Manually editing all documents to update terminology

B) Implementing a preprocessing step that identifies and replaces problematic terms with appropriate alternatives

C) Using only AI-generated content instead of original documents

D) Setting a high temperature parameter to encourage varied language

Section 6: Evaluation and Monitoring

Question 38: When evaluating the retrieval performance of a RAG application, which metric would be most valuable for understanding if the system is retrieving relevant documents?

A) The total number of tokens in retrieved documents

B) The publication date of retrieved documents

C) The relevance score between query and retrieved documents

D) The processing time for retrieval operations

Question 39: A data engineer is evaluating several LLM options for a customer service application. Which evaluation metric would be most important for selecting the best model?

A) The model’s parameter count

B) The model’s performance on customer service-relevant tasks like problem resolution and sentiment understanding

C) The model’s training data size

D) The model’s release date

Question 40: Which approach would be most effective for monitoring and controlling the cost of a production RAG application deployed on Databricks?

A) Manually reviewing logs to identify expensive queries

B) Implementing token tracking, query caching, and automated alerts for unusual usage patterns

C) Restricting the application to a fixed number of queries per day

D) Using only the smallest available models regardless of performance requirements

Question 41: A generative AI application deployed in production is showing performance degradation over time. Which monitoring approach would best help identify the root cause?

A) Tracking only the final response quality scores

B) Monitoring component-level metrics including retrieval quality, LLM performance, and end-to-end latency

C) Comparing current responses to the original training data

D) Analyzing only error rates without examining successful responses

Question 42: When using MLflow to evaluate model performance in a RAG application, which metrics should be tracked? (Select two)

A) Number of model parameters

B) Response factuality compared to source documents

C) Model training dataset size

D) Query-document relevance scores

E) Model architecture type

Question 43: A company wants to implement inference logging to assess deployed RAG application performance. Which data should be captured to provide the most valuable insights?

A) Only failed queries to reduce storage requirements

B) User queries, retrieved documents, generated responses, and user feedback

C) Only the final responses to ensure user privacy

D) Random sampling of 10% of all interactions

Question 44: Which method is most effective for controlling LLM costs for RAG applications in Databricks?

A) Always using the smallest available model

B) Implementing caching for common queries and optimizing prompt templates to reduce token usage

C) Restricting the application to business hours only

D) Processing all queries in daily batches

Question 45: A Generative AI Engineer needs to evaluate a RAG application’s ability to provide accurate financial advice. Which approach would provide the most meaningful evaluation?

A) Comparing responses to a test set of pre-defined questions with expert-validated answers

B) Measuring the semantic similarity of all responses

C) Counting the number of financial terms used in responses

D) Measuring response generation speed

Answer Key with Explanations

1. C - Specifying the exact output structure provides clear instructions for formatting, ensuring consistency in the generated outputs.

2. B - A retrieval-augmented generation system is ideal for this use case as it can access and incorporate the latest research while generating relevant responses.

3. A - This ordering ensures documents are properly processed before extraction and classification occurs, with response generation as the final step.

4. B, C - Categorization of feedback topics and extraction of specific product mentions are key business requirements for effectively analyzing customer feedback.

5. B - Clear instructions with examples help the model understand the expected format and structure of the extracted data.

6. B - A retrieval component with comparison functionality can retrieve relevant policies and compare them to the complaint.

7. C - Text generation with personalization context allows the system to generate customized recommendations based on customer data.

8. A - The correct sequence is first gathering relevant knowledge, then taking appropriate actions based on that knowledge, and finally generating output.

9. C - Setting the optimization hint to ‘latency’ configures the index specifically for fast query response times.

10. C - Hierarchical chunking preserves both document-level context and detailed paragraph information, addressing the issue of missing context.

11. A, B - Increasing chunk size and decreasing overlap both reduce the total number of chunks created from the same content.

12. C - PyTesseract is specifically designed for OCR (Optical Character Recognition) needed to extract text from scanned documents.

13. B - Removing boilerplate legal text and repetitive headers/footers reduces noise while preserving the substantive content.

14. B - Generating embeddings and using Vector Search provides the optimal balance of storage efficiency and retrieval performance.

15. B - A feature store with transaction-specific delivery data directly addresses the missing information needed to answer shipping questions.

16. B - Diverse use cases with consistent formatting provide the model with broad exposure while maintaining output quality standards.

17. C - The prompt template is responsible for formatting retrieved documents into a structure appropriate for the LLM.

18. B - The training domain and corpus are most important for specialized terminology understanding, as models trained on medical text will perform better with medical terminology.

19. B - Explicit instructions to use only provided context and indicate knowledge gaps help minimize hallucinations.

20. B - Extracting key entities and intents enables targeted retrieval of relevant additional context.

21. C - A comprehensive approach using both input and output filtering with continuous monitoring provides the most robust protection.

22. B - Clear instructions about handling sensitive data provide explicit guidance to prevent data leakage.

23. B - A medium-sized model with strong instruction-following is the best balance of capability and efficiency for structured legal tasks.

24. B - 512 tokens provides sufficient context for paragraphs of 200-300 tokens while maintaining efficiency.

25. B - The model signature defines the expected input and output formats, which is essential for proper deployment.

26. A - A class extending mlflow.pyfunc.PythonModel with load_context and predict methods is the required structure for PyFunc models.

27. C - This command correctly creates a Vector Search index with the proper optimization hint for low-latency queries.

28. B - Serverless endpoints with auto-scaling efficiently handle variable traffic patterns by scaling up and down as needed.

29. B - This sequence represents the complete pipeline from loading documents to generating responses via the LLM chain.

30. B - Creating separate endpoints with team-specific access control provides the most secure and manageable approach.

31. C - This query correctly uses vector_dot_product to calculate similarity between embeddings and orders results by similarity.

32. B - A dedicated token for API authentication is essential for securely accessing Foundation Model APIs.

33. A, C - PII detection/masking and maintaining audit logs are essential for HIPAA compliance in healthcare applications.

34. B - A multi-layered approach provides comprehensive protection by addressing different types of sensitive information.

35. C - Verifying licensing terms and obtaining proper permissions is crucial to avoid copyright infringement and legal issues.

36. B - A multi-layer approach with continuous improvement provides the most robust protection against evolving threats.

37. B - A preprocessing step to identify and replace problematic terms maintains the value of the content while addressing terminology issues.

38. C - The relevance score between query and retrieved documents directly measures retrieval quality.

39. B - Task-specific performance is the most important factor when selecting a model for a specific application.

40. B - A comprehensive approach with tracking, caching, and alerts provides effective cost control while maintaining performance.

41. B - Component-level monitoring helps pinpoint exactly where degradation is occurring in the pipeline.

42. B, D - Response factuality and query-document relevance scores are key metrics for RAG application quality.

43. B - Capturing the complete interaction data provides the most comprehensive insights for performance assessment.

44. B - Implementing caching and optimizing prompts directly addresses the main cost drivers in LLM applications.

45. A - Comparing responses to expert-validated answers provides the most meaningful evaluation of accuracy for domain-specific advice.


Final Preparation

Critical Area 1: Advanced Retrieval Techniques

Retrieval forms the foundation of effective RAG applications, and understanding its nuances is essential for exam success.

Key Concepts Review:

Vector Search optimization requires careful consideration of several parameters. The most important configuration choice is setting the appropriate optimization hint based on your application needs. For applications requiring fast response times, ‘latency’ optimization is preferred, while applications that prioritize thoroughness might benefit from ‘throughput’ optimization.

Chunking strategy selection depends on document characteristics and query patterns. Hierarchical chunking offers the most comprehensive approach by maintaining both document-level context and detailed segment information. This approach creates chunks at multiple granularity levels, allowing the system to retrieve the most appropriate context unit based on the query.

Contextual retrieval enhances standard vector similarity by incorporating additional factors. These factors include metadata filtering, hybrid search combining semantic and keyword matching, and query expansion to address vocabulary gaps between queries and documents.

Practice Exercise: Advanced Retrieval Implementation

Consider a financial advisory application that needs to retrieve relevant information from quarterly reports, regulatory filings, and news articles. Implement a retrieval system that appropriately handles different document types.

def implement_advanced_retrieval(query, document_types=None):
    """
    Implement advanced retrieval with document type awareness
    
    Parameters:
    - query: User question
    - document_types: Optional filter for specific document categories
    
    Returns:
    - Retrieved documents with relevance metrics
    """
    # Generate query embedding
    query_embedding = embedding_model.embed_text(query)
    
    # Analyze query to determine optimal retrieval approach
    query_analysis = analyze_query_intent(query)
    
    # Construct base SQL query
    base_query = """
    SELECT 
      document_id, 
      chunk_id,
      document_type,
      publication_date,
      chunk_text,
      vector_dot_product(embedding, array{}) as semantic_score
    FROM financial_documents
    """
    
    # Add document type filtering if specified
    filter_clause = ""
    if document_types:
        type_list = "', '".join(document_types)
        filter_clause = f"WHERE document_type IN ('{type_list}')"
    
    # Add query-specific optimizations
    if query_analysis.get('requires_recency', False):
        # Prioritize recent documents for time-sensitive queries
        if filter_clause:
            filter_clause += " AND publication_date > current_date - interval 90 days"
        else:
            filter_clause = "WHERE publication_date > current_date - interval 90 days"
    
    # Implement hybrid search for fact-seeking queries
    if query_analysis.get('fact_seeking', False):
        order_clause = f"""
        ORDER BY 
          semantic_score * 0.7 + 
          bm25(chunk_text, '{query}') * 0.3 
        DESC LIMIT 10
        """
    else:
        order_clause = "ORDER BY semantic_score DESC LIMIT 10"
    
    # Execute retrieval query
    full_query = f"{base_query} {filter_clause} {order_clause}"
    results = spark.sql(full_query.format(str(query_embedding)[1:-1]))
    
    return results.collect()

Critical Area 2: Model Selection and Evaluation

Selecting the appropriate models and implementing effective evaluation metrics represents a significant portion of the exam.

Key Concepts Review:

Model selection criteria extend beyond simply choosing the largest available model. Consider domain relevance, where models trained on domain-specific content often outperform larger general models for specialized tasks. Context window requirements depend on typical document and query lengths in your application. Cost-performance trade-offs balance inference speed, quality, and operational costs.

Comprehensive evaluation frameworks address multiple quality dimensions. Automated metrics include retrieval metrics (precision, recall, relevance scores), response quality metrics (factual accuracy, coherence, helpfulness), and operational metrics (latency, throughput, token usage). Human evaluation provides critical assessment of subjective aspects that automated metrics cannot fully capture.

MLflow integration enables systematic tracking of experiments and models. This includes logging evaluation metrics, tracking model parameters, storing artifacts, and managing model versions through their lifecycle.

Practice Exercise: Evaluation Framework Implementation

Design an evaluation framework for a RAG application that prioritizes factual accuracy and response quality.

def evaluate_rag_system(test_queries, reference_answers, rag_system):
    """
    Comprehensive evaluation of a RAG system
    
    Parameters:
    - test_queries: List of test questions
    - reference_answers: List of expert-provided answers
    - rag_system: The RAG system to evaluate
    
    Returns:
    - Dictionary of evaluation metrics
    """
    results = {
        "retrieval_metrics": {},
        "response_metrics": {},
        "operational_metrics": {}
    }
    
    all_metrics = []
    
    for i, (query, reference) in enumerate(zip(test_queries, reference_answers)):
        # Track run in MLflow
        with mlflow.start_run(run_name=f"eval_query_{i}"):
            # Log query
            mlflow.log_text(query, "query.txt")
            mlflow.log_text(reference, "reference.txt")
            
            # Track retrieval performance
            start_time = time.time()
            retrieved_docs = rag_system.retrieve(query)
            retrieval_time = time.time() - start_time
            
            # Evaluate retrieval quality
            retrieval_metrics = {
                "num_docs_retrieved": len(retrieved_docs),
                "retrieval_time_seconds": retrieval_time
            }
            
            # Generate response
            start_time = time.time()
            response = rag_system.generate_response(query, retrieved_docs)
            generation_time = time.time() - start_time
            
            # Calculate response metrics
            response_metrics = {
                "factual_accuracy": evaluate_factual_accuracy(response, reference, retrieved_docs),
                "coherence": evaluate_coherence(response),
                "semantic_similarity": calculate_semantic_similarity(response, reference),
                "generation_time_seconds": generation_time
            }
            
            # Calculate operational metrics
            operational_metrics = {
                "total_time_seconds": retrieval_time + generation_time,
                "prompt_tokens": count_tokens(query) + sum(count_tokens(doc["text"]) for doc in retrieved_docs),
                "completion_tokens": count_tokens(response)
            }
            
            # Log all metrics to MLflow
            for metrics_dict in [retrieval_metrics, response_metrics, operational_metrics]:
                for name, value in metrics_dict.items():
                    mlflow.log_metric(name, value)
            
            # Store results for this query
            query_results = {
                "query": query,
                "reference": reference,
                "response": response,
                "retrieval_metrics": retrieval_metrics,
                "response_metrics": response_metrics,
                "operational_metrics": operational_metrics
            }
            all_metrics.append(query_results)
    
    # Calculate aggregate metrics
    for metric_type in ["retrieval_metrics", "response_metrics", "operational_metrics"]:
        for metric_name in all_metrics[0][metric_type].keys():
            metric_values = [result[metric_type][metric_name] for result in all_metrics]
            results[metric_type][f"avg_{metric_name}"] = sum(metric_values) / len(metric_values)
    
    return results, all_metrics

Critical Area 3: RAG Application Security and Governance

Security and governance considerations represent a significant portion of the exam and are essential for deploying production applications.

Key Concepts Review:

Input and output filtering provide essential guardrails against misuse and harmful content. Input filtering prevents problematic queries by detecting intent, filtering prohibited topics, and validating input parameters. Output filtering prevents the generation of harmful, inaccurate, or sensitive content through content moderation, fact verification, and PII detection.

Data privacy protections include several critical mechanisms. PII detection and masking identify and remove sensitive information from both queries and responses. Access controls limit who can use the system and what data they can access. Audit logging maintains records of all system interactions for compliance and security purposes.

Regulatory compliance requirements vary by industry and region. HIPAA compliance for healthcare applications requires strict data handling practices. Financial regulations impose requirements on advice and reporting. Intellectual property considerations affect data usage and attribution practices.

Practice Exercise: Implementing Security Guardrails

Implement comprehensive security guardrails for a RAG application that handles sensitive customer information.

def implement_security_guardrails(query, retrieved_documents, generated_response):
    """
    Apply comprehensive security guardrails to RAG pipeline
    
    Parameters:
    - query: Original user query
    - retrieved_documents: Documents retrieved for context
    - generated_response: The LLM-generated response
    
    Returns:
    - Dictionary with security status and filtered response
    """
    security_result = {
        "original_query": query,
        "security_status": "approved",
        "security_checks": {},
        "filtered_response": generated_response
    }
    
    # 1. Input validation
    input_check = validate_input(query)
    security_result["security_checks"]["input_validation"] = input_check
    
    if not input_check["passed"]:
        security_result["security_status"] = "rejected"
        security_result["filtered_response"] = "I'm sorry, but I cannot process this request."
        return security_result
    
    # 2. PII detection in retrieved documents
    pii_check = detect_pii_in_documents(retrieved_documents)
    security_result["security_checks"]["pii_detection"] = pii_check
    
    if pii_check["pii_detected"]:
        # Mask PII in retrieved documents
        masked_documents = mask_pii_in_documents(retrieved_documents, pii_check["detected_entities"])
        # Regenerate response with masked documents
        # In a real implementation, this would call the LLM again
        security_result["filtered_response"] = "Response with masked PII would be generated here"
    
    # 3. Output content moderation
    content_check = moderate_content(generated_response)
    security_result["security_checks"]["content_moderation"] = content_check
    
    if not content_check["passed"]:
        security_result["security_status"] = "filtered"
        security_result["filtered_response"] = "I'm sorry, but I cannot provide that information."
        return security_result
    
    # 4. Fact verification
    fact_check = verify_facts(generated_response, retrieved_documents)
    security_result["security_checks"]["fact_verification"] = fact_check
    
    if not fact_check["passed"]:
        security_result["security_status"] = "modified"
        security_result["filtered_response"] = fact_check["corrected_response"]
    
    # 5. Logging for audit
    log_interaction({
        "timestamp": datetime.now().isoformat(),
        "query": query,
        "response": security_result["filtered_response"],
        "security_checks": security_result["security_checks"],
        "security_status": security_result["security_status"]
    })
    
    return security_result

Exam Format Review

The Databricks Generative AI Engineer Associate exam consists of 45 questions to be completed in 90 minutes. Questions include both multiple-choice (single answer) and multiple-selection (multiple answers) formats. The exam covers six main sections: Designing Applications, Data Preparation, Application Development, Assembling and Deploying Applications, Governance, and Evaluation and Monitoring.


Time Management Strategies

Effective time management ensures you can complete all questions while giving appropriate consideration to complex items.

Two-Pass Approach:

The first pass involves answering straightforward questions immediately while flagging complex questions for later review. This ensures you capture all “easy points” before tackling more difficult items. Allow approximately 60 minutes for this pass.

The second pass focuses exclusively on the flagged questions, giving them more thorough consideration. Allocate the remaining 30 minutes for this pass.

Question-Type Strategies:

For multiple-choice questions (single answer), use the elimination method to remove clearly incorrect options before selecting from remaining candidates.

For multiple-selection questions, first identify the clearly correct and clearly incorrect options. Then evaluate remaining options individually based on their technical merit.

Avoiding Common Pitfalls:

Avoid overthinking by focusing on the question’s core technical requirement rather than looking for tricks. Additionally, manage time effectively by setting internal checkpoints (15 questions completed per 30 minutes), and prioritize answering all questions even if some must be educated guesses.


Content Review Strategies

While comprehensive review of all material is not feasible on the final day, focused review can yield significant benefits.

High-Value Topics:

Vector Search configuration requires careful attention to indexing parameters, optimization hints, and query structures. This topic frequently appears on the exam and has practical implementation implications.

RAG architecture components include document processing, embedding generation, retrieval systems, and response generation. Understanding the function and integration of these components is essential for many exam questions.

Evaluation metrics for both retrieval and response quality help assess system performance. Familiarize yourself with precision, recall, semantic similarity, and factual accuracy metrics.

Concept Integration:

The exam often presents scenarios requiring the integration of multiple concepts. Practice identifying which technologies and approaches would be most appropriate for specific business scenarios. Consider factors such as data characteristics, user requirements, and operational constraints when selecting solutions.


Practice Scenario Exercises

Scenario 1: Customer Support Knowledge Base

A company needs to create a customer support system that answers product-related questions based on user manuals, support tickets, and knowledge base articles.

For this scenario, identify:

  1. The most appropriate chunking strategy for these document types
  2. The optimal embedding model selection criteria
  3. The most important evaluation metrics for this application

Scenario 2: Legal Contract Analysis

A legal firm wants to build a system for analyzing contracts to identify non-standard clauses and potential risks.

For this scenario, describe:

  1. The appropriate chain components for this multi-stage analysis
  2. The security and compliance requirements
  3. The monitoring approach for ensuring accurate results

Exam Instructions

  1. Set aside 90 minutes of uninterrupted time
  2. Answer all 45 questions
  3. Use the two-pass approach discussed earlier
  4. Review answers after completion

Post-Exam Activities

After completing the practice exam:

  1. Review your answers against the provided solutions
  2. Identify any remaining knowledge gaps
  3. Create concise notes on these areas for final review
  4. Rest adequately before the actual exam

Final Exam Preparation Checklist

Before your exam:

  1. Verify your understanding of key concepts in each exam section
  2. Review your notes on previously identified weak areas
  3. Ensure you’re familiar with the Databricks-specific implementations
  4. Prepare your testing environment according to exam requirements
  5. Get adequate rest the night before the exam

Day of Exam Guidelines

On exam day:

  1. Ensure your testing environment meets all requirements
  2. Have necessary identification ready
  3. Complete system checks well before your scheduled time
  4. Apply the time management and question approach strategies we’ve practiced
  5. Read each question carefully, focusing on keywords

Conclusion and Final Thoughts

The Databricks Certified Generative AI Engineer Associate exam evaluates your understanding of designing, implementing, and evaluating generative AI applications using Databricks tools. Success requires both technical knowledge and effective exam strategies.

Remember that the certification represents your ability to implement practical solutions using Databricks’ generative AI capabilities. Focus on the application of concepts rather than memorization of facts.