Python 插件
适用于:✅Microsoft Fabric✅Azure 数据资源管理器
Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。 插件的运行时托管在沙盒中,运行在群集的节点上。
语法
T |
evaluate
[hint.distribution
=
(single
| per_node
)] [hint.remote
=
(auto
| local
)] python(
output_schema,
script [,
script_parameters] [,
external_artifacts][,
spill_to_disk])
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
output_schema | string |
✔️ | type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof( ColumnName: ColumnType[, ...]) 。例如,typeof(col1:string, col2:long) 。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long) 。 |
脚本 | string |
✔️ | 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示。 |
script_parameters | dynamic |
一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量。 |
|
hint.distribution | string |
一个提示,用于在多个群集节点上分布插件的执行。 默认值为 single 。 single 意味着脚本的单个实例将针对整个查询数据运行。 per_node 意味着,如果在分发 Python 块之前执行查询,则脚本的一个实例将在每个节点上对其包含的数据运行。 |
|
hint.remote | string |
此提示仅适用于跨群集查询。 默认值为 auto 。 auto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。 |
|
external_artifacts | dynamic |
名称和 URL 对的属性包,用于可从云存储访问的项目。 有关详细信息,请参阅使用外部项目。 | |
spill_to_disk | bool |
指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true 。 |
保留 Python 变量
以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。
df
:作为pandas
数据帧的输入表格数据(上述T
的值)。kargs
:作为 Python 字典的 script_parameters 参数的值。result
:由 Python 脚本创建的pandas
数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。
启用插件
默认禁用该插件。 在开始之前,请查看先决条件列表。 若要启用插件并选择 Python 映像的版本,请参阅在群集上启用语言扩展。
Python 沙盒映像
要将 Python 映像的版本更改为不同的托管映像或自定义映像,请参阅更改群集上的 Python 语言扩展映像。
若要查看不同 Python 映像的包列表,请参阅 Python 包参考。
注意
- 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
- 有一些包可能与运行插件的沙盒所实施的限制不兼容。
使用来自查询和更新策略的引入
- 在以下查询中使用插件:
- 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。
示例
range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double), // Output schema: append a new fx column to original table
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4) // dictionary of parameters
)
| render linechart
print "This is an example for using 'external_artifacts'"
| evaluate python(
typeof(File:string, Size:string), ```if 1:
import os
result = pd.DataFrame(columns=['File','Size'])
sizes = []
path = '.\\\\Temp'
files = os.listdir(path)
result['File']=files
for file in files:
sizes.append(os.path.getsize(path + '\\\\' + file))
result['Size'] = sizes
```,
external_artifacts =
dynamic({"this_is_my_first_file":"https://kustoscriptsamples.blob.core.windows.net/samples/R/sample_script.r",
"this_is_a_script":"https://kustoscriptsamples.blob.core.windows.net/samples/python/sample_script.py"})
)
文件 | 大小 |
---|---|
this_is_a_script | 120 |
this_is_my_first_file | 105 |
性能提示
- 将插件的输入数据集减少到所需的最小量(列/行)。
- 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
- 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
- 如果脚本中的逻辑可分发,请使用
hint.distribution = per_node
。- 还可以使用分区运算符对输入数据集进行分区。
- 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。
使用提示
若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(Jupyter、Visual Studio Code、PyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:
```
python code
```
使用
externaldata
运算符获取存储在外部位置(例如 Azure Blob 存储)中的脚本内容。
示例
let script =
externaldata(script:string)
[h'https://kustoscriptsamples.blob.core.windows.net/samples/python/sample_script.py']
with(format = raw);
range x from 1 to 360 step 1
| evaluate python(
typeof(*, fx:double),
toscalar(script),
bag_pack('gain', 100, 'cycles', 4))
| render linechart
使用外部项目
云存储中的外部项目可用于脚本并在运行时使用。
外部项目属性引用的 URL 必须是:
注意
使用托管标识对外部项目进行身份验证时,必须在群集级别托管标识策略上定义 SandboxArtifacts
用法。
这些项目可供脚本从本地临时目录 .\Temp
使用。 属性包中提供的名称用作本地文件名。 请参阅示例。
有关引用外部包的信息,请参阅安装 Python 插件的包。
刷新外部项目缓存
查询中使用的外部项目文件缓存在群集上。 如果对云存储中的文件进行更新,并要求立即与群集同步,可以使用 .clear cluster cache external-artifacts 命令。 此命令会清除缓存的文件,并确保后续查询使用最新版本的项目运行。
安装 Python 插件的包
在大多数用例下,你可能更愿意创建自定义映像。
出于以下原因,你可能想自己安装包:
- 你没有创建自定义映像的权限。
- 包是专用的。
- 你更想要创建用于测试的临时包安装,而不想创建自定义映像的开销。
安装包,如下所示:
先决条件
创建 blob 容器来承载包,最好与群集位于同一个位置。 例如,
https://artifactswestus.blob.core.windows.net/python
假设群集位于美国西部。更改群集的标注策略以允许访问该位置。
此更改需要 AllDatabasesAdmin 权限。
例如,若要启用对位于
https://artifactswestus.blob.core.windows.net/python
中的 blob 的访问,请运行以下命令:
.alter-merge cluster policy callout @'[ { "CalloutType": "sandbox_artifacts", "CalloutUriRegex": "artifactswestus\\.blob\\.core\\.windows\\.net/python/","CanCall": true } ]'
安装包
对于 PyPi 或其他通道中的公共包,请下载包及其依赖项。
- 从本地 Windows Python 环境中的 cmd 窗口运行:
pip wheel [-w download-dir] package-name.
创建一个 zip 文件,其中包含所需的包及其依赖项。
- 对于专用包:压缩包的文件夹及其依赖项的文件夹。
- 对于公共包,对在上一步中下载的文件进行压缩。
注意
- 确保下载与 Python 引擎和沙盒运行时(当前在 Windows 上为 3.6.5 版本)平台兼容的包
- 请确保压缩
.whl
文件本身,而不是其父文件夹。 - 对于基本沙箱映像中已经存在的具有相同版本的包,你可以跳过
.whl
文件。
将压缩文件上传到项目位置中的 blob(从步骤 1 开始)。
调用
python
插件。- 使用一个包含名称和对 zip 文件(blob 的 URL,包括一个 SAS 令牌)的引用的属性包指定
external_artifacts
参数。 - 在内联 python 代码中,从
sandbox_utils
导入Zipackage
,并使用 zip 文件的名称调用其install()
方法。
- 使用一个包含名称和对 zip 文件(blob 的 URL,包括一个 SAS 令牌)的引用的属性包指定
示例
安装生成伪数据的 Faker 包。
range ID from 1 to 3 step 1
| extend Name=''
| evaluate python(typeof(*), ```if 1:
from sandbox_utils import Zipackage
Zipackage.install("Faker.zip")
from faker import Faker
fake = Faker()
result = df
for i in range(df.shape[0]):
result.loc[i, "Name"] = fake.name()
```,
external_artifacts=bag_pack('faker.zip', 'https://artifacts.blob.core.windows.net/Faker.zip?*** REPLACE WITH YOUR SAS TOKEN ***'))
ID | 名称 |
---|---|
1 | Gary Tapia |
2 | Emma Evans |
3 | Ashley Bowen |
相关内容
有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库。
Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。
语法
T |
evaluate
[hint.distribution
=
(single
| per_node
)] [hint.remote
=
(auto
| local
)] python(
output_schema,
script [,
script_parameters] [,
spill_to_disk])
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
output_schema | string |
✔️ | type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof( ColumnName: ColumnType[, ...]) 。例如,typeof(col1:string, col2:long) 。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long) 。 |
脚本 | string |
✔️ | 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示。 |
script_parameters | dynamic |
一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量。 |
|
hint.distribution | string |
一个提示,用于在多个群集节点上分布插件的执行。 默认值为 single 。 single 意味着脚本的单个实例将针对整个查询数据运行。 per_node 意味着,如果在分发 Python 块之前执行查询,则脚本的一个实例将在每个节点上对其包含的数据运行。 |
|
hint.remote | string |
此提示仅适用于跨群集查询。 默认值为 auto 。 auto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。 |
|
spill_to_disk | bool |
指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true 。 |
保留 Python 变量
以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。
df
:作为pandas
数据帧的输入表格数据(上述T
的值)。kargs
:作为 Python 字典的 script_parameters 参数的值。result
:由 Python 脚本创建的pandas
数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。
启用插件
默认禁用该插件。 在开始之前,请在 KQL 数据库中启用 Python 插件。
Python 沙盒映像
若要查看不同 Python 映像的包列表,请参阅 Python 包参考。
注意
- 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
- 有一些包可能与运行插件的沙盒所实施的限制不兼容。
使用来自查询和更新策略的引入
- 在以下查询中使用插件:
- 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。
示例
range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double), // Output schema: append a new fx column to original table
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4) // dictionary of parameters
)
| render linechart
性能提示
- 将插件的输入数据集减少到所需的最小量(列/行)。
- 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
- 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
- 如果脚本中的逻辑可分发,请使用
hint.distribution = per_node
。- 还可以使用分区运算符对输入数据集进行分区。
- 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。
使用提示
若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(Jupyter、Visual Studio Code、PyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:
```
python code
```
使用
externaldata
运算符获取存储在外部位置(例如 Azure Blob 存储)中的脚本内容。
示例
let script =
externaldata(script:string)
[h'https://kustoscriptsamples.blob.core.windows.net/samples/python/sample_script.py']
with(format = raw);
range x from 1 to 360 step 1
| evaluate python(
typeof(*, fx:double),
toscalar(script),
bag_pack('gain', 100, 'cycles', 4))
| render linechart
相关内容
有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库。