Udostępnij za pośrednictwem


Zapisywanie komunikatów zdarzeń w usłudze Azure Data Lake Storage Gen2 przy użyciu interfejsu API Apache Flink® DataStream

Ważny

Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej dzięki temu ogłoszeniu.

Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.

Ważny

Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla wersji zapoznawczych Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure w wersji beta, w wersji zapoznawczej lub w inny sposób nie są jeszcze ogólnodostępne. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz Azure HDInsight w usłudze AKS w wersji zapoznawczej informacji. W przypadku pytań lub sugestii dotyczących funkcji, prześlij żądanie na AskHDInsight ze szczegółami i śledź nas, aby otrzymywać więcej aktualizacji na społeczności Azure HDInsight.

Narzędzie Apache Flink używa systemów plików do korzystania z danych i ich trwałego przechowywania, zarówno w przypadku wyników aplikacji, jak i odporności na uszkodzenia i odzyskiwania. Z tego artykułu dowiesz się, jak zapisywać komunikaty o zdarzeniach w usłudze Azure Data Lake Storage Gen2 przy użyciu interfejsu API datastream.

Warunki wstępne

Ten łącznik systemu plików zapewnia te same gwarancje zarówno dla usługi BATCH, jak i PRZESYŁANIA STRUMIENIOWEgo, i jest przeznaczony do zapewnienia dokładnie raz semantyki na potrzeby wykonywania przesyłania strumieniowego. Aby uzyskać więcej informacji, zobacz Flink DataStream Filesystem.

Łącznik Apache Kafka

Flink udostępnia łącznik platformy Apache Kafka do odczytywania danych z tematów platformy Kafka i zapisywania ich w tematach platformy Kafka z dokładnie jednokrotnymi gwarancjami. Aby uzyskać więcej informacji, zobacz łącznik Apache Kafka.

pom.xml w środowisku IntelliJ IDEA

<properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>3.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

program dla ujścia usługi ADLS Gen2

abfsGen2.java

Notatka

Zastąp Apache Kafka w klastrze HDInsight bootStrapServers własnymi brokerami dla Kafka 3.2

package contoso.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class KafkaSinkToGen2 {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         
        Configuration flinkConfig = new Configuration(); 

         flinkConfig.setString("classloader.resolve-order", "parent-first"); 

         env.getConfig().setGlobalJobParameters(flinkConfig);  

        // 2. read kafka message as stream input, update your broker ip's
        String brokers = "<update-broker-ip>:9092,<update-broker-ip>:9092,<update-broker-ip>:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("click_events")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.print();

        // 3. sink to gen2, update container name and storage path
        String outputPath  = "abfs://<container-name>@<storage-path>.dfs.core.windows.net/flink/data/click_events";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(2))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())
                .build();

        stream.sinkTo(sink);

        // 4. run stream
        env.execute("Kafka Sink To Gen2");
    }
}

Zapakuj plik jar pakietu i prześlij go do platformy Apache Flink.

  1. Przekaż plik jar do systemu ABFS.

    Zrzut ekranu przedstawiający ekran trybu aplikacji Flink.

  2. Przekaż informacje o pliku jar zadania podczas tworzenia klastra AppMode.

    Zrzut ekranu przedstawiający tryb tworzenia aplikacji.

    Notatka

    Upewnij się, że dodano classloader.resolve-order jako „parent-first” oraz hadoop.classpath.enable jako true

  3. Wybierz pozycję Agregacja dziennika operacji, aby przesyłać dzienniki zadań do konta magazynowego.

    Zrzut ekranu przedstawiający sposób włączania dziennika zadań.

  4. Możesz zobaczyć, jak zadanie jest uruchomione.

    zrzut ekranu przedstawiający interfejs użytkownika Flink.

Weryfikuj dane przesyłane strumieniowo w usłudze ADLS Gen2

Obserwujemy, jak strumień click_events dociera do ADLS Gen2.

zrzut ekranu przedstawiający dane wyjściowe usługi ADLS Gen2. Zrzut ekranu przedstawiający dane wyjściowe zdarzenia kliknięcia Flink.

Możesz określić zasady rotacyjną, która przekształca plik częściowy będący w toku przy spełnieniu jednego z następujących trzech warunków:

.withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(5))
                                .withInactivityInterval(Duration.ofMinutes(3))
                                .withMaxPartSize(MemorySize.ofMebiBytes(5))
                                .build())

Odniesienie