Freigeben über


NotebookUtils-Notizbuchausführung und -orchestrierung

Verwenden Sie die Notizbuchprogramme, um ein Notizbuch auszuführen, mehrere Notizbücher parallel auszuführen oder ein Notizbuch mit einem Wert zu beenden. Führen Sie den folgenden Befehl aus, um eine Übersicht über die verfügbaren Methoden zu erhalten:

notebookutils.notebook.help()

In der folgenden Tabelle sind die verfügbaren Notizbuchausführungs- und Orchestrierungsmethoden aufgeführt:

Methode Signature Beschreibung
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Führt ein Notebook aus und gibt dessen Exit-Wert zurück.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Führt mehrere Notizbücher gleichzeitig mit Unterstützung für Abhängigkeitsbeziehungen aus.
validateDAG validateDAG(dag: Any): bool Überprüft, ob eine DAG-Definition ordnungsgemäß strukturiert ist.
exit exit(value: str): None Beendet das aktuelle Notizbuch mit einem Wert.

Informationen zu Notizbuch-CRUD-Vorgängen (Erstellen, Abrufen, Aktualisieren, Löschen, Liste) finden Sie unter Verwalten von Notizbuchartefakten.

Hinweis

Der config Parameter in runMultiple() ist nur in Python verfügbar. Scala und R unterstützen diesen Parameter nicht.

Hinweis

Notebook-Dienstprogramme gelten nicht für Apache Spark-Auftragsdefinitionen (SJD).

Verweis auf ein Notizbuch

Die run() Methode verweist auf ein Notizbuch und gibt den Ausgangswert zurück. Sie können Verschachtelungsfunktionsaufrufe in einem Notebook interaktiv oder in einer Pipeline ausführen. Das referenzierte Notebook läuft im Spark-Pool des Notebooks, das diese Funktion aufruft.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Beispiel:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Zurückgegebener Wert

Die run() Methode gibt die genaue Zeichenfolge zurück, die an notebookutils.notebook.exit(value) im untergeordneten Notizbuch übergeben wird. Wenn exit() im untergeordneten Notizbuch nicht aufgerufen wird, wird eine leere Zeichenfolge ("") zurückgegeben.

Fabric-Notizbücher unterstützen auch das Verweisen auf Notizbücher über Arbeitsbereiche hinweg, indem sie die Arbeitsbereichs-ID angeben.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Öffnen Sie den Snapshot-Link im Zellausgabebereich, um den Referenzdurchlauf zu überprüfen. Der Schnappschuss erfasst die Ergebnisse und hilft Ihnen beim Debuggen des referenzierten Notizbuchs.

Screenshot: Ergebnis eines Referenzlaufs.

Screenshot: Momentaufnahmebeispiel.

Einrichten von untergeordneten Notizbüchern zum Empfangen von Parametern

Wenn Sie ein untergeordnetes Notizbuch erstellen, das über run() oder runMultiple()aufgerufen wird, richten Sie eine Parameterzelle ein, damit das Notizbuch Argumente vom übergeordneten Element empfangen kann:

  1. Erstellen Sie eine Codezelle mit Standardparameterwerten.
  2. Markieren Sie die Zelle als Parameterzelle, indem Sie "Zelle als Parameter markieren " in der Benutzeroberfläche des Notizbuchs auswählen.
  3. Während der Ausführung werden die Werte in den Parameterzellen durch die übergebenen Argumente des übergeordneten Elements ersetzt.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Tipp

Ausgangswerte sind immer Zeichenfolgen. Wenn Sie einen numerischen Wert im übergeordneten Notizbuch benötigen, konvertieren Sie das Ergebnis nach dem Abruf (z. B int(result). ).

