Připojení Apache Flinku® ve službě HDInsight v AKS se službou Azure Event Hubs pro Apache Kafka®
Poznámka:
Azure HDInsight vyřadíme ze služby AKS 31. ledna 2025. Před 31. lednem 2025 budete muset migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure, abyste se vyhnuli náhlému ukončení úloh. Zbývající clustery ve vašem předplatném se zastaví a odeberou z hostitele.
Do data vyřazení bude k dispozici pouze základní podpora.
Důležité
Tato funkce je aktuálně dostupná jako ukázková verze. Doplňkové podmínky použití pro Microsoft Azure Preview obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi Preview nebo ještě nejsou vydány v obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight o službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích v komunitě Azure HDInsight.
Známý případ použití pro Apache Flink je stream analytics. Oblíbená volba mnoha uživatelů k používání datových proudů, které se ingestují pomocí Apache Kafka. Typické instalace Flinku a Kafka začínají tím, že streamy událostí se odsílají do Kafka, které můžou využívat úlohy Flink. Azure Event Hubs poskytuje koncový bod Apache Kafka v centru událostí, který umožňuje uživatelům připojit se k centru událostí pomocí protokolu Kafka.
V tomto článku se podíváme, jak připojit službu Azure Event Hubs k Apache Flinku ve službě HDInsight v AKS a probereme následující témata:
- Vytvoření oboru názvů služby Event Hubs
- Vytvoření HDInsightu v clusteru AKS pomocí Apache Flinku
- Spuštění producenta Flink
- Package Jar for Apache Flink
- Odeslání a ověření úlohy
Vytvoření oboru názvů služby Event Hubs a event Hubs
Pokud chcete vytvořit obor názvů služby Event Hubs a službu Event Hubs, podívejte se sem.
Nastavení clusteru Flink ve službě HDInsight v AKS
Pomocí existující služby HDInsight ve fondu clusterů AKS můžete vytvořit cluster Flink.
Spusťte producenta Flink, který přidává bootstrap.servers a
producer.config
informace.bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 client.id=FlinkExampleProducer sasl.mechanism=PLAIN security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="$ConnectionString" \ password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
Nahraďte
{YOUR.EVENTHUBS.CONNECTION.STRING}
připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v podrobnostech o tom, jak získat připojovací řetězec služby Event Hubs.Příklad:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Balení JAR pro Flink
com.example.app balení;
package contoso.example; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.io.FileReader; import java.util.Properties; public class AzureEventHubDemo { public static void main(String[] args) throws Exception { // 1. get stream execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); ParameterTool parameters = ParameterTool.fromArgs(args); String input = parameters.get("input"); Properties properties = new Properties(); properties.load(new FileReader(input)); // 2. generate stream input DataStream<String> stream = createStream(env); // 3. sink to eventhub KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .build(); stream.sinkTo(sink); // 4. execute the stream env.execute("Produce message to Azure event hub"); } public static DataStream<String> createStream(StreamExecutionEnvironment env){ return env.generateSequence(0, 200) .map(new MapFunction<Long, String>() { @Override public String map(Long in) { return "FLINK PRODUCE " + in; } }); } }
Přidejte fragment kódu pro spuštění Flink Producer.
Po spuštění kódu se události ukládají v tématu "topic1" (téma1 ).
Reference
- Web Apache Flink
- Názvy apache, Apache Kafka, Kafka, Apache Flink, Flink a přidružených opensourcových projektů jsou ochranné známky Apache Software Foundation (ASF).