Gerenciamento de tarefas do Apache Flink® no HDInsight em clusters AKS
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para o público geral. Para obter informações sobre essa visualização específica, consulte Azure HDInsight no AKS informações de visualização. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight .
O HDInsight no AKS fornece um recurso para gerenciar e enviar trabalhos do Apache Flink® diretamente por meio do portal do Azure (interface amigável) e das APIs ARM Rest.
Esse recurso permite que os usuários controlem e monitorem com eficiência seus trabalhos do Apache Flink sem exigir conhecimento profundo em nível de cluster.
Benefícios
Gerenciamento simplificado de tarefas: Com a integração nativa do Apache Flink no portal do Azure, os usuários não precisam mais de conhecimento extensivo de clusters Flink para enviar, gerenciar e monitorar trabalhos.
User-Friendly API REST: O HDInsight no AKS oferece APIs REST ARM fáceis de usar para submeter e gerir trabalhos Flink. Os utilizadores podem enviar trabalhos Flink de qualquer serviço Azure usando estas APIs REST.
Atualizações de trabalho sem esforço e gerenciamento de estado: A integração nativa do portal do Azure fornece uma experiência sem complicações para atualizar trabalhos e restaurá-los para seu último estado salvo. Essa funcionalidade garante a continuidade e a integridade dos dados durante todo o ciclo de vida do trabalho.
Automatizando o trabalho do Flink usando o pipeline do Azure: Usando o HDInsight no AKS, os usuários do Flink têm acesso à API ARM Rest amigável, você pode integrar perfeitamente as operações do trabalho Flink ao seu Pipeline do Azure. Não importa se você está lançando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina etapas manuais. Ele permite que você gerencie seu cluster Flink de forma eficiente.
Pré-requisitos
Há alguns pré-requisitos antes de enviar e gerir trabalhos a partir do portal ou das APIs Rest.
Crie um diretório na conta de armazenamento principal do cluster para fazer upload do job jar.
Se o utilizador quiser criar pontos de salvaguarda, deve-se criar um diretório na conta de armazenamento para pontos de salvaguarda do trabalho.
Principais características e operações
Nova submissão de tarefa: Os utilizadores podem submeter facilmente um novo Flink, eliminando a necessidade de configurações complexas ou ferramentas externas.
Pare e inicie trabalhos com savepoints: Os usuários podem parar e iniciar seus trabalhos Flink a partir de seu estado anterior (Savepoint). Os pontos de salvaguarda garantem que o progresso do trabalho seja preservado, permitindo retomadas sem interrupções.
Atualizações de Tarefas: O utilizador pode atualizar a tarefa em execução depois de atualizar o arquivo jar na conta de armazenamento. Esta atualização pega automaticamente o savepoint e inicia o trabalho com um novo jar.
Atualizações sem estado: A execução de uma nova reinicialização para um trabalho é simplificada por meio de atualizações sem estado. Esse recurso permite que os usuários iniciem uma reinicialização limpa usando o jar de trabalho atualizado.
Gestão de savepoints: A qualquer momento, os utilizadores podem criar savepoints para os seus trabalhos em execução. Esses pontos de salvamento podem ser listados e usados para reiniciar o trabalho a partir de um ponto de verificação específico, conforme necessário.
Cancelar: Isso cancela o trabalho permanentemente.
Excluir: Excluir registro de histórico de trabalho.
Opções para gerenciar trabalhos no HDInsight no AKS
O HDInsight no AKS fornece maneiras de gerir tarefas do Flink.
Gestão de Tarefas a partir do portal do Azure
Para executar o trabalho Flink do portal, vá para:
Portal --> HDInsight no Pool de Clusters AKS --> Flink Cluster -- Configurações de> --> Flink Jobs
Novo trabalho: Para submeter um novo trabalho, carregue os ficheiros JAR para a conta de armazenamento e crie um diretório de savepoint. Preencha o modelo com as configurações necessárias e, em seguida, envie o trabalho.
Detalhes do imóvel:
Propriedade Descrição Valor padrão Obrigatório Nome do trabalho Nome exclusivo para o trabalho. Isso é exibido no portal. O nome do trabalho deve estar em letras minúsculas. Sim Caminho do ficheiro JAR Caminho de armazenamento para arquivos jar de tarefas. Os utilizadores devem criar um diretório no armazenamento de cluster e fazer o upload do job jar. Sim Classe de entrada Classe de entrada para o trabalho a partir do qual a execução do trabalho é iniciada. Sim Arguments Argumento para o programa principal da tarefa. Separe todos os argumentos com espaços. Não paralelismo Paralelismo do Job Flink. 2 Sim savepoint.directory Diretório Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de tarefas na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Depois que o trabalho é iniciado, o status do trabalho no portal é EXECUTANDO.
Parar: Parar o trabalho não exige nenhum parâmetro, o utilizador pode pará-lo ao selecionar a ação.
Quando o trabalho é interrompido, o estado do trabalho no portal é PARADO.
Iniciar: Esta ação inicia o trabalho a partir do savepoint. Para iniciar o trabalho, selecione o trabalho interrompido e inicie-o.
Preencha o modelo de fluxo com as opções necessárias e inicie-o. Os usuários precisam selecionar o ponto de salvamento a partir do qual o usuário deseja iniciar o trabalho. Por padrão, ele leva o último ponto de salvamento bem-sucedido.
Detalhes do imóvel:
Propriedade Descrição Valor padrão Obrigatório Argumentos Argumento para o programa principal de trabalho. Todos os argumentos devem ser separados por espaço. Não Último ponto de salvaguarda Último ponto de salvamento bem-sucedido antes de parar a tarefa. Isso será usado por padrão se nenhum ponto de salvamento estiver selecionado. Não editável Salvar nome do ponto Os usuários podem listar o savepoint disponível para o trabalho e selecionar um para iniciar o trabalho. Não Assim que o trabalho for iniciado, o status do trabalho no portal será EXECUTANDO.
Update: Update ajuda a reiniciar trabalhos com código de trabalho atualizado. Os utilizadores têm de atualizar o ficheiro jar de trabalho mais recente na localização de armazenamento e gerir o trabalho a partir do portal. Esta atualização interrompe o trabalho com savepoint e começa novamente com o jar mais recente.
Modelo para atualizar o trabalho.
Depois que a tarefa é atualizada, o estado da tarefa no portal é "EM EXECUÇÃO".
Atualização sem estado: Este trabalho é como uma atualização, mas envolve uma nova reinicialização do trabalho com o código mais recente.
Modelo para atualizar o trabalho.
Detalhes do imóvel:
Propriedade Descrição Valor padrão Obrigatório Argumentos Argumento para o programa principal de trabalho. Separe todos os argumentos com espaço. Não Assim que o trabalho for atualizado, o status do trabalho no portal será EM EXECUÇÃO.
Savepoint: Faça o savepoint para a tarefa Flink.
Savepoint é um processo demorado e leva algum tempo. Você pode ver o status da tarefa em andamento.
Cancelar: Esta tarefa ajuda o utilizador a encerrar a tarefa.
Excluir: Excluir dados de trabalho do portal.
Exibir detalhes do trabalho: Para visualizar os detalhes do trabalho, o usuário pode clicar no nome do trabalho, ele fornece os detalhes sobre o trabalho e o resultado da última ação.
Para qualquer ação falhada, o json deste trabalho fornece exceções detalhadas e razões para a falha.
gerenciamento de tarefas usando a API Rest
O HDInsight no AKS suporta APIs ARM Rest fáceis de usar para enviar trabalhos e gerenciar trabalhos. Usando essa API REST do Flink, você pode integrar perfeitamente as operações de trabalho do Flink ao seu Pipeline do Azure. Não importa se você está lançando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina etapas manuais e permite que você gerencie seu cluster Flink de forma eficiente.
Formato de URL base para a API Rest
Consulte a seguinte URL para a API REST; os utilizadores precisam substituir a assinatura, o grupo de recursos, o pool de clusters, o nome do cluster e o HDInsight na versão da API do AKS antes de o utilizar.
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
Usando essa API REST, os usuários podem iniciar novos trabalhos, parar trabalhos, iniciar trabalhos, criar pontos de salvamento, cancelar trabalhos e excluir trabalhos. O API_VERSION atual é 2023-06-01-preview.
Autenticação da API Rest
Para autenticar os usuários da API Flink ARM Rest, é necessário obter o token de portador ou token de acesso para o recurso ARM. Para autenticar a API REST do Azure ARM (Azure Resource Manager) usando uma entidade de serviço, você pode seguir estas etapas gerais:
Crie um principal de serviço.
az ad sp create-for-rbac --name <your-SP-name>
Dê permissão de proprietário ao SP para
flink
cluster.Inicie sessão com a entidade de serviço.
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
Obtenha o token de acesso.
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
Os usuários podem usar o token no URL mostrado.
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
Autenticação usando Identidade Gerenciada: Os usuários podem utilizar recursos que oferecem suporte à Identidade Gerenciada para fazer chamadas para a API REST de trabalho. Para obter mais detalhes, consulte a documentação do Managed Identity.
LISTA de APIs e parâmetros
Novo trabalho: API Rest para enviar um novo trabalho para o Flink.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Authorization = "Bearer $token" Órgão do pedido:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome exclusivo para o trabalho. Isso é exibido no portal. O nome do trabalho deve estar em letras minúsculas. Sim ação Ele indica o tipo de operação no trabalho. Deve ser sempre "NOVO" para a criação de novas posições de trabalho. Sim DiretórioJobJar Caminho de armazenamento para o diretório jar de trabalho. Os utilizadores devem criar um diretório no armazenamento de cluster e carregar o ficheiro jar do trabalho. Sim jarName Nome da lista de tarefas. Sim entryClasse Classe de entrada para o trabalho a partir do qual a execução do trabalho é iniciada. Sim Args Argumento para o programa principal de trabalho. Separe os argumentos com espaço. Não paralelismo Paralelismo Job Flink. 2 Sim savepoint.directory Diretório Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de tarefas na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Parar tarefa: API Rest para interromper a tarefa em execução.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Authorization = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho, que é usado para iniciar o trabalho Sim ação Deve ser "STOP" Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Iniciar trabalho: API REST para iniciar o trabalho PARADO.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim ação Deve ser "START" Sim savePointName Salve o nome do ponto para iniciar o trabalho. É uma propriedade opcional; por padrão, a operação de início utiliza o último savepoint bem-sucedido. Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho de atualização: API Rest para atualizar o trabalho em execução atual.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim ação Deve ser "UPDATE" sempre para o lançamento de novas vagas. Sim Args Argumentos da JVM de trabalho Não nomeDoPontoDeSalvamento Salve o nome do ponto para iniciar o trabalho. É uma propriedade opcional; por defeito, a operação de inicialização utilizará o último ponto de salvaguarda bem-sucedido. Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho de atualização sem estado: REST API para atualização sem estado.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim ação Deve ser "STATELESS_UPDATE" sempre para o lançamento de novas tarefas. Sim Args Argumentos da JVM de trabalho Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Savepoint: APIs REST para acionar o savepoint para a tarefa.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Authorization = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim ação Deve ser "SAVEPOINT" sempre para lançamento de novas vagas. Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Listar savepoints: API Rest para listar todos os savepoints do diretório de savepoints.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho Sim ação Deve ser "LIST_SAVEPOINT" Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Cancelar: API Rest para cancelar a tarefa.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser FlinkJob
Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim ação Deve ser CANCELAR. Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Excluir: API REST para excluir a tarefa.
Opção Valor Método PUBLICAR URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
Detalhes da propriedade para o corpo JSON:
Propriedade Descrição Valor padrão Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar a tarefa. Sim ação Deve ser "DELETE". Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Listar trabalhos: API Rest para listar todos os trabalhos e status da ação atual.
Opção Valor Método OBTER URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
Cabeçalho Autorização = "Bearer $token" Saída:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
Observação
Quando qualquer ação estiver em andamento, actionResult a indicará com o valor 'IN_PROGRESS'. Na conclusão bem-sucedida, ela mostrará 'SUCESSO' e, em caso de falha, será 'FALHA'.
Referência
- Agendamento de Trabalhos Apache Flink
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).