Überlegungen

  • Das arbeitsbereichübergreifende Referenznotebook wird von Laufzeitversion 1.2 und höher unterstützt.
  • Wenn Sie die Dateien unter Notebook-Ressource verwenden, verwenden Sie notebookutils.nbResPath im referenzierten Notebook, um sicherzustellen, dass es auf denselben Ordner wie die interaktive Ausführung verweist.
  • Der Referenzlauf erlaubt es untergeordneten Notizbüchern nur dann ausgeführt zu werden, wenn sie dasselbe Lakehouse wie das übergeordnete Notizbuch verwenden, das Lakehouse des Elternnotizbuchs erben oder keines definieren. Die Ausführung wird blockiert, wenn das Kind ein anderes Seehaus als das übergeordnete Notizbuch angibt. Um diese Überprüfung zu umgehen, setzen Sie useRootDefaultLakehouse: True in den Argumenten.
  • Rufen Sie nicht notebookutils.notebook.exit(value) innerhalb eines try-catch-Blocks auf. Der Exit-Aufruf wird nicht wirksam, wenn er in der Ausnahmebehandlung eingeschlossen ist.

Paralleles Ausführen von Referenzläufen in mehreren Notebooks

Verwenden Sie notebookutils.notebook.runMultiple(), um mehrere Notizbücher parallel oder in einer vordefinierten topologischen Struktur auszuführen. Die API verwendet eine Multithread-Implementierung innerhalb einer Spark-Sitzung, was bedeutet, dass referenzierte Notebooks Rechnerressourcen gemeinsam nutzen.

Mit notebookutils.notebook.runMultiple() haben Sie folgende Möglichkeiten:

  • Führen Sie mehrere Notebooks gleichzeitig aus, ohne auf die Fertigstellung jedes einzelnen warten zu müssen.

  • Geben Sie die Abhängigkeiten und die Reihenfolge der Ausführung für Ihre Notebooks mithilfe eines einfachen JSON-Formats an.

  • Optimieren Sie die Verwendung von Spark Computeressourcen, und reduzieren Sie die Kosten Ihrer Fabric-Projekte.

  • Zeigen Sie die Snapshots der einzelnen Notebook-Ausführungsprotokolle in der Ausgabe an, und debuggen oder überwachen Sie Ihre Notebook-Aufgaben effizient.

  • Ermitteln Sie den Ergebniswert jeder einzelnen exekutiven Aktivität und verwenden Sie deren Werte in nachgelagerten Aufgaben.

Führen Sie notebookutils.notebook.help("runMultiple") aus, um weitere Beispiele und Nutzungsdetails anzuzeigen.

Einfacher Zugriff auf eine Liste von Notizbüchern

Im folgenden Beispiel wird eine Liste von Notizbüchern parallel ausgeführt:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Das Ausführungsergebnis aus dem Stammnotebook lautet wie folgt:

Screenshot einer Liste von Notebooks als Referenz.

Zurückgegebener Wert

Die runMultiple() Methode gibt ein Wörterbuch zurück, in dem jeder Schlüssel der Aktivitätsname ist und jeder Wert ein Wörterbuch mit den folgenden Schlüsseln ist:

  • exitVal: Die vom Aufruf des untergeordneten Notizbuchs exit() zurückgegebene Zeichenfolge oder eine leere Zeichenfolge, wenn exit() sie nicht aufgerufen wurde.
  • exception: Ein Fehlerobjekt, wenn die Aktivität fehlgeschlagen ist oder None erfolgreich war.

Ausführen von Notizbüchern mit einer DAG-Struktur

Im folgenden Beispiel werden Notebooks mithilfe von notebookutils.notebook.runMultiple() in einer DAG-Struktur ausgeführt.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Das Ausführungsergebnis aus dem Stammnotebook lautet wie folgt:

Screenshot des Verweises auf eine Liste von Notebooks mit Parametern.

DAG-Parameterreferenz

Die folgende Tabelle beschreibt jedes Feld, das Sie in der DAG-Definition verwenden können:

