Gravar mensagens de evento no Azure Data Lake Storage Gen2 com a API Apache Flink® DataStream
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam às funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não lançadas para disponibilização geral. Para obter informações sobre esta visualização específica, consulte informações de pré-visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight .
O Apache Flink usa sistemas de arquivos para consumir e armazenar dados persistentemente, tanto para os resultados de aplicativos quanto para tolerância a falhas e recuperação. Neste artigo, saiba como gravar mensagens de evento no Azure Data Lake Storage Gen2 com a API DataStream.
Pré-requisitos
- Cluster do Apache Flink no HDInsight no AKS
-
cluster Apache Kafka no HDInsight
- É necessário garantir que as configurações de rede sejam atendidas conforme descrito em Usando o Apache Kafka no HDInsight. Certifique-se de que o HDInsight em AKS e os clusters HDInsight estão na mesma Rede Virtual.
- Use o MSI para acessar o ADLS Gen2
- IntelliJ para desenvolvimento em uma VM do Azure no HDInsight na Rede Virtual AKS
Conector Apache Flink FileSystem
Este conector de sistema de ficheiros fornece as mesmas garantias para BATCH e STREAMING e foi concebido para proporcionar exatamente uma vez a semântica para a execução de STREAMING. Para obter mais informações, consulte Flink DataStream Filesystem.
Conector Apache Kafka
Flink fornece conector Apache Kafka para ler e gravar dados em tópicos do Kafka com garantias de exatamente uma vez. Para obter mais informações, consulte Apache Kafka Connector.
Crie o projeto para o Apache Flink
pom.xml sobre o IntelliJ IDEA
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.2.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Programa para de lavatório ADLS Gen2
abfsGen2.java
Observação
Substitua Apache Kafka no cluster HDInsight bootStrapServers por os seus próprios brokers para o Kafka 3.2
package contoso.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class KafkaSinkToGen2 {
public static void main(String[] args) throws Exception {
// 1. get stream execution env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration flinkConfig = new Configuration();
flinkConfig.setString("classloader.resolve-order", "parent-first");
env.getConfig().setGlobalJobParameters(flinkConfig);
// 2. read kafka message as stream input, update your broker ip's
String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("click_events")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.print();
// 3. sink to gen2, update container name and storage path
String outputPath = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(2))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
.build();
stream.sinkTo(sink);
// 4. run stream
env.execute("Kafka Sink To Gen2");
}
}
jar do pacote e envie para o Apache Flink.
Carregue o ficheiro jar para o ABFS.
Passe as informações do job jar durante a criação do cluster
AppMode
.Observação
Certifique-se de adicionar classloader.resolve-order como 'parent-first' e hadoop.classpath.enable como
true
Selecione Agregação de logs de tarefas para enviar logs de trabalho para a conta de armazenamento.
Você pode ver o trabalho em execução.
Validar dados de streaming no ADLS Gen2
Estamos vendo o click_events
fluindo para o ADLS Gen2.
Você pode especificar uma política de rotação que gira o ficheiro atualmente em progresso sob qualquer uma das 3 condições a seguir:
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(5))
.withInactivityInterval(Duration.ofMinutes(3))
.withMaxPartSize(MemorySize.ofMebiBytes(5))
.build())
Referência
- Apache Kafka Connector
- Sistema de Arquivos do Flink DataStream
- site Apache Flink
- Apache, Apache Kafka, Kafka, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).