Sdílet prostřednictvím


Připojení Apache Flinku® ve službě HDInsight v AKS se službou Azure Event Hubs pro Apache Kafka®

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Zjistěte více prostřednictvím tohoto oznámení.

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro verze preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta, ve verzi preview, nebo ještě nebyly obecně dostupné. Informace o této konkrétní verzi preview najdete v tématu Azure HDInsight na AKS ve verzi preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace na komunitě Azure HDInsight.

Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink. Azure Event Hubs poskytuje koncový bod Apache Kafka v centru událostí, který umožňuje uživatelům připojit se k centru událostí pomocí protokolu Kafka.

V tomto článku se dozvíte, jak připojit Azure Event Hubs k Apache Flink na HDInsight na AKS a probereme následující témata:

  • Vytvořte obor názvů Event Hubs
  • Vytvoření HDInsightu v clusteru AKS pomocí Apache Flinku
  • Spuštění producenta Flink
  • Balíček JAR pro Apache Flink
  • Ověření odeslání úlohy &

Vytvoření oboru názvů služby Event Hubs a Event Hubs

  1. Pokud chcete vytvořit obor názvů a centrum událostí Event Hubs, podívejte se na zde

    snímek obrazovky s nastavením služby Event Hubs

  1. Pomocí existující služby HDInsight ve fondu clusterů AKS můžete vytvořit cluster Flink

  2. Spusťte Flink producenta, který přidá bootstrap.servers a producer.config informace.

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING} připojovacím řetězcem pro obor názvů Event Hubs. Pokyny k získání připojovacího řetězce najdete v podrobnostech o tom, jak získat připojovací řetězec služby Event Hubs.

    Například

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Package com.example.app

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Přidejte fragment kódu pro spuštění Flink Producer.

    snímek obrazovky ukazující, jak otestovat Flink ve službě Event Hubs

  3. Po spuštění kódu se události ukládají v tématu "topic1"

    snímek obrazovky zobrazující službu Event Hubs uloženou v tématu

Odkaz