Feld Grad Erforderlich Beschreibung
activities Wurzel Ja Eine Liste der Aktivitätsobjekte, die die auszuführenden Notizbücher definieren.
timeoutInSeconds Wurzel No Maximale Zeitüberschreitung für die gesamte DAG. Der Standardwert ist 43200 (12 Stunden).
concurrency Wurzel No Maximale Anzahl von Notizbüchern, die gleichzeitig ausgeführt werden sollen. Der Standardwert ist 3 Mal die verfügbare CPU-Kernanzahl. Legen Sie diesen Wert explizit fest, wenn Sie eine engere Kontrolle benötigen, oder verwenden Sie 0 für unbegrenzte Parallelität.
name Aktivität Ja Ein eindeutiger Name für die Aktivität. Wird verwendet, um Ergebnisse zu identifizieren und Abhängigkeiten zu definieren.
path Aktivität Ja Der Name oder Pfad des Notizbuchelements, das ausgeführt werden soll.
timeoutPerCellInSeconds Aktivität No Für jede Zelle im untergeordneten Notizbuch ist ein maximales Timeout festgelegt. Der Standardwert ist 90 Sekunden.
args Aktivität No Ein Wörterbuch mit Parametern, die an das untergeordnete Notizbuch übergeben werden sollen.
workspace Aktivität No Der Arbeitsbereichsname oder die ID, in der sich das Notizbuch befindet. Standardmäßig wird das untergeordnete Notizbuch im selben Arbeitsbereich wie der Anrufer ausgeführt.
retry Aktivität No Anzahl der Wiederholungsversuche, wenn die Aktivität fehlschlägt. Standard ist "0".
retryIntervalInSeconds Aktivität No Wartezeit in Sekunden zwischen Wiederholungsversuchen. Standard ist "0".
dependencies Aktivität No Eine Liste der Aktivitätsnamen, die abgeschlossen werden müssen, bevor diese Aktivität gestartet wird.

Verweisen auf Ausgangswerte zwischen Aktivitäten

Sie können unter Verwendung des Ausdrucks @activity() auf den Ausgangswert einer Abhängigkeitsaktivität im args-Feld verweisen. Mit diesem Muster können Sie Daten zwischen Notizbüchern in einer DAG übergeben.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Tipp

Verwenden Sie den @activity('activity_name').exitValue() Ausdruck im args Feld, um Ergebnisse aus einer Aktivität innerhalb einer DAG an eine andere zu übergeben.

Erstellen einer dynamischen DAG

Sie können DAG-Strukturen programmgesteuert für Szenarien wie die Fan-Out-Verarbeitung auf mehreren Partitionen generieren.

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Überprüfen einer DAG

Überprüfen Sie, ob die DAG-Struktur validateDAG() vor der Ausführung gültig ist. Es erfasst Probleme wie doppelte Aktivitätsnamen, fehlende Abhängigkeiten und Zirkelbezüge.

notebookutils.notebook.validateDAG(DAG)

Zurückgegebener Wert

Die validateDAG() Methode gibt zurück True , wenn die DAG-Struktur gültig ist oder eine Ausnahme auslöst, wenn die Überprüfung fehlschlägt.

Tipp

Rufen Sie immer validateDAG() vor runMultiple() in Produktionsworkflows auf, um strukturelle Fehler frühzeitig abzufangen.

Behandeln von RunMultiple-Fehlern

