So verwenden Sie Flink/Delta-Connector
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Mithilfe von Apache Flink und Delta Lake können Sie eine zuverlässige und skalierbare Data Lakehouse-Architektur erstellen. Mit dem Flink/Delta-Connector können Sie Daten mit ACID-Transaktionen (Atomicity, Consistency, Isolation, Durability, Atomarität, Konsistenz, Isolation, Dauerhaftigkeit) und Exactly-Once-Verarbeitung in Deltatabellen schreiben. Dies bedeutet, dass Ihre Datenströme konsistent und fehlerfrei sind, auch wenn Sie die Flink-Pipeline von einem Prüfpunkt neu starten. Der Flink/Delta-Connector stellt sicher, dass Ihre Daten nicht verloren gehen oder dupliziert werden und dass sie der Flink-Semantik entsprechen.
In diesem Artikel erfahren Sie, wie Sie den Flink/Delta-Connector verwenden.
- Lesen der Daten aus der Deltatabelle
- Schreiben der Daten in die Deltatabelle
- Abfragen der Daten in Power BI
Was ist Flink/Delta-Connector
Der Flink/Delta-Connector ist eine JVM-Bibliothek zum Lesen und Schreiben von Daten aus Apache Flink-Anwendungen in Deltatabellen mithilfe der eigenständigen JVM-Delta-Bibliothek. Der Connector bietet eine Exactly-Once-Zustellungsgarantie.
Umfang des Flink/Delta-Connectors:
Deltasenke (DeltaSink) zum Schreiben von Daten aus Apache Flink in eine Deltatabelle Deltaquelle (DeltaSource) zum Lesen von Deltatabellen mit Apache Flink
Umfang des Apache Flink/Delta-Connectors:
Abhängig von der Version des Connectors können Sie ihn mit folgenden Apache Flink-Versionen verwenden:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Voraussetzungen
- HDInsight Flink 1.17.0-Cluster auf AKS
- Flink-Delta Connector 0.7.0
- Verwenden von MSI für den Zugriff auf ADLS Gen2
- IntelliJ für die Entwicklung
Lesen von Daten aus einer Deltatabelle
Delta Source kann in einem von zwei Modi funktionieren, wie folgt beschrieben.
Gebundener Modus geeignet für Batchaufträge, bei denen der Inhalt der Delta-Tabelle nur für bestimmte Tabellenversion gelesen werden soll. Erstellen Sie eine Quelle dieses Modus mithilfe der DeltaSource.forBoundedRowData-API.
Kontinuierlicher Modus geeignet für Streamingaufträge, wo wir die Delta-Tabelle kontinuierlich auf neue Änderungen und Versionen überprüfen möchten. Erstellen Sie eine Quelle dieses Modus mithilfe der DeltaSource.forContinuousRowData-API.
Beispiel: Die Quellerstellung für die Delta-Tabelle, um alle Spalten im gebundenen Modus zu lesen. Geeignet für Batchaufträge. In diesem Beispiel wird die neueste Tabellenversion geladen.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Schreiben in die Deltasenke
Delta Sink macht derzeit die folgenden Flink-Metriken verfügbar:
Senkenerstellung für nicht partitionierte Tabellen
In diesem Beispiel zeigen wir, wie eine DeltaSink erstellt und an eine vorhandene org.apache.flink.streaming.api.datastream.DataStream
angebunden wird.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Vollständiger Code
Lesen sie Daten aus einer Delta-Tabelle und sinken Sie in eine andere Delta-Tabelle.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Packen Sie das Jar und übermitteln Sie es an das Flink-Cluster, um es auszuführen
Übergeben Sie die Informationen der Auftrags-JAR-Datei im AppMode-Cluster.
Hinweis
Aktivieren Sie immer
hadoop.classpath.enable
beim Lesen/Schreiben in ADLS.Übermitteln Sie den Cluster. Sie sollten den Auftrag auf der Flink-Benutzeroberfläche sehen.
Suchen Sie Ergebnisse in ADLS.
Power BI-Integration
Sobald sich die Daten in der Deltasenke befinden, können Sie die Abfrage in Power BI Desktop ausführen und einen Bericht erstellen.
Öffnen Sie Power BI Desktop, um die Daten mithilfe des ADLS Gen2-Connectors abzurufen.
URL des Speicherkontos.
Erstellen Sie die M-Abfrage für die Quelle, und rufen Sie die Funktion auf, die die Daten aus dem Speicherkonto abfragt.
Sobald die Daten verfügbar sind, können Sie Berichte erstellen.
References
- Apache, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Handelsmarken der Apache Software Foundation (ASF).