Condividi tramite


NotebookUtils esecuzione e orchestrazione di notebook

Usare le utilità del notebook per eseguire un notebook, eseguire più notebook in parallelo o chiudere un notebook restituendo un valore. Usare il comando seguente per ottenere una panoramica dei metodi disponibili:

notebookutils.notebook.help()

Nella tabella seguente sono elencati i metodi di esecuzione e orchestrazione del notebook disponibili:

metodo Firma Descrizione
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Esegue un notebook e ne restituisce il valore di uscita.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Esegue più notebook contemporaneamente con il supporto per le relazioni di dipendenza.
validateDAG validateDAG(dag: Any): bool Convalida se una definizione DIG è strutturata correttamente.
exit exit(value: str): None Esce dal notebook corrente restituendo un valore.

Per le operazioni CRUD del notebook (creazione, recupero, aggiornamento, eliminazione, elenco), vedere Gestire gli artefatti del notebook.

Annotazioni

Il config parametro in runMultiple() è disponibile solo in Python. Scala e R non supportano questo parametro.

Annotazioni

Le utilità notebook non sono applicabili per le definizioni di lavoro di Apache Spark (SJD).

Fare riferimento a un notebook

Il run() metodo fa riferimento a un notebook e restituisce il relativo valore di uscita. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline. Il notebook a cui si fa riferimento funziona sul pool di Spark del notebook che chiama questa funzione.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Per esempio:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Valore restituito

Il metodo run() restituisce la stringa esatta passata a notebookutils.notebook.exit(value) nel notebook figlio. Se il exit() non viene chiamato nel notebook figlio, viene restituita una stringa vuota ("").

I notebook di Fabric supportano anche il riferimento ai notebook tra aree di lavoro specificando l'ID dell'area di lavoro.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Aprire il collegamento dello snapshot nell'output della cella per ispezionare il run di riferimento. Lo snapshot acquisisce i risultati dell'esecuzione e consente di eseguire il debug del notebook di riferimento.

Screenshot del risultato dell'esecuzione di riferimento.

Screenshot di un esempio di snapshot.

Configurare i notebook secondari per ricevere i parametri

Quando crei un notebook figlio chiamato tramite run() o runMultiple(), configura una cella di parametri per consentire al notebook di ricevere argomenti dal notebook padre:

  1. Creare una cella di codice con valori di parametro predefiniti.
  2. Contrassegnare la cella come cella di parametro selezionando Contrassegna cella come parametri nell'interfaccia utente del notebook.
  3. Durante l'esecuzione, i valori delle celle dei parametri vengono sostituiti con gli argomenti passati dall'elemento padre.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Suggerimento

I valori di uscita sono sempre stringhe di caratteri. Se è necessario un valore numerico nel notebook padre, converta il risultato dopo il recupero, ad esempio int(result).

Considerazioni

  • Il notebook di riferimento tra aree di lavoro è supportato dal runtime versione 1.2 e successive.
  • Se usi i file in Risorsa Notebook, usa notebookutils.nbResPath nel notebook di riferimento per assicurarti che punti alla stessa cartella dell'esecuzione interattiva.
  • L'esecuzione di riferimento consente l'esecuzione dei notebook figli solo se usano la stessa lakehouse del genitore, ereditano la lakehouse del genitore o non ne definiscono una. L'esecuzione viene bloccata se il notebook figlio specifica un lakehouse diverso rispetto al notebook padre. Per ignorare questo controllo, impostare useRootDefaultLakehouse: True negli argomenti.
  • Non chiamare notebookutils.notebook.exit(value) dentro un blocco try-catch. La chiamata di uscita non avrà effetto quando viene gestita nella gestione delle eccezioni.

Consultare l'esecuzione in parallelo di più notebook.

Usare notebookutils.notebook.runMultiple() per eseguire più notebook in parallelo o in una struttura topologica predefinita. L'API usa un'implementazione multithread all'interno di una sessione Spark, il che significa che i notebook di riferimento condividono le risorse di calcolo.

