Łączenie Apache Flink® w usłudze HDInsight w AKS z Azure Event Hubs dla Apache Kafka®
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej przy pomocy tego ogłoszenia.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe Warunki Użytkowania dla zapoznawczych wersji Microsoft Azure zawierają dodatkowe warunki prawne mające zastosowanie do funkcji Azure, które są w wersji beta, zapoznawczej, lub które w inny sposób nie zostały jeszcze wydane jako ogólnodostępne. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie dotyczące AskHDInsight, aby uzyskać więcej informacji na temat społeczności usługi Azure HDInsight.
Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór wielu użytkowników to wykorzystanie strumieni danych pozyskiwanych przy użyciu Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wysyłania strumieni zdarzeń do Kafka, które mogą być konsumowane przez zadania Flink. Usługa Azure Event Hubs udostępnia punkt końcowy platformy Apache Kafka w centrum zdarzeń, który umożliwia użytkownikom łączenie się z centrum zdarzeń przy użyciu protokołu Kafka.
W tym artykule omówimy, jak nawiązać połączenie Azure Event Hubs z usługą Apache Flink w usłudze HDInsight na AKS i omówimy następujące kwestie
- Tworzenie przestrzeni nazw usługi Event Hubs
- Tworzenie usługi HDInsight w klastrze usługi AKS za pomocą narzędzia Apache Flink
- Uruchom producenta Flink
- Pakiet Jar dla platformy Apache Flink
- Weryfikacja zgłoszenia zadania &
Tworzenie przestrzeni nazw usługi Event Hubs i usługi Event Hubs
Aby utworzyć przestrzeń nazw usługi Event Hubs i usługę Event Hubs, zobacz tutaj
Konfigurowanie klastra Flink w usłudze HDInsight w usłudze AKS
Korzystając z istniejącej puli klastrów HDInsight na klastrze AKS, można utworzyć klaster Flink .
Uruchom producenta Flink, dodając bootstrap.servers oraz informacje dotyczące
producer.config
.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Zastąp
{YOUR.EVENTHUBS.CONNECTION.STRING}
parametrami połączenia dla przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące pobierania parametrów połączenia, zobacz szczegółowe informacje na temat sposobu pobierania parametrów połączenia usługi Event Hubs.Na przykład
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Pakowanie pliku JAR dla Flink
Pakiet com.example.app;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Dodaj fragment kodu, aby uruchomić producenta Flink.
Po wykonaniu kodu zdarzenia są przechowywane w temacie "topic1"
Odniesienie
- Witryna internetowa Apache Flink
- Nazwy projektów typu apache, Apache Kafka, Apache Flink, Flink i skojarzone z nimi są znaki towaroweApache Software Foundation (ASF).