Unstructured retrieval AI agent tools

Important

This feature is in Public Preview.

This article shows how to create AI agent tools for unstructured data retrieval using the Mosaic AI Agent Framework. Unstructured retrievers enable agents to query unstructured data sources, such as a document corpus, using vector search indexes.

To learn more about agent tools, see Create AI agent tools.

Vector Search retriever tool with Unity Catalog functions

The following example creates a Unity Catalog function for a retriever tool that can query data from a Mosaic AI Vector Search index.

The Unity Catalog function databricks_docs_vector_search queries a hypothetical Vector Search index containing Databricks documentation. It wraps the Databricks SQL function vector_search() and uses the aliases page_content and metadata to match its output to the MLflow retriever schema.

Note

To conform to the MLflow retriever schema, any additional metadata columns must be added to the metadata column using the SQL map function, rather than as top-level output keys.

Run the following code in a notebook or SQL editor.

CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  chunked_text as page_content,
  map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => 'catalog.schema.databricks_docs_index',
    query => query,
    num_results => 5
  )

This retriever tool has the following caveats:

  • MLflow traces this Unity Catalog function as a TOOL span type rather than a RETRIEVER span type. As a result, downstream Agent Framework applications like the agent review app and AI Playground will not show retriever-specific details such as links to chunks. For more information on span types, see MLflow Tracing Schema.
  • SQL clients may limit the maximum number of rows or bytes returned. To prevent data truncation, you should truncate column values returned by the UDF. For example, you could use substring(chunked_text, 0, 8192) to reduce the size of large content columns and avoid row truncation during execution.
  • Since this tool is a wrapper for the vector_search() function, it is subject to the same limitations as the vector_search() function. See Limitations.

If this example is unsuitable for your use case, create a vector search retriever tool using custom agent code instead.

Vector Search retriever with agent code (PyFunc)

The following example creates a Vector Search retriever for a PyFunc-flavored agent in agent code.

This example uses databricks-vectorsearch to create a basic retriever that performs a Vector Search similarity search with filters. It uses MLflow decorators to enable agent tracing.

Note

To conform to the MLflow retriever schema, the retriever function should return a Document type and use the metadata field in the Document class to add additional attributes to the returned document, like like doc_uri and similarity_score.

Use the following code in the agent module or agent notebook.

import mlflow
import json

from mlflow.entities import Document
from typing import List, Dict, Any
from dataclasses import asdict
from databricks.vector_search.client import VectorSearchClient

class VectorSearchRetriever:
    """
    Class using Databricks Vector Search to retrieve relevant documents.
    """
    def __init__(self):
        self.vector_search_client = VectorSearchClient(disable_notice=True)
        # TODO: Replace this with the list of column names to return in the result when querying Vector Search
        self.columns = ["chunk_id", "text_column", "doc_uri"]
        self.vector_search_index = self.vector_search_client.get_index(
            index_name="catalog.schema.chunked_docs_index"
        )
        mlflow.models.set_retriever_schema(
            name="vector_search",
            primary_key="chunk_id",
            text_column="text_column",
            doc_uri="doc_uri"
        )

    @mlflow.trace(span_type="RETRIEVER", name="vector_search")
    def __call__(
        self,
        query: str,
        filters: Dict[Any, Any] = None,
        score_threshold = None
    ) -> List[Document]:
        """
        Performs vector search to retrieve relevant chunks.
        Args:
            query: Search query.
            filters: Optional filters to apply to the search. Filters must follow the Databricks Vector Search filter spec
            score_threshold: Score threshold to use for the query.

        Returns:
            List of retrieved Documents.
        """

        results = self.vector_search_index.similarity_search(
            query_text=query,
            columns=self.columns,
            filters=filters,
            num_results=5,
            query_type="ann"
        )

        documents = self.convert_vector_search_to_documents(
            results, score_threshold
        )
        return [asdict(doc) for doc in documents]

    @mlflow.trace(span_type="PARSER")
    def convert_vector_search_to_documents(
        self, vs_results, score_threshold
    ) -> List[Document]:

        docs = []
        column_names = [column["name"] for column in vs_results.get("manifest", {}).get("columns", [])]
        result_row_count = vs_results.get("result", {}).get("row_count", 0)

        if result_row_count > 0:
            for item in vs_results["result"]["data_array"]:
                metadata = {}
                score = item[-1]

                if score >= score_threshold:
                    metadata["similarity_score"] = score
                    for i, field in enumerate(item[:-1]):
                        metadata[column_names[i]] = field

                    page_content = metadata.pop("text_column", None)

                    if page_content:
                        doc = Document(
                            page_content=page_content,
                            metadata=metadata
                        )
                        docs.append(doc)

        return docs