Con notebookutils.notebook.runMultiple() è possibile:

  • Eseguire più notebook contemporaneamente senza attendere il completamento di ognuno.

  • Specificare le dipendenze e l'ordine di esecuzione per i notebook usando un formato JSON semplice.

  • Ottimizzare l'uso delle risorse di calcolo di Spark e ridurre i costi dei progetti Fabric.

  • Visualizzare gli snapshot di ogni record di esecuzione del notebook nell'output e effettuare comodamente il debug e il monitoraggio delle attività del notebook.

  • Ottieni il valore di uscita di ogni attività esecutiva e usalo nelle attività successive.

Eseguire notebookutils.notebook.help("runMultiple") per visualizzare altri esempi e dettagli sull'utilizzo.

Visualizzare una semplice lista di notebook

L'esempio seguente esegue un elenco di notebook in parallelo:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot di un elenco di notebook di riferimento.

Valore restituito

Il runMultiple() metodo restituisce un dizionario in cui ogni chiave è il nome dell'attività e ogni valore è un dizionario con le chiavi seguenti:

  • exitVal: La stringa restituita dalla chiamata del notebook figlio exit(), o una stringa vuota se exit() non è stato chiamato.
  • exception: oggetto di errore se l'attività non è riuscita o None se ha avuto esito positivo.

Eseguire notebook con una struttura DAG

Nell'esempio seguente vengono eseguiti notebook in una struttura DAG usando notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot di un elenco di notebook con parametri come riferimento.

Riferimento ai parametri DAG

La tabella seguente descrive ogni campo che è possibile usare nella definizione di DAG:

Campo livello Obbligatorio Descrizione
activities Radice Elenco di oggetti attività che definiscono i notebook da eseguire.
timeoutInSeconds Radice No Timeout massimo per l'intero DAG. Il valore predefinito è 43200 (12 ore).
concurrency Radice No Numero massimo di notebook da eseguire contemporaneamente. Il valore predefinito è 3 volte il numero di core CPU disponibile. Impostare questo valore in modo esplicito se è necessario un controllo più stretto o usare 0 per una concorrenza illimitata.
name Activity Nome univoco per l'attività. Usato per identificare i risultati e definire le dipendenze.
path Activity Nome o percorso dell'elemento del notebook da eseguire.
timeoutPerCellInSeconds Activity No Timeout massimo per ogni cella del notebook figlio. Il valore predefinito è 90 secondi.
args Activity No Un dizionario di parametri da passare al notebook figlio.
workspace Activity No Nome o ID dell'area di lavoro in cui risiede il notebook. Per impostazione predefinita, il notebook figlio viene eseguito nella stessa area di lavoro del chiamante.
retry Activity No Numero di tentativi se l'attività ha esito negativo. Il valore predefinito è 0.
retryIntervalInSeconds Activity No Tempo di attesa in secondi tra i tentativi di ripetizione. Il valore predefinito è 0.
dependencies Activity No L'elenco dei nomi delle attività che devono essere completate prima dell'avvio di questa attività.

Valori di uscita di riferimento tra le attività

È possibile fare riferimento al valore di uscita di un'attività di dipendenza nel args campo usando l'espressione @activity() . Questo schema consente di passare dati tra notebook in un DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Suggerimento

Usare l'espressione @activity('activity_name').exitValue() nel campo args per passare i risultati da un'attività args a un'altra all'interno di un DAG.

Costruire un DAG dinamico

È possibile generare strutture DAG a livello di codice per scenari come l'elaborazione fan-out tra più partizioni:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Convalidare un DAG

Usare validateDAG() per verificare che la struttura DAG sia valida prima dell'esecuzione. Rileva problemi quali nomi di attività duplicati, dipendenze mancanti e riferimenti circolari.

notebookutils.notebook.validateDAG(DAG)

Valore restituito

Il validateDAG() metodo restituisce True se la struttura DAG è valida o genera un'eccezione se la convalida non riesce.

