Delen via


Apache Flink® on HDInsight in AKS verbinden met Azure Event Hubs voor Apache Kafka®

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Meer informatie met deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

Een bekende use case voor Apache Flink is stream analytics. De populaire keuze van veel gebruikers om de gegevensstromen te gebruiken, die worden verwerkt met behulp van Apache Kafka. Typische installaties van Flink en Kafka beginnen met gebeurtenisstromen die naar Kafka worden verzonden, die kunnen worden verwerkt door Flink-taken. Azure Event Hubs biedt een Apache Kafka-eindpunt op een Event Hub, waarmee gebruikers verbinding kunnen maken met de Event Hub met behulp van het Kafka-protocol.

In dit artikel verkennen we hoe u Azure Event Hubs- kunt verbinden met Apache Flink in HDInsight op AKS- en het volgende behandelen

  • Een Event Hubs-naamruimte maken
  • Een HDInsight op AKS-cluster maken met Apache Flink
  • Voer Flink producer uit
  • JAR-pakket voor Apache Flink
  • Validatie van taakindiening &

Event Hubs-naamruimte en Event Hubs maken

  1. Als u Event Hubs-naamruimte en Event Hubs wilt maken, raadpleegt u hier

    Screenshot van de setup van Event Hubs.

  1. Met behulp van bestaande HDInsight in AKS-clusterpool kunt u een Flink-cluster maken

  2. Voer de Flink-producent uit door de bootstrap.servers en de producer.config info toe te voegen

    bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
    client.id=FlinkExampleProducer
    sasl.mechanism=PLAIN
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    
  3. Vervang {YOUR.EVENTHUBS.CONNECTION.STRING} door de verbindingsreeks voor uw Event Hubs-naamruimte. Raadpleeg voor instructies over het verkrijgen van de verbindingsreeks de gedetailleerde uitleg over het verkrijgen van een Event Hubs-verbindingsreeks.

    Bijvoorbeeld

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString"
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
    
  1. Package com.example.app;

       package contoso.example;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    import org.apache.flink.connector.kafka.sink.KafkaSink;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.FileReader;
    import java.util.Properties;
    
    public class AzureEventHubDemo {
    
       public static void main(String[] args) throws Exception {
           // 1. get stream execution environment
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
           ParameterTool parameters = ParameterTool.fromArgs(args);
           String input = parameters.get("input");
           Properties properties = new Properties();
           properties.load(new FileReader(input));
    
           // 2. generate stream input
           DataStream<String> stream = createStream(env);
    
           // 3. sink to eventhub
           KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
                  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                          .setTopic("topic1")
                          .setValueSerializationSchema(new SimpleStringSchema())
                           .build())
                   .build();
    
           stream.sinkTo(sink);
    
           // 4. execute the stream
           env.execute("Produce message to Azure event hub");
       }
    
       public static DataStream<String> createStream(StreamExecutionEnvironment env){
           return env.generateSequence(0, 200)
                   .map(new MapFunction<Long, String>() {
                       @Override
                       public String map(Long in) {
                           return "FLINK PRODUCE " + in;
                       }
                   });
       }
    } 
    
  2. Voeg het fragment toe om de Flink Producer uit te voeren.

    Schermopname van het testen van Flink in Event Hubs.

  3. Zodra de code is uitgevoerd, worden de gebeurtenissen opgeslagen in het onderwerp onderwerp1

    screenshot van Event Hubs opgeslagen in onderwerp.

Referentie