使用 Apache Flink 將數據內嵌至 Azure 數據總管
Apache Flink 是一種架構和分散式處理引擎,可用於透過未繫結和已繫結資料流進行具狀態計算。
Flink 連接器是可在任何 Flink 叢集上執行的開放原始碼專案。 它會實作用於從 Flink 叢集移動資料的資料接收器。 使用 Apache Flink 的連接器,您可以建置以資料驅動案例為目標的快速且可調整的應用程式,例如機器學習服務 (ML)、擷取-轉換-載入 (ETL) 和 Log Analytics。
在本文中,您將瞭解如何使用 Flink 連接器將資料從 Flink 傳送至資料表。 您可以建立資料表和資料對應、直接 Flink 以將資料傳送至資料表,然後驗證結果。
必要條件
- Azure 資料總管叢集和資料庫。 在 Microsoft Fabric 即時智慧中建立叢集和資料庫或 KQL 資料庫。
- 資料庫中的目標資料表。 請參閱在 Azure 資料總管 中建立資料表或在即時智慧中建立資料表
- Apache Flink 叢集。 建立叢集。
- Maven 3.x
取得 Flink 連接器
對於使用 Maven 管理相依性的 Flink 專案,請將 Flink Connector Core Sink For Azure Data Explorer 新增為相依性來整合:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
針對未使用 Maven 來管理相依性的專案,請複製 Azure Data Explorer Connector for Apache Flink 的存放庫,並在本機建置。 此方法可讓您使用命令 mvn clean install -DskipTests
,手動將連接器新增至本機 Maven 存放庫。
您可以從 Flink 進行驗證,以使用 Microsoft Entra ID 應用程式或受控識別。
此服務主體將是連接器用來在 Kusto 資料表中寫入您的資料的識別。 您稍後會授與此服務主體存取 Kusto 資源的權限。
透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。
az login
選擇用來託管主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。
az account set --subscription YOUR_SUBSCRIPTION_GUID
建立服務主體。 在此範例中,服務主體稱為
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
從傳回的 JSON 資料中,複製
appId
、password
和tenant
供日後使用。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
您已建立您的 Microsoft Entra 應用程式和服務主體。
授與資料庫上的應用程式使用者權限:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
授與應用程式資料表的擷取者或系統管理員權限。 所需的權限取決於所選的資料寫入方法。 擷取者權限足夠用於 SinkV2,而 WriteAndSink 則需要系統管理員權限。
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
如需授權的詳細資訊,請參閱 Kusto 角色型存取控制。
從 Flink 寫入資料
若要從 Flink 寫入資料:
匯入所需的選項:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
使用您的應用程式或受控識別進行驗證。
針對應用程式驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
針對受控識別驗證:
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId("<Object ID>") .setClusterUrl("<Cluster URI>").build();
設定接收器參數,例如資料庫和資料表:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
您可以新增更多選項,如下表所述:
選項 描述 預設值 IngestionMappingRef 參考現有的擷取對應。 FlushImmediately 立即排清資料,並可能導致效能問題。 不建議使用此方法。 BatchIntervalMs 控制資料排清的頻率。 30 秒 BatchSize 設定排清前緩衝記錄的批次大小。 1,000 筆記錄 ClientBatchSizeLimit 指定擷取前的彙總資料大小 (MB)。 300 MB PollForIngestionStatus 如果為 true,連接器會在資料排清之後輪詢擷取狀態。 false DeliveryGuarantee 決定傳遞保證語意。 若要達到確切一次語意,請使用 WriteAheadSink。 AT_LEAST_ONCE 使用下列其中一種方法寫入串流資料:
- SinkV2:這是一種無狀態選項,可在檢查點上排清資料,確保至少一次一致性。 我們建議使用此選項來進行大量資料擷取。
- WriteAheadSink:此方法會將資料發出至 KustoSink。 它已與 Flink 的檢查點系統整合,並提供確切一次保證。 資料會儲存在 AbstractStateBackend 中,而且只有在檢查點完成之後才提交。
下列範例會使用 SinkV2。 若要使用 WriteAheadSink,請使用
buildWriteAheadSink
方法,而不是build
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
完整程式碼應該類似以下:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
驗證正在擷取資料
設定連線之後,資料就會傳送至您的資料表。 您可以執行 KQL 查詢來驗證資料是否已擷取。
執行下列查詢,以驗證資料已擷取至資料表:
<MyTable> | count
執行下列查詢以檢視資料:
<MyTable> | take 100