共用方式為


適用于 Python 的 Azure EventHubs 檢查點存放區用戶端程式庫 - 1.1.4 版

使用儲存體 Blob

Azure EventHubs 檢查點存放區用於儲存檢查點,同時處理來自Azure 事件中樞的事件。 此檢查點存放區套件可作為 的 EventHubConsumerClient 外掛程式套件。 它會使用 Azure 儲存體 Blob 作為持續性存放區,以維護檢查點和分割區擁有權資訊。

請注意,這是非同步程式庫,適用于 Azure EventHubs Checkpoint Store 用戶端程式庫的同步版本,請參閱 azure-eventhub-checkpointstoreblob

| 原始程式碼套件 (PyPi) | API 參考檔 | Azure Eventhubs 檔 | Azure 儲存體檔

開始使用

Prerequisites

  • Python 3.6 或更新版本。

  • Microsoft Azure 訂用帳戶:若要使用 Azure 服務,包括Azure 事件中樞,您需要訂用帳戶。 如果您沒有現有的 Azure 帳戶,您可以在 建立帳戶時註冊免費試用或使用 MSDN 訂閱者權益。

  • 具有事件中樞的事件中樞命名空間:若要與Azure 事件中樞互動,您也必須提供命名空間和事件中樞。 如果您不熟悉建立 Azure 資源,建議您遵循使用 Azure 入口網站 建立事件中樞的逐步指南。 您也可以在該處找到使用 Azure CLI、Azure PowerShell或 Azure Resource Manager (ARM) 範本來建立事件中樞的詳細指示。

  • Azure 儲存體帳戶:您必須擁有 Azure 儲存體帳戶,並建立Azure Blob 儲存體區塊容器,以使用 Blob 來儲存檢查點資料。 您可以遵循建立 Azure 區塊 Blob 儲存體帳戶的指南。

安裝套件

$ pip install azure-eventhub-checkpointstoreblob-aio

重要概念

檢查點檢查

設立檢查點 是讀取器在資料分割事件序列中標記或認可其位置的程序。 設立檢查點是取用者的責任,這項作業會在取用者群組內依照資料分割來進行。 此責任表示在每個取用者群組內,每個資料分割讀取器必須追蹤自己目前在事件串流中的位置,而當它們認為資料串流完成時,可以通知服務。 如果讀取器與資料分割中斷連線,當它重新連線時,會從該取用者群組中該資料分割最後一個讀取器先前提交的檢查點開始讀取。 當讀取器連線時,它會將此位移傳遞給事件中樞,以指定要開始讀取的位置。 如此一來,檢查點能成為下游應用程式標記事件「完成」的方法,也能針對在不同機器上執行的讀取器提供容錯移轉發生時的恢復功能。 從這個檢查點處理程序中指定較低的位移,即可回到較舊的資料。 透過這項機制,檢查點可提供容錯移轉彈性,並支援事件串流重播。

&位移序號

這兩個位移 & 序號都會參考資料分割內事件的位置。 您可以將它們視為用戶端資料指標。 位移是事件的位元組編號。 位移/序號可讓事件取用者 (讀取器) 指定事件資料流程中要開始讀取事件的點。 您可以指定時間戳記,以便只在指定的時間戳記之後才會排入佇列的事件。 取用者需負責將自己的位移值儲存在事件中樞服務之外。 在資料分割內,每個事件都包含位移、序號和排入佇列時的時間戳記。

範例

建立 EventHubConsumerClient

建立 EventHubConsumerClient 最簡單的方法是使用連接字串。

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

如需建立 EventHubConsumerClient 的其他方式,請參閱 EventHubs 程式庫 以取得詳細資料。

使用 來取用 BlobCheckpointStore 事件來執行檢查點

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

搭配不同版本的 Azure 儲存體服務 API 使用 BlobCheckpointStore

有些環境有不同的 Azure 儲存體服務 API 版本。 BlobCheckpointStore 根據預設,會使用儲存體服務 API 版本 2019-07-07。 若要針對不同的版本使用它,請在建立 BlobCheckpointStore 物件時指定 api_version

疑難排解

一般

啟用記錄有助於進行疑難排解。

記錄

  • 啟用 azure.eventhub.extensions.checkpointstoreblobaio 記錄器從程式庫收集追蹤。
  • 啟用 azure.eventhub 記錄器從主要 azure-eventhub 程式庫收集追蹤。
  • 啟用 azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage 記錄器從 Azure 儲存體 Blob 程式庫收集追蹤。
  • 啟用 uamqp 記錄器從基礎 uAMQP 程式庫收集追蹤。
  • 在建立用戶端時設定 logging_enable=True ,以啟用 AMQP 畫面層級追蹤。

下一步

更多的程式碼範例

開始使用 我們的 EventHubs 檢查點存放區非同步範例

文件

這裡 提供參考檔。

提供意見反應

如果您遇到任何錯誤或有建議,請在專案的 [ 問題 ] 區段中提出問題。

參與

此專案歡迎參與和提供建議。 大部分的參與都要求您同意「參與者授權合約 (CLA)」,宣告您有權且確實授與我們使用投稿的權利。 如需詳細資料,請前往 https://cla.microsoft.com

當您提交提取要求時,CLA Bot 會自動判斷您是否需要提供 CLA,並適當地裝飾 PR (例如標籤、註解)。 請遵循 bot 提供的指示。 您只需要使用我們的 CLA 在所有存放庫上執行此動作一次。

此專案採用 Microsoft Open Source Code of Conduct (Microsoft 開放原始碼管理辦法)。 如需詳細資訊,請參閱管理辦法常見問題集,如有任何其他問題或意見請連絡 opencode@microsoft.com

曝光數