Freigeben über


Verbindung von Apache Flink® auf HDInsight auf AKS mit Azure Event Hubs für Apache Kafka®

Wichtig

Azure HDInsight auf AKS wurde am 31. Januar 2025 eingestellt. Erfahren Sie mehr über mit dieser Ankündigung.

Sie müssen Ihre Workloads zu Microsoft Fabric oder ein gleichwertiges Azure-Produkt migrieren, um eine abrupte Beendigung Ihrer Workloads zu vermeiden.

Wichtig

Dieses Feature befindet sich derzeit in der Vorschau. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure Previews weitere rechtliche Bestimmungen enthalten, die für Azure-Features gelten, die in der Betaversion, in der Vorschau oder auf andere Weise noch nicht in die allgemeine Verfügbarkeit veröffentlicht werden. Informationen zu dieser spezifischen Vorschau finden Sie unter Azure HDInsight auf AKS-Vorschauinformationen. Für Fragen oder Featurevorschläge senden Sie bitte eine Anfrage auf AskHDInsight mit den Details und folgen Sie uns, um weitere Updates von Azure HDInsight Communityzu erhalten.

Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Die beliebte Wahl vieler Benutzer, die Datenströme zu verwenden, die mit Apache Kafka aufgenommen werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisströmen, die an Kafka gesendet werden und von Flink-Aufträgen genutzt werden können. Azure Event Hubs stellt einen Apache Kafka-Endpunkt auf einem Event Hub bereit, mit dem Benutzer mithilfe des Kafka-Protokolls eine Verbindung mit dem Event Hub herstellen können.

In diesem Artikel wird erläutert, wie Sie Azure Event Hubs mit Apache Flink auf HDInsight auf AKS verbinden und die folgenden Themen abdecken:

  • Erstellen eines Event Hubs-Namespaces
  • Erstellen Sie ein HDInsight auf einem AKS-Cluster mit Apache Flink.
  • Flink-Produzent ausführen
  • Package Jar für Apache Flink
  • Validierung der Auftragsabgabe &

Erstellen von Event Hubs-Namespaces und Event Hubs

  1. Um Event Hubs-Namespaces und Event Hubs zu erstellen, siehe hier

    Screenshot mit dem Setup von Event Hubs.

  1. Mit vorhandenem HDInsight im AKS-Clusterpool können Sie einen Flink-Cluster erstellen

  2. Führen Sie den Flink-Produzenten aus, der die bootstrap.servers und die producer.config Informationen hinzufügt.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für den Event Hubs-Namespace. Eine Anleitung zum Abrufen der Verbindungszeichenfolge finden Sie unter den Details zum Abrufen einer Event Hubs-Verbindungszeichenfolge.

    Zum Beispiel

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Package com.example.app

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Fügen Sie den Codeausschnitt hinzu, um den Flink Producer auszuführen.

    Screenshot, der zeigt, wie Flink in Event Hubs getestet wird.

  3. Sobald der Code ausgeführt wird, werden die Ereignisse im Topic "topic1" gespeichert.

    Screenshot mit im Thema abgelegten Event Hubs.

Referenz