Del via


NotebookUtils notebook run og orkestrering

Brug notebook-værktøjerne til at køre en notesbog, kør flere notesbøger parallelt, eller luk en notesbog med en værdi. Kør følgende kommando for at få en oversigt over de tilgængelige metoder:

notebookutils.notebook.help()

Følgende tabel viser de tilgængelige notebook-kørsels- og orkestreringsmetoder:

Metode Signatur Beskrivelse
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Kører en notebook og returnerer dens exit-værdi.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Kører flere notesbøger samtidig med understøttelse af afhængighedsforhold.
validateDAG validateDAG(dag: Any): bool Validerer, om en DAG-definition er korrekt struktureret.
exit exit(value: str): None Afslutter den nuværende notesbog med en værdi.

For notebook CRUD-operationer (oprette, hent, opdatere, slette, liste), se Administrer notebook-artefakter.

Bemærkning

Parameteren config i runMultiple() er kun tilgængelig i Python. Scala og R understøtter ikke denne parameter.

Bemærkning

Hjælpeprogrammer til notesbøger er ikke relevante for Apache Spark-jobdefinitioner (SJD).

Reference til en notesbog

Metoden run() refererer til en notesbog og returnerer dens exit-værdi. Du kan køre indlejrede funktionskald i en notesbog interaktivt eller i en pipeline. Den notesbog, der refereres til, kører på Spark-gruppen for den notesbog, der kalder denne funktion.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Eksempel:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Returværdi

Metoden run() returnerer den præcise streng, der sendes til notebookutils.notebook.exit(value) i børnenotesbogen. Hvis ikke exit() kaldes i underordnet notesbog, returneres en tom streng ("") igen.

Fabric-notebooks understøtter også reference til notebooks på tværs af arbejdsområder ved at angive workspace-ID'et.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Åbn snapshot-linket i celleudgangen for at inspicere referencekørslen. Snapshot'en fanger resultaterne og hjælper dig med at fejlfinde den refererede notesbog.

Skærmbillede af kørselsresultatet for referencen.

Skærmbillede af et snapshoteksempel.

Opsæt børnenotesbøger til at modtage parametre

Når du opretter en underordnet notesbog, der kaldes via run() eller runMultiple(), opsæt en parametercelle, så notesbogen kan modtage argumenter fra forældrebogen:

  1. Opret en kodecelle med standardparametreværdier.
  2. Markér cellen som en parametercelle ved at vælge Mark celle som parametre i notesbogens brugerflade.
  3. Under udførelsen erstattes parametercelleværdierne med de argumenter, der sendes fra forælderen.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Tips

Exit-værdier er altid strenge. Hvis du har brug for en numerisk værdi i forældrenotesbogen, konverter resultatet efter hentning (for eksempel int(result)).

Overvejelser

  • Referencenotesbogen på tværs af arbejdsområder understøttes af runtime version 1.2 og nyere.
  • Hvis du bruger filerne under Notebook-ressource, skal du bruge notebookutils.nbResPath den notesbog, der refereres til, i den notesbog, der refereres til, for at sikre, at den peger på den samme mappe som den interaktive kørsel.
  • Referencekørsel gør det kun muligt for underordnede notesbøger at køre, hvis de bruger det samme søhus som det overordnede, arver det overordnede søhus, eller ingen af dem definerer et. Udførelsen blokeres, hvis barnet angiver et andet lakehouse end forældrenotesbogen. For at omgå denne kontrol, sæt useRootDefaultLakehouse: True argumenterne ind.
  • Ring notebookutils.notebook.exit(value) ikke inde i en try-catch blok. Exit-kaldet træder ikke i kraft, når det er pakket ind i undtagelseshåndtering.

Referencen kører flere notesbøger parallelt

Brug notebookutils.notebook.runMultiple() den til at køre flere notebooks parallelt eller i en foruddefineret topologisk struktur. API'et bruger en multitrådet implementering inden for en Spark-session, hvilket betyder, at refererede notebook-kørsler deler beregningsressourcer.

