非结构化检索 AI 代理工具
重要说明
此功能目前以公共预览版提供。
本文介绍如何使用马赛克 AI 代理框架为非结构化数据检索创建 AI 代理工具。 非结构化检索器使代理能够使用矢量搜索索引查询非结构化数据源,例如文档库。
若要详细了解代理工具,请参阅 AI 代理工具。
使用 AI Bridge 在本地开发矢量搜索检索器工具
开始开发 Databricks 矢量搜索检索器工具的最简单方法是本地。 使用 Databricks AI Bridge 包(如 databricks-langchain
和 databricks-openai
)快速向代理添加检索功能,并使用查询参数进行试验。 此方法可在初始开发期间实现快速迭代。
本地工具准备就绪后,可以直接将其生产为代理代码的一部分,或将其迁移到 Unity 目录函数,该函数可提供更好的可发现性和治理性,但存在某些限制。 参见 具有 Unity Catalog 功能的矢量搜索检索器工具。
LangChain/LangGraph
下面的代码样例展示了一个检索工具,并将其本地绑定到LLM,以便你可以通过与代理聊天来测试其工具调用行为。
安装最新版本的 databricks-langchain
,其中包括 Databricks AI Bridge。
%pip install --upgrade databricks-langchain
注释
初始化 VectorSearchRetrieverTool
时,对于具有自托管嵌入的增量同步索引和直接矢量访问索引,需要 text_column
和 embedding
参数。 请参阅用于提供嵌入的选项。
from databricks_langchain import VectorSearchRetrieverTool, ChatDatabricks
# Initialize the retriever tool
vs_tool = VectorSearchRetrieverTool(index_name="catalog.schema.my_index_name")
# Run a query against the vector search index locally for testing
vs_tool.invoke("Databricks Agent Framework?")
# Bind the retriever tool to your Langchain LLM of choice
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-1-70b-instruct")
llm_with_tools = llm.bind_tools([vs_tool])
# Chat with your LLM to test the tool calling functionality
llm_with_tools.invoke("Based on the Databricks documentation, what is Databricks Agent Framework?")
若要自定义工具调用,请将其他参数传递给 VectorSearchRetrieverTool
:
from databricks_langchain import VectorSearchRetrieverTool
vs_tool = VectorSearchRetrieverTool(
index_name, # Index name in the format 'catalog.schema.index'
num_results, # Max number of documents to return
columns, # List of columns to include in the search
filters, # Filters to apply to the query
query_type, # Query type ("ANN" or "HYBRID").
tool_name, # Used by the LLM to understand the purpose of the tool
tool_description, # Used by the LLM to understand the purpose of the tool
text_column, # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
embedding # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)
OpenAI
以下代码原型是矢量搜索检索器工具,并将其与 OpenAI 的 GPT 模型集成。
有关 OpenAI 对工具的建议的详细信息,请参阅 OpenAI 函数调用文档。
安装最新版本的 databricks-openai
,其中包括 Databricks AI Bridge。
%pip install --upgrade databricks-openai
注释
初始化 VectorSearchRetrieverTool
时,对于具有自托管嵌入的增量同步索引和直接矢量访问索引,需要 text_column
和 embedding
参数。 请参阅用于提供嵌入的选项。
from databricks_openai import VectorSearchRetrieverTool
from openai import OpenAI
import json
# Initialize OpenAI client
client = OpenAI(api_key=<your_API_key>)
# Call model with VectorSearchRetrieverTool defined
dbvs_tool = VectorSearchRetrieverTool(index_name="catalog.schema.my_index_name")
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": "Using the Databricks documentation, answer what is Spark?"
}
]
first_response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=[dbvs_tool.tool]
)
# Execute function code and parse the model's response and handle function calls.
tool_call = first_response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = dbvs_tool.execute(query=args["query"]) # For self-managed embeddings, optionally pass in openai_client=client
# Supply model with results – so it can incorporate them into its final response.
messages.append(first_response.choices[0].message)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
})
second_response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=[dbvs_tool.tool]
)
若要自定义工具调用,请将其他参数传递给 VectorSearchRetrieverTool
:
from databricks_openai import VectorSearchRetrieverTool
vs_tool = VectorSearchRetrieverTool(
index_name, # Index name in the format 'catalog.schema.index'
num_results, # Max number of documents to return
columns, # List of columns to include in the search
filters, # Filters to apply to the query
query_type, # Query type ("ANN" or "HYBRID").
tool_name, # Used by the LLM to understand the purpose of the tool
tool_description, # Used by the LLM to understand the purpose of the tool
text_column, # Specify text column for embeddings. Required for direct-access index or delta-sync index with self-managed embeddings.
embedding_model_name # The embedding model. Required for direct-access index or delta-sync index with self-managed embeddings.
)
使用 Unity Catalog 功能的 矢量搜索检索工具
以下示例使用 Unity 目录函数创建检索器工具,以便从 马赛克 AI 矢量搜索索引查询数据。
Unity 目录函数 databricks_docs_vector_search
查询包含 Databricks 文档的假设矢量搜索索引。 此函数包装 Databricks SQL 函数 vector_search(),并将其输出与 MLflow 检索器架构对齐。 通过使用 page_content
和 metadata
别名。
注释
若要符合 MLflow 检索器架构,必须使用 SQL 映射函数(而不是顶级输出键)将任何其他元数据列添加到 metadata
列。
在笔记本或 SQL 编辑器中运行以下代码以创建函数:
CREATE OR REPLACE FUNCTION main.default.databricks_docs_vector_search (
-- The agent uses this comment to determine how to generate the query string parameter.
query STRING
COMMENT 'The query string for searching Databricks documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on Databricks documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
chunked_text as page_content,
map('doc_uri', url, 'chunk_id', chunk_id) as metadata
FROM
vector_search(
-- Specify your Vector Search index name here
index => 'catalog.schema.databricks_docs_index',
query => query,
num_results => 5
)
要在 AI 代理中使用此检索工具,请用 UCFunctionToolkit
将其封装。 这将通过 MLflow 实现自动追踪。
MLflow 跟踪捕获生成式 AI 应用程序的详细执行信息。 它记录每个步骤的输入、输出和元数据,帮助你调试问题和分析性能。
使用 UCFunctionToolkit
时,如果检索器的输出符合 MLflow 检索器架构,则检索器会在 MLflow 日志中自动生成 RETRIEVER
跨度类型。 请参阅 MLflow 跟踪架构。
有关 UCFunctionToolkit
的详细信息,请参阅 Unity 目录文档。
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit
toolkit = UCFunctionToolkit(
function_names=[
"main.default.databricks_docs_vector_search"
]
)
tools = toolkit.tools
此检索器工具具有以下注意事项:
- SQL 客户端可能会限制返回的最大行数或字节数。 若要防止数据截断,应截断 UDF 返回的列值。 例如,可以使用
substring(chunked_text, 0, 8192)
来减小大型内容列的大小,并避免在执行过程中行截断。 - 由于此工具是
vector_search()
函数的包装器,因此它受到与vector_search()
函数相同的限制。 请参阅限制。
如果此示例不适合你的用例,请改用自定义代理代码创建矢量搜索检索器工具。
使用代理代码的矢量搜索检索器 (PyFunc)
以下示例在代理代码中为 PyFunc 风格的代理创建矢量搜索检索器。
此示例使用 databricks-vectorsearch 创建一个基本检索器,该检索器使用筛选器 执行矢量搜索相似性搜索。 它使用 MLflow 修饰器启用代理跟踪。
注释
为了符合 MLflow 检索器架构,检索器函数应返回 文档 类型,并使用 Document 类中的 metadata
字段向返回的文档添加其他属性,例如 like doc_uri
和 similarity_score.
在代理模块或代理笔记本中使用以下代码。
import mlflow
import json
from mlflow.entities import Document
from typing import List, Dict, Any
from dataclasses import asdict
from databricks.vector_search.client import VectorSearchClient
class VectorSearchRetriever:
"""
Class using Databricks Vector Search to retrieve relevant documents.
"""
def __init__(self):
self.vector_search_client = VectorSearchClient(disable_notice=True)
# TODO: Replace this with the list of column names to return in the result when querying Vector Search
self.columns = ["chunk_id", "text_column", "doc_uri"]
self.vector_search_index = self.vector_search_client.get_index(
index_name="catalog.schema.chunked_docs_index"
)
mlflow.models.set_retriever_schema(
name="vector_search",
primary_key="chunk_id",
text_column="text_column",
doc_uri="doc_uri"
)
@mlflow.trace(span_type="RETRIEVER", name="vector_search")
def __call__(
self,
query: str,
filters: Dict[Any, Any] = None,
score_threshold = None
) -> List[Document]:
"""
Performs vector search to retrieve relevant chunks.
Args:
query: Search query.
filters: Optional filters to apply to the search. Filters must follow the Databricks Vector Search filter spec
score_threshold: Score threshold to use for the query.
Returns:
List of retrieved Documents.
"""
results = self.vector_search_index.similarity_search(
query_text=query,
columns=self.columns,
filters=filters,
num_results=5,
query_type="ann"
)
documents = self.convert_vector_search_to_documents(
results, score_threshold
)
return [asdict(doc) for doc in documents]
@mlflow.trace(span_type="PARSER")
def convert_vector_search_to_documents(
self, vs_results, score_threshold
) -> List[Document]:
docs = []
column_names = [column["name"] for column in vs_results.get("manifest", {}).get("columns", [])]
result_row_count = vs_results.get("result", {}).get("row_count", 0)
if result_row_count > 0:
for item in vs_results["result"]["data_array"]:
metadata = {}
score = item[-1]
if score >= score_threshold:
metadata["similarity_score"] = score
for i, field in enumerate(item[:-1]):
metadata[column_names[i]] = field
page_content = metadata.pop("text_column", None)
if page_content:
doc = Document(
page_content=page_content,
metadata=metadata
)
docs.append(doc)
return docs
若要运行检索器,请运行以下 Python 代码。 可以选择在请求中包含 矢量搜索筛选器 来筛选结果。
retriever = VectorSearchRetriever()
query = "What is Databricks?"
filters={"text_column LIKE": "Databricks"},
results = retriever(query, filters=filters, score_threshold=0.1)
设置检索器架构
若要确保检索器在下游应用程序中被正确跟踪并呈现,请在定义代理时调用 mlflow.models.set_retriever_schema。 使用 set_retriever_schema
将返回表中的列名映射到 MLflow 的预期字段,例如 primary_key
、text_column
和 doc_uri
。
# Define the retriever's schema by providing your column names
mlflow.models.set_retriever_schema(
name="vector_search",
primary_key="chunk_id",
text_column="text_column",
doc_uri="doc_uri"
# other_columns=["column1", "column2"],
)
还可以通过在 other_columns
字段中提供列名列表,在检索器的架构中指定其他列。
如果有多个检索器,则可以为每个检索器架构使用唯一名称来定义多个架构。
代理创建期间设置的检索器架构会影响下游应用程序和工作流,例如评审应用和评估集。 具体而言,doc_uri
列充当检索器返回的文档的主要标识符。
- 审阅应用 显示
doc_uri
,以帮助审阅者评估响应和跟踪文档来源。 请参阅评审应用 UI。 - 评估集 使用
doc_uri
将检索器结果与预定义评估数据集进行比较,以确定检索器的召回率和精度。 请参阅评估集。
跟踪检索器
MLflow 跟踪通过捕获有关代理执行的详细信息来增加可观测性。 它提供了一种方法来记录与请求的每个中间步骤关联的输入、输出和元数据,使你能够快速查明 bug 和意外行为的来源。
此示例使用 @mlflow.trace 修饰器 为检索器和分析器创建跟踪。 有关设置跟踪方法的其他选项,请参阅代理的 MLflow 跟踪。
修饰器创建一个 范围,该范围在调用函数时开始,并在函数返回时结束。 MLflow 会自动记录函数的输入和输出以及引发的任何异常。
注释
LangChain、LlamaIndex 和 OpenAI 库用户可使用 MLflow 自动日志记录,而不是使用修饰器手动定义跟踪。 请参阅使用自动日志记录将跟踪添加到代理。
...
@mlflow.trace(span_type="RETRIEVER", name="vector_search")
def __call__(self, query: str) -> List[Document]:
...
要确保代理评估和 AI 操场等下游应用程序正确呈现检索器跟踪,请确保修饰器满足以下要求:
- 使用
span_type="RETRIEVER"
并确保函数返回List[Document]
对象。 请参阅检索器跨度。 - 跟踪名称和retriever_schema名称必须匹配才能正确配置跟踪。
筛选矢量搜索结果
可以使用矢量搜索筛选器将搜索范围限制为数据子集。
VectorSearchRetriever
中的 filters
参数使用 Databricks 矢量搜索筛选器规范定义筛选条件。
filters = {"text_column LIKE": "Databricks"}
在 __call__
方法中,筛选器字典直接传递到 similarity_search
函数:
results = self.vector_search_index.similarity_search(
query_text=query,
columns=self.columns,
filters=filters,
num_results=5,
query_type="ann"
)
初始筛选后,score_threshold
参数通过设置最小相似性分数来提供其他筛选。
if score >= score_threshold:
metadata["similarity_score"] = score
最终结果包括满足 filters
和 score_threshold
条件的文档。
后续步骤
创建 Unity 目录函数代理工具后,将该工具添加到 AI 代理。 请参阅将 Unity Catalog 工具添加到代理。