非结构化检索 AI 代理工具

重要说明

此功能目前以公共预览版提供。

本文介绍如何使用马赛克 AI 代理框架为非结构化数据检索创建 AI 代理工具。 非结构化检索器使代理能够使用矢量搜索索引查询非结构化数据源,例如文档库。

若要详细了解代理工具,请参阅 AI 代理工具

使用 AI Bridge 在本地开发矢量搜索检索器工具

开始开发 Databricks 矢量搜索检索器工具的最简单方法是本地。 使用 Databricks AI Bridge 包(如 databricks-langchaindatabricks-openai)快速向代理添加检索功能,并使用查询参数进行试验。 此方法可在初始开发期间实现快速迭代。

本地工具准备就绪后,可以直接将其生产为代理代码的一部分,或将其迁移到 Unity 目录函数,该函数可提供更好的可发现性和治理性,但存在某些限制。 参见 具有 Unity Catalog 功能的矢量搜索检索器工具

LangChain/LangGraph

下面的代码样例展示了一个检索工具,并将其本地绑定到LLM,以便你可以通过与代理聊天来测试其工具调用行为。

安装最新版本的 databricks-langchain,其中包括 Databricks AI Bridge。

%pip install --upgrade databricks-langchain

注释

初始化 VectorSearchRetrieverTool 时,对于具有自托管嵌入的增量同步索引和直接矢量访问索引,需要 text_columnembedding 参数。 请参阅用于提供嵌入的选项

from databricks_langchain import VectorSearchRetrieverTool, ChatDatabricks

# Initialize the retriever tool
vs_tool = VectorSearchRetrieverTool(index_name="catalog.schema.my_index_name")

# Run a query against the vector search index locally for testing
vs_tool.invoke("Databricks Agent Framework?")

# Bind the retriever tool to your Langchain LLM of choice
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-1-70b-instruct")
llm_with_tools = llm.bind_tools([vs_tool])

# Chat with your LLM to test the tool calling functionality
llm_with_tools.invoke("Based on the Databricks documentation, what is Databricks Agent Framework?")

若要自定义工具调用,请将其他参数传递给 VectorSearchRetrieverTool

from databricks_langchain import VectorSearchRetrieverTool

