Escritura de mensajes de eventos en Azure Data Lake Storage Gen2 con La API de DataStream de Apache Flink®
Importante
Azure HDInsight en AKS se retiró el 31 de enero de 2025. Descubre más con este anuncio.
Debe migrar las cargas de trabajo a microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo.
Importante
Esta característica está actualmente en versión preliminar. Los Términos de uso complementarios para las versiones preliminares de Microsoft Azure incluyen más términos legales que se aplican a las características de Azure que se encuentran en versión beta, en versión preliminar o, de lo contrario, aún no se han publicado en disponibilidad general. Para obtener información sobre esta versión preliminar específica, consulte información de la versión preliminar de Azure HDInsight en AKS. Para preguntas o sugerencias de características, envíe una solicitud en AskHDInsight con los detalles y síganos para obtener más actualizaciones sobre Comunidad de Azure HDInsight.
Apache Flink usa sistemas de archivos para consumir y almacenar datos de forma persistente, tanto para los resultados de las aplicaciones como para la tolerancia a errores y la recuperación. En este artículo, aprenderá a escribir mensajes de eventos en Azure Data Lake Storage Gen2 con DataStream API.
Prerrequisitos
- clúster de Apache Flink en HDInsight sobre AKS
-
clúster de Apache Kafka en HDInsight
- Debe asegurarse de que la configuración de red se ha gestionado como se describe en Uso de Apache Kafka en HDInsight. Asegúrese de que HDInsight en AKS y los clústeres de HDInsight estén en la misma red virtual.
- Uso de MSI para acceder a ADLS Gen2
- IntelliJ para el desarrollo en una máquina virtual de Azure en HDInsight en AKS Virtual Network
Conector de Apache Flink FileSystem
Este conector del sistema de archivos proporciona las mismas garantías tanto para BATCH como para STREAMING y está diseñado para ofrecer semántica de ejecución exactamente una vez en la ejecución de STREAMING. Para obtener más información, consulte Filesystem DataStream de Flink.
Conector de Apache Kafka
Flink proporciona un conector de Apache Kafka para leer datos de y escribir datos en tópicos de Kafka con garantías de exactamente una vez. Para más información, consulte Conector de Apache Kafka.
Compilación del proyecto para Apache Flink
pom.xml en IntelliJ IDEA
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Programa para receptores de ADLS Gen2
abfsGen2.java
Nota
Reemplace Apache Kafka en el clúster de HDInsight bootStrapServers por sus propios agentes para Kafka 3.2
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
Empaquete el archivo jar y envíelo a Apache Flink.
Cargue el archivo jar en ABFS.
Pase la información del archivo jar del trabajo en la creación del clúster
AppMode
.Nota
Asegúrese de agregar classloader.resolve-order como "parent-first" y hadoop.classpath.enable como
true
Selecciona la agregación del registro de trabajos para almacenar los registros de trabajos en la cuenta de almacenamiento.
Puede ver el trabajo en ejecución.
Validar datos de streaming en ADLS Gen2
Estamos viendo la transmisión del click_events
en ADLS Gen2.
Puede especificar una política de rotación que gire el archivo de partes en curso bajo cualquiera de las tres condiciones siguientes:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
Referencia
- Conector de Apache Kafka
- del sistema de archivos DataStream de Flink
- Sitio web de Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de la Apache Software Foundation (ASF).