Gerenciamento de tarefas do Apache Flink® no HDInsight em clusters AKS
Nota
Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.
Apenas o apoio básico estará disponível até à data da reforma.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.
O HDInsight no AKS fornece um recurso para gerenciar e enviar trabalhos do Apache Flink® diretamente por meio do portal do Azure (interface amigável) e das APIs ARM Rest.
Esse recurso permite que os usuários controlem e monitorem com eficiência seus trabalhos do Apache Flink sem exigir conhecimento profundo em nível de cluster.
Benefícios
Gerenciamento simplificado de tarefas: com a integração nativa do Apache Flink no portal do Azure, os usuários não precisam mais de amplo conhecimento dos clusters Flink para enviar, gerenciar e monitorar trabalhos.
API REST amigável: o HDInsight no AKS fornece APIs ARM Rest amigáveis para enviar e gerenciar trabalhos do Flink. Os usuários podem enviar trabalhos Flink de qualquer serviço do Azure usando essas APIs Rest.
Atualizações de trabalho e gerenciamento de estado sem esforço: a integração nativa do portal do Azure fornece uma experiência sem complicações para atualizar trabalhos e restaurá-los para seu último estado salvo (savepoint). Essa funcionalidade garante a continuidade e a integridade dos dados durante todo o ciclo de vida do trabalho.
Automatizando o trabalho do Flink usando o pipeline do Azure: usando o HDInsight no AKS, os usuários do Flink têm acesso à API ARM Rest amigável, você pode integrar perfeitamente as operações do trabalho Flink ao seu Pipeline do Azure. Não importa se você está lançando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina etapas manuais. Ele permite que você gerencie seu cluster Flink de forma eficiente.
Pré-requisitos
Há alguns pré-requisitos antes de enviar e gerenciar trabalhos do portal ou APIs Rest.
Crie um diretório na conta de armazenamento principal do cluster para carregar o jar de trabalho.
Se o usuário quiser fazer savepoints, crie um diretório na conta de armazenamento para savepoints de trabalho.
Principais características e operações
Novo envio de trabalho: os usuários podem enviar facilmente um novo Flink, eliminando a necessidade de configurações complexas ou ferramentas externas.
Parar e iniciar trabalhos com savepoints: Os usuários podem parar e iniciar seus trabalhos Flink a partir de seu estado anterior (Savepoint). Os pontos de salvamento garantem que o progresso do trabalho seja preservado, permitindo retomadas perfeitas.
Atualizações de trabalho: o usuário pode atualizar o trabalho em execução depois de atualizar o jar na conta de armazenamento. Esta atualização pega automaticamente o savepoint e inicia o trabalho com um novo jar.
Atualizações sem estado: a execução de uma nova reinicialização para um trabalho é simplificada por meio de atualizações sem monitoração de estado. Esse recurso permite que os usuários iniciem uma reinicialização limpa usando o jar de trabalho atualizado.
Gerenciamento de pontos de salvamento: a qualquer momento, os usuários podem criar savepoints para seus trabalhos em execução. Esses pontos de salvamento podem ser listados e usados para reiniciar o trabalho a partir de um ponto de verificação específico, conforme necessário.
Cancelar: cancela o trabalho permanentemente.
Excluir: exclua o registro do histórico de trabalhos.
Opções para gerenciar trabalhos no HDInsight no AKS
O HDInsight no AKS fornece maneiras de gerenciar trabalhos do Flink.
Gestão de Tarefas a partir do portal do Azure
Para executar o trabalho Flink do portal, vá para:
Portal --> HDInsight no pool de clusters AKS --> Flink Cluster --> Configurações --> Flink Jobs
Novo trabalho: para enviar um novo trabalho, carregue os jars de trabalho para a conta de armazenamento e crie um diretório de savepoint. Preencha o modelo com as configurações necessárias e, em seguida, envie o trabalho.
Detalhes do imóvel:
Property Descrição Valor Predefinido Obrigatório Nome da tarefa Nome exclusivo para o trabalho. Isso é exibido no portal. O nome do trabalho deve estar em letras minúsculas. Sim Caminho do jarro Caminho de armazenamento para jar de trabalho. Os usuários devem criar diretório no armazenamento de cluster e carregar jar de tarefas. Sim Classe de entrada Classe de entrada para o trabalho a partir do qual a execução do trabalho é iniciada. Sim Args Argumento para o programa principal de trabalho. Separe todos os argumentos com espaços. Não paralelismo Paralelismo Job Flink. 2 Sim savepoint.directory Diretório Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de tarefas na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Depois que o trabalho é iniciado, o status do trabalho no portal é RUNNING.
Parar: Parar trabalho não exigiu nenhum parâmetro, o usuário pode parar o trabalho selecionando a ação.
Quando o trabalho é interrompido, o status do trabalho no portal é INTERROMPIDO.
Iniciar: esta ação inicia o trabalho a partir do savepoint. Para iniciar o trabalho, selecione o trabalho interrompido e inicie-o.
Preencha o modelo de fluxo com as opções necessárias e inicie-o. Os usuários precisam selecionar o ponto de salvamento a partir do qual o usuário deseja iniciar o trabalho. Por padrão, ele leva o último ponto de salvamento bem-sucedido.
Detalhes do imóvel:
Property Descrição Valor Predefinido Obrigatório Args Argumento para o programa principal de trabalho. Todos os argumentos devem ser separados por espaço. Não Último savepoint Último savepoint bem-sucedido tomar antes de parar o trabalho. Isso será usado por padrão se não savepoint estiver selecionado. Não editável Salvar nome do ponto Os usuários podem listar o savepoint disponível para o trabalho e selecionar um para iniciar o trabalho. Não Assim que o trabalho for iniciado, o status do trabalho no portal será RUNNING.
Atualização: a atualização ajuda a reiniciar trabalhos com código de trabalho atualizado. Os usuários precisam atualizar o jar de trabalho mais recente no local de armazenamento e atualizar o trabalho do portal. Esta atualização interrompe o trabalho com savepoint e começa novamente com o jar mais recente.
Modelo para atualizar o trabalho.
Depois que o trabalho é atualizado, o status do trabalho no portal é "EM EXECUÇÃO".
Atualização sem estado: este trabalho é como uma atualização, mas envolve uma nova reinicialização do trabalho com o código mais recente.
Modelo para atualizar o trabalho.
Detalhes do imóvel:
Property Descrição Valor Predefinido Obrigatório Args Argumento para o programa principal de trabalho. Separe todos os argumentos com espaço. Não Depois que o trabalho for atualizado, o status do trabalho no portal será EXECUTADO.
Savepoint: Pegue o savepoint para o Flink Job.
Savepoint é um processo demorado e leva algum tempo. Você pode ver o status da ação de trabalho como em andamento.
Cancelar: este trabalho ajuda o usuário a encerrar o trabalho.
Excluir: exclua dados de trabalho do portal.
Exibir detalhes do trabalho: Para visualizar os detalhes do trabalho, o usuário pode clicar no nome do trabalho, ele fornece os detalhes sobre o trabalho e o resultado da última ação.
Para qualquer ação falhada, este trabalho json fornecer exceções detalhadas e razões para a falha.
Gerenciamento de tarefas usando a API Rest
O HDInsight no AKS suporta APIs ARM Rest fáceis de usar para enviar trabalhos e gerenciar trabalhos. Usando essa API REST do Flink, você pode integrar perfeitamente as operações de trabalho do Flink ao seu Pipeline do Azure. Não importa se você está lançando novos trabalhos, atualizando trabalhos em execução ou executando várias operações de trabalho, essa abordagem simplificada elimina etapas manuais e permite que você gerencie seu cluster Flink de forma eficiente.
Formato de URL base para a API Rest
Consulte a seguinte URL para a API rest, os usuários precisam substituir a assinatura, o grupo de recursos, o pool de clusters, o nome do cluster e o HDInsight na versão da API do AKS antes de usá-la.
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
Usando essa API REST, os usuários podem iniciar novos trabalhos, parar trabalhos, iniciar trabalhos, criar pontos de salvamento, cancelar trabalhos e excluir trabalhos. O API_VERSION atual é 2023-06-01-preview.
Autenticação da API Rest
Para autenticar os usuários da API Flink ARM Rest, é necessário obter o token de portador ou token de acesso para o recurso ARM. Para autenticar a API REST do Azure ARM (Azure Resource Manager) usando uma entidade de serviço, você pode seguir estas etapas gerais:
Crie uma entidade de serviço.
az ad sp create-for-rbac --name <your-SP-name>
Dê permissão de proprietário ao SP para
flink
cluster.Faça login com a entidade de serviço.
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
Obtenha o token de acesso.
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
Os usuários podem usar o token no URL mostrado.
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
Autenticação usando Identidade Gerenciada: os usuários podem utilizar recursos que oferecem suporte à Identidade Gerenciada para fazer chamadas para a API REST de Trabalho. Para obter mais detalhes, consulte a documentação do Managed Identity .
LISTA de APIs e parâmetros
Novo trabalho: API Rest para enviar novo trabalho para o Flink.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do pedido:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome exclusivo para o trabalho. Isso é exibido no portal. O nome do trabalho deve estar em letras minúsculas. Sim action Ele indica o tipo de operação no trabalho. Deve ser "NOVO" sempre para lançamento de novas vagas. Sim jobJarDirectory Caminho de armazenamento para o diretório jar de trabalho. Os usuários devem criar diretório no armazenamento de cluster e carregar jar de tarefas. Sim jarName Nome do frasco de trabalho. Sim entryClasse Classe de entrada para o trabalho a partir do qual a execução do trabalho é iniciada. Sim args Argumento para o programa principal de trabalho. Separe os argumentos com espaço. Não paralelismo Paralelismo Job Flink. 2 Sim savepoint.directory Diretório Savepoint para trabalho. É recomendável que os usuários criem um novo diretório para o ponto de salvamento de tarefas na conta de armazenamento. abfs://<container>@<account>/<deployment-ID>/savepoints
Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Parar trabalho: API de repouso para interromper o trabalho em execução atual.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho, que é usado para iniciar o trabalho Sim action Deve ser "STOP" Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Iniciar trabalho: API de repouso para iniciar o trabalho INTERROMPIDO.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser "START" Sim savePointName Salve o nome do ponto para iniciar o trabalho. É propriedade opcional, por padrão iniciar operação tomar último salvamento bem-sucedido. Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho de atualização: API Rest para atualizar o trabalho em execução atual.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser "UPDATE" sempre para o lançamento de novas vagas. Sim args Argumentos da JVM de trabalho Não savePointName Salve o nome do ponto para iniciar o trabalho. É propriedade opcional, por padrão, a operação de início levará o último ponto de salvamento bem-sucedido. Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Trabalho de atualização sem estado: API Rest para atualização sem monitoração de estado.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser "STATELESS_UPDATE" sempre para o lançamento de novos empregos. Sim args Argumentos da JVM de trabalho Não Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Savepoint: APIs de repouso para acionar o savepoint para o trabalho.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser "SAVEPOINT" sempre para lançamento de novas vagas. Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
List savepoint: API Rest para listar todos os savepoint do diretório savepoint.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho Sim action Deve ser "LIST_SAVEPOINT" Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Cancelar: API Rest para cancelar o trabalho.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser FlinkJob
Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser CANCELAR. Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Excluir: API Rest para excluir trabalho.
Opção Value Método POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Órgão do Pedido
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
Detalhes da propriedade para o corpo JSON:
Property Descrição Valor Predefinido Obrigatório Tipo de trabalho Tipo de trabalho. Deve ser "FlinkJob" Sim nome do trabalho Nome do trabalho que é usado para iniciar o trabalho. Sim action Deve ser DELETE. Sim Exemplo:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Listar Trabalhos: API Rest para listar todos os trabalhos e status da ação atual.
Opção Value Método GET URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
Cabeçalho Autorização = "$token ao Portador" Saída:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
Nota
Quando qualquer ação estiver em andamento, actionResult a indicará com o valor 'IN_PROGRESS' Na conclusão bem-sucedida, ela mostrará 'SUCESSO' e, em caso de falha, será 'FALHOU'.
Referência
- Agendamento de trabalhos do Apache Flink
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).