Dela via


Så här använder du Flink/Delta Connector

Kommentar

Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.

Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.

Viktigt!

Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.

Genom att använda Apache Flink och Delta Lake tillsammans kan du skapa en tillförlitlig och skalbar data lakehouse-arkitektur. Med Flink/Delta Connector kan du skriva data till Delta-tabeller med ACID-transaktioner och exakt en gång bearbetning. Det innebär att dina dataströmmar är konsekventa och felfria, även om du startar om Flink-pipelinen från en kontrollpunkt. Flink/Delta Connector säkerställer att dina data inte tappas bort eller dupliceras och att de matchar Flink-semantiken.

I den här artikeln får du lära dig hur du använder Flink-Delta-anslutningsappen.

  • Läs data från deltatabellen.
  • Skriv data till en deltatabell.
  • Fråga den i Power BI.

Vad är Flink/Delta-anslutningsprogram

Flink/Delta Connector är ett JVM-bibliotek för att läsa och skriva data från Apache Flink-program till Delta-tabeller med hjälp av Delta Standalone JVM-biblioteket. Anslutningsappen ger leveransgarantier exakt en gång.

Flink/Delta Connector innehåller:

DeltaSink för att skriva data från Apache Flink till en Delta-tabell. DeltaSource för att läsa Delta-tabeller med Apache Flink.

Apache Flink-Delta Connector innehåller:

Beroende på vilken version av anslutningsappen du kan använda med följande Apache Flink-versioner:

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Förutsättningar

  • HDInsight Flink 1.17.0-kluster på AKS
  • Flink-Delta Connector 0.7.0
  • Använda MSI för att komma åt ADLS Gen2
  • IntelliJ för utveckling

Läsa data från deltatabell

Delta Source kan fungera i något av två lägen, vilket beskrivs på följande sätt.

  • Begränsat läge Lämpligt för batchjobb, där vi endast vill läsa innehållet i Delta-tabellen för specifik tabellversion. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forBoundedRowData.

  • Kontinuerligt läge Lämpligt för direktuppspelningsjobb, där vi kontinuerligt vill kontrollera Delta-tabellen för nya ändringar och versioner. Skapa en källa för det här läget med hjälp av API:et DeltaSource.forContinuousRowData.

Exempel: Skapa källa för Delta-tabellen för att läsa alla kolumner i begränsat läge. Lämplig för batchjobb. Det här exemplet läser in den senaste tabellversionen.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Skriva till Delta-mottagare

Delta Sink exponerar för närvarande följande Flink-mått:

Skärmbild som visar tabellen för Flink-mått.

Skapa mottagare för icke-partitionerade tabeller

I det här exemplet visar vi hur du skapar en DeltaSink och ansluter den till en befintlig org.apache.flink.streaming.api.datastream.DataStream.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Fullständig kod

Läsa data från en deltatabell och mottagare till en annan deltatabell.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Ladda upp jar-filen till ABFS. Skärmbild som visar jar-filer i appläge.

  2. Skicka information om jobbburken i AppMode-klustret.

    Skärmbild som visar klusterkonfiguration.

    Kommentar

    Aktivera hadoop.classpath.enable alltid när du läser/skriver till ADLS.

  3. Skicka klustret. Du bör kunna se jobbet i Flink-användargränssnittet.

    Skärmbild som visar Flink-instrumentpanelen.

  4. Hitta resultat i ADLS.

    Skärmbild som visar utdata.

Power BI-integrering

När data finns i deltamottagaren kan du köra frågan i Power BI Desktop och skapa en rapport.

  1. Öppna Power BI Desktop för att hämta data med hjälp av ADLS Gen2-anslutningsappen.

    Skärmbild som visar Power BI Desktop.

    Skärmbild som visar anslutningsappen ADLSGen 2.

  2. URL för lagringskontot.

    Skärmbild som visar URL:en för lagringskontot.

    Skärmbild som visar ADLS Gen2-information.

  3. Skapa M-fråga för källan och anropa funktionen, som frågar efter data från lagringskontot.

  4. När data är lättillgängliga kan du skapa rapporter.

    Skärmbild som visar hur du skapar rapporter.

Referenser

  • Apache, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).