Tutorial: Utilizar a Transmissão em Fluxo Estruturada do Apache Spark com o Apache Kafka no HDInsight
Este tutorial demonstra como usar o Apache Spark Structured Streaming para ler e gravar dados com o Apache Kafka no Azure HDInsight.
O Spark Structured Streaming é um mecanismo de processamento de fluxo construído no Spark SQL. Permite-lhe expressar computações de transmissão em fluxo, tal como a computação em lotes o faz em dados estáticos.
Neste tutorial, irá aprender a:
- Usar um modelo do Azure Resource Manager para criar clusters
- Usar o Spark Structured Streaming com Kafka
Quando terminar as etapas neste documento, lembre-se de excluir os clusters para evitar cobranças excessivas.
Pré-requisitos
jq, um processador JSON de linha de comando. Consulte https://stedolan.github.io/jq/.
Familiaridade com o uso de Notebooks Jupyter com o Spark no HDInsight. Para obter mais informações, consulte o documento Carregar dados e executar consultas com o Apache Spark no HDInsight .
Familiaridade com a linguagem de programação Scala. O código utilizado neste tutorial está escrito em Scala.
Familiaridade com a criação de tópicos do Kafka. Para obter mais informações, consulte o documento de início rápido do Apache Kafka no HDInsight.
Importante
Os passos neste documento requerem um grupo de recursos do Azure que contém um cluster do Spark no HDInsight e um cluster do Kafka no HDInsight. Estes dois clusters estão localizados numa Rede Virtual do Azure, o que permite que o cluster do Spark comunique diretamente com o cluster do Kafka.
Para sua comodidade, este documento tem uma ligação para o modelo que pode criar todos os recursos do Azure necessários.
Para obter mais informações sobre como usar o HDInsight em uma rede virtual, consulte o documento Planejar uma rede virtual para o HDInsight .
Streaming estruturado com Apache Kafka
A transmissão em Fluxo Estruturada do Spark é um motor de processamento de fluxos incorporado no motor SQL do Spark. Ao usar o Structured Streaming, você pode escrever consultas de streaming da mesma forma que escreve consultas em lote.
Os fragmentos de código seguintes demonstram a leitura a partir do Kafka e o armazenamento num ficheiro. A primeira é uma operação em lote, enquanto que a segunda é uma operação de transmissão em fluxo:
// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
.write
.format("parquet")
.option("path","/example/batchtripdata")
.option("checkpointLocation", "/batchcheckpoint")
.save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
.writeStream
.format("parquet")
.option("path","/example/streamingtripdata")
.option("checkpointLocation", "/streamcheckpoint")
.start.awaitTermination(30000)
Em ambos os fragmentos, os dados são lidos a partir do Kafka e escritos num ficheiro. As diferenças entre os exemplos são:
Batch | Transmissão |
---|---|
read |
readStream |
write |
writeStream |
save |
start |
A operação de streaming também usa awaitTermination(30000)
, que para o fluxo após 30.000 ms.
Para utilizar a Transmissão em Fluxo Estruturada com o Kafka, o projeto tem de ter uma dependência no pacote org.apache.spark : spark-sql-kafka-0-10_2.11
. A versão deste pacote deve corresponder à versão do Spark no HDInsight. Para o Spark 2.4 (disponível no HDInsight 4.0), você pode encontrar as informações de dependência para diferentes tipos de projeto em https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.
Para o Jupyter Notebook usado com este tutorial, a célula a seguir carrega essa dependência de pacote:
%%configure -f
{
"conf": {
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
}
}
Criar os clusters
O Apache Kafka no HDInsight não fornece acesso aos corretores Kafka pela internet pública. Tudo o que utilize Kafka tem de estar na mesma rede virtual do Azure. Neste tutorial, os clusters do Kafka e do Spark estão localizados na mesma rede virtual do Azure.
O diagrama seguinte mostra como a comunicação flui entre o Spark e o Kafka:
Nota
O serviço Kafka está limitado à comunicação na rede virtual. Outros serviços em cluster, como SSH e Ambari, podem ser acedidos através da Internet. Para obter mais informações sobre as portas públicas disponíveis com o HDInsight, veja Portas e URIs utilizados pelo HDInsight.
Para criar uma Rede Virtual do Azure e, em seguida, criar os clusters do Kafka e do Spark na mesma, utilize os passos abaixo:
Utilize o botão seguinte para iniciar sessão no Azure e abrir o modelo no Portal do Azure.
O modelo do Azure Resource Manager está localizado em https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.
Este modelo cria os seguintes recursos:
Um Kafka no cluster HDInsight 4.0 ou 5.0.
Um Spark 2.4 ou 3.1 no cluster HDInsight 4.0 ou 5.0.
Uma Rede Virtual do Azure, que contém os clusters do HDInsight.
Importante
O notebook de streaming estruturado usado neste tutorial requer o Spark 2.4 ou 3.1 no HDInsight 4.0 ou 5.0. Se utilizar uma versão anterior do Spark no HDInsight, irá receber mensagens de erro ao utilizar o bloco de notas.
Utilize as seguintes informações para preencher as entradas da secção Modelo personalizado:
Definição Value Subscrição a subscrição do Azure Grupo de recursos O grupo de recursos que contém os recursos. Localização A região do Azure na qual os recursos são criados. Nome de Cluster do Spark O nome do cluster do Spark. Os primeiros seis carateres devem ser diferentes do nome do cluster do Kafka. Nome do Cluster do Kafka O nome do cluster do Kafka. Os primeiros seis carateres devem ser diferentes do nome do cluster do Spark. Nome de Utilizador de Início de Sessão do Cluster O nome de utilizador administrador para os clusters. Palavra-passe de Início de Sessão do Cluster A palavra-passe de utilizador administrador para os clusters. Nome de Utilizador SSH O nome de utilizador SSH para os clusters. Palavra-passe do SSH A palavra-passe do utilizador SSH. Leia os Termos e Condições e, em seguida, selecione Concordo com os termos e condições mencionados acima.
Selecione Comprar.
Nota
A criação dos clusters pode demorar até 20 minutos.
Usar o Spark Structured Streaming
Este exemplo demonstra como usar o Spark Structured Streaming com Kafka no HDInsight. Ele usa dados sobre viagens de táxi, que são fornecidos pela cidade de Nova York. O conjunto de dados utilizado por este caderno é de 2016 Green Taxi Trip Data.
Reúna informações do host. Use os comandos curl e jq abaixo para obter suas informações sobre o Kafka ZooKeeper e os hosts do corretor. Os comandos são projetados para um prompt de comando do Windows, pequenas variações serão necessárias para outros ambientes. Substitua
KafkaCluster
pelo nome do cluster Kafka eKafkaPassword
pela senha de login do cluster. Além disso, substituaC:\HDI\jq-win64.exe
pelo caminho real para sua instalação jq. Insira os comandos em um prompt de comando do Windows e salve a saída para uso em etapas posteriores.REM Enter cluster name in lowercase set CLUSTERNAME=KafkaCluster set PASSWORD=KafkaPassword curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")" curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
Em um navegador da Web, navegue até
https://CLUSTERNAME.azurehdinsight.net/jupyter
, ondeCLUSTERNAME
é o nome do cluster. Quando lhe for pedido, introduza o início de sessão do cluster (admin) e a palavra-passe utilizada quando criou o cluster.Selecione Novo > Spark para criar um bloco de anotações.
O streaming do Spark tem microbatching, o que significa que os dados vêm como lotes e executores executados nos lotes de dados. Se o executor tiver tempo limite ocioso menor do que o tempo necessário para processar o lote, os executores serão constantemente adicionados e removidos. Se o tempo limite de inatividade dos executores for maior do que a duração do lote, o executor nunca será removido. Portanto, recomendamos que você desative a alocação dinâmica definindo spark.dynamicAllocation.enabled como false ao executar aplicativos de streaming.
Carregue os pacotes usados pelo Bloco de Anotações inserindo as seguintes informações em uma célula do Bloco de Anotações. Execute o comando usando CTRL + ENTER.
%%configure -f { "conf": { "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11", "spark.dynamicAllocation.enabled": false } }
Crie o tópico Kafka. Edite o comando abaixo substituindo
YOUR_ZOOKEEPER_HOSTS
pelas informações do host do Zookeeper extraídas na primeira etapa. Digite o comando editado no seu Jupyter Notebook para criar otripdata
tópico.%%bash export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS" /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
Recupere dados sobre viagens de táxi. Digite o comando na próxima célula para carregar dados sobre viagens de táxi em Nova York. Os dados são carregados em um dataframe e, em seguida, o dataframe é exibido como a saída da célula.
import spark.implicits._ // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json" val result = scala.io.Source.fromURL(url).mkString // Create a dataframe from the JSON data val taxiDF = spark.read.json(Seq(result).toDS) // Display the dataframe containing trip data taxiDF.show()
Defina as informações de hosts do corretor Kafka. Substitua
YOUR_KAFKA_BROKER_HOSTS
pelo broker hospeda as informações extraídas na etapa 1. Insira o comando editado na próxima célula do Jupyter Notebook.// The Kafka broker hosts and topic used to write to Kafka val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS" val kafkaTopic="tripdata" println("Finished setting Kafka broker and topic configuration.")
Envie os dados para Kafka. No comando a seguir, o
vendorid
campo é usado como o valor-chave para a mensagem Kafka. A chave é usada por Kafka ao particionar dados. Todos os campos são armazenados na mensagem Kafka como um valor de cadeia de caracteres JSON. Digite o seguinte comando no Jupyter para salvar os dados no Kafka usando uma consulta em lote.// Select the vendorid as the key and save the JSON string as the value. val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save() println("Data sent to Kafka")
Declare um esquema. O comando a seguir demonstra como usar um esquema ao ler dados JSON do kafka. Digite o comando na próxima célula do Jupyter.
// Import bits used for declaring schemas and working with JSON data import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ // Define a schema for the data val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType) // Reproduced here for readability //val schema = (new StructType) // .add("dropoff_latitude", StringType) // .add("dropoff_longitude", StringType) // .add("extra", StringType) // .add("fare_amount", StringType) // .add("improvement_surcharge", StringType) // .add("lpep_dropoff_datetime", StringType) // .add("lpep_pickup_datetime", StringType) // .add("mta_tax", StringType) // .add("passenger_count", StringType) // .add("payment_type", StringType) // .add("pickup_latitude", StringType) // .add("pickup_longitude", StringType) // .add("ratecodeid", StringType) // .add("store_and_fwd_flag", StringType) // .add("tip_amount", StringType) // .add("tolls_amount", StringType) // .add("total_amount", StringType) // .add("trip_distance", StringType) // .add("trip_type", StringType) // .add("vendorid", StringType) println("Schema declared")
Selecione os dados e inicie o fluxo. O comando a seguir demonstra como recuperar dados do Kafka usando uma consulta em lotes. E, em seguida, escreva os resultados no HDFS no cluster do Spark. Neste exemplo, o
select
recupera a mensagem (campo de valor) de Kafka e aplica o esquema a ela. Os dados são então gravados no HDFS (WASB ou ADL) em formato parquet. Digite o comando na próxima célula do Jupyter.// Read a batch from Kafka val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load() // Select data and write to file val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save() println("Wrote data to file")
Você pode verificar se os arquivos foram criados digitando o comando em sua próxima célula Jupyter. Ele lista os
/example/batchtripdata
arquivos no diretório.%%bash hdfs dfs -ls /example/batchtripdata
Enquanto o exemplo anterior usava uma consulta em lotes, o comando a seguir demonstra como fazer a mesma coisa usando uma consulta de streaming. Digite o comando na próxima célula do Jupyter.
// Stream from Kafka val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load() // Select data from the stream and write to file kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000) println("Wrote data to file")
Execute a célula a seguir para verificar se os arquivos foram gravados pela consulta de streaming.
%%bash hdfs dfs -ls /example/streamingtripdata
Clean up resources (Limpar recursos)
Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. A exclusão do grupo de recursos também exclui o cluster HDInsight associado. E quaisquer outros recursos associados ao grupo de recursos.
Para remover o grupo de recursos através do Portal do Azure:
- No portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e escolha Grupos de Recursos para exibir a lista de seus grupos de recursos.
- Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
- Selecione Eliminar grupo de recursos e, em seguida, confirme.
Aviso
A faturação do cluster do HDInsight tem início quando o cluster é criado e termina quando é eliminado. A faturação é rateada por minuto, pelo que deve sempre eliminar o cluster quando deixar de ser utilizado.
Eliminar um cluster do Kafka no HDInsight elimina quaisquer dados armazenados no Kafka.