Udostępnij za pośrednictwem


Łączenie Apache Flink® w usłudze HDInsight w AKS z Azure Event Hubs dla Apache Kafka®

Ważny

Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej przy pomocy tego ogłoszenia.

Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.

Ważny

Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe Warunki Użytkowania dla zapoznawczych wersji Microsoft Azure zawierają dodatkowe warunki prawne mające zastosowanie do funkcji Azure, które są w wersji beta, zapoznawczej, lub które w inny sposób nie zostały jeszcze wydane jako ogólnodostępne. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie dotyczące AskHDInsight, aby uzyskać więcej informacji na temat społeczności usługi Azure HDInsight.

Dobrze znany przypadek użycia platformy Apache Flink to analiza strumienia. Popularny wybór wielu użytkowników to wykorzystanie strumieni danych pozyskiwanych przy użyciu Apache Kafka. Typowe instalacje Flink i Kafka zaczynają się od wysyłania strumieni zdarzeń do Kafka, które mogą być konsumowane przez zadania Flink. Usługa Azure Event Hubs udostępnia punkt końcowy platformy Apache Kafka w centrum zdarzeń, który umożliwia użytkownikom łączenie się z centrum zdarzeń przy użyciu protokołu Kafka.

W tym artykule omówimy, jak nawiązać połączenie Azure Event Hubs z usługą Apache Flink w usłudze HDInsight na AKS i omówimy następujące kwestie

  • Tworzenie przestrzeni nazw usługi Event Hubs
  • Tworzenie usługi HDInsight w klastrze usługi AKS za pomocą narzędzia Apache Flink
  • Uruchom producenta Flink
  • Pakiet Jar dla platformy Apache Flink
  • Weryfikacja zgłoszenia zadania &

Tworzenie przestrzeni nazw usługi Event Hubs i usługi Event Hubs

  1. Aby utworzyć przestrzeń nazw usługi Event Hubs i usługę Event Hubs, zobacz tutaj

    Zrzut ekranu przedstawiający konfigurację usługi Event Hubs.

  1. Korzystając z istniejącej puli klastrów HDInsight na klastrze AKS, można utworzyć klaster Flink .

  2. Uruchom producenta Flink, dodając bootstrap.servers oraz informacje dotyczące producer.config.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING} parametrami połączenia dla przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące pobierania parametrów połączenia, zobacz szczegółowe informacje na temat sposobu pobierania parametrów połączenia usługi Event Hubs.

    Na przykład

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Pakiet com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Dodaj fragment kodu, aby uruchomić producenta Flink.

    zrzut ekranu przedstawiający sposób testowania Flinka w usłudze Event Hubs.

  3. Po wykonaniu kodu zdarzenia są przechowywane w temacie "topic1"

    Zrzut ekranu przedstawiający usługę Event Hubs przechowywaną w temacie.

Odniesienie