Jak używać łącznika Flink/Delta
Uwaga
Wycofamy usługę Azure HDInsight w usłudze AKS 31 stycznia 2025 r. Przed 31 stycznia 2025 r. należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure, aby uniknąć nagłego zakończenia obciążeń. Pozostałe klastry w ramach subskrypcji zostaną zatrzymane i usunięte z hosta.
Tylko podstawowa pomoc techniczna będzie dostępna do daty wycofania.
Ważne
Ta funkcja jest aktualnie dostępna jako funkcja podglądu. Dodatkowe warunki użytkowania dla wersji zapoznawczych platformy Microsoft Azure obejmują więcej warunków prawnych, które dotyczą funkcji platformy Azure, które znajdują się w wersji beta, w wersji zapoznawczej lub w inny sposób nie zostały jeszcze wydane w wersji ogólnodostępnej. Aby uzyskać informacje o tej konkretnej wersji zapoznawczej, zobacz Informacje o wersji zapoznawczej usługi Azure HDInsight w usłudze AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie w usłudze AskHDInsight , aby uzyskać szczegółowe informacje i postępuj zgodnie z nami, aby uzyskać więcej aktualizacji w społeczności usługi Azure HDInsight.
Korzystając ze sobą usług Apache Flink i Delta Lake, można utworzyć niezawodną i skalowalną architekturę usługi Data Lakehouse. Łącznik Flink/Delta umożliwia zapisywanie danych w tabelach delty przy użyciu transakcji ACID i dokładnie raz przetwarzania. Oznacza to, że strumienie danych są spójne i wolne od błędów, nawet jeśli ponownie uruchomisz potok Flink z punktu kontrolnego. Łącznik Flink/Delta zapewnia, że dane nie zostaną utracone ani zduplikowane oraz że są zgodne z semantykami Flink.
Z tego artykułu dowiesz się, jak używać łącznika Flink-Delta.
- Odczytywanie danych z tabeli delty.
- Zapisywanie danych w tabeli delty.
- Wykonaj zapytanie w usłudze Power BI.
Co to jest łącznik Flink/Delta
Flink/Delta Connector to biblioteka JVM do odczytywania i zapisywania danych z aplikacji Apache Flink do tabel delty korzystających z biblioteki Delta Standalone JVM. Łącznik zapewnia dokładnie jednokrotne gwarancje dostarczania.
Łącznik Flink/Delta obejmuje następujące elementy:
DeltaSink do zapisywania danych z platformy Apache Flink do tabeli delty. Usługa DeltaSource do odczytywania tabel delty przy użyciu narzędzia Apache Flink.
Łącznik apache Flink-Delta obejmuje:
W zależności od wersji łącznika można go używać z następującymi wersjami narzędzia Apache Flink:
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
Wymagania wstępne
- Klaster usługi HDInsight Flink 1.17.0 w usłudze AKS
- Łącznik Flink-Delta Connector 0.7.0
- Uzyskiwanie dostępu do usługi ADLS Gen2 przy użyciu tożsamości usługi ZARZĄDZANEj
- IntelliJ na potrzeby programowania
Odczytywanie danych z tabeli delty
Źródło różnicowe może działać w jednym z dwóch trybów opisanych w następujący sposób.
Tryb ograniczony odpowiedni dla zadań wsadowych, w którym chcemy odczytywać zawartość tabeli delty tylko dla określonej wersji tabeli. Utwórz źródło tego trybu przy użyciu interfejsu API DeltaSource.forBoundedRowData.
Tryb ciągły odpowiedni dla zadań przesyłania strumieniowego, w którym chcemy stale sprawdzać tabelę delty pod kątem nowych zmian i wersji. Utwórz źródło tego trybu przy użyciu interfejsu API DeltaSource.forContinuousRowData.
Przykład: Tworzenie źródła dla tabeli delty w celu odczytania wszystkich kolumn w trybie ograniczonym. Nadaje się do zadań wsadowych. W tym przykładzie ładuje najnowszą wersję tabeli.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Zapisywanie w ujściu delty
Ujście delty uwidacznia obecnie następujące metryki linku Flink:
Tworzenie ujścia dla tabel niepartycyjnych
W tym przykładzie pokazano, jak utworzyć obiekt DeltaSink i podłączyć go do istniejącego org.apache.flink.streaming.api.datastream.DataStream
elementu .
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Pełny kod
Odczytywanie danych z tabeli delty i ujście do innej tabeli różnicowej.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Spakuj plik jar i prześlij go do klastra Flink w celu uruchomienia
Przekaż informacje o pliku jar zadania w klastrze AppMode.
Uwaga
Zawsze włączaj
hadoop.classpath.enable
podczas odczytu/zapisu w usłudze ADLS.Prześlij klaster. Powinno być możliwe wyświetlenie zadania w interfejsie użytkownika Flink.
Znajdź wyniki w usłudze ADLS.
Integracja usługi Power BI
Gdy dane są w ujściu różnicowym, możesz uruchomić zapytanie w programie Power BI Desktop i utworzyć raport.
Otwórz program Power BI Desktop, aby pobrać dane przy użyciu łącznika usługi ADLS Gen2.
Adres URL konta magazynu.
Utwórz zapytanie M dla źródła i wywołaj funkcję, która wysyła zapytania do danych z konta magazynu.
Gdy dane będą łatwo dostępne, możesz tworzyć raporty.
Informacje
- Nazwy projektów apache, Apache Flink, Flink i skojarzone z nimi są znakami towarowymi programu Apache Software Foundation (ASF).