Gestione dei processi Apache Flink® in cluster HDInsight su AKS
Nota
Azure HDInsight su AKS verrà ritirato il 31 gennaio 2025. Prima del 31 gennaio 2025, sarà necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare interruzioni improvvise dei carichi di lavoro. I cluster rimanenti nella sottoscrizione verranno arrestati e rimossi dall’host.
Solo il supporto di base sarà disponibile fino alla data di ritiro.
Importante
Questa funzionalità è attualmente disponibile solo in anteprima. Le Condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure includono termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale. Per informazioni su questa anteprima specifica, vedere Informazioni sull'anteprima di Azure HDInsight nel servizio Azure Kubernetes. Per domande o suggerimenti sulle funzionalità, inviare una richiesta in AskHDInsight con i dettagli e seguire Microsoft per altri aggiornamenti nella Community di Azure HDInsight.
HDInsight su AKS offre una funzionalità per gestire e inviare processi Apache Flink® direttamente tramite il portale di Azure (interfaccia intuitiva) e le API REST ARM.
Questa funzionalità consente agli utenti di controllare e monitorare in modo efficiente i processi Apache Flink senza richiedere conoscenze approfondite a livello di cluster.
Vantaggi
Gestione semplificata dei processi: con l'integrazione nativa di Apache Flink nel portale di Azure, gli utenti non necessitano più di una conoscenza approfondita dei cluster Flink per inviare, gestire e monitorare i processi.
API REST intuitiva: HDInsight su AKS offre API REST ARM intuitive per inviare e gestire processi Flink. Gli utenti possono inviare processi Flink da qualsiasi servizio di Azure usando queste API REST.
Facilità di gestione degli aggiornamenti dei processi e della gestione dello stato: l'integrazione del portale di Azure nativo offre un'esperienza semplice per l'aggiornamento dei processi e il ripristino all'ultimo stato salvato (punto di salvataggio). Questa funzionalità garantisce la continuità e l'integrità dei dati nel ciclo di vita del processo.
Automazione del processo Flink con la pipeline di Azure: usando HDInsight su AKS, gli utenti Flink hanno accesso all'API REST ARM intuitiva, è possibile integrare facilmente le operazioni del processo Flink in Azure Pipeline. Sia che si avviino nuovi processi, si aggiornino processi in esecuzione o si eseguano varie operazioni di processo, questo approccio semplificato elimina i passaggi manuali. Consente di gestire il cluster Flink in modo efficiente.
Prerequisiti
Prima di inviare e gestire i processi dal portale o dalle API REST, sono necessari alcuni prerequisiti.
Creare una directory nell'account di archiviazione primario del cluster per eseguire il caricamento del file JAR del processo.
Se l'utente vuole selezionare i punti di salvataggio, occorre creare una directory nell'account di archiviazione per i punti di salvataggio dei processi.
Funzionalità e operazioni principali
Nuovo invio di processi: gli utenti possono inviare facilmente un nuovo collegamento Flink, eliminando la necessità di configurazioni complesse o strumenti esterni.
Arrestare e avviare i processi con punti di salvataggio: gli utenti possono arrestare e avviare normalmente i processi Flink dallo stato precedente (punto di salvataggio). I punti di salvataggio assicurano che lo stato di avanzamento del processo venga mantenuto, abilitando le riprese senza problemi.
Aggiornamenti dei processi: l'utente può aggiornare il processo in esecuzione dopo l'aggiornamento del file JAR nell'account di archiviazione. Questo aggiornamento accetta automaticamente il punto di salvataggio e avvia il processo con un nuovo file JAR.
Aggiornamenti senza stato: l'esecuzione di un nuovo riavvio per un processo viene semplificata tramite aggiornamenti senza stato. Questa funzionalità consente agli utenti di avviare un riavvio pulito usando il file JAR del processo aggiornato.
Gestione dei punti di salvataggio: in qualsiasi momento, gli utenti possono creare punti di salvataggio per i processi in esecuzione. Questi punti di salvataggio possono essere elencati e usati per riavviare il processo da un punto di controllo specifico in base alle esigenze.
Annulla: annulla il processo in modo permanente.
Elimina: elimina il record della cronologia dei processi.
Opzioni per gestire i processi in HDInsight su AKS
HDInsight su AKS offre modi per gestire i processi Flink.
Gestione processi dal portale di Azure
Per eseguire il processo Flink dal portale, passare a:
Portale --> HDInsight su AKS --> Cluster Flink --> Impostazioni --> Processi Flink
Nuovo processo: per inviare un nuovo processo, caricare i file JAR del processo nell'account di archiviazione e creare una directory dei punti di salvataggio. Completare il modello con le configurazioni necessarie e quindi inviare il processo.
Dettagli proprietà:
Proprietà Descrizione Valore predefinito Obbligatorio Nome processo Nome univoco per il processo. Questo viene visualizzato nel portale. Il nome del processo deve essere in lettere minuscole. Sì Percorso file JAR Archiviazione percorso per il file JAR del processo. Gli utenti devono creare le directory nell'archiviazione cluster e caricare il file JAR del processo. Sì Classe di immissione Classe di ingresso per il processo da cui viene avviata la relativa esecuzione. Sì Args Argomento per il programma principale del processo. Separare tutti gli argomenti con spazi. No parallelismo Parallelismo Flink del processo. 2 Sì savepoint.directory Directory del punto di salvataggio per il processo. È consigliabile che gli utenti creino una nuova directory per il punto di salvataggio dei processi nell'account di archiviazione. abfs://<container>@<account>/<deployment-ID>/savepoints
No Dopo l'avvio del processo, lo stato del processo nel portale è IN ESECUZIONE.
Arresta: l'arresto del processo non richiede alcun parametro, l'utente può arrestare il processo selezionando l'azione.
Dopo l'arresto del processo, lo stato del processo nel portale è ARRESTATO.
Avvia: questa azione avvia il processo dal punto di salvataggio. Per avviare il processo, selezionare il processo arrestato e avviarlo.
Compilare il modello di flusso con le opzioni necessarie e avviarlo. Gli utenti devono selezionare il punto di salvataggio da cui l'utente vuole avviare il processo. Per impostazione predefinita, accetta l'ultimo punto di salvataggio riuscito.
Dettagli proprietà:
Proprietà Descrizione Valore predefinito Obbligatorio Args Argomento per il programma principale del processo. Tutti gli argomenti devono essere separati da spazio. No Ultimo punto di salvataggio Ultima adozione del punto di salvataggio riuscita prima di arrestare il processo. Questa opzione verrà utilizzata per impostazione predefinita se non è selezionato il punto di salvataggio. Non modificabile Nome del punto di salvataggio Gli utenti possono elencare i punti di salvataggio disponibili per il processo e selezionare uno per avviare il processo. No Dopo l'avvio del processo, lo stato del processo nel portale sarà IN ESECUZIONE.
Aggiorna: l'aggiornamento consente di riavviare i processi con il codice del processo aggiornato. Gli utenti devono aggiornare il file JAR del processo più recente nella posizione di archiviazione e aggiornare il processo dal portale. Questo aggiornamento arresta il processo con il punto di salvataggio e lo avvia di nuovo con il file JAR più recente.
Modello per l'aggiornamento del processo.
Dopo aver aggiornato il processo, lo stato del processo nel portale è "IN ESECUZIONE".
Aggiornamento senza stato: questo processo è simile a un aggiornamento, ma comporta un nuovo riavvio del processo con il codice più recente.
Modello per l'aggiornamento del processo.
Dettagli proprietà:
Proprietà Descrizione Valore predefinito Obbligatorio Args Argomento per il programma principale del processo. Separare tutti gli argomenti con lo spazio. No Dopo aver aggiornato il processo, lo stato del processo nel portale è IN ESECUZIONE.
Punto di salvataggio: adotta il punto di salvataggio per il processo Flink.
Il punto di salvataggio è un processo lungo e richiede molto tempo. È possibile visualizzare lo stato dell'azione del processo come in corso.
Annulla: questo processo consente all'utente di terminare il processo.
Elimina: eliminare i dati del processo dal portale.
Visualizza dettagli processo: per visualizzare i dettagli del processo l'utente può fare clic sul nome del processo, che fornisce i dettagli sul processo e sull'ultimo risultato dell'azione.
Per qualsiasi azione non riuscita, questo file JSON del processo offre eccezioni dettagliate e motivi per l'errore.
Gestione dei processi con l'API REST
HDInsight su AKS supporta API REST ARM intuitive per inviare il processo e gestire il processo. Usando questa API REST Flink, è possibile integrare facilmente le operazioni del processo Flink in Azure Pipeline. Sia che si avviino nuovi processi, si aggiornino processi in esecuzione o si eseguano varie operazioni di processo, questo approccio semplificato elimina i passaggi manuali e consente di gestire il cluster Flink in modo efficiente.
Formato URL di base per l'API REST
Vedere l'URL seguente per l'API REST, gli utenti devono sostituire la sottoscrizione, il gruppo di risorse, il pool di cluster, il nome del cluster e la versione API HDInsight su AKS prima di usarla.
https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runjob?api-version={{API_VERSION}}
Usando questa API REST, gli utenti possono avviare nuovi processi, arrestare processi, avviare processi, creare punti di salvataggio, annullare processi ed eliminare processi. L'API_VERSION corrente è 2023-06-01-preview.
Autenticazione tramite l'API REST
Per autenticare gli utenti dell'API REST ARM Flink, è necessario ottenere il token di connessione o il token di accesso per la risorsa ARM. Per autenticare l'API REST di Azure ARM (Azure Resource Manager) usando un'entità servizio, è possibile seguire questa procedura generale:
Creare un'entità servizio.
az ad sp create-for-rbac --name <your-SP-name>
Concedere al proprietario l'autorizzazione SP per il cluster
flink
.Accedere con un'entità servizio.
az login --service-principal -u <client_id> -p <client_secret> --tenant <tenant_id>
Ottenere il token di accesso.
$token = az account get-access-token --resource=https://management.azure.com/ | ConvertFrom-Json
$tok = $token.accesstoken
Gli utenti possono usare il token nell'URL visualizzato.
$data = Invoke-RestMethod -Uri $restUri -Method GET -Headers @{ Authorization = "Bearer $tok" }
Autenticazione con identità gestita: gli utenti possono usare risorse che supportano l'identità gestita per effettuare chiamate all'API REST del processo. Per altri dettagli, vedere la documentazione sull'Identità gestita.
ELENCO di API e parametri
Nuovo processo: API REST per inviare un nuovo processo a Flink.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta:
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "NEW", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "entryClass": "<JOB_ENTRY_CLASS>", “args”: ”<JOB_JVM_ARGUMENT>” "flinkConfiguration": { "parallelism": "<JOB_PARALLELISM>", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>" } } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome univoco per il processo. Questo viene visualizzato nel portale. Il nome del processo deve essere in lettere minuscole. Sì action Indica il tipo di operazione nel processo. In caso di nuovo avvio del processo, dovrebbe essere sempre di tipo "NUOVO". Sì jobJarDirectory Archiviazione percorso per la directory JAR del processo. Gli utenti devono creare le directory nell'archiviazione cluster e caricare il file JAR del processo. Sì jarName Nome del file JAR del processo. Sì entryClass Classe di ingresso per il processo da cui viene avviata la relativa esecuzione. Sì args Argomento per il programma principale del processo. Separare gli argomenti con lo spazio. No parallelismo Parallelismo Flink del processo. 2 Sì savepoint.directory Directory del punto di salvataggio per il processo. È consigliabile che gli utenti creino una nuova directory per il punto di salvataggio dei processi nell'account di archiviazione. abfs://<container>@<account>/<deployment-ID>/savepoints
No Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Arresta processo: API REST per arrestare il processo in esecuzione corrente.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STOP" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo, usato per l'avvio del processo Sì action Deve essere "STOP" Sì Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Avvia processo: API REST per avviare il processo ARRESTATO.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "START", "savePointName": "<SAVEPOINT_NAME>" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere "START" Sì savePointName Salvare il nome del punto per avviare il processo. È una proprietà facoltativa, per impostazione predefinita l'operazione di avvio accetta l'ultimo punto di salvataggio riuscito. No Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Aggiorna processo: API REST per l'aggiornamento del processo in esecuzione corrente.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "UPDATE", “args” : “<JOB_JVM_ARGUMENT>”, "savePointName": "<SAVEPOINT_NAME>" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere sempre "UPDATE" per l'avvio di un nuovo processo. Sì args Argomenti JVM del processo No savePointName Salvare il nome del punto per avviare il processo. È una proprietà facoltativa, per impostazione predefinita l'operazione di avvio accetta l'ultimo punto di salvataggio riuscito. No Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Aggiornamento senza stato del processo: API REST per l'aggiornamento senza stato.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "STATELESS_UPDATE", “args” : “<JOB_JVM_ARGUMENT>” } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere sempre "STATELESS_UPDATE" per l'avvio di un nuovo processo. Sì args Argomenti JVM del processo No Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Punto di salvataggio: API PREST per attivare il punto di salvataggio per il processo.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "SAVEPOINT" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere sempre "SAVEPOINT" per l'avvio di un nuovo processo. Sì Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Elenco punti di salvataggio: API REST per elencare tutti i punti di salvataggio dalla directory dei punti di salvataggio.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "LIST_SAVEPOINT" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per l'avvio del processo Sì action Deve essere "LIST_SAVEPOINT" Sì Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Annulla: API REST per annullare il processo.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "CANCEL" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Dovrebbe essere FlinkJob
Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere "CANCEL". Sì Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Elimina: API REST per eliminare il processo.
Opzione Valore metodo POST URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/runJob?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Corpo della richiesta
{ "properties": { "jobType": "FlinkJob", "jobName": "<JOB_NAME>", "action": "DELETE" } }
Dettagli delle proprietà per il corpo JSON:
Proprietà Descrizione Valore predefinito Obbligatorio jobType Tipo di processo. Deve essere "FlinkJob" Sì jobName Nome processo usato per avviare il processo. Sì action Deve essere "DELETE". Sì Esempio:
Invoke-RestMethod -Uri $restUri -Method POST -Headers @{ Authorization = "Bearer $tok" } -Body $jsonString -ContentType "application/json"
Elenca processi: API REST per elencare tutti i processi e lo stato dell'azione corrente.
Opzione Valore metodo GET URL https://management.azure.com/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs?api-version={{API_VERSION}}
Intestazione Authorization = "Bearer $token" Output:
{ "value": [ { "id": "/subscriptions/{{USER_SUBSCRIPTION}}/resourceGroups/{{USER_RESOURCE_GROUP}}/providers/Microsoft.HDInsight/clusterpools/{{CLUSER_POOL}}/clusters/{{FLINK_CLUSTER}}/jobs/job1", "properties": { "jobType": "FlinkJob", "jobName": "job1", "jobJarDirectory": "<JOB_JAR_STORAGE_PATH>", "jarName": "<JOB_JAR_NAME>", "action": "STOP", "entryClass": "<JOB_ENTRY_CLASS>", "flinkConfiguration": { "parallelism": "2", "savepoint.directory": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s" }, "jobId": "20e9e907eb360b1c69510507f88cdb7b", "status": "STOPPED", "jobOutput": "Savepoint completed. Path: <JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5", "actionResult": "SUCCESS", "lastSavePoint": "<JOB_SAVEPOINT_DIRECTORY_STORAGE_PATH>s/savepoint-20e9e9-8a48c6b905e5" } } ] }
Nota
Quando un'azione è in corso, actionResult lo indicherà con il valore "IN_PROGRESS". Al completamento, verrà visualizzato "SUCCESS" e, in caso di errore, verrà visualizzato "FAILED".
Riferimento
- Pianificazione processi Apache Flink
- Apache, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi di Apache Software Foundation (ASF).