Freigeben über


So verwenden Sie Flink/Delta-Connector

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Mithilfe von Apache Flink und Delta Lake können Sie eine zuverlässige und skalierbare Data Lakehouse-Architektur erstellen. Mit dem Flink/Delta-Connector können Sie Daten mit ACID-Transaktionen (Atomicity, Consistency, Isolation, Durability, Atomarität, Konsistenz, Isolation, Dauerhaftigkeit) und Exactly-Once-Verarbeitung in Deltatabellen schreiben. Dies bedeutet, dass Ihre Datenströme konsistent und fehlerfrei sind, auch wenn Sie die Flink-Pipeline von einem Prüfpunkt neu starten. Der Flink/Delta-Connector stellt sicher, dass Ihre Daten nicht verloren gehen oder dupliziert werden und dass sie der Flink-Semantik entsprechen.

In diesem Artikel erfahren Sie, wie Sie den Flink/Delta-Connector verwenden.

  • Lesen der Daten aus der Deltatabelle
  • Schreiben der Daten in die Deltatabelle
  • Abfragen der Daten in Power BI

Was ist Flink/Delta-Connector

Der Flink/Delta-Connector ist eine JVM-Bibliothek zum Lesen und Schreiben von Daten aus Apache Flink-Anwendungen in Deltatabellen mithilfe der eigenständigen JVM-Delta-Bibliothek. Der Connector bietet eine Exactly-Once-Zustellungsgarantie.

Umfang des Flink/Delta-Connectors:

Deltasenke (DeltaSink) zum Schreiben von Daten aus Apache Flink in eine Deltatabelle Deltaquelle (DeltaSource) zum Lesen von Deltatabellen mit Apache Flink

Umfang des Apache Flink/Delta-Connectors:

Abhängig von der Version des Connectors können Sie ihn mit folgenden Apache Flink-Versionen verwenden:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Voraussetzungen

  • HDInsight Flink 1.17.0-Cluster auf AKS
  • Flink-Delta Connector 0.7.0
  • Verwenden von MSI für den Zugriff auf ADLS Gen2
  • IntelliJ für die Entwicklung

Lesen von Daten aus einer Deltatabelle

Delta Source kann in einem von zwei Modi funktionieren, wie folgt beschrieben.

  • Gebundener Modus geeignet für Batchaufträge, bei denen der Inhalt der Delta-Tabelle nur für bestimmte Tabellenversion gelesen werden soll. Erstellen Sie eine Quelle dieses Modus mithilfe der DeltaSource.forBoundedRowData-API.

  • Kontinuierlicher Modus geeignet für Streamingaufträge, wo wir die Delta-Tabelle kontinuierlich auf neue Änderungen und Versionen überprüfen möchten. Erstellen Sie eine Quelle dieses Modus mithilfe der DeltaSource.forContinuousRowData-API.

Beispiel: Die Quellerstellung für die Delta-Tabelle, um alle Spalten im gebundenen Modus zu lesen. Geeignet für Batchaufträge. In diesem Beispiel wird die neueste Tabellenversion geladen.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Schreiben in die Deltasenke

Delta Sink macht derzeit die folgenden Flink-Metriken verfügbar:

Screenshot der Tabelle für Flink-Metriken.

Senkenerstellung für nicht partitionierte Tabellen

In diesem Beispiel zeigen wir, wie eine DeltaSink erstellt und an eine vorhandene org.apache.flink.streaming.api.datastream.DataStream angebunden wird.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Vollständiger Code

Lesen sie Daten aus einer Delta-Tabelle und sinken Sie in eine andere Delta-Tabelle.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Laden Sie die JAR-Datei in ABFS hoch. Screenshot der JAR-Dateien im App-Modus.

  2. Übergeben Sie die Informationen der Auftrags-JAR-Datei im AppMode-Cluster.

    Screenshot der Clusterkonfiguration.

    Hinweis

    Aktivieren Sie immer hadoop.classpath.enable beim Lesen/Schreiben in ADLS.

  3. Übermitteln Sie den Cluster. Sie sollten den Auftrag auf der Flink-Benutzeroberfläche sehen.

    Screenshot des Flink-Dashboards.

  4. Suchen Sie Ergebnisse in ADLS.

    Screenshot der Ausgabe.

Power BI-Integration

Sobald sich die Daten in der Deltasenke befinden, können Sie die Abfrage in Power BI Desktop ausführen und einen Bericht erstellen.

  1. Öffnen Sie Power BI Desktop, um die Daten mithilfe des ADLS Gen2-Connectors abzurufen.

    Screenshot zeigt Power BI-Desktop.

    Screenshot des ADLSGen 2-Connectors.

  2. URL des Speicherkontos.

    Screenshot der URL des Speicherkontos.

    Screenshot zeigt ADLS Gen2-Details.

  3. Erstellen Sie die M-Abfrage für die Quelle, und rufen Sie die Funktion auf, die die Daten aus dem Speicherkonto abfragt.

  4. Sobald die Daten verfügbar sind, können Sie Berichte erstellen.

    Screenshot der Erstellung eines neuen Berichts.

References