Partager via


Utilisation du connecteur Flink/Delta

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus grâce à cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez informations sur Azure HDInsight sur AKS en préversion. Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

En utilisant Apache Flink et Delta Lake ensemble, vous pouvez créer une architecture data lakehouse fiable et évolutive. Le connecteur Flink/Delta vous permet d’écrire des données dans des tables Delta avec les transactions ACID et un traitement exactement une fois. Cela signifie que vos flux de données sont cohérents et sans erreur, même si vous redémarrez votre pipeline Flink à partir d’un point de contrôle. Le connecteur Flink/Delta garantit que vos données ne sont pas perdues ou dupliquées, et qu’elles correspondent à la sémantique Flink.

Dans cet article, vous allez apprendre à utiliser le connecteur Flink-Delta.

  • Lisez les données de la table delta.
  • Écrivez les données dans une table delta.
  • Interrogez-le dans Power BI.

Présentation du connecteur Flink/Delta

Flink/Delta Connector est une bibliothèque JVM pour lire et écrire des données à partir d’applications Apache Flink vers des tables Delta utilisant la bibliothèque JVM autonome Delta. Le connecteur fournit exactement une fois les garanties de remise.

Flink/Delta Connector inclut :

DeltaSink pour écrire des données d’Apache Flink vers une table Delta. DeltaSource pour lire des tables Delta à l’aide d’Apache Flink.

Apache Flink-Delta Connector inclut :

Selon la version du connecteur, vous pouvez l’utiliser avec les versions Apache Flink suivantes :

Connector's version	    Flink's version
0.4.x (Sink Only)	    1.12.0 <= X <= 1.14.5
0.5.0	                1.13.0 <= X <= 1.13.6
0.6.0	                X >= 1.15.3 
0.7.0	                X >= 1.16.1         --- We use this in Flink 1.17.0

Conditions préalables

  • Cluster HDInsight Flink 1.17.0 sur AKS
  • Connecteur Flink-Delta 0.7.0
  • Utiliser MSI pour accéder à ADLS Gen2
  • IntelliJ pour le développement

Lire des données à partir d’une table delta

La source Delta peut fonctionner dans l’un des deux modes décrits comme suit.

  • Mode limité adapté aux travaux par lots, où nous voulons lire le contenu de la table Delta uniquement pour une version spécifique de la table. Créez une source de ce mode à l’aide de l’API DeltaSource.forBoundedRowData.

  • Mode continu adapté aux travaux de diffusion en continu, où nous voulons vérifier en permanence la table Delta pour les nouvelles modifications et versions. Créez une source de ce mode à l’aide de l’API DeltaSource.forContinuousRowData.

Exemple : Création de source pour la table Delta, pour lire toutes les colonnes en mode limité. Convient pour les travaux par lots. Cet exemple charge la dernière version de la table.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

Écriture dans le puits Delta

Delta Sink expose actuellement les métriques Flink suivantes :

Capture d’écran montrant le tableau des métriques Flink.

Création d’un récepteur pour les tables nonpartitionées

Dans cet exemple, nous montrons comment créer un DeltaSink et le brancher à un org.apache.flink.streaming.api.datastream.DataStreamexistant.

import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

       createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }

Code complet

Lecture des données d'une table delta et écriture vers une autre table delta.

package contoso.example;

import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;

public class DeltaSourceExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the sink Delta table path
        String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";

        // Define the source Delta table path
        String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";

        // Define the source Delta table path
        RowType rowType = RowType.of(
                DataTypes.STRING().getLogicalType(),  // Date
                DataTypes.STRING().getLogicalType(),  // Time
                DataTypes.STRING().getLogicalType(),  // TargetTemp
                DataTypes.STRING().getLogicalType(),  // ActualTemp
                DataTypes.STRING().getLogicalType(),  // System
                DataTypes.STRING().getLogicalType(),  // SystemAge
                DataTypes.STRING().getLogicalType()   // BuildingID
        );

        // Create a bounded Delta source for all columns
        DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);

        createDeltaSink(deltaStream, deltaTablePath_sink, rowType);

        // Execute the Flink job
        env.execute("Delta datasource and sink Example");
    }

    public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
            StreamExecutionEnvironment env,
            String deltaTablePath) {

        DeltaSource<RowData> deltaSource = DeltaSource
                .forBoundedRowData(
                        new Path(deltaTablePath),
                        new Configuration())
                .build();

        return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
    }

    public static DataStream<RowData> createDeltaSink(
            DataStream<RowData> stream,
            String deltaTablePath,
            RowType rowType) {
        DeltaSink<RowData> deltaSink = DeltaSink
                .forRowData(
                        new Path(deltaTablePath),
                        new Configuration(),
                        rowType)
                .build();
        stream.sinkTo(deltaSink);
        return stream;
    }
}

Maven Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>contoso.example</groupId>
    <artifactId>FlinkDeltaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop-version>3.3.4</hadoop-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-standalone_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-flink</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
  1. Chargez le fichier jar dans ABFS. Capture d’écran montrant les fichiers jar en mode Application.

  2. Transmettez les informations du fichier jar de la tâche dans le cluster AppMode.

    capture d’écran montrant la configuration du cluster.

    Remarque

    Activez toujours hadoop.classpath.enable lors de la lecture/écriture dans ADLS.

  3. Soumettez le cluster, et vous devriez être en mesure de voir la tâche dans l’interface utilisateur Flink.

    Capture d’écran montrant le tableau de bord Flink.

  4. Recherchez les résultats dans ADLS.

    Capture d’écran montrant la sortie.

Intégration de Power BI

Une fois que les données se trouvent dans le récepteur delta, vous pouvez exécuter la requête dans Power BI Desktop et créer un rapport.

  1. Ouvrez Power BI Desktop pour obtenir les données à l’aide du connecteur ADLS Gen2.

    Capture d’écran montrant Power BI Desktop.

    Capture d’écran montrant le connecteur ADLSGen 2.

  2. URL du compte de stockage.

    Capture d’écran montrant l’URL du compte de stockage.

    Capture d’écran montrant les détails d’ADLS Gen2.

  3. Créez une requête M pour la source et appelez la fonction, qui interroge les données à partir du compte de stockage.

  4. Une fois les données facilement disponibles, vous pouvez créer des rapports.

    Capture d’écran montrant comment créer des rapports.

Références