Verbinden von Apache Flink® auf HDInsight auf AKS mit Azure Event Hubs für Apache Kafka®
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Ein bekannter Anwendungsfall für Apache Flink ist Stream Analytics. Dies ist eine beliebte Wahl vieler Benutzer*innen, um die Datenströme zu verwenden, die mit Apache Kafka erfasst werden. Typische Installationen von Flink und Kafka beginnen mit Ereignisstreams, die an Kafka übertragen werden und von Flink-Aufträgen genutzt werden können. Azure Event Hubs stellt einen Apache Kafka-Endpunkt auf einem Event Hub bereit, mit dem Benutzer mithilfe des Kafka-Protokolls eine Verbindung mit dem Event Hub herstellen können.
In diesem Artikel erkunden wir, wie man Azure Event Hubs mit Apache Flink auf HDInsight auf AKS verbindet und es wird Folgendes behandelt
- Erstellen eines Event Hubs-Namespace
- Erstellen eines HDInsight on AKS-Clusters mit Apache Flink
- Ausführen des Flink-Producers
- Packen der JAR-Datei für Apache Flink
- Auftragsübermittlung und Überprüfung
Erstellen eines Azure Event Hubs-Namespace und einer Event Hubs-Instanz
Informationen zum Erstellen eines Event Hubs-Namespace und einer Event Hubs-Instanz finden Sie hier.
Einrichten eines Flink-Clusters in HDInsight on AKS
Mithilfe eines vorhandenen HDInsight on AKS-Clusterpools können Sie einen Flink-Cluster erstellen.
Führen Sie den Flink-Producer aus, der bootstrap.servers und die
producer.config
-Informationen hinzufügt.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Ersetzen Sie
{YOUR.EVENTHUBS.CONNECTION.STRING}
durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Detaillierte Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge.Ein auf ein Objekt angewendeter
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Packen der JAR-Datei für Flink
Packen Sie „com.example.app“:
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Fügen Sie den Codeschnipsel hinzu, um den Flink-Producer auszuführen:
Nach dem Ausführen des Codes werden die Ereignisse im Thema topic1 gespeichert.
Verweis
- Apache Flink-Website
- Apache, Apache Kafka, Kafka, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Handelsmarken der Apache Software Foundation (ASF).