Suggerimento

Chiamare validateDAG() sempre prima runMultiple() nei flussi di lavoro di produzione per rilevare gli errori strutturali in anticipo.

Gestire gli errori runMultiple

Il runMultiple() metodo restituisce un dizionario in cui ogni chiave è il nome dell'attività e ogni valore contiene una exitVal (stringa) e un exception (oggetto error o None). È possibile esaminare i risultati parziali anche quando alcune attività hanno esito negativo:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Considerazioni

  • Il grado di parallelismo dell'esecuzione di più notebook è limitato alla risorsa di calcolo totale disponibile di una sessione Spark.
  • Il numero predefinito di notebook simultanei è 3 volte il numero di core CPU disponibile. È possibile personalizzare questo valore, ma un parallelismo eccessivo potrebbe causare problemi di stabilità e prestazioni a causa dell'utilizzo elevato delle risorse di calcolo. In caso di problemi, è consigliabile separare i notebook in più chiamate di runMultiple o ridurre la concorrenza modificando il campo di concorrenza nel parametro DAG.
  • Il timeout predefinito per l'intero DAG è di 12 ore e il timeout predefinito per ogni cella in un notebook figlio è di 90 secondi. È possibile modificare il timeout impostando i campi timeoutInSeconds e timeoutPerCellInSeconds nel parametro DAG.
  • Configurare retry e retryIntervalInSeconds per le attività che potrebbero non riuscire a causa di problemi temporanei, ad esempio timeout di rete o indisponibilità temporanea del servizio.
  • I notebook paralleli condividono le risorse di calcolo all'interno di una singola sessione Spark. Monitorare l'utilizzo delle risorse per evitare la pressione della memoria e la contesa della CPU.

Uscire da un notebook

Il exit() metodo esce da un notebook con un valore . È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline.

  • Quando si chiama una exit() funzione da un notebook in modo interattivo, il notebook di Fabric genera un'eccezione, ignora l'esecuzione di celle successive e mantiene attiva la sessione Spark.

  • Quando si orchestra un notebook in una pipeline che chiama una exit() funzione, l'attività del notebook restituisce un valore di uscita. In questo modo viene completata l'esecuzione della pipeline e viene arrestata la sessione Spark.

  • Quando si chiama una funzione exit() in un notebook a cui si fa riferimento, Fabric Spark arresta l'ulteriore esecuzione del notebook a cui si fa riferimento, e continua a eseguire le celle successive nel notebook principale che chiama una funzione run(). Ad esempio: Notebook1 ha tre celle e chiama una exit() funzione nella seconda cella. Notebook2 ha cinque celle e chiama run(notebook1) nella terza cella. Quando si esegue Notebook2, Notebook1 si arresta nella seconda cella quando incontra la funzione exit(). Notebook2 sta continuando a eseguire la quarta e la quinta cella.

notebookutils.notebook.exit("value string")

Comportamento di ritorno

Il exit() metodo non restituisce un valore. Termina il notebook corrente e passa la stringa fornita al notebook o alla pipeline chiamante.

Annotazioni

La exit() funzione sovrascrive l'output della cella corrente. Per evitare di perdere l'output di altre istruzioni di codice, chiama notebookutils.notebook.exit() in una cella separata.

Importante

Non chiamare notebookutils.notebook.exit() dentro un blocco try-catch. L'uscita non avrà effetto quando è incapsulata nella gestione delle eccezioni. La exit() chiamata deve essere al livello superiore del codice per funzionare correttamente.

Per esempio:

Il notebook Sample1 include le due celle seguenti:

  • La cella 1 definisce un parametro di input con il valore predefinito impostato su 10.

  • La cella 2 esce dal notebook con input come valore di uscita.

Screenshot che mostra un esempio di notebook con la funzione di uscita.

È possibile eseguire Sample1 in un altro notebook con i valori predefiniti:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

10

È possibile eseguire Sample1 in un altro notebook e impostare il valore di input su 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

20