Připojení Apache Flinku® ve službě HDInsight v AKS se službou Azure Event Hubs pro Apache Kafka®
Důležitý
Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Zjistěte více prostřednictvím tohoto oznámení.
Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.
Důležitý
Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro verze preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta, ve verzi preview, nebo ještě nebyly obecně dostupné. Informace o této konkrétní verzi preview najdete v tématu Azure HDInsight na AKS ve verzi preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace na komunitě Azure HDInsight.
Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink. Azure Event Hubs poskytuje koncový bod Apache Kafka v centru událostí, který umožňuje uživatelům připojit se k centru událostí pomocí protokolu Kafka.
V tomto článku se dozvíte, jak připojit Azure Event Hubs k Apache Flink na HDInsight na AKS a probereme následující témata:
- Vytvořte obor názvů Event Hubs
- Vytvoření HDInsightu v clusteru AKS pomocí Apache Flinku
- Spuštění producenta Flink
- Balíček JAR pro Apache Flink
- Ověření odeslání úlohy &
Vytvoření oboru názvů služby Event Hubs a Event Hubs
Pokud chcete vytvořit obor názvů a centrum událostí Event Hubs, podívejte se na zde
Nastavení clusteru Flink ve službě HDInsight v AKS
Pomocí existující služby HDInsight ve fondu clusterů AKS můžete vytvořit cluster Flink
Spusťte Flink producenta, který přidá bootstrap.servers a
producer.config
informace.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Nahraďte
{YOUR.EVENTHUBS.CONNECTION.STRING}
připojovacím řetězcem pro obor názvů Event Hubs. Pokyny k získání připojovacího řetězce najdete v podrobnostech o tom, jak získat připojovací řetězec služby Event Hubs.Například
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Sestavení JAR pro Flink
Package com.example.app
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Přidejte fragment kódu pro spuštění Flink Producer.
Po spuštění kódu se události ukládají v tématu "topic1"
Odkaz
- webu Apache Flink
- Názvy projektů Apache, Apache Kafka, Kafka, Apache Flink, Flink a přidružených open-source projektů jsou ochranné známkyApache Software Foundation (ASF).