Gerenciamento de tarefas do Apache Flink® no HDInsight em clusters do AKS
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte as informações de visualização de Azure HDInsight no AKS . Para questões ou sugestões de funcionalidades, envie um pedido no AskHDInsight com os detalhes e siga-nos para obter mais atualizações da Comunidade do Azure HDInsight .
O HDInsight no AKS fornece um recurso para gerenciar e enviar trabalhos do Apache Flink® diretamente por meio do portal do Azure (interface amigável) e das APIs REST do ARM.
Esse recurso capacita os usuários a controlar e monitorar com eficiência seus trabalhos do Apache Flink sem exigir conhecimento profundo em nível de cluster.
Benefícios
Gerenciamento simplificado de tarefas: Com a integração nativa do Apache Flink no portal do Azure, os usuários não necessitam mais de amplo conhecimento dos clusters Flink para enviar, gerenciar e monitorar tarefas.
User-Friendly API REST: O HDInsight no AKS oferece APIs REST do ARM fáceis de usar para enviar e gerenciar jobs do Flink. Os usuários podem enviar trabalhos Flink de qualquer serviço do Azure usando essas APIs REST.
Atualizações de trabalho sem complicações egerenciamento de estado: a integração nativa do portal Azure fornece uma experiência sem problemas para atualizar trabalhos e restaurá-los ao último estado salvo (savepoint). Essa funcionalidade garante a continuidade e a integridade dos dados em todo o ciclo de vida do trabalho.
Automatizando a tarefa do Flink usando o pipeline do Azure: Usando o HDInsight no AKS, os usuários do Flink têm acesso a uma API Rest ARM intuitiva, permitindo integrar de forma harmoniosa as operações de tarefas do Flink ao pipeline do Azure. Se você estiver iniciando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina as etapas manuais. Ele permite que você gerencie seu cluster Flink com eficiência.
Pré-requisitos
Há alguns pré-requisitos antes de enviar e gerenciar trabalhos a partir do portal ou de APIs REST.
Crie um diretório na conta de armazenamento primário do cluster para fazer upload do arquivo JAR do job.
Se o usuário quiser utilizar savepoints, crie um diretório na conta de armazenamento para os savepoints do trabalho.
Principais recursos e operações
Novode envio de trabalho: os usuários podem enviar sem esforço um novo Flink, eliminando a necessidade de configurações complexas ou ferramentas externas.
Parar e iniciar trabalhos com savepoints: Os usuários podem parar e reiniciar de forma tranquila seus trabalhos do Flink a partir do estado anterior (savepoint). Os pontos de salvamento garantem que o progresso do trabalho seja preservado, permitindo retomadas sem interrupção.
Atualizações de tarefa: O usuário pode atualizar a tarefa em execução após atualizar o jar na conta de armazenamento. Essa atualização captura automaticamente o ponto de salvamento e inicia o trabalho com um novo arquivo jar.
atualizações sem estado: realizar uma nova reinicialização de um trabalho é mais simples por meio de atualizações sem estado. Esse recurso permite que os usuários iniciem uma reinicialização limpa usando o jar de trabalho atualizado.
de gerenciamento do Savepoint: a qualquer momento, os usuários podem criar pontos de salvamento para seus trabalhos em execução. Esses pontos de salvamento podem ser listados e usados para reiniciar o trabalho de um ponto de verificação específico, conforme necessário.
Cancelar: isso cancela o trabalho permanentemente.
Excluir: Excluir o registro do histórico de trabalho.
Opções para gerenciar trabalhos no HDInsight no AKS
O HDInsight em AKS oferece formas de gerenciar tarefas do Flink.
Gerenciamento de Tarefas do portal do Azure
Para executar o trabalho Flink no portal, acesse:
Portal --> HDInsight no Pool de Clusters do AKS --> Cluster Flink --> Configurações --> Trabalhos Flink
Novo trabalho: Para enviar um novo trabalho, carregue os arquivos JAR na conta de armazenamento e crie um diretório de ponto de verificação. Conclua o modelo com as configurações necessárias e envie o trabalho.
detalhes da propriedade:
Propriedade Descrição Valor Padrão Obrigatório Nome do trabalho Nome exclusivo para o trabalho. Isso é exibido no portal. O nome do trabalho deve estar em letras minúsculas. Sim Caminho do JAR Caminho de armazenamento para o jar do trabalho. Os usuários devem criar um diretório no armazenamento de cluster e fazer upload do arquivo jar do trabalho. Sim Classe de entrada Classe de entrada para o trabalho do qual a execução do trabalho é iniciada. Sim Argumentos Argumento para o programa principal da tarefa. Separe todos os argumentos com espaços. Não paralelismo Paralelismo de Flink de Trabalho. 2 Sim savepoint.directory Diretório do Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de trabalho na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Depois que o trabalho é iniciado, o status do trabalho no portal é RUNNING.
Parar: A parada do trabalho não exigia nenhum parâmetro; o usuário pode interromper o trabalho selecionando a ação.
Depois que o trabalho é interrompido, o status do trabalho no portal é PARADO.
Iniciar: Esta ação inicia o trabalho do savepoint. Para iniciar o trabalho, selecione o trabalho interrompido e inicie-o.
Preencha o modelo de fluxo com as opções necessárias e inicie-o. Os usuários precisam selecionar o ponto de salvamento do qual desejam iniciar o trabalho. Por padrão, ele usa o último ponto de salvamento bem-sucedido.
detalhes da propriedade :
Propriedade Descrição Valor Padrão Obrigatório Argumentos Argumento do programa principal da tarefa. Todos os argumentos devem ser separados por espaço. Não Último ponto de salvamento Última tomada de ponto de salvamento bem-sucedida antes de parar o trabalho. Isso será usado por padrão se o ponto de salvamento não estiver selecionado. Não editável Salvar nome do ponto Os usuários podem listar o ponto de salvamento disponível para o trabalho e selecionar um para iniciar o trabalho. Não Depois que o trabalho for iniciado, o status do trabalho no portal será EXECUTANDO.
Atualização: Atualização ajuda a reiniciar tarefas com código de tarefa atualizado. Os usuários precisam atualizar o arquivo jar mais recente no local de armazenamento e atualizar a tarefa pelo portal. Essa atualização pausa o trabalho com o ponto de salvamento e reinicia com o jar mais recente.
Modelo para atualização do trabalho.
Depois que o trabalho é atualizado, o status do trabalho no portal é "EXECUTANDO".
Atualização sem estado: Esta tarefa é como uma atualização, mas envolve um reinício completo da tarefa com o código mais recente.
Modelo para atualização do trabalho.
Detalhes da propriedade :
Propriedade Descrição Valor Padrão Obrigatório Argumentos Argumento para o programa principal do trabalho. Separe todos os argumentos com espaço. Não Depois que o trabalho é atualizado, o status do trabalho no portal é RUNNING.
Savepoint: Pegue o ponto de salvamento para o Flink Job.
O savepoint é um processo demorado e leva algum tempo. Você pode ver o status da tarefa como em andamento.
Cancelar: Esta tarefa ajuda o usuário a finalizar a tarefa.
Excluir: Excluir dados de trabalho do portal.
Exibir detalhes do trabalho: Para exibir o detalhe do trabalho, o usuário pode clicar no nome do trabalho, ele fornece os detalhes sobre o trabalho e o resultado da última ação.
Para qualquer ação com falha, este arquivo JSON do trabalho fornece exceções detalhadas e os motivos da falha.
Gerenciamento de Jobs usando a API Rest
O HDInsight no AKS oferece suporte a APIs REST do ARM amigáveis para enviar e gerenciar trabalhos. Usando essa API REST do Flink, você pode integrar perfeitamente as operações de trabalho do Flink ao pipeline do Azure. Se você estiver iniciando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina as etapas manuais e capacita você a gerenciar seu cluster Flink com eficiência.
Formato de URL base para a API Rest
Consulte a URL a seguir para a API rest, os usuários precisam substituir a assinatura, o grupo de recursos, o pool de clusters, o nome do cluster e o HDInsight na versão da API do AKS antes de usá-la.
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
Usando essa API REST, os usuários podem iniciar novos trabalhos, interromper trabalhos, iniciar trabalhos, criar pontos de salvamento, cancelar trabalhos e excluir trabalhos. O API_VERSION atual é 2023-06-01-preview.
Autenticação de API Rest
Para autenticar usuários da API Rest ARM do Flink, é necessário obter o bearer token ou o token de acesso para o recurso ARM. Para autenticar a API REST do Azure ARM (Azure Resource Manager) usando uma entidade de serviço, siga estas etapas gerais:
Crie um Principal de Serviço.
az ad sp create-for-rbac --name <your-SP-name>
Conceda permissão de dono ao SP para o cluster
flink
.Faça o login com o serviço principal.
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
Obter token de acesso.
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
Os usuários podem usar o token na URL mostrada.
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
Autenticação usando Identidade Gerenciada: os usuários podem utilizar recursos que dão suporte à Identidade Gerenciada para fazer chamadas à API REST Job. Para obter mais detalhes, consulte a documentação do Managed Identity.
LISTA de APIs e parâmetros
Nova Tarefa: REST API para submeter nova tarefa ao Flink.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Corpo da solicitação :
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeEmprego Tipo de trabalho. Deve ser "FlinkJob" Sim Nome do Trabalho Nome exclusivo para o trabalho. Isso é exibido no portal. O nome da tarefa deve estar em letras minúsculas. Sim ação Indica o tipo de operação no trabalho. Deve ser "NEW" sempre para o lançamento do novo trabalho. Sim jobJarDirectory Caminho de armazenamento para o diretório jar do job. Os usuários devem criar um diretório no armazenamento de cluster e carregar o arquivo jar do trabalho. Sim jarName Nome do jar do trabalho. Sim entryClass Classe de entrada para o trabalho a partir do qual a execução se inicia. Sim args Argumento para o programa principal do trabalho. Separar argumentos com espaço. Não paralelismo Paralelismo de Flink de Trabalho. 2 Sim savepoint.directory Diretório do Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de trabalho na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Finalizar tarefa: API REST para interromper a tarefa em execução atual.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
Detalhes da propriedade para a estrutura JSON.
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho, que é usado para iniciar o trabalho Sim ação Deve ser "STOP" Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho inicial: API Rest para iniciar o trabalho PARADO.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim Nome do Trabalho Nome da tarefa usado para iniciar a tarefa. Sim ação Deve ser "START" Sim nomeDoPontoDeSalvamento Salve o nome do ponto para iniciar o trabalho. É uma propriedade opcional; por padrão, a operação de inicialização utiliza o último ponto de salvamento bem-sucedido. Não exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho de atualização: REST API para atualizar o trabalho que está em execução.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
Para o corpo JSON, detalhes da propriedade :
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim Nome do trabalho Nome da tarefa usado para iniciar o trabalho. Sim ação Deve ser "UPDATE" sempre para a nova inicialização do trabalho. Sim args Argumentos JVM da tarefa Não nomeDoPontoDeSalvar Salve o nome do ponto para iniciar o trabalho. É uma propriedade opcional, por padrão, a operação de início usará o último ponto de salvamento bem-sucedido. Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
tarefa de atualização sem estado: API REST para atualização sem estado.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim Jobname Nome da tarefa usado para iniciar o trabalho. Sim ação Deve ser "STATELESS_UPDATE" sempre para o lançamento de novos trabalhos. Sim args Argumentos JVM de trabalho Não exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Savepoint: APIs REST para acionar o ponto de salvamento para a tarefa.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
Informações da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nomeDoTrabalho Nome da tarefa usado para iniciar o trabalho. Sim ação Deve ser "SAVEPOINT" sempre para o início de um novo trabalho. Sim exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Lista de savepoints: API Rest para listar todos os savepoints do diretório de savepoints.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim Jobname Nome do trabalho que é usado para iniciar o trabalho Sim ação Deve ser "LIST_SAVEPOINT" Sim exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Cancelar: API REST para cancelar o trabalho.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser FlinkJob
Sim NomeDoTrabalho Nome do trabalho usado para iniciar o trabalho. Sim ação Deve ser CANCELAR. Sim exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Excluir: API Rest para excluir a tarefa.
Opção Valor Método POSTAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" corpo da solicitação
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor Padrão Obrigatório tipoDeTrabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nomeDoTrabalho Nome do trabalho usado para iniciar o trabalho. Sim ação Deve ser DELETE. Sim exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalhos de Lista: API Rest para listar todos os trabalhos e o status da ação atual.
Opção Valor Método OBTER URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Saída :
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
Nota
Quando qualquer ação estiver em andamento, actionResult a indicará com o valor 'IN_PROGRESS' Na conclusão bem-sucedida, ela mostrará 'SUCCESS' e, em caso de falha, será 'FAILED'.
Referência
- Agendamento de Trabalho do Apache Flink
- Apache, Apache Flink, Flink e nomes de projeto de software livre associados são marcas comerciais do ASF (Apache Software Foundation).