Die runMultiple() Methode gibt ein Wörterbuch zurück, in dem jeder Schlüssel der Aktivitätsname ist und jeder Wert eine exitVal (Zeichenfolge) und ein exception (Fehlerobjekt oder None) enthält. Sie können Teilergebnisse auch dann überprüfen, wenn einige Aktivitäten fehlschlagen:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Überlegungen

  • Der Parallelitätsgrad der Ausführung mehrerer Notebooks ist auf die gesamte verfügbare Computeressource einer Spark-Sitzung beschränkt.
  • Die Standardanzahl an gleichzeitig geöffneten Notizbüchern beträgt das 3-fache der verfügbaren CPU-Kernanzahl. Sie können diesen Wert anpassen, aber übermäßige Parallelität kann aufgrund einer hohen Rechenressourcennutzung zu Stabilitäts- und Leistungsproblemen führen. Wenn Probleme auftreten, sollten Sie Notizbücher in mehrere runMultiple Aufrufe trennen oder die Parallelität reduzieren, indem Sie das Parallelitätsfeld im DAG-Parameter anpassen.
  • Das Standardtimeout für die gesamte DAG beträgt 12 Stunden, und das Standardtimeout für jede Zelle in einem untergeordneten Notizbuch beträgt 90 Sekunden. Sie können die Zeitüberschreitung ändern, indem Sie die Felder timeoutInSeconds und timeoutPerCellInSeconds im DAG-Parameter konfigurieren.
  • Konfigurieren Sie retry und retryIntervalInSeconds für Aktivitäten, die aufgrund vorübergehender Probleme wie Netzwerkzeitschranken oder vorübergehender Dienstunverfügbarkeit fehlschlagen können.
  • Parallele Notizbücher teilen Computeressourcen innerhalb einer einzelnen Spark-Sitzung. Überwachen Sie die Ressourcenauslastung, um Arbeitsspeicherdruck und CPU-Überlastung zu vermeiden.

Notizbuch schließen

Die exit() Methode beendet ein Notizbuch mit einem Wert. Sie können Verschachtelungsfunktionsaufrufe in einem Notebook interaktiv oder in einer Pipeline ausführen.

  • Wenn Sie eine exit() Funktion aus einem Notizbuch interaktiv aufrufen, löst das Fabric-Notizbuch eine Ausnahme aus, überspringt nachfolgende Zellen und hält die Spark-Sitzung aktiv.

  • Wenn Sie ein Notizbuch in einer Pipeline koordinieren, die eine exit() Funktion aufruft, wird die Notizbuchaktivität mit einem Ausgangswert zurückgegeben. Dadurch wird die Pipelineausführung abgeschlossen und die Spark-Sitzung beendet.

  • Wenn Sie eine exit() Funktion in einem Notizbuch aufrufen, auf das verwiesen wird, beendet Fabric Spark die weitere Ausführung des referenzierten Notizbuchs und führt die nächsten Zellen im Hauptnotizbuch aus, die die run() Funktion aufruft. Ein Beispiel: Notebook1 hat drei Zellen und ruft in der zweiten Zelle eine exit() Funktion auf. Notebook2 verfügt über fünf Zellen und ruft run(notebook1) in der dritten Zelle auf. Wenn Sie "Notebook2" ausführen, stoppt "Notebook1" bei der zweiten Zelle beim Drücken der exit() Funktion. Notebook2 führt seine vierte und fünfte Zelle weiter aus.

notebookutils.notebook.exit("value string")

Rückgabeverhalten

Die exit() Methode gibt keinen Wert zurück. Es beendet das aktuelle Notizbuch und übergibt die bereitgestellte Zeichenfolge an das aufrufende Notizbuch oder die Pipeline.

Hinweis

Die exit() Funktion überschreibt die aktuelle Zellenausgabe. Um zu vermeiden, dass die Ausgabe anderer Codeanweisungen verloren geht, rufen Sie notebookutils.notebook.exit() in einer separaten Zelle auf.

Von Bedeutung

Rufen Sie nicht notebookutils.notebook.exit() innerhalb eines try-catch-Blocks auf. Das Verlassen wird beim Einschließen in die Ausnahmebehandlung nicht wirksam. Der exit() Aufruf muss auf der obersten Ebene des Codes stehen, damit er ordnungsgemäß funktioniert.

Beispiel:

Das Sample1-Notizbuch weist die folgenden beiden Zellen auf:

  • Zelle 1 definiert einen Eingabe-Parameter, dessen Standardwert auf 10 festgelegt ist.

  • Zelle 2 beendet das Notizbuch mit Input als Austrittswert.

Screenshot eines Beispielnotebooks mit einer Exit-Funktion.

Sie können das Beispiel1 in einem anderen Notizbuch mit Standardwerten ausführen:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Ausgabe:

10

Sie können das Beispiel1 in einem anderen Notizbuch ausführen und den Eingabewert als 20 festlegen:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Ausgabe:

20