Esta arquitetura de referência mostra um pipeline de processamento de fluxo de ponta a ponta. Os quatro estágios desse pipeline são ingerir, processar, armazenar, analisar e relatar. Para essa arquitetura de referência, o pipeline ingere dados de duas fontes, executa uma junção em registros relacionados de cada fluxo, enriquece o resultado e calcula uma média em tempo real. Os resultados são então armazenados para análise posterior.
Uma implementação de referência para essa arquitetura está disponível no GitHub.
Arquitetura
Baixe um arquivo Visio desta arquitetura.
Fluxo de Trabalho
O seguinte fluxo de dados corresponde ao diagrama anterior:
Nessa arquitetura, existem duas fontes de dados que geram fluxos de dados em tempo real. O primeiro fluxo contém informações de viagem e o segundo fluxo contém informações sobre tarifas. A arquitetura de referência inclui um gerador de dados simulado que lê de um conjunto de arquivos estáticos e envia os dados por push para os Hubs de Eventos do Azure. As fontes de dados em uma aplicação real são dispositivos instalados nos táxis.
de Hubs de Eventos é um serviço de ingestão de eventos. Essa arquitetura usa duas instâncias de hub de eventos, uma para cada fonte de dados. Cada fonte de dados envia um fluxo de dados para o hub de eventos associado.
Azure Databricks é uma plataforma de análise baseada no Apache Spark otimizada para a plataforma de serviços de nuvem do Microsoft Azure. O Azure Databricks é usado para correlacionar os dados de corrida de táxi e tarifa e para enriquecer os dados correlacionados com dados de vizinhança armazenados no sistema de arquivos do Azure Databricks.
Azure Cosmos DB é um serviço de banco de dados totalmente gerenciado e de vários modelos. A saída de um trabalho do Azure Databricks é uma série de registros, que são gravados no Azure Cosmos DB para Apache Cassandra. O Azure Cosmos DB para Apache Cassandra é usado porque dá suporte à modelagem de dados de séries temporais.
Azure Synapse Link for Azure Cosmos DB permite executar análises quase em tempo real em dados operacionais no Azure Cosmos DB, sem quaisquer efeitos de desempenho ou custo na sua carga de trabalho transacional. Você pode obter esses resultados usando de pool SQL sem servidor e pools do Spark. Esses mecanismos de análise estão disponíveis em seu espaço de trabalho do Azure Synapse Analytics.
espelhamento do Azure Cosmos DB para NoSQL no Microsoft Fabric permite integrar dados do Azure Cosmos DB com o restante dos dados no Microsoft Fabric.
do Log Analytics é uma ferramenta dentro do Azure Monitor que permite consultar e analisar dados de log de várias fontes. Os dados de log do aplicativo que Azure Monitor coleta são armazenados em um espaço de trabalho do Log Analytics. Você pode usar consultas do Log Analytics para analisar e visualizar métricas e inspecionar mensagens de log para identificar problemas no aplicativo.
Detalhes do cenário
Uma empresa de táxis recolhe dados sobre cada viagem de táxi. Para esse cenário, assumimos que dois dispositivos separados enviam dados. O táxi tem um medidor que envia informações sobre cada corrida, incluindo a duração, distância e locais de embarque e desembarque. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre tarifas. Para identificar as tendências de passageiros, a empresa de táxis quer calcular a gorjeta média por quilômetro percorrido para cada bairro, em tempo real.
Ingestão de dados
Para simular uma fonte de dados, essa arquitetura de referência usa o conjunto de dados de táxi Nova York1. Este conjunto de dados contém dados sobre viagens de táxi na cidade de Nova York de 2010 a 2013. Ele contém registros de dados de viagens e tarifas. Os dados da viagem incluem a duração da viagem, a distância da viagem e os locais de embarque e desembarque. Os dados da tarifa incluem valores de tarifas, impostos e gorjetas. Os campos em ambos os tipos de registro incluem número de medalhão, licença de hack e ID do fornecedor. A combinação destes três campos identifica exclusivamente um táxi e um motorista. Os dados são armazenados em formato CSV.
[1] Donovan, Brian; Work, Dan (2016): Dados da viagem de táxi de Nova Iorque (2010-2013). Universidade de Illinois em Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
O gerador de dados é um aplicativo .NET Core que lê os registros e os envia para Hubs de Eventos. O gerador envia dados de viagem em formato JSON e dados de tarifa em formato CSV.
Os Hubs de Eventos usam partições para segmentar os dados. As partições permitem que um consumidor leia cada partição em paralelo. Ao enviar dados para Hubs de Eventos, você pode especificar a chave de partição diretamente. Caso contrário, os registros são atribuídos a partições de forma round-robin.
Nesse cenário, os dados de viagem e os dados de tarifa devem receber o mesmo ID de partição para um táxi específico. Essa atribuição permite que o Databricks aplique um grau de paralelismo quando correlaciona os dois fluxos. Por exemplo, um registro na partição n dos dados da viagem corresponde a um registro na partição n dos dados da tarifa.
Baixe um arquivo Visio desta arquitetura.
No gerador de dados, o modelo de dados comum para ambos os tipos de registro tem uma PartitionKey
propriedade que é a concatenação de Medallion
, HackLicense
e VendorId
.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Essa propriedade é usada para fornecer uma chave de partição explícita quando envia dados para Hubs de Eventos.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Hubs de Eventos
A capacidade de taxa de transferência dos Hubs de Eventos é medida em unidades de taxa de transferência. Você pode dimensionar automaticamente um hub de eventos habilitando de inflação automática. Esse recurso dimensiona automaticamente as unidades de taxa de transferência com base no tráfego, até um máximo configurado.
Processamento de fluxos
No Azure Databricks, um trabalho executa o processamento de dados. O trabalho é atribuído a um cluster e, em seguida, executado nele. O trabalho pode ser um código personalizado escrito em Java ou um notebook Spark .
Nesta arquitetura de referência, o trabalho é um arquivo Java que tem classes escritas em Java e Scala. Quando você especifica o arquivo Java para um trabalho Databricks, o cluster Databricks especifica a classe para operação. Aqui, o main
método da classe contém a com.microsoft.pnp.TaxiCabReader
lógica de processamento de dados.
Leia o fluxo das duas instâncias do hub de eventos
A lógica de processamento de dados usa o streaming estruturado do Spark para ler as duas instâncias do hub de eventos do Azure:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Enriqueça os dados com as informações do bairro
Os dados da viagem incluem as coordenadas de latitude e longitude dos locais de embarque e desembarque. Estas coordenadas são úteis, mas não facilmente consumidas para análise. Portanto, esses dados são enriquecidos com dados de vizinhança lidos de um shapefile .
O formato shapefile é binário e não é facilmente analisado. Mas a biblioteca GeoTools fornece ferramentas para dados geoespaciais que usam o formato shapefile. Essa biblioteca é usada na classe com.microsoft.pnp.GeoFinder
para determinar o nome do bairro com base nas coordenadas dos locais de embarque e desembarque.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Junte-se aos dados da viagem e da tarifa
Primeiro, os dados de viagem e tarifa são transformados:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Em seguida, os dados da viagem são unidos com os dados da tarifa:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Processar os dados e inseri-los no Azure Cosmos DB
O valor médio da tarifa para cada bairro é calculado para um intervalo de tempo específico:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
O valor médio da tarifa é então inserido no Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Considerações
Essas considerações implementam os pilares do Azure Well-Architected Framework, que é um conjunto de princípios orientadores que você pode usar para melhorar a qualidade de uma carga de trabalho. Para obter mais informações, consulte Microsoft Azure Well-Architected Framework.
Segurança
A segurança fornece garantias contra ataques deliberados e o uso indevido de seus valiosos dados e sistemas. Para obter mais informações, consulte Lista de verificação de revisão de design parade segurança .
O acesso ao espaço de trabalho do Azure Databricks é controlado usando o console do administrador do . O console do administrador inclui funcionalidade para adicionar usuários, gerenciar permissões de usuário e configurar o logon único. O controle de acesso para espaços de trabalho, clusters, trabalhos e tabelas também pode ser definido por meio do console do administrador.
Gerir segredos
O Azure Databricks inclui um de armazenamento secreto que é usado para armazenar credenciais e fazer referência a elas em blocos de anotações e trabalhos. Segredos de partição de escopos no repositório secreto do Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Os segredos são adicionados no nível do escopo:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Nota
Use um de escopo apoiado pelo Cofre de Chaves do em vez do escopo nativo do Azure Databricks.
No código, os segredos são acessados por meio dos utilitários de segredos do Azure Databricks.
Otimização de Custos
A Otimização de Custos concentra-se em formas de reduzir despesas desnecessárias e melhorar a eficiência operacional. Para obter mais informações, consulte Lista de verificação de revisão de projeto para Otimização de custos.
Utilize a calculadora de preços do Azure para prever os custos. Considere os seguintes serviços usados nesta arquitetura de referência.
Considerações de custo dos Hubs de Eventos
Essa arquitetura de referência implanta Hubs de Eventos na camada Padrão. O modelo de preços é baseado em unidades de taxa de transferência, eventos de entrada e eventos de captura. Um evento de entrada é uma unidade de dados de 64 KB ou menos. As mensagens maiores são cobradas em múltiplos de 64 KB. Você especifica unidades de taxa de transferência por meio do portal do Azure ou das APIs de gerenciamento de Hubs de Eventos.
Se precisar de mais dias de retenção, considere a camada Dedicada. Essa camada fornece implantações de locatário único com requisitos rigorosos. Essa oferta cria um cluster baseado em unidades de capacidade e não dependente de unidades de taxa de transferência. A camada Standard também é cobrada com base em eventos de entrada e unidades de taxa de transferência.
Para obter mais informações, consulte preços dos Hubs de Eventos.
Considerações de custo do Azure Databricks
O Azure Databricks fornece a camada Standard e a camada Premium, ambas com suporte a três cargas de trabalho. Essa arquitetura de referência implanta um espaço de trabalho do Azure Databricks na camada Premium.
As cargas de trabalho de engenharia de dados devem ser executadas em um cluster de tarefas. Os engenheiros de dados usam clusters para criar e executar trabalhos. As cargas de trabalho de análise de dados devem ser executadas em um cluster multiuso e destinam-se a cientistas de dados para explorar, visualizar, manipular e compartilhar dados e insights de forma interativa.
O Azure Databricks fornece vários modelos de preços.
plano pré-pago
Você é cobrado por máquinas virtuais (VMs) provisionadas em clusters e unidades de dados (DBUs) do Azure Databricks com base na instância de VM escolhida. Uma DBU é uma unidade de capacidade de processamento que é cobrada pelo uso por segundo. O consumo de DBU depende do tamanho e do tipo de instância que é executada no Azure Databricks. O preço depende da carga de trabalho e do nível escolhidos.
Plano de pré-compra
Você se compromete com DBUs como unidades de confirmação do Azure Databricks por um ou três anos para reduzir o custo total de propriedade durante esse período de tempo em comparação com o modelo de pagamento conforme o uso.
Para obter mais informações, consulte de preços do Azure Databricks .
Considerações de custo do Azure Cosmos DB
Nessa arquitetura, o trabalho do Azure Databricks grava uma série de registros no Azure Cosmos DB. Você é cobrado pela capacidade reservada, que é medida em Unidades de solicitação por segundo (RU/s). Essa capacidade é usada para executar operações de inserção. A unidade para faturamento é de 100 RU/s por hora. Por exemplo, o custo de escrever itens de 100 KB é de 50 RU/s.
Para operações de gravação, provisione capacidade suficiente para suportar o número de gravações necessárias por segundo. Você pode aumentar a taxa de transferência provisionada usando o portal ou a CLI do Azure antes de executar operações de gravação e, em seguida, reduzir a taxa de transferência após a conclusão dessas operações. Sua taxa de transferência para o período de gravação é a soma da taxa de transferência mínima necessária para os dados específicos e a taxa de transferência necessária para a operação de inserção. Esse cálculo pressupõe que não há outra carga de trabalho em execução.
Exemplo de análise de custos
Suponha que você configure um valor de taxa de transferência de 1.000 RU/s em um contêiner. Ele é implantado por 24 horas por 30 dias, por um total de 720 horas.
O contêiner é cobrado em 10 unidades de 100 RU/s por hora para cada hora. Dez unidades a US$ 0,008 (por 100 RU/s por hora) são cobradas a US$ 0,08 por hora.
Por 720 horas ou 7.200 unidades (de 100 RUs), você é cobrado US $ 57,60 pelo mês.
O armazenamento também é cobrado por cada GB usado para os dados armazenados e o índice. Para obter mais informações, consulte Modelo de preços do Azure Cosmos DB.
Use o da calculadora de capacidade do Azure Cosmos DB para obter uma estimativa rápida do custo da carga de trabalho.
Excelência Operacional
A Excelência Operacional abrange os processos operacionais que implantam um aplicativo e o mantêm em execução na produção. Para obter mais informações, consulte Lista de verificação de revisão de projeto para o Operational Excellence.
Monitorização
O Azure Databricks é baseado no Apache Spark. O Azure Databricks e o Apache Spark usam Apache Log4j como a biblioteca padrão para registro em log. Além do log padrão que o Apache Spark fornece, você pode implementar o registro em log no Log Analytics. Para obter mais informações, consulte Monitorando o Azure Databricks.
Como a classe com.microsoft.pnp.TaxiCabReader
processa mensagens de viagem e tarifa, uma mensagem pode estar malformada e, portanto, não ser válida. Em um ambiente de produção, é importante analisar essas mensagens malformadas para identificar um problema com as fontes de dados para que ele possa ser corrigido rapidamente para evitar a perda de dados. A classe com.microsoft.pnp.TaxiCabReader
registra um Apache Spark Accumulator que rastreia o número de registros de tarifas malformados e registros de passeio:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
O Apache Spark usa a biblioteca do Dropwizard para enviar métricas. Alguns dos campos de métricas nativos do Dropwizard são incompatíveis com o Log Analytics, e é por isso que essa arquitetura de referência inclui um coletor e um repórter personalizados do Dropwizard. Ele formata as métricas no formato esperado pelo Log Analytics. Quando o Apache Spark relata métricas, as métricas personalizadas para os dados de viagem e tarifa malformados também são enviadas.
Você pode usar as seguintes consultas de exemplo em seu espaço de trabalho do Log Analytics para monitorar a operação do trabalho de streaming. O argumento ago(1d)
em cada consulta retorna todos os registros que foram gerados no último dia. Você pode ajustar esse parâmetro para exibir um período de tempo diferente.
Exceções registradas durante a operação de consulta de fluxo
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Acumulação de dados malformados sobre tarifas e viagens
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operação do trabalho ao longo do tempo
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organização e implantações de recursos
Crie grupos de recursos separados para ambientes de produção, desenvolvimento e teste. A utilização de grupos de recursos separados torna mais fácil gerir as implementações, eliminar as implementações de teste e atribuir direitos de acesso.
Use o modelo Azure Resource Manager para implantar os recursos do Azure de acordo com o processo de infraestrutura como código. Usando modelos, você pode automatizar facilmente implantações com de serviços de DevOps do Azure ou outras soluções de integração contínua e entrega contínua (CI/CD).
Coloque cada carga de trabalho em um modelo de implantação separado e armazene os recursos em sistemas de controle do código-fonte. Você pode implantar os modelos juntos ou individualmente como parte de um processo de CI/CD. Essa abordagem simplifica o processo de automação.
Nessa arquitetura, os Hubs de Eventos, o Log Analytics e o Azure Cosmos DB são identificados como uma única carga de trabalho. Esses recursos estão incluídos em um único modelo do Azure Resource Manager.
Considere preparar suas cargas de trabalho. Implante em vários estágios e execute verificações de validação em cada estágio antes de passar para o próximo estágio. Dessa forma, você pode controlar como enviar atualizações para seus ambientes de produção e minimizar problemas de implantação imprevistos.
Nessa arquitetura, há vários estágios de implantação. Considere criar um pipeline de DevOps do Azure e adicionar esses estágios. Você pode automatizar as seguintes etapas:
- Inicie um cluster Databricks.
- Configure a CLI do Databricks.
- Instale as ferramentas do Scala.
- Adicione os segredos do Databricks.
Considere escrever testes de integração automatizados para melhorar a qualidade e a confiabilidade do código Databricks e seu ciclo de vida.
Implementar este cenário
Para implantar e executar a implementação de referência, siga as etapas no Leiame do GitHub.