Obtención de datos de Apache Flink
Apache Flink es un marco y un motor de procesamiento distribuido para cálculos con estado mediante flujos de datos enlazados y no enlazados.
El conector Flink es un proyecto de código abierto que se puede ejecutar en cualquier clúster de Flink. Implementa el receptor de datos para mover datos desde un clúster de Flink. Con el conector de Apache Flink, puede crear aplicaciones rápidas y escalables orientadas a escenarios controlados por datos, como, por ejemplo, el aprendizaje automático (ML), la extracción, transformación y carga de datos (ETL), y el análisis de registros.
En este artículo, aprenderá a usar el conector de Flink para enviar datos de Flink a la tabla. Creará una tabla y una asignación de datos, hará que Flink envíe los datos a la tabla y, a continuación, validará los resultados.
Requisitos previos
- Un clúster y la base de datos de Azure Data Explorer. Cree un clúster y una base de datos o una base de datos KQL en inteligencia de tiempo real en Microsoft Fabric.
- Una tabla de destino en la base de datos. Consulte Creación de una tabla en Azure Data Explorer o Creación de una tabla en Inteligencia en tiempo real.
- Un clúster de Apache Flink. Cree un clúster.
- Maven 3.x
Obtención del conector de Flink
Para los proyectos de Flink que usan Maven para administrar las dependencias, integre el receptor de núcleo del conector de Flink para Azure Data Explorer agregándolo como dependencia:
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>flink-connector-kusto</artifactId>
<version>1.0.0</version>
</dependency>
En el caso de los proyectos que no usan Maven para administrar las dependencias, clone el repositorio del conector de Azure Data Explorer para Apache Flink y créelo localmente. Este enfoque le permite agregar manualmente el conector al repositorio local de Maven mediante el comando mvn clean install -DskipTests
.
Authenticate
Puede autenticarse desde Flink mediante una aplicación de Microsoft Entra ID.
Esta entidad de servicio será la identidad utilizada por el conector para escribir datos en la tabla de Kusto. Más adelante concederemos permisos para que esta entidad de servicio acceda a los recursos de Kusto.
Inicie sesión en su suscripción de Azure a través de la CLI de Azure. A continuación, realice la autenticación en el explorador.
az login
Elija la suscripción para hospedar la entidad de servicio. Este paso es necesario si tiene varias suscripciones.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Cree la entidad de servicio. En este ejemplo, la entidad de servicio se llama
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
En los datos JSON devueltos, copie los valores
appId
,password
ytenant
para usarlos posteriormente.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Ha creado una aplicación de Microsoft Entra y una entidad de servicio.
Conceda permisos de usuario a la aplicación en la base de datos:
// Grant database user permissions .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
Conceda a la aplicación permisos de ingesta o de administrador en la tabla. Los permisos necesarios dependen del método de escritura de datos elegido. Los permisos de ingesta son suficientes para SinkV2, mientras que WriteAndSink requiere permisos de administrador.
// Grant table ingestor permissions (SinkV2) .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>') // Grant table admin permissions (WriteAheadSink) .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
Para obtener más información sobre la autorización, consulte Control de acceso basado en roles de Kusto.
Escritura de datos desde Flink
Para escribir datos desde Flink:
Importe las opciones necesarias:
import com.microsoft.azure.flink.config.KustoConnectionOptions; import com.microsoft.azure.flink.config.KustoWriteOptions;
Use la aplicación para autenticarse.
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setAppId("<Application ID>") .setAppKey("<Application key>") .setTenantId("<Tenant ID>") .setClusterUrl("<Cluster URI>").build();
Configure los parámetros del receptor, como la base de datos y la tabla:
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder() .withDatabase("<Database name>").withTable("<Table name>").build();
Puede agregar más opciones, que se describen en la tabla siguiente:
Opción Descripción Valor predeterminado IngestionMappingRef Hace referencia a una asignación de ingesta existente. FlushImmediately Vacía los datos inmediatamente y puede causar problemas de rendimiento. No se recomienda este método. BatchIntervalMs Controla la frecuencia con la que se vacían los datos. 30 segundos BatchSize Establece el tamaño del lote para los registros de almacenamiento en búfer antes de vaciarlos. 1000 registros ClientBatchSizeLimit Especifica el tamaño en MB de datos agregados antes de la ingesta. 300 MB PollForIngestionStatus Si es true, el conector sondea el estado de ingesta después del vaciado de datos. false DeliveryGuarantee Determina la semántica de garantía de entrega. Para lograr exactamente una semántica, use WriteAheadSink. AT_LEAST_ONCE Escriba datos de streaming con uno de los métodos siguientes:
- SinkV2: esta es una opción sin estado que vacía los datos en el punto de control, lo que garantiza al menos una coherencia. Se recomienda esta opción para la ingesta de datos de gran volumen.
- WriteAheadSink: este método emite datos a KustoSink. Se integra con el sistema de puntos de comprobación de Flink y ofrece exactamente una garantía. Los datos se almacenan en un AbstractStateBackend y se confirman solo después de completar un punto de control.
En el ejemplo siguiente se usa SinkV2. Para usar WriteAheadSink, use el método
buildWriteAheadSink
en lugar debuild
:KustoWriteSink.builder().setWriteOptions(kustoWriteOptions) .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/ , 2 /*Parallelism to use*/);
El código de finalización debería tener este aspecto:
import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();
KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
.withDatabase("<Database name>").withTable("<Table name>").build();
KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
.setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
, 2 /*Parallelism to use*/);
Compruebe la ingesta de los datos
Una vez configurada la conexión, los datos se envían a la tabla. Puede comprobar que los datos se ingieren ejecutando una consulta KQL.
Ejecute la consulta siguiente para comprobar que los datos se ingieren en la tabla:
<MyTable> | count
Ejecute la siguiente consulta para ver los datos:
<MyTable> | take 100