vs_tool = VectorSearchRetrieverTool(
  index_name, # Index name in the format 'catalog.schema.index'
  num_results, # Max number of documents to return
  columns, # List of columns to include in the search
  filters, # Filters to apply to the query
  query_type, # Query type ("ANN" or "HYBRID").
  tool_name, # Used by the LLM to understand the purpose of the tool
  tool_description, # Used by the LLM to understand the purpose of the tool
  text_column, # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
  embedding # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

OpenAI

以下代码原型是矢量搜索检索器工具,并将其与 OpenAI 的 GPT 模型集成。

有关 OpenAI 对工具的建议的详细信息,请参阅 OpenAI 函数调用文档

安装最新版本的 databricks-openai,其中包括 Databricks AI Bridge。

%pip install --upgrade databricks-openai

注释

初始化 VectorSearchRetrieverTool 时,对于具有自托管嵌入的增量同步索引和直接矢量访问索引,需要 text_columnembedding 参数。 请参阅用于提供嵌入的选项

from databricks_openai import VectorSearchRetrieverTool
from openai import OpenAI
import json

# Initialize OpenAI client
client = OpenAI(api_key=<your_API_key>)

# Call model with VectorSearchRetrieverTool defined
dbvs_tool = VectorSearchRetrieverTool(index_name="catalog.schema.my_index_name")
messages = [
  {"role": "system", "content": "You are a helpful assistant."},
  {
    "role": "user",
    "content": "Using the Databricks documentation, answer what is Spark?"
  }
]
first_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

# Execute function code and parse the model's response and handle function calls.
tool_call = first_response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = dbvs_tool.execute(query=args["query"])  # For self-managed embeddings, optionally pass in openai_client=client

# Supply model with results – so it can incorporate them into its final response.
messages.append(first_response.choices[0].message)
messages.append({
  "role": "tool",
  "tool_call_id": tool_call.id,
  "content": json.dumps(result)
})
second_response = client.chat.completions.create(
  model="gpt-4o",
  messages=messages,
  tools=[dbvs_tool.tool]
)

若要自定义工具调用,请将其他参数传递给 VectorSearchRetrieverTool

from databricks_openai import VectorSearchRetrieverTool

vs_tool = VectorSearchRetrieverTool(
    index_name, # Index name in the format 'catalog.schema.index'
    num_results, # Max number of documents to return
    columns, # List of columns to include in the search
    filters, # Filters to apply to the query
    query_type, # Query type ("ANN" or "HYBRID").
    tool_name, # Used by the LLM to understand the purpose of the tool
    tool_description, # Used by the LLM to understand the purpose of the tool
    text_column, # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
    embedding_model_name # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)

使用 Unity Catalog 功能的 矢量搜索检索工具

以下示例使用 Unity 目录函数创建检索器工具,以便从 马赛克 AI 矢量搜索索引查询数据。

Unity 目录函数 databricks_docs_vector_search 查询包含 Databricks 文档的假设矢量搜索索引。 此函数包装 Databricks SQL 函数 vector_search(),并将其输出与 MLflow 检索器架构对齐。 通过使用 page_contentmetadata 别名。

注释

若要符合 MLflow 检索器架构,必须使用 SQL 映射函数(而不是顶级输出键)将任何其他元数据列添加到 metadata 列。

在笔记本或 SQL 编辑器中运行以下代码以创建函数:

CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  chunked_text as page_content,
  map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => 'catalog.schema.databricks_docs_index',
    query => query,
    num_results => 5
  )

要在 AI 代理中使用此检索工具,请用 UCFunctionToolkit将其封装。 这将通过 MLflow 实现自动追踪。

MLflow 跟踪捕获生成式 AI 应用程序的详细执行信息。 它记录每个步骤的输入、输出和元数据,帮助你调试问题和分析性能。

使用 UCFunctionToolkit时,如果检索器的输出符合 MLflow 检索器架构,则检索器会在 MLflow 日志中自动生成 RETRIEVER 跨度类型。 请参阅 MLflow 跟踪架构

有关 UCFunctionToolkit 的详细信息,请参阅 Unity 目录文档

from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

toolkit = UCFunctionToolkit(
    function_names=[
        "main.default.databricks_docs_vector_search"
    ]
)
tools = toolkit.tools

此检索器工具具有以下注意事项:

  • SQL 客户端可能会限制返回的最大行数或字节数。 若要防止数据截断,应截断 UDF 返回的列值。 例如,可以使用 substring(chunked_text, 0, 8192) 来减小大型内容列的大小,并避免在执行过程中行截断。
  • 由于此工具是 vector_search() 函数的包装器,因此它受到与 vector_search() 函数相同的限制。 请参阅限制

如果此示例不适合你的用例,请改用自定义代理代码创建矢量搜索检索器工具。

使用代理代码的矢量搜索检索器 (PyFunc)

以下示例在代理代码中为 PyFunc 风格的代理创建矢量搜索检索器。

此示例使用 databricks-vectorsearch 创建一个基本检索器,该检索器使用筛选器 执行矢量搜索相似性搜索。 它使用 MLflow 修饰器启用代理跟踪

注释

为了符合 MLflow 检索器架构,检索器函数应返回 文档 类型,并使用 Document 类中的 metadata 字段向返回的文档添加其他属性,例如 like doc_urisimilarity_score.

在代理模块或代理笔记本中使用以下代码。

import mlflow
import json

from mlflow.entities import Document
from typing import List, Dict, Any
from dataclasses import asdict
from databricks.vector_search.client import VectorSearchClient

class VectorSearchRetriever:
    """
    Class using Databricks Vector Search to retrieve relevant documents.
    """
    def __init__(self):
        self.vector_search_client = VectorSearchClient(disable_notice=True)
        # TODO: Replace this with the list of column names to return in the result when querying Vector Search
        self.columns = ["chunk_id", "text_column", "doc_uri"]
        self.vector_search_index = self.vector_search_client.get_index(
            index_name="catalog.schema.chunked_docs_index"
        )
        mlflow.models.set_retriever_schema(
            name="vector_search",
            primary_key="chunk_id",
            text_column="text_column",
            doc_uri="doc_uri"
        )

    @mlflow.trace(span_type="RETRIEVER", name="vector_search")
    def __call__(
        self,
        query: str,
        filters: Dict[Any, Any] = None,
        score_threshold = None
    ) -> List[Document]:
        """
        Performs vector search to retrieve relevant chunks.
        Args:
            query: Search query.
            filters: Optional filters to apply to the search. Filters must follow the Databricks Vector Search filter spec
            score_threshold: Score threshold to use for the query.

        Returns:
            List of retrieved Documents.
        """

        results = self.vector_search_index.similarity_search(
            query_text=query,
            columns=self.columns,
            filters=filters,
            num_results=5,
            query_type="ann"
        )

        documents = self.convert_vector_search_to_documents(
            results, score_threshold
        )
        return [asdict(doc) for doc in documents]

    @mlflow.trace(span_type="PARSER")
    def convert_vector_search_to_documents(
        self, vs_results, score_threshold
    ) -> List[Document]:

        docs = []
        column_names = [column["name"] for column in vs_results.get("manifest", {}).get("columns", [])]
        result_row_count = vs_results.get("result", {}).get("row_count", 0)

        if result_row_count > 0:
            for item in vs_results["result"]["data_array"]:
                metadata = {}
                score = item[-1]

                if score >= score_threshold:
                    metadata["similarity_score"] = score
                    for i, field in enumerate(item[:-1]):
                        metadata[column_names[i]] = field

                    page_content = metadata.pop("text_column", None)

                    if page_content:
                        doc = Document(
                            page_content=page_content,
                            metadata=metadata
                        )
                        docs.append(doc)

        return docs

若要运行检索器,请运行以下 Python 代码。 可以选择在请求中包含 矢量搜索筛选器 来筛选结果。

retriever = VectorSearchRetriever()
query = "What is Databricks?"
filters={"text_column LIKE": "Databricks"},
results = retriever(query, filters=filters, score_threshold=0.1)

设置检索器架构

若要确保检索器在下游应用程序中被正确跟踪并呈现,请在定义代理时调用 mlflow.models.set_retriever_schema。 使用 set_retriever_schema 将返回表中的列名映射到 MLflow 的预期字段,例如 primary_keytext_columndoc_uri

# Define the retriever's schema by providing your column names
mlflow.models.set_retriever_schema(
    name="vector_search",
    primary_key="chunk_id",
    text_column="text_column",
    doc_uri="doc_uri"
    # other_columns=["column1", "column2"],
)

还可以通过在 other_columns 字段中提供列名列表,在检索器的架构中指定其他列。

如果有多个检索器,则可以为每个检索器架构使用唯一名称来定义多个架构。

代理创建期间设置的检索器架构会影响下游应用程序和工作流,例如评审应用和评估集。 具体而言,doc_uri 列充当检索器返回的文档的主要标识符。

  • 审阅应用 显示 doc_uri,以帮助审阅者评估响应和跟踪文档来源。 请参阅评审应用 UI
  • 评估集 使用 doc_uri 将检索器结果与预定义评估数据集进行比较,以确定检索器的召回率和精度。 请参阅评估集

跟踪检索器

MLflow 跟踪通过捕获有关代理执行的详细信息来增加可观测性。 它提供了一种方法来记录与请求的每个中间步骤关联的输入、输出和元数据,使你能够快速查明 bug 和意外行为的来源。

此示例使用 @mlflow.trace 修饰器 为检索器和分析器创建跟踪。 有关设置跟踪方法的其他选项,请参阅代理的 MLflow 跟踪

修饰器创建一个 范围,该范围在调用函数时开始,并在函数返回时结束。 MLflow 会自动记录函数的输入和输出以及引发的任何异常。

注释

LangChain、LlamaIndex 和 OpenAI 库用户可使用 MLflow 自动日志记录,而不是使用修饰器手动定义跟踪。 请参阅使用自动日志记录将跟踪添加到代理

...
@mlflow.trace(span_type="RETRIEVER", name="vector_search")
def __call__(self, query: str) -> List[Document]:
  ...

要确保代理评估和 AI 操场等下游应用程序正确呈现检索器跟踪,请确保修饰器满足以下要求:

  • 使用 span_type="RETRIEVER" 并确保函数返回 List[Document] 对象。 请参阅检索器跨度
  • 跟踪名称和retriever_schema名称必须匹配才能正确配置跟踪。

筛选矢量搜索结果

可以使用矢量搜索筛选器将搜索范围限制为数据子集。

VectorSearchRetriever 中的 filters 参数使用 Databricks 矢量搜索筛选器规范定义筛选条件。

filters = {"text_column LIKE": "Databricks"}

__call__ 方法中,筛选器字典直接传递到 similarity_search 函数:

results = self.vector_search_index.similarity_search(
    query_text=query,
    columns=self.columns,
    filters=filters,
    num_results=5,
    query_type="ann"
)

初始筛选后,score_threshold 参数通过设置最小相似性分数来提供其他筛选。

if score >= score_threshold:
    metadata["similarity_score"] = score

最终结果包括满足 filtersscore_threshold 条件的文档。

后续步骤

创建 Unity 目录函数代理工具后,将该工具添加到 AI 代理。 请参阅将 Unity Catalog 工具添加到代理