To run the retriever, run the following Python code. You can optionally include Vector Search filters in the request to filter results.

retriever = VectorSearchRetriever()
query = "What is Databricks?"
filters={"text_column LIKE": "Databricks"},
results = retriever(query, filters=filters, score_threshold=0.1)

Set retriever schema

To ensure that retrievers are traced properly and render correctly in downstream applications, call mlflow.models.set_retriever_schema when you define your agent. Use set_retriever_schema to map the column names in the returned table to MLflow’s expected fields such as primary_key, text_column, and doc_uri.

# Define the retriever's schema by providing your column names
mlflow.models.set_retriever_schema(
    name="vector_search",
    primary_key="chunk_id",
    text_column="text_column",
    doc_uri="doc_uri"
    # other_columns=["column1", "column2"],
)

You can also specify additional columns in your retriever’s schema by providing a list of column names with the other_columns field.

If you have multiple retrievers, you can define multiple schemas by using unique names for each retriever schema.

The retriever schema set during agent creation affects downstream applications and workflows, such as the review app and evaluation sets. Specifically, the doc_uri column serves as the primary identifier for documents returned by the retriever.

  • The review app displays the doc_uri to help reviewers assess responses and trace document origins. See Review App UI.
  • Evaluation sets use doc_uri to compare retriever results against predefined evaluation datasets to determine the retriever’s recall and precision. See Evaluation sets.

Trace the retriever

MLflow tracing adds observability by capturing detailed information about your agent’s execution. It provides a way to record the inputs, outputs, and metadata associated with each intermediate step of a request, enabling you to pinpoint the source of bugs and unexpected behaviors quickly.

This example uses the @mlflow.trace decorator to create a trace for the retriever and parser. For other options for setting up trace methods, see MLflow Tracing for agents.

The decorator creates a span that starts when the function is invoked and ends when it returns. MLflow automatically records the function’s input and output and any exceptions raised.

Note

LangChain, LlamaIndex, and OpenAI library users can use MLflow auto logging instead of manually defining traces with the decorator. See Use autologging to add traces to your agents.

...
@mlflow.trace(span_type="RETRIEVER", name="vector_search")
def __call__(self, query: str) -> List[Document]:
  ...

To ensure downstream applications such as Agent Evaluation and the AI Playground render the retriever trace correctly, make sure the decorator meets the following requirements:

  • Use span_type="RETRIEVER" and ensure the function returns List[Document] object. See Retriever spans.
  • The trace name and the retriever_schema name must match to configure the trace correctly.

Filter Vector Search results

You can limit the search scope to a subset of data using a Vector Search filter.

The filters parameter in VectorSearchRetriever defines the filter conditions using the Databricks Vector Search filter specification.

filters = {"text_column LIKE": "Databricks"}

Inside the __call__ method, the filters dictionary is passed directly to the similarity_search function:

results = self.vector_search_index.similarity_search(
    query_text=query,
    columns=self.columns,
    filters=filters,
    num_results=5,
    query_type="ann"
)

After initial filtering, the score_threshold parameter provides additional filtering by setting a minimum similarity score.

if score >= score_threshold:
    metadata["similarity_score"] = score

The final result includes documents that meet the filters and score_threshold conditions.

Next steps

After you create a Unity Catalog function agent tool, add the tool to an AI agent. See Add Unity Catalog tools to agents.