Med notebookutils.notebook.runMultiple()kan du:

  • Udfør flere notesbøger samtidigt uden at vente på, at hver enkelt afsluttes.

  • Angiv afhængighederne og rækkefølgen af udførelsen af dine notesbøger ved hjælp af et simpelt JSON-format.

  • Optimer brugen af Spark-beregningsressourcer, og reducer omkostningerne ved dine Fabric-projekter.

  • Få vist snapshots af hver notesbogs kørselspost i outputtet, og foretag en nem fejlfinding/overvågning af dine opgaver i notesbogen.

  • Hent afslutningsværdien for hver lederaktivitet, og brug dem i downstream-opgaver.

Kør notebookutils.notebook.help("runMultiple") for at se flere eksempler og brugsdetaljer.

Kør en simpel liste over notesbøger

Følgende eksempel kører en liste over notesbøger parallelt:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Udførelsesresultatet fra rodnotesbogen er som følger:

Skærmbillede af reference til en liste over notesbøger.

Returværdi

Metoden runMultiple() returnerer en ordbog, hvor hver nøgle er aktivitetsnavnet, og hver værdi er en ordbog med følgende nøgler:

  • exitVal: Strengen, der returneres af børnenotesbogens exit() kald, eller en tom streng, hvis exit() den ikke blev kaldt.
  • exception: Et fejlobjekt, hvis aktiviteten fejlede, eller None hvis den lykkedes.

Kør notebooks med en DAG-struktur

Følgende eksempel kører notesbøger i en DAG-struktur ved at bruge notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Udførelsesresultatet fra rodnotesbogen er som følger:

Skærmbillede af reference til en liste over notesbøger med parametre.

DAG-parameterreference

Følgende tabel beskriver hvert felt, du kan bruge i DAG-definitionen:

Felt Niveau Påkrævet Beskrivelse
activities Root Ja En liste over aktivitetsobjekter, der definerer de notesbøger, der skal køres.
timeoutInSeconds Root Nej Maksimal timeout for hele DAG. Standard er 43200 (12 timer).
concurrency Root Nej Maksimalt antal notebooks til at køre samtidig. Standard er 3 gange antallet af tilgængelige CPU-kerner. Indstil denne værdi eksplicit, hvis du har brug for strammere kontrol, eller brug 0 den til ubegrænset samtidighed.
name Aktivitet Ja Et unikt navn for aktiviteten. Bruges til at identificere resultater og definere afhængigheder.
path Aktivitet Ja Notebookens itemnavn eller sti, der skal eksekveres.
timeoutPerCellInSeconds Aktivitet Nej Maksimal timeout for hver celle i børnenotesbogen. Standard er 90 sekunder.
args Aktivitet Nej En ordbog over parametre, der skal sendes til børnenotesbogen.
workspace Aktivitet Nej Arbejdsområdets navn eller ID, hvor notesbogen ligger. Som standard kører underordnet notesbog i samme arbejdsområde som opkalderen.
retry Aktivitet Nej Antal forsøg på at prøve igen, hvis aktiviteten fejler. Standard er 0.
retryIntervalInSeconds Aktivitet Nej Ventetid i sekunder mellem forsøg på genoptagelse. Standard er 0.
dependencies Aktivitet Nej En liste over aktivitetsnavne, der skal gennemføres, før denne aktivitet starter.

Reference-udgangsværdier mellem aktiviteter

Du kan referere til udgangsværdien for en afhængighedsaktivitet i args feltet ved at bruge udtrykket @activity() . Dette mønster lader dig sende data mellem notesbøger i en DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Tips

Brug udtrykket @activity('activity_name').exitValue() i args feltet til at sende resultater fra én aktivitet til en anden inden for en DAG.

Byg en dynamisk DAG

Du kan generere DAG-strukturer programmatisk til scenarier som fan-out behandling på tværs af flere partitioner:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Valider en DAG

Brug validateDAG() den til at verificere, at din DAG-struktur er gyldig før udførelse. Den fanger problemer som dublerede aktivitetsnavne, manglende afhængigheder og cirkulære referencer.

notebookutils.notebook.validateDAG(DAG)

Returværdi

Metoden validateDAG() returnerer True , hvis DAG-strukturen er gyldig, eller udløser en undtagelse, hvis valideringen fejler.

Tips

Ring validateDAG() altid før runMultiple() i produktionsarbejdsgange for at opdage strukturelle fejl tidligt.

Håndter kørFlere fejl

Metoden runMultiple() returnerer en ordbog, hvor hver nøgle er aktivitetsnavnet, og hver værdi indeholder en exitVal (streng) og en exception (fejlobjekt eller None). Du kan inspicere delvise resultater, selv når nogle aktiviteter fejler:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Overvejelser

  • Parallelitetsgraden for kørsel af flere notesbøger er begrænset til den samlede tilgængelige beregningsressource for en Spark-session.
  • Standardantallet af samtidige notebooks er 3 gange antallet af tilgængelige CPU-kerner. Du kan tilpasse denne værdi, men overdreven parallelisme kan føre til stabilitets- og ydelsesproblemer på grund af højt forbrug af beregningsressourcer. Hvis der opstår problemer, kan du overveje at opdele notesbøger i flere runMultiple kald eller reducere samtidigheden ved at justere samtidighedsfeltet i parameteren DAG.
  • Standard timeout for hele DAG er 12 timer, og standard timeout for hver celle i en child notebook er 90 sekunder. Du kan ændre timeout ved at angive felterne timeoutInSeconds og timeoutPerCellInSeconds i DAG-parameteren.
  • Konfigurér retry og retryIntervalInSeconds for aktiviteter, der kan fejle på grund af midlertidige problemer som netværkstimeouts eller midlertidig utilgængelighed af tjenester.
  • Parallelle notebooks deler beregningsressourcer inden for en enkelt Spark-session. Overvåg ressourceudnyttelsen for at undgå hukommelsestryk og CPU-konkurrence.

Afslut en notesbog

Metoden exit() forlader en notesbog med en værdi. Du kan køre indlejrede funktionskald i en notesbog interaktivt eller i en pipeline.

  • Når du kalder en exit() funktion fra en notesbog interaktivt, kaster Fabric-notebooken en undtagelse, springer efterfølgende celler over og holder Spark-sessionen i live.

  • Når du orkestrerar en notesbog i en pipeline, der kalder en exit() funktion, returnerer notesnotesbog aktiviteten med en exit-værdi. Dette fuldfører pipeline-kørslen og stopper Spark-sessionen.

  • Når du kalder en exit() funktion i en notesbog, der refereres, stopper Fabric Spark den videre udførelse af den refererede notesbog og fortsætter med at køre de næste celler i hovednotesbogen, der kalder run() funktionen. For eksempel: Notebook1 har tre celler og kalder en exit() funktion i den anden celle. Notebook2 har fem celler og ringer run(notebook1) i den tredje celle. Når du kører Notebook2, stopper Notebook1 ved den anden celle, når funktionen aktiveres exit() . Notebook2 fortsætter med at køre den fjerde celle og femte celle.

notebookutils.notebook.exit("value string")

Returadfærd

Metoden exit() returnerer ikke en værdi. Den afslutter den aktuelle notebook og sender den medfølgende streng videre til den kaldende notebook eller pipeline.

Bemærkning

Funktionen exit() overskriver den aktuelle celleoutput. Hvis du vil undgå at miste outputtet fra andre kodesætninger, skal du kalde notebookutils.notebook.exit() i en separat celle.

Vigtigt!

Ring notebookutils.notebook.exit() ikke inde i en try-catch blok. Exit'en træder ikke i kraft, når den er pakket ind i undtagelseshåndtering. Opkaldet exit() skal være på det øverste niveau af din kode for at fungere korrekt.

Eksempel:

Sample1-notesbogen har følgende to celler:

  • Celle 1 definerer en inputparameter med standardværdien indstillet til 10.

  • Celle 2 afslutter notesbogen med input som udgangsværdi.

Skærmbillede, der viser en eksempelnotesbog med afslutningsfunktionen.

Du kan køre Sample1 i en anden notesbog med standardværdier:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

10

Du kan køre Sample1 i en anden notesbog og angive inputværdien som 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

20