你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Python 的Azure 事件网格客户端库 - 版本 4.17.0b1
Azure 事件网格是一种完全托管的智能事件路由服务,允许通过发布-订阅模型来一致地使用事件。
源代码 | 包 (PyPI) | 包 (Conda) | API 参考文档 | 产品文档 | 样品 | 更改日志
免责声明
这是 Azure EventGrid 的 EventGridClient
Beta 版本,该版本与 GA EventGridPublisherClient
一起提供。 EventGridClient
支持 publish_cloud_events
、 receive_cloud_events
、 acknowledge_cloud_events
、 release_cloud_events
、 reject_cloud_events
和 renew_cloud_event_locks
操作。 有关详细信息,请参阅 示例 。
入门
先决条件
- 使用此包需要 Python 3.7 或更高版本。
- 必须具有 Azure 订阅 和事件网格主题资源才能使用此包。 按照此分步教程注册事件网格资源提供程序并使用Azure 门户创建事件网格主题。 有一个 类似的教程 使用 Azure CLI。
安装包
使用 pip 安装适用于 Python 的 Azure 事件网格 客户端库:
pip install azure-eventgrid
如果使用 Azure CLI,请将 和 <resource-name>
替换为<resource-group-name>
自己的唯一名称。
创建事件网格主题
az eventgrid topic --create --location <location> --resource-group <resource-group-name> --name <resource-name>
创建事件网格域
az eventgrid domain --create --location <location> --resource-group <resource-group-name> --name <resource-name>
验证客户端
若要与事件网格服务交互,需要创建客户端的实例。 需要 终结点 和 凭据 才能实例化客户端对象。
使用 Azure Active Directory (AAD)
Azure 事件网格提供与 Azure Active Directory (Azure AD) 的集成,以便对请求进行基于标识的身份验证。 借助 Azure AD,可以使用基于角色的访问控制 (RBAC) 向用户、组或应用程序授予对Azure 事件网格资源的访问权限。
若要使用 TokenCredential
将事件发送到主题或域,应为经过身份验证的标识分配“EventGrid 数据发送者”角色。
借助该 azure-identity
包,可以在开发和生产环境中无缝授权请求。 若要详细了解 Azure Active Directory,请参阅 azure-identity
自述文件。
例如,可以使用 DefaultAzureCredential
构造使用 Azure Active Directory 进行身份验证的客户端:
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
default_az_credential = DefaultAzureCredential()
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
client = EventGridPublisherClient(endpoint, default_az_credential)
查找终结点
可以在Azure 门户的事件网格主题资源中找到主题终结点。 这将如下所示: "https://<event-grid-topic-name>.<topic-location>.eventgrid.azure.net/api/events"
使用 AzureKeyCredential 创建客户端
若要使用 Access 密钥作为 credential
参数,请将密钥作为字符串传递到 AzureKeyCredential 实例中。
注意: 可以在 Azure 门户中的事件网格主题资源的“访问密钥”菜单中找到访问密钥。 还可以通过 azure CLI 或
azure-mgmt-eventgrid
库获取它们。 可 在此处找到获取访问密钥的指南。
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
topic_key = os.environ["EVENTGRID_TOPIC_KEY"]
endpoint = os.environ["EVENTGRID_TOPIC_ENDPOINT"]
credential_key = AzureKeyCredential(topic_key)
client = EventGridPublisherClient(endpoint, credential_key)
注意: 还可以使用
AzureSasCredential
通过 SAS 签名对客户端进行身份验证。 此处提供了 (async_version) 的示例。
注意: 方法
generate_sas
可用于生成共享访问签名。 此处显示了一个演示这一点的示例。
关键概念
主题
主题是 EventGrid 服务中用于发送事件的通道。 主题接受的事件架构在主题创建时决定。 如果将架构类型的事件发送到需要其他架构类型的主题,则会引发错误。
Domain
事件 域 是用于大量与同一应用程序相关的事件网格主题的管理工具。 它们可以将事件发布到数千个主题。 域还提供对每个主题的授权和身份验证控制。 有关详细信息,请访问 事件域概述。
创建事件域时,会提供此域的发布终结点。 此过程类似于创建事件网格主题。 唯一的区别在于,在发布到域时,必须在想要将事件传递到的域中指定主题。
事件架构
事件是完全描述系统中发生的事件的最小信息量。 创建自定义主题或域时,必须指定发布事件时将使用的架构。
事件网格支持多个架构来编码事件。
事件网格架构
虽然可以将主题配置为使用 自定义架构,但使用已定义的事件网格架构更为常见。 请参阅 此处的规范和要求。
CloudEvents v1.0 架构
另一个选项是使用 CloudEvents v1.0 架构。 CloudEvents 是一个云原生计算基础项目,它生成用于以通用方式描述事件数据的规范。 可 在此处找到 CloudEvents 的服务摘要。
EventGridPublisherClient
EventGridPublisherClient
提供将事件数据发送到客户端初始化期间指定的主题主机名的操作。
无论主题或域配置为使用哪种架构, EventGridPublisherClient
都将用于向主题或域发布事件。 send
使用 方法发布事件。
允许发送以下事件格式:
强类型 EventGridEvents 的列表或单个实例。
序列化的 EventGridEvent 对象的听写表示形式。
强类型 CloudEvents 的列表或单个实例。
序列化 CloudEvent 对象的听写表示形式。
任何自定义架构的听写表示形式。
请查看 示例 以获取详细示例。
注意: 发布之前,请务必了解主题是否支持 CloudEvents 或 EventGridEvents。 如果发送到不支持所发送事件的架构的主题,则 send () 将引发异常。
系统主题
事件网格中的系统主题表示 Azure 服务(例如 Azure 存储或Azure 事件中心)发布的一个或多个事件。 例如,系统主题可以表示所有 Blob 事件,或者仅表示为特定存储帐户发布的 Blob 创建和 Blob 删除事件。
发布到 Azure 事件网格 的系统事件的各种事件类型的名称在 中azure.eventgrid.SystemEventNames
可用。
有关可识别的系统主题的完整列表,请访问 系统主题。
有关事件网格中的关键概念的详细信息,请参阅Azure 事件网格中的概念。
具有 Azure Arc 的 Kubernetes 上的事件网格
具有 Azure Arc 的 Kubernetes 上的事件网格是一种产品/服务,可让你在自己的 Kubernetes 群集中运行事件网格。 此功能是通过使用已启用 Azure Arc 的 Kubernetes 启用的。 通过已启用 Azure Arc 的 Kubernetes,可将受支持的 Kubernetes 群集连接到 Azure。 连接后,可在其上安装事件网格。 你可在此处了解相关详细信息。
支持 CNCF 云事件
从 v4.7.0 开始,此包还支持从 https://pypi.org/project/cloudevents/发布 CNCF 云事件。 可以将 CloudEvent 对象从此库传递到 send
API。
from cloudevents.http import CloudEvent
event = CloudEvent(...)
client.send(event)
示例
以下部分提供了几个代码片段,涵盖了一些最常见的事件网格任务,包括:
发送事件网格事件
此示例发布事件网格事件。
import os
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient, EventGridEvent
key = os.environ["EG_ACCESS_KEY"]
endpoint = os.environ["EG_TOPIC_HOSTNAME"]
event = EventGridEvent(
data={"team": "azure-sdk"},
subject="Door1",
event_type="Azure.Sdk.Demo",
data_version="2.0"
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
发送云事件
此示例发布一个 Cloud 事件。
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
发送多个事件
向主题或域发送多个事件时,可以批量发送事件。 此示例使用 send 方法发送 CloudEvent 列表。
警告: 一次发送多个事件的列表时,循环访问并发送每个事件不会获得最佳性能。 为了获得最佳性能,强烈建议发送事件列表。
import os
from azure.core.credentials import AzureKeyCredential
from azure.core.messaging import CloudEvent
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CLOUD_ACCESS_KEY"]
endpoint = os.environ["CLOUD_TOPIC_HOSTNAME"]
event0 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team": "azure-sdk"}
)
event1 = CloudEvent(
type="Azure.Sdk.Sample",
source="https://egsample.dev/sampleevent",
data={"team2": "azure-eventgrid"}
)
events = [event0, event1]
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(events)
将事件作为字典发送
除了强类型化对象之外,还可以使用相应序列化模型的 dict 表示形式发布 CloudEvent () 或 EventGridEvent () 。
使用类似于 dict 的表示形式发送到具有自定义架构的主题,如下所示。
import os
import uuid
import datetime as dt
from msrest.serialization import UTC
from azure.core.credentials import AzureKeyCredential
from azure.eventgrid import EventGridPublisherClient
key = os.environ["CUSTOM_SCHEMA_ACCESS_KEY"]
endpoint = os.environ["CUSTOM_SCHEMA_TOPIC_HOSTNAME"]
event = custom_schema_event = {
"customSubject": "sample",
"customEventType": "sample.event",
"customDataVersion": "2.0",
"customId": uuid.uuid4(),
"customEventTime": dt.datetime.now(UTC()).isoformat(),
"customData": "sample data"
}
credential = AzureKeyCredential(key)
client = EventGridPublisherClient(endpoint, credential)
client.send(event)
从存储队列使用
此示例使用从存储队列接收的消息,并将其反序列化为 CloudEvent 对象。
from azure.core.messaging import CloudEvent
from azure.storage.queue import QueueServiceClient, BinaryBase64DecodePolicy
import os
import json
# all types of CloudEvents below produce same DeserializedEvent
connection_str = os.environ['STORAGE_QUEUE_CONN_STR']
queue_name = os.environ['STORAGE_QUEUE_NAME']
with QueueServiceClient.from_connection_string(connection_str) as qsc:
payload = qsc.get_queue_client(
queue=queue_name,
message_decode_policy=BinaryBase64DecodePolicy()
).peek_messages()
## deserialize payload into a list of typed Events
events = [CloudEvent.from_dict(json.loads(msg.content)) for msg in payload]
从 servicebus 使用
此示例使用从 ServiceBus 接收的有效负载消息,并将其反序列化为 EventGridEvent 对象。
from azure.eventgrid import EventGridEvent
from azure.servicebus import ServiceBusClient
import os
import json
# all types of EventGridEvents below produce same DeserializedEvent
connection_str = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
with ServiceBusClient.from_connection_string(connection_str) as sb_client:
payload = sb_client.get_queue_receiver(queue_name).receive_messages()
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_dict(json.loads(next(msg.body).decode('utf-8'))) for msg in payload]
使用 EventGrid 进行分布式跟踪
可以像往常一样将 OpenTelemetry for Python 与 EventGrid 配合使用,因为它与 azure 核心跟踪集成兼容。
下面是使用 OpenTelemetry 跟踪发送 CloudEvent 的示例。
首先,将 OpenTelemetry 设置为 EventGrid 的已启用跟踪插件。
from azure.core.settings import settings
from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
settings.tracing_implementation = OpenTelemetrySpan
从此处定期打开遥测使用情况。 有关详细信息 ,请参阅 OpenTelemetry 。
此示例使用简单的控制台导出程序导出跟踪。 此处可以使用任何导出程序,包括 azure-monitor-opentelemetry-exporter
、 jaeger
zipkin
等。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # this requires opentelemetry >= 1.0.0
# Simple console exporter
exporter = ConsoleSpanExporter()
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(exporter)
)
tracer
设置 和 exporter
后,请按照以下示例开始收集跟踪,同时使用 send
方法从 EventGridPublisherClient
发送 CloudEvent 对象。
import os
from azure.eventgrid import EventGridPublisherClient
from azure.core.messaging import CloudEvent
from azure.core.credentials import AzureKeyCredential
hostname = os.environ['CLOUD_TOPIC_HOSTNAME']
key = AzureKeyCredential(os.environ['CLOUD_ACCESS_KEY'])
cloud_event = CloudEvent(
source = 'demo',
type = 'sdk.demo',
data = {'test': 'hello'},
)
with tracer.start_as_current_span(name="MyApplication"):
client = EventGridPublisherClient(hostname, key)
client.send(cloud_event)
故障排除
- 启用
azure.eventgrid
记录器以从库收集跟踪。
常规
事件网格客户端库将引发 Azure Core 中定义的异常。
日志记录
此库使用标准 日志记录 库进行日志记录。 有关 HTTP 会话 (URL、标头等的基本信息,) 在 INFO 级别记录。
可选配置
可选关键字 (keyword) 参数可以在客户端和按操作级别传入。 azure-core 参考文档 介绍了重试、日志记录、传输协议等的可用配置。
后续步骤
以下部分提供了几个代码片段,说明了事件网格 Python API 中使用的常见模式。
更多示例代码
这些代码示例演示了使用 Azure 事件网格 客户端库的常见支持者方案操作。
生成共享访问签名: sample_generate_sas.py
对客户端进行身份验证: sample_authentication.py (async_version)
使用 SAS 将事件发布到主题: sample_publish_events_to_a_topic_using_sas_credential_async.py (async_version)
将事件网格事件发布到主题: sample_publish_eg_events_to_a_topic.py (async_version)
将 EventGrid 事件发布到域主题: sample_publish_eg_events_to_a_domain_topic.py (async_version)
发布云事件: sample_publish_events_using_cloud_events_1.0_schema.py (async_version)
发布自定义架构: sample_publish_custom_schema_to_a_topic.py (async_version)
以下示例介绍了发布和使用 dict
EventGridEvents 和 CloudEvents 的表示形式。
将 EventGridEvent 发布为 dict,如表示形式: sample_publish_eg_event_using_dict.py (async_version)
将 CloudEvent 发布为类似于表示形式的 dict: sample_publish_cloud_event_using_dict.py (async_version)
使用原始 cloudevent 数据的自定义有效负载: sample_consume_custom_payload.py
在此处可以找到更多示例。
其他文档
有关Azure 事件网格的更多文档,请参阅有关 docs.microsoft.com 的事件网格文档。
贡献
本项目欢迎贡献和建议。 大多数贡献要求你同意贡献者许可协议 (CLA),并声明你有权(并且确实有权)授予我们使用你的贡献的权利。 有关详细信息,请访问 cla.microsoft.com。
提交拉取请求时,CLA 机器人将自动确定你是否需要提供 CLA,并相应地修饰 PR(例如标签、注释)。 直接按机器人提供的说明操作。 只需使用 CLA 对所有存储库执行一次这样的操作。
此项目采用了 Microsoft 开放源代码行为准则。 有关详细信息,请参阅行为准则常见问题解答,或如果有任何其他问题或意见,请与 联系。