共用方式為


設定 Azure 事件中樞 和 Kafka 數據流端點

重要

此頁面包含使用 Kubernetes 部署指令清單來管理 Azure IoT Operations 元件的指示,其處於預覽狀態。 這項功能隨附 數個限制,不應用於生產工作負載。

請參閱 Microsoft Azure 預覽版增補使用規定,以了解適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的法律條款。

若要設定 Azure IoT 作業與 Apache Kafka 訊息代理程式之間的雙向通訊,您可以設定資料流端點。 此組態可讓您指定端點、傳輸層安全性 (TLS)、驗證和其他設定。

必要條件

Azure 事件中樞

Azure 事件中樞 與 Kafka 通訊協定相容,而且可與數據流搭配使用,但有一些限制。

建立 Azure 事件中樞 命名空間和事件中樞

首先,建立已啟用 Kafka 的 Azure 事件中樞 命名空間

接下來, 在命名空間中建立事件中樞。 每個個別事件中樞都會對應至 Kafka 主題。 您可以在相同的命名空間中建立多個事件中樞來代表多個 Kafka 主題。

將許可權指派給受控識別

若要設定 Azure 事件中樞 的數據流端點,建議您使用使用者指派或系統指派的受控識別。 這種方法是安全的,而且不需要手動管理認證。

建立 Azure 事件中樞 命名空間和事件中樞之後,您必須將角色指派給 Azure IoT Operations 受控識別,以授與傳送或接收訊息給事件中樞的許可權。

如果使用系統指派的受控識別,請在 Azure 入口網站 中移至您的 Azure IoT 作業實例,然後選取 [概觀]。 複製 Azure IoT Operations Arc 延伸模組之後 所列的延伸模組名稱。 例如, azure-iot-operations-xxxx7。 您可以使用 Azure IoT Operations Arc 延伸模組的相同名稱,找到系統指派的受控識別。

然後,移至事件中樞命名空間>訪問控制 (IAM)>新增角色指派

  1. 在 [ 角色] 索引標籤上,選取適當的角色,例如 Azure Event Hubs Data SenderAzure Event Hubs Data Receiver。 這會為受控識別提供命名空間中所有事件中樞傳送或接收訊息的必要許可權。 若要深入瞭解,請參閱使用 Microsoft Entra ID 驗證應用程式以存取事件中樞資源
  2. 在 [ 成員] 索引標籤上:
    1. 如果使用系統指派的受控識別,針對 [指派存取權],選取 [使用者、群組或服務主體 ] 選項,然後選取 [+ 選取成員 ],然後搜尋 Azure IoT Operations Arc 擴充功能的名稱。
    2. 如果使用使用者指派的受控識別,針對 [指派存取權] 選取 [受控識別 ] 選項,然後選取 [+ 選取成員 ],然後搜尋針對 雲端連線設定的使用者指派受控識別。

建立 Azure 事件中樞 的數據流端點

設定 Azure 事件中樞 命名空間和事件中樞之後,您可以為已啟用 Kafka 的 Azure 事件中樞 命名空間建立數據流端點。

  1. 在作業體驗中,選取 [數據流端點] 索引標籤

  2. 在 [建立新的數據流端點] 下,選取 [Azure 事件中樞>[新增]。

    使用作業體驗建立 Azure 事件中樞 數據流端點的螢幕快照。

  3. 輸入端點的下列設定:

    設定 描述
    名稱 數據流端點的名稱。
    Host 格式為 <NAMEPSACE>.servicebus.windows.net:9093Kafka 訊息代理程式主機名。 在事件中樞的主機設定中包含埠號碼 9093
    驗證方法 用於驗證的方法。 我們建議您選擇 [系統指派的受控識別] 或 [使用者指派的受控識別]。
  4. 選取 [ 套用 ] 以布建端點。

注意

當您建立數據流時,會設定 Kafka 主題或個別事件中樞。 Kafka 主題是數據流訊息的目的地。

使用 連接字串 向事件中樞進行驗證

重要

若要使用作業體驗入口網站來管理秘密,必須先設定 Azure 金鑰保存庫 並啟用工作負載身分識別,以使用安全設定來啟用 Azure IoT 作業。 若要深入瞭解,請參閱 在 Azure IoT 作業部署中啟用安全設定。

在 [作業體驗數據流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法>SASL]。

輸入端點的下列設定:

設定 描述
SASL 類型 選擇 Plain
同步的秘密名稱 輸入包含 連接字串 的 Kubernetes 秘密名稱。
用戶名稱參考或令牌秘密 用於SASL驗證的用戶名稱或令牌密碼參考。 請從 金鑰保存庫 清單中挑選它,或建立新的清單。 值必須是 $ConnectionString
令牌密碼的密碼參考 用於SASL驗證的密碼或令牌密碼參考。 請從 金鑰保存庫 清單中挑選它,或建立新的清單。 值的格式必須為 Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>

選取 [新增參考] 之後,如果您選取 [新建],請輸入下列設定:

設定 描述
祕密名稱 Azure 金鑰保存庫 中的秘密名稱。 挑選容易記住的名稱,以便稍後從清單中選取秘密。
祕密值 針對使用者名稱,輸入 $ConnectionString。 針對密碼,以 格式Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>輸入 連接字串。
設定啟用日期 如果開啟,則秘密變成作用中的日期。
設定到期日期 如果開啟,則為秘密到期的日期。

若要深入瞭解秘密,請參閱 在 Azure IoT Operations Preview 中建立和管理秘密。

限制

Azure 事件中樞 不支援 Kafka 支援的所有壓縮類型。 目前 Azure 事件中樞 進階和專用層僅支援 GZIP 壓縮。 使用其他壓縮類型可能會導致錯誤。

自訂 Kafka 訊息代理程式

若要為非事件中樞 Kafka 訊息代理程式設定數據流端點,請視需要設定主機、TLS、驗證和其他設定。

  1. 在作業體驗中,選取 [數據流端點] 索引標籤

  2. 在 [建立新的數據流端點] 下,選取 [自定義 Kafka Broker>新增]。

    使用作業體驗建立 Kafka 數據流端點的螢幕快照。

  3. 輸入端點的下列設定:

    設定 描述
    名稱 數據流端點的名稱。
    Host 格式為 <Kafa-broker-host>:xxxxKafka 訊息代理程式主機名。 在主機設定中包含埠號碼。
    驗證方法 用於驗證的方法。 選擇 [SASL]。
    SASL 類型 SASL 驗證的類型。 選擇 [純文本]、 [ScramSha256] 或 [ScramSha512]。 如果使用 SASL,則為必要項。
    同步的秘密名稱 秘密的名稱。 如果使用 SASL,則為必要項。
    令牌秘密的用戶名稱參考 SASL 令牌秘密中使用者名稱的參考。 如果使用 SASL,則為必要項。
  4. 選取 [ 套用 ] 以布建端點。

注意

目前,作業體驗不支援使用 Kafka 數據流端點作為來源。 您可以使用 Kubernetes 或 Bicep,建立具有來源 Kafka 數據流端點的數據流。

若要自定義端點設定,請使用下列各節以取得詳細資訊。

可用的驗證方法

下列驗證方法適用於 Kafka 訊息代理程式資料流端點。

系統指派的受控識別

設定數據流端點之前,請先將角色指派給 Azure IoT Operations 受控識別,以授與連線至 Kafka 訊息代理程式的許可權:

  1. 在 Azure 入口網站 中,移至您的 Azure IoT 作業實例,然後選取 [概觀]。
  2. 複製 Azure IoT Operations Arc 延伸模組之後 所列的延伸模組名稱。 例如, azure-iot-operations-xxxx7
  3. 移至您需要授與許可權的雲端資源。 例如,移至事件中樞命名空間>訪問控制 (IAM)>新增角色指派
  4. 在 [ 角色] 索引標籤上,選取適當的角色。
  5. 在 [成員] 索引標籤上,針對 [指派存取權],選取 [使用者、群組或服務主體] 選項,然後選取 [+ 選取成員],然後搜尋 Azure IoT Operations 受控識別。 例如, azure-iot-operations-xxxx7

然後,使用系統指派的受控識別設定來設定數據流端點。

在 [作業體驗數據流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法>系統指派的受控識別]。

此組態會建立具有默認物件的受控識別,其格式 https://<NAMESPACE>.servicebus.windows.net與事件中樞命名空間主機值相同。 不過,如果您需要覆寫默認物件,您可以將字段設定 audience 為所需的值。

作業體驗中不支援。

使用者指派的受控識別

若要使用使用者指派的受控識別進行驗證,您必須先部署已啟用安全設定的 Azure IoT 作業。 然後, 您必須為雲端連線設定使用者指派的受控識別。 若要深入瞭解,請參閱 在 Azure IoT 作業部署中啟用安全設定。

設定資料流端點之前,請先將角色指派給使用者指派的受控識別,以授與連線至 Kafka 訊息代理程式的許可權:

  1. 在 Azure 入口網站 中,移至您需要授與許可權的雲端資源。 例如,移至事件方格命名空間>訪問控制 (IAM)>新增角色指派
  2. 在 [ 角色] 索引標籤上,選取適當的角色。
  3. 在 [成員] 索引標籤上,針對 [指派存取權],選取 [受控識別] 選項,然後選取 [+ 選取成員],然後搜尋使用者指派的受控識別。

然後,使用使用者指派的受控識別設定來設定數據流端點。

在 [作業體驗數據流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法>使用者指派的受控識別]。

在這裡,範圍是受控識別的物件。 預設值與事件 https://<NAMESPACE>.servicebus.windows.net中樞命名空間主機值的格式相同。 不過,如果您需要覆寫默認物件,您可以使用 Bicep 或 Kubernetes,將範圍欄位設定為所需的值。

SASL

若要使用 SASL 進行驗證,請指定 SASL 驗證方法,並使用包含 SASL 令牌的秘密名稱來設定 SASL 類型和秘密參考。

在 [作業體驗數據流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法>SASL]。

輸入端點的下列設定:

設定 描述
SASL 類型 要使用的SASL驗證類型。 支援的類型為 PlainScramSha256ScramSha512
同步的秘密名稱 包含SASL令牌的 Kubernetes 秘密名稱。
用戶名稱參考或令牌秘密 用於SASL驗證的用戶名稱或令牌密碼參考。
令牌密碼的密碼參考 用於SASL驗證的密碼或令牌密碼參考。

支援的 SASL 類型如下:

  • Plain
  • ScramSha256
  • ScramSha512

秘密必須位於與 Kafka 數據流端點相同的命名空間中。 秘密必須具有SASL令牌作為金鑰/值組。

匿名

若要使用匿名驗證,請更新 Kafka 設定的驗證區段以使用 Anonymous 方法。

在 [作業體驗數據流端點設定] 頁面中,選取 [基本] 索引標籤,然後選擇 [驗證方法>]。

進階設定

您可以設定 Kafka 數據流端點的進階設定,例如 TLS、受信任的 CA 憑證、Kafka 傳訊設定、批處理和 CloudEvents。 您可以在資料流端點 [進階 入口網站] 索引標籤或資料流端點資源內設定這些設定。

在作業體驗中,選取數據流端點的 [ 進階 ] 索引標籤。

使用作業體驗來設定 Kafka 數據流端點進階設定的螢幕快照。

TLS 設定

TLS 模式

若要啟用或停用 Kafka 端點的 TLS,請更新 mode TLS 設定中的設定。

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用啟用 TLS 模式旁 複選框。

TLS 模式可以設定為 EnabledDisabled。 如果模式設定為 Enabled,數據流會使用與 Kafka 訊息代理程式的安全連線。 如果模式設定為 Disabled,數據流會使用 Kafka 訊息代理程式不安全的連線。

受信任的 CA 憑證

設定 Kafka 端點的受信任 CA 憑證,以建立與 Kafka 訊息代理程式的安全連線。 如果 Kafka 訊息代理程式使用自我簽署憑證或由預設不信任的自定義 CA 所簽署的憑證,此設定就很重要。

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引卷標,然後使用 [受信任的 CA 憑證設定對應 ] 字段來指定包含受信任 CA 憑證的 ConfigMap。

此 ConfigMap 應包含 PEM 格式的 CA 憑證。 ConfigMap 必須與 Kafka 數據流資源位於相同的命名空間中。 例如:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

提示

聯機到 Azure 事件中樞 時,不需要 CA 憑證,因為事件中樞服務會使用預設信任的公用 CA 所簽署的憑證。

取用者群組標識碼

取用者群組標識碼可用來識別數據流用來從 Kafka 主題讀取訊息的取用者群組。 取用者群組標識碼在 Kafka 訊息代理程式內必須是唯一的。

重要

當 Kafka 端點作為 來源使用時,需要取用者群組標識碼。 否則,數據流無法從 Kafka 主題讀取訊息,而且您收到錯誤「Kafka 類型來源端點必須已定義 consumerGroupId」。

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用 [取用者群組標識符 ] 字段來指定取用者群組標識符。

只有當端點作為來源使用時,此設定才會生效(也就是數據流是取用者)。

壓縮

壓縮欄位會啟用傳送至 Kafka 主題之訊息的壓縮。 壓縮有助於降低資料傳輸所需的網路頻寬和儲存空間。 不過,壓縮也會為流程增加一些額外負荷和延遲。 下表列出支援的壓縮類型。

Description
None 不會套用壓縮或批次處理。 如果沒有指定壓縮,則 None 是預設值。
Gzip 會套用 GZIP 壓縮和批次處理。 GZIP 是一般用途的壓縮演算法,可提供壓縮比例與速度之間的良好平衡。 目前 Azure 事件中樞 進階和專用層僅支援 GZIP 壓縮。
Snappy 會套用 Snappy 壓縮和批次處理。 Snappy 是一種快速壓縮演算法,可提供中度壓縮比例和速度。 Azure 事件中樞 不支援此壓縮模式。
Lz4 會套用 LZ4 壓縮和批次處理。 LZ4 是一種快速壓縮演算法,可提供低壓縮比例和高速。 Azure 事件中樞 不支援此壓縮模式。

若要設定壓縮:

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引卷標,然後使用 [壓縮 ] 字段來指定壓縮類型。

只有當端點作為數據流為生產者的目的地時,此設定才會生效。

批次處理

除了壓縮之外,您也可以在將訊息傳送至 Kafka 主題之前,先設定訊息的批次處理。 批次處理可讓您將多個訊息分組在一起,並將其壓縮為單一單位,以提升壓縮效率並降低網路負荷。

欄位 描述 必要
mode 可以是 EnabledDisabled。 預設值是因為 Enabled Kafka 沒有未批處理傳訊的概念。 如果設定為 Disabled,則批處理會最小化,以每次使用單一訊息建立批次。 No
latencyMs 訊息在傳送之前可以緩衝處理的最大時間間隔,以毫秒為單位。 如果達到此間隔,則所有緩衝的訊息都會以批次方式傳送,而不論其大小為何。 如果未設定,預設值為 5。 No
maxMessages 在傳送之前可以緩衝處理的訊息數目上限。 如果達到這個數位,則所有緩衝的訊息都會以批次方式傳送,無論其大小或緩衝時間長度為何。 如果未設定,預設值為 100000。 No
maxBytes 在傳送之前可以緩衝處理的位元組大小上限。 如果達到這個大小,則所有緩衝的訊息都會以批次方式傳送,而不論其緩衝的時間長度為何。 預設值為 1000000 (1 MB)。 No

例如,如果您將延遲Ms 設定為 1000、maxMessages 設定為 100,而 maxBytes 設定為 1024,則訊息會在緩衝區中有 100 則訊息傳送,或當緩衝區中有 1,024 個字節時,或自上次傳送之後經過 1,000 毫秒時,就會傳送訊息,以先傳送。

若要設定批次處理:

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用 [已啟用 批處理] 字段來啟用批處理。 使用 [批處理延遲]、[位元組上限] 和 [訊息計數] 字段來指定批處理設定。

只有當端點作為數據流為生產者的目的地時,此設定才會生效。

資料分割處理策略

分割區處理策略可控制將訊息傳送至 Kafka 主題時,如何將訊息指派給 Kafka 資料分割。 Kafka 分割區是 Kafka 主題的邏輯區段,可啟用平行處理和容錯。 Kafka 主題中的每個訊息都有一個分割區和位移,用來識別和排序訊息。

只有當端點作為數據流為生產者的目的地時,此設定才會生效。

根據預設,數據流會使用迴圈配置資源演算法,將訊息指派給隨機分割區。 不過,您可以使用不同的策略,根據某些準則將訊息指派給分割區,例如 MQTT 主題名稱或 MQTT 訊息屬性。 這可協助您達成更好的負載平衡、資料位置或訊息排序。

Description
Default 使用循環配置資源演算法,將訊息指派給隨機分割區。 如果未指定任何策略,則這是預設值。
Static 將訊息指派給衍生自數據流實例標識碼的固定分割區編號。 這表示每個數據流實例都會將訊息傳送至不同的分割區。 這有助於達到更好的負載平衡和資料位置。
Topic 使用數據流來源中的 MQTT 主題名稱作為數據分割的索引鍵。 這表示具有相同 MQTT 主題名稱的訊息會傳送至相同的分割區。 這有助於達到更好的訊息排序和資料位置。
Property 使用數據流來源的 MQTT 訊息屬性作為數據分割的索引鍵。 在 [partitionKeyProperty] 欄位中指定屬性的名稱。 這表示具有相同屬性值的訊息會傳送至相同的分割區。 這有助於根據自訂準則達成更佳的訊息排序和資料位置。

例如,如果您將分割區處理策略設定為 Property ,並將數據分割索引鍵屬性設定為 device-id,則具有相同屬性的 device-id 訊息會傳送至相同的分割區。

若要設定分割區處理策略:

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用 [數據分割處理策略 ] 字段來指定數據分割處理策略。 如果策略設定Property為 ,請使用 [資料分割索引鍵] 屬性字段來指定用於數據分割的屬性。

Kafka 通知

Kafka 通知 (acks) 可用來控制傳送至 Kafka 主題之訊息的持久性和一致性。 當產生者將訊息傳送至 Kafka 主題時,它可以向 Kafka 訊息代理程式要求不同層級的通知,以確保訊息已成功寫入主題,並跨 Kafka 叢集復寫。

只有當端點作為目的地使用時,此設定才會生效(也就是數據流是產生者)。

Description
None 數據流不會等候 Kafka 訊息代理程式的任何通知。 此設定是最快但最不持久的選項。
All 數據流會等候訊息寫入領導者數據分割和所有追蹤數據分割。 此設定是最慢但最持久的選項。 此設定也是預設選項
One 數據流會等候訊息寫入領導者數據分割,以及至少一個追蹤者分割區。
Zero 數據流會等候訊息寫入領導者數據分割,但不會等候任何來自追隨者的通知。 這比 更 One 快速,但較不耐久。

例如,如果您將 Kafka 通知設定為 All,資料流會等候訊息寫入領導者數據分割和所有追蹤數據分割,再傳送下一個訊息。

若要設定 Kafka 通知:

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用 [Kafka 通知 ] 字段來指定 Kafka 通知層級。

只有在端點作為數據流為生產者的目的地時,此設定才會生效。

複製 MQTT 屬性

根據預設,會啟用複製 MQTT 屬性設定。 這些使用者屬性包括 subject 之類的值,可儲存傳送訊息的資產名稱。

在 [作業體驗數據流端點設定] 頁面中,選取 [進階] 索引標籤,然後使用 [複製 MQTT 屬性] 字段旁的複選框來啟用或停用複製 MQTT 屬性。

下列各節說明 MQTT 屬性如何轉譯為 Kafka 用戶標頭,反之亦然,當啟用設定時。

Kafka 端點是目的地

當 Kafka 端點是數據流目的地時,所有 MQTT v5 規格定義屬性都會轉譯 Kafka 用戶標頭。 例如,將「內容類型」轉送至 Kafka 的 MQTT v5 訊息會轉譯為 Kafka 用戶標頭 "Content Type":{specifiedValue}。 類似的規則適用於下表中定義的其他內建 MQTT 屬性。

MQTT 屬性 已翻譯的行為
承載格式指標 機碼:「承載格式指標」
值:“0” (Payload is bytes) 或 “1” (Payload is UTF-8)
回應主題 索引鍵:「回應主題”
值:原始訊息的回應主題複本。
訊息到期間隔 機碼:“訊息到期間隔”
值:訊息到期前的UTF-8秒數表示法。 如需詳細資訊,請參閱 訊息到期間隔屬性
相互關聯資料: 索引鍵:「相互關聯數據」
值:從原始訊息複製相互關聯數據。 與UTF-8編碼的許多MQTT v5屬性不同,相互關聯數據可以是任意數據。
內容類型: 機碼:「內容類型”
值:從原始訊息複製內容類型。

MQTT v5 使用者屬性索引鍵值組會直接轉譯為 Kafka 用戶標頭。 如果訊息中的用戶標頭名稱與內建 MQTT 屬性相同(例如,名為「相互關聯數據」的用戶標頭),則無論是轉送 MQTT v5 規格屬性值還是未定義用戶屬性。

數據流永遠不會從 MQTT Broker 接收這些屬性。 因此,數據流永遠不會轉送它們:

  • 主題別名
  • 訂用帳戶標識碼
Message Expiry Interval 屬性

訊息 到期間隔 會指定在捨棄之前,訊息可以保留在 MQTT 訊息代理程式中的時間長度。

當數據流收到指定訊息到期間隔的 MQTT 訊息時,它會:

  • 記錄收到訊息的時間。
  • 在訊息發出至目的地之前,會從訊息中減去時間,從原始到期間隔時間排入佇列。
  • 如果訊息尚未過期(上述作業為 > 0),則訊息會發出至目的地,並包含更新的訊息到期時間。
  • 如果訊息已過期(上述作業為 <= 0),則目標不會發出訊息。

範例:

  • 數據流會收到訊息到期間隔 = 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 在此 MQTT 訊息傳送至目標之前,經過 1,000 秒。 在此情況下,目的地的訊息會將其訊息到期間隔設定為 2600 (3600 - 1000) 秒。
  • 數據流會接收含有訊息到期間隔 = 3600 秒的 MQTT 訊息。 對應的目的地會暫時中斷連線,但能夠重新連線。 不過,在此情況下,重新連線需要 4,000 秒。 訊息已過期,數據流不會將此訊息轉送至目的地。

Kafka 端點是數據流來源

注意

當使用事件中樞端點做為數據流來源時,有一個已知問題,Kafka 標頭在轉譯為 MQTT 時會損毀。 只有在使用事件中樞時,才會發生此情況,而事件中樞用戶端會使用涵蓋下的AMQP。 例如“foo”=“bar”,會轉譯 “foo”,但值會變成“\xa1\x03bar”。

當 Kafka 端點是數據流來源時,Kafka 用戶標頭會轉譯為 MQTT v5 屬性。 下表描述 Kafka 用戶標頭如何轉譯為 MQTT v5 屬性。

Kafka 標頭 已翻譯的行為
機碼 機碼:“金鑰”
值:原始訊息中的金鑰複本。
時間戳記 機碼:“時間戳”
值:Kafka Timestamp 的 UTF-8 編碼,這是 Unix epoch 之後的毫秒數。

Kafka 用戶標頭索引鍵/值組 - 前提是它們全都以 UTF-8 編碼 - 會直接轉譯成 MQTT 使用者密鑰/值屬性。

UTF-8 / 二進位不符

MQTT v5 只能支援UTF-8型屬性。 如果數據流收到包含一或多個非 UTF-8 標頭的 Kafka 訊息,數據流將會:

  • 拿掉冒犯的屬性或屬性。
  • 遵循先前的規則,將訊息的其餘部分轉寄到 上。

在 Kafka Source 標頭中需要二進位傳輸的應用程式 => MQTT 目標屬性必須先將它們編碼為 UTF-8,例如透過 Base64。

>=64KB 屬性不符

MQTT v5 屬性必須小於 64 KB。 如果數據流收到包含一或多個標頭的 Kafka 訊息,也就是 >= 64KB,數據流將會:

  • 拿掉冒犯的屬性或屬性。
  • 遵循先前的規則,將訊息的其餘部分轉寄到 上。
使用使用AMQP的事件中樞和產生者時的屬性轉譯

如果您有用戶端轉送訊息,Kafka 資料流來源端點會執行下列任何動作:

  • 使用 Azure.Messaging.EventHubs 等 用戶端連結庫將訊息傳送至事件中樞
  • 直接使用AMQP

有要注意的屬性轉譯細微差別。

您應該執行下列其中一項動作:

  • 避免傳送屬性
  • 如果您必須傳送屬性,請傳送編碼為UTF-8的值。

當事件中樞將屬性從AMQP轉譯為Kafka時,它會在其訊息中包含基礎AMQP編碼類型。 如需行為的詳細資訊,請參閱 使用不同通訊協定在取用者和產生者之間交換事件。

在下列程式代碼範例中,當資料流端點收到 值 "foo":"bar"時,它會將 屬性接收為 <0xA1 0x03 "bar">

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

數據流端點無法將承載屬性 <0xA1 0x03 "bar"> 轉送至 MQTT 訊息,因為數據不是 UTF-8。 不過,如果您指定UTF-8字串,數據流端點會在傳送至 MQTT 之前轉譯字串。 如果您使用UTF-8字串,MQTT訊息會有 "foo":"bar" 作為用戶屬性。

只會翻譯UTF-8標頭。 例如,假設下列案例會將 屬性設定為 float:

Properties = 
{
  {"float-value", 11.9 },
}

數據流端點會捨棄包含 欄位的 "float-value" 封包。

並非所有事件數據屬性,包括 propertyEventData.correlationId 都會轉送。 如需詳細資訊,請參閱 事件用戶屬性

CloudEvents

CloudEvents 是一種以常見方式描述事件數據的方法。 CloudEvents 設定可用來以 CloudEvents 格式傳送或接收訊息。 您可以將 CloudEvents 用於事件驅動架構,其中不同的服務需要在相同或不同的雲端提供者中彼此通訊。

選項 CloudEventAttributesPropagateCreateOrRemap

在 [作業體驗數據流端點設定] 頁面中,選取 [ 進階 ] 索引標籤,然後使用 [雲端事件屬性 ] 字段來指定 CloudEvents 設定。

下列各節說明如何傳播或建立及重新對應 CloudEvent 屬性。

傳播設定

CloudEvent 屬性會針對包含必要屬性的訊息傳遞。 如果訊息未包含必要的屬性,訊息會依目前方式傳遞。 如果必要屬性存在, ce_ 則會將前置詞新增至 CloudEvent 屬性名稱。

名稱 必要 範例值 輸出名稱 輸出值
specversion Yes 1.0 ce-specversion 以目前方式傳遞
type Yes ms.aio.telemetry ce-type 以目前方式傳遞
source Yes aio://mycluster/myoven ce-source 以目前方式傳遞
id Yes A234-1234-1234 ce-id 以目前方式傳遞
subject No aio/myoven/telemetry/temperature ce-subject 以目前方式傳遞
time No 2018-04-05T17:31:00Z ce-time 以目前方式傳遞。 它沒有安息。
datacontenttype No application/json ce-datacontenttype 在選擇性轉換階段之後,已變更為輸出數據內容類型。
dataschema No sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 如果在轉換組態中指定輸出數據轉換架構, dataschema 則會變更為輸出架構。

CreateOrRemap 設定

CloudEvent 屬性會針對包含必要屬性的訊息傳遞。 如果訊息未包含必要的屬性,則會產生屬性。

名稱 必要 輸出名稱 遺漏時產生的值
specversion Yes ce-specversion 1.0
type .是 ce-type ms.aio-dataflow.telemetry
source .是 ce-source aio://<target-name>
id Yes ce-id 目標客戶端中產生的 UUID
subject No ce-subject 傳送訊息的輸出主題
time No ce-time 在目標客戶端中產生為 RFC 3339
datacontenttype No ce-datacontenttype 在選擇性轉換階段之後變更為輸出數據內容類型
dataschema No ce-dataschema 架構登錄中定義的架構

下一步

若要深入了解數據流,請參閱 建立數據流