Dela via


Så här använder du Flink/Delta Connector

Viktig

Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Läs mer med det här meddelandet.

Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.

Viktig

Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts till allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. För frågor eller funktionsförslag, vänligen skicka en begäran på AskHDInsight med detaljerna samt följ oss för fler uppdateringar från Azure HDInsight Community.

Genom att använda Apache Flink och Delta Lake tillsammans kan du skapa en tillförlitlig och skalbar data lakehouse-arkitektur. Med Flink/Delta Connector kan du skriva data till Delta-tabeller med ACID-transaktioner och exakt en gångs bearbetning. Det innebär att dina dataströmmar är konsekventa och felfria, även om du startar om Flink-pipelinen från en kontrollpunkt. Flink/Delta Connector säkerställer att dina data inte tappas bort eller dupliceras och att de matchar Flink-semantiken.

I den här artikeln får du lära dig hur du använder Flink-Delta-kontakten.

  • Läs data från deltatabellen.
  • Skriv data till en deltatabell.
  • Sök i Power BI.

Vad är Flink/Delta-koppling

Flink/Delta Connector är ett JVM-bibliotek för att läsa och skriva data från Apache Flink-program till Delta-tabeller med hjälp av Delta Standalone JVM-biblioteket. Anslutningen ger garantier för exakt en leverans.

Flink/Delta Connector innehåller:

DeltaSink för att skriva data från Apache Flink till en Delta-tabell. DeltaSource för att läsa Delta-tabeller med Apache Flink.

Apache Flink-Delta Connector innehåller:

Beroende på vilken version av anslutningen du har kan du använda den med följande versioner av Apache Flink:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Förutsättningar

  • HDInsight Flink 1.17.0-kluster på AKS
  • Flink-Delta Connector 0.7.0
  • Använda MSI för att komma åt ADLS Gen2
  • IntelliJ för utveckling

Läser data från deltatabellen

Delta Source kan fungera i något av två lägen, vilket beskrivs på följande sätt.

  • Begränsat läge Lämpligt för batchjobb, där vi endast vill läsa innehållet i Delta-tabellen för specifik tabellversion. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forBoundedRowData.

  • Kontinuerligt läge Lämpligt för direktuppspelningsjobb, där vi kontinuerligt vill kontrollera Delta-tabellen för nya ändringar och versioner. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forContinuousRowData.

Exempel: Skapa källa för Delta-tabellen för att läsa alla kolumner i begränsat läge. Lämplig för batchjobb. Det här exemplet läser in den senaste tabellversionen.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Skriva till Delta-sänk

Delta Sink exponerar för närvarande följande Flink-mått:

Skärmbild som visar tabellen för Flink-mått.

Skapande av sink för icke-partitionerade tabeller

I det här exemplet visar vi hur du skapar en DeltaSink och kopplar den till en befintlig org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Fullständig kod

Läs data från en deltatabell och skriv till en annan deltatabell.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Ladda upp jar-filen till ABFS. Skärmbild som visar jar-filer i appläge.

  2. Skicka jobbinformationen i AppMode-klustret.

    Skärmbild som visar klusterkonfiguration.

    Not

    Aktivera alltid hadoop.classpath.enable när du läser/skriver till ADLS.

  3. Skicka klustret. Du bör kunna se jobbet i Flink-användargränssnittet.

    Skärmbild som visar Flink-instrumentpanelen.

  4. Hitta resultat i ADLS.

    Skärmbild som visar utdata.

Power BI-integrering

När data finns i delta-samlingen kan du köra sökfrågan i Power BI Desktop och skapa en rapport.

  1. Öppna Power BI Desktop för att hämta data med hjälp av ADLS Gen2-anslutningsappen.

    Skärmbild som visar Power BI Desktop.

    Skärmbild som visar ADLSGen 2-anslutningsappen.

  2. URL för lagringskontot.

    Skärmbild som visar URL:en för lagringskontot.

    Skärmbild som visar ADLS Gen2-information.

  3. Skapa M-fråga för källan och anropa funktionen, som frågar efter data från lagringskontot.

  4. När data är lättillgängliga kan du skapa rapporter.

    Skärmbild som visar hur du skapar rapporter.

Referenser