Εκτέλεση και ενορχήστρωση σημειωματαρίου NotebookUtils

Χρησιμοποιήστε τα βοηθητικά προγράμματα σημειωματαρίου για να εκτελέσετε ένα σημειωματάριο, να εκτελέσετε πολλά σημειωματάρια παράλληλα ή να κλείσετε ένα σημειωματάριο με μια τιμή. Εκτελέστε την ακόλουθη εντολή για να δείτε μια επισκόπηση των διαθέσιμων μεθόδων:

notebookutils.notebook.help()

Ο παρακάτω πίνακας παραθέτει τις διαθέσιμες μεθόδους εκτέλεσης και ενορχήστρωσης σημειωματαρίου:

Μέθοδος Υπογραφή Περιγραφή
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Εκτελεί ένα σημειωματάριο και επιστρέφει την τιμή εξόδου του.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Εκτελεί πολλά σημειωματάρια ταυτόχρονα με υποστήριξη για σχέσεις εξάρτησης.
validateDAG validateDAG(dag: Any): bool Επικυρώνει εάν ένας ορισμός DAG είναι σωστά δομημένος.
exit exit(value: str): None Εξέρχεται από τον τρέχοντα φορητό υπολογιστή με μια τιμή.

Για λειτουργίες CRUD σημειωματαρίου (δημιουργία, λήψη, ενημέρωση, διαγραφή, λίστα), ανατρέξτε στο θέμα Διαχείριση αντικειμένων σημειωματαρίου.

Σημείωμα

Η config παράμετρος in runMultiple() είναι διαθέσιμη μόνο στην Python. Η Scala και η R δεν υποστηρίζουν αυτήν την παράμετρο.

Σημείωμα

Τα βοηθητικά προγράμματα σημειωματάριου δεν ισχύουν για τους ορισμούς εργασίας Apache Spark (SJD).

Αναφορά σημειωματάριου

Η run() μέθοδος αναφέρεται σε ένα σημειωματάριο και επιστρέφει την τιμή εξόδου του. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση. Το σημειωματάριο στο οποίο γίνεται αναφορά εκτελείται στον χώρο συγκέντρωσης Spark του σημειωματάριου που καλεί αυτήν τη συνάρτηση.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Για παράδειγμα:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Επιστρεφόμενη αξία

Η run() μέθοδος επιστρέφει την ακριβή συμβολοσειρά που notebookutils.notebook.exit(value) μεταβιβάζεται στο θυγατρικό σημειωματάριο. Εάν exit() δεν καλείται στο θυγατρικό σημειωματάριο, επιστρέφεται μια κενή συμβολοσειρά ("").

Τα σημειωματάρια Fabric υποστηρίζουν επίσης την αναφορά σημειωματαρίων σε όλους τους χώρους εργασίας, καθορίζοντας το αναγνωριστικό χώρου εργασίας.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Ανοίξτε τη σύνδεση στιγμιότυπου στην έξοδο κελιού για να επιθεωρήσετε την εκτέλεση αναφοράς. Το στιγμιότυπο καταγράφει τα αποτελέσματα εκτέλεσης και σας βοηθά να εντοπίσετε σφάλματα στο σημειωματάριο αναφοράς.

Στιγμιότυπο οθόνης του αποτελέσματος εκτέλεσης αναφοράς.

Στιγμιότυπο οθόνης ενός παραδείγματος στιγμιότυπου αναφοράς.

Ρύθμιση θυγατρικών σημειωματαρίων για λήψη παραμέτρων

Όταν δημιουργείτε ένα θυγατρικό σημειωματάριο που καλείται μέσω run() ή runMultiple(), ρυθμίστε ένα κελί παραμέτρων, έτσι ώστε το σημειωματάριο να μπορεί να λαμβάνει ορίσματα από το γονικό σημειωματάριο:

  1. Δημιουργήστε ένα κελί κώδικα με προεπιλεγμένες τιμές παραμέτρων.
  2. Επισημάνετε το κελί ως κελί παραμέτρων, επιλέγοντας Σήμανση κελιού ως παραμέτρων στο περιβάλλον εργασίας χρήστη του σημειωματαρίου.
  3. Κατά την εκτέλεση, οι τιμές των κελιών παραμέτρων αντικαθίστανται με τα ορίσματα που μεταβιβάζονται από τον γονέα.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Συμβουλή

Οι τιμές εξόδου είναι πάντα συμβολοσειρές. Εάν χρειάζεστε μια αριθμητική τιμή στο γονικό σημειωματάριο, μετατρέψτε το αποτέλεσμα μετά την ανάκτηση (για παράδειγμα, int(result)).

Εκτιμήσεις

  • Το σημειωματάριο αναφοράς μεταξύ χώρων εργασίας υποστηρίζεται από την έκδοση 1.2 του περιβάλλοντος εκτέλεσης και νεότερες εκδόσεις.
  • Εάν χρησιμοποιείτε τα αρχεία στην περιοχή Πόρος σημειωματαρίου, χρησιμοποιήστε notebookutils.nbResPath το στο αναφερόμενο σημειωματάριο για να βεβαιωθείτε ότι οδηγεί στον ίδιο φάκελο με την αλληλεπιδραστική εκτέλεση.
  • Η εκτέλεση αναφοράς επιτρέπει στα θυγατρικά σημειωματάρια να εκτελούνται μόνο εάν χρησιμοποιούν το ίδιο λιμναίο σπίτι με το γονικό, κληρονομούν το λιμναίο σπίτι του γονέα ή δεν ορίζουν κανένα. Η εκτέλεση αποκλείεται εάν το θυγατρικό στοιχείο καθορίσει ένα διαφορετικό lakehouse από το γονικό σημειωματάριο. Για να παρακάμψετε αυτόν τον έλεγχο, ορίστε useRootDefaultLakehouse: True τα ορίσματα.
  • Μην καλείτε notebookutils.notebook.exit(value) μέσα σε ένα try-catch μπλοκ. Η κλήση εξόδου δεν θα τεθεί σε ισχύ όταν είναι τυλιγμένη σε χειρισμό εξαιρέσεων.

Αναφορά εκτέλεσης πολλών σημειωματάριων παράλληλα

Χρησιμοποιείται notebookutils.notebook.runMultiple() για την εκτέλεση πολλών σημειωματάριων παράλληλα ή σε μια προκαθορισμένη τοπολογική δομή. Το API χρησιμοποιεί μια υλοποίηση πολλαπλών νημάτων σε μια περίοδο λειτουργίας Spark, πράγμα που σημαίνει ότι οι αναφερόμενες εκτελέσεις σημειωματαρίων μοιράζονται υπολογιστικούς πόρους.

Με notebookutils.notebook.runMultiple()το , μπορείτε να κάνετε τα εξής:

  • Εκτελέστε πολλά σημειωματάρια ταυτόχρονα, χωρίς να πρέπει να ολοκληρωθεί το καθένα.

  • Καθορίστε τις εξαρτήσεις και τη σειρά εκτέλεσης για τα σημειωματάριά σας, χρησιμοποιώντας μια απλή μορφή JSON.

  • Βελτιστοποιήστε τη χρήση των υπολογιστικών πόρων Spark και μειώστε το κόστος των έργων σας Fabric.

  • Προβάλετε τα Στιγμιότυπα κάθε εγγραφής εκτέλεσης σημειωματάριου στην έξοδο και εντοπίστε εύκολα τις εργασίες του σημειωματάριου/παρακολουθήστε τις εργασίες σας.

  • Λάβετε την αξία εξόδου κάθε εκτελεστικής δραστηριότητας και χρησιμοποιήστε τις σε εργασίες κατάντη.

Εκτελέστε notebookutils.notebook.help("runMultiple") για να δείτε περισσότερα παραδείγματα και λεπτομέρειες χρήσης.

Εκτελέστε μια απλή λίστα σημειωματαρίων

Το παρακάτω παράδειγμα εκτελεί παράλληλα μια λίστα σημειωματαρίων:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:

Στιγμιότυπο οθόνης αναφοράς μιας λίστας σημειωματάριων.

Επιστρεφόμενη αξία

Η runMultiple() μέθοδος επιστρέφει ένα λεξικό όπου κάθε κλειδί είναι το όνομα της δραστηριότητας και κάθε τιμή είναι ένα λεξικό με τα ακόλουθα κλειδιά:

  • exitVal: Η συμβολοσειρά που επιστρέφεται από την κλήση του θυγατρικού exit() σημειωματαρίου ή μια κενή συμβολοσειρά, εάν exit() δεν κλήθηκε.
  • exception: Ένα αντικείμενο σφάλματος εάν η δραστηριότητα απέτυχε ή None εάν ήταν επιτυχής.

Εκτελέστε σημειωματάρια με δομή DAG

Το παρακάτω παράδειγμα εκτελεί σημειωματάρια σε μια δομή DAG χρησιμοποιώντας notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Το αποτέλεσμα εκτέλεσης από το ριζικό σημειωματάριο είναι το εξής:

Στιγμιότυπο οθόνης αναφοράς μιας λίστας σημειωματάριων με παραμέτρους.

Αναφορά παραμέτρου DAG

Ο παρακάτω πίνακας περιγράφει κάθε πεδίο που μπορείτε να χρησιμοποιήσετε στον ορισμό της ΕΣΟ:

Πεδίο Επίπεδο Υποχρεωτικό Περιγραφή
activities Ρίζα Ναι Μια λίστα αντικειμένων δραστηριότητας που ορίζουν τα σημειωματάρια που θα εκτελεστούν.
timeoutInSeconds Ρίζα Όχι Μέγιστο χρονικό όριο για ολόκληρο το DAG. Η προεπιλογή είναι 43200 (12 ώρες).
concurrency Ρίζα Όχι Μέγιστος αριθμός φορητών υπολογιστών για ταυτόχρονη εκτέλεση. Η προεπιλογή είναι 3 φορές ο διαθέσιμος αριθμός πυρήνων CPU. Ορίστε αυτήν την τιμή ρητά εάν χρειάζεστε αυστηρότερο έλεγχο ή χρησιμοποιήστε την 0 για απεριόριστο συγχρονισμό.
name Δραστηριότητα Ναι Ένα μοναδικό όνομα για τη δραστηριότητα. Χρησιμοποιείται για τον εντοπισμό αποτελεσμάτων και τον καθορισμό εξαρτήσεων.
path Δραστηριότητα Ναι Το όνομα του στοιχείου σημειωματάριου ή η διαδρομή προς εκτέλεση.
timeoutPerCellInSeconds Δραστηριότητα Όχι Μέγιστο χρονικό όριο για κάθε κελί στο θυγατρικό σημειωματάριο. Η προεπιλογή είναι 90 δευτερόλεπτα.
args Δραστηριότητα Όχι Ένα λεξικό παραμέτρων για μεταβίβαση στο θυγατρικό σημειωματάριο.
workspace Δραστηριότητα Όχι Το όνομα ή το αναγνωριστικό του χώρου εργασίας όπου βρίσκεται το σημειωματάριο. Από προεπιλογή, το θυγατρικό σημειωματάριο εκτελείται στον ίδιο χώρο εργασίας με τον καλούντα.
retry Δραστηριότητα Όχι Αριθμός προσπαθειών επανάληψης εάν η δραστηριότητα αποτύχει. Η προεπιλογή είναι 0.
retryIntervalInSeconds Δραστηριότητα Όχι Περιμένετε χρόνο σε δευτερόλεπτα μεταξύ των προσπαθειών επανάληψης. Η προεπιλογή είναι 0.
dependencies Δραστηριότητα Όχι Μια λίστα με ονόματα δραστηριοτήτων που πρέπει να ολοκληρωθούν πριν από την έναρξη αυτής της δραστηριότητας.

Τιμές εξόδου αναφοράς μεταξύ δραστηριοτήτων

Μπορείτε να αναφέρετε την τιμή εξόδου μιας δραστηριότητας εξάρτησης στο args πεδίο χρησιμοποιώντας την @activity() παράσταση. Αυτό το μοτίβο σάς επιτρέπει να μεταβιβάζετε δεδομένα μεταξύ σημειωματαρίων σε ένα DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Συμβουλή

Χρησιμοποιήστε την @activity('activity_name').exitValue() έκφραση στο args πεδίο για να μεταβιβάσετε αποτελέσματα από μια δραστηριότητα σε μια άλλη μέσα σε μια ΕΣΟ.

Δημιουργήστε ένα δυναμικό DAG

Μπορείτε να δημιουργήσετε δομές DAG μέσω προγραμματισμού για σενάρια όπως η επεξεργασία με ανεμιστήρα σε πολλαπλές κατατμήσεις:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Επικύρωση ΕΣΟ

Χρησιμοποιήστε το validateDAG() για να επαληθεύσετε ότι η δομή DAG σας είναι έγκυρη πριν από την εκτέλεση. Εντοπίζει ζητήματα όπως διπλότυπα ονόματα δραστηριοτήτων, εξαρτήσεις που λείπουν και κυκλικές αναφορές.

notebookutils.notebook.validateDAG(DAG)

Επιστρεφόμενη αξία

Η validateDAG() μέθοδος επιστρέφει True εάν η δομή DAG είναι έγκυρη ή δημιουργεί μια εξαίρεση εάν αποτύχει η επικύρωση.

Συμβουλή

Πάντα να καλείτε validateDAG() πριν στις runMultiple() ροές εργασιών παραγωγής για να εντοπίσετε έγκαιρα τα δομικά σφάλματα.

Χειρισμός εκτέλεσηςΠολλαπλές αποτυχίες

Η runMultiple() μέθοδος επιστρέφει ένα λεξικό όπου κάθε κλειδί είναι το όνομα της δραστηριότητας και κάθε τιμή περιέχει ένα exitVal (string) και ένα exception (error object or None). Μπορείτε να επιθεωρήσετε μερικά αποτελέσματα ακόμα και όταν ορισμένες δραστηριότητες αποτυγχάνουν:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Εκτιμήσεις

  • Ο βαθμός παραλληλισμού της εκτέλεσης πολλαπλού σημειωματάριου περιορίζεται στον συνολικό διαθέσιμο υπολογιστικό πόρο μιας περιόδου λειτουργίας Spark.
  • Ο προεπιλεγμένος αριθμός ταυτόχρονων φορητών υπολογιστών είναι 3 φορές μεγαλύτερος από τον διαθέσιμο αριθμό πυρήνων CPU. Μπορείτε να προσαρμόσετε αυτήν την τιμή, αλλά ο υπερβολικός παραλληλισμός μπορεί να οδηγήσει σε προβλήματα σταθερότητας και απόδοσης λόγω της υψηλής χρήσης υπολογιστικών πόρων. Εάν προκύψουν προβλήματα, εξετάστε το ενδεχόμενο διαχωρισμού σημειωματαρίων σε πολλές runMultiple κλήσεις ή μείωσης του ταυτοχρονισμού προσαρμόζοντας το πεδίο ταυτοχρονισμού στην παράμετρο DAG.
  • Το προεπιλεγμένο χρονικό όριο για ολόκληρο το DAG είναι 12 ώρες και το προεπιλεγμένο χρονικό όριο για κάθε κελί σε ένα θυγατρικό σημειωματάριο είναι 90 δευτερόλεπτα. Μπορείτε να αλλάξετε το χρονικό όριο ρυθμίζοντας τα πεδία timeoutInSeconds και timeoutPerCellInSeconds στην παράμετρο DAG.
  • Ρυθμίστε τις παραμέτρους retry και retryIntervalInSeconds για δραστηριότητες που ενδέχεται να αποτύχουν λόγω προσωρινών ζητημάτων, όπως χρονικά όρια δικτύου ή προσωρινή μη διαθεσιμότητα υπηρεσίας.
  • Τα παράλληλα σημειωματάρια μοιράζονται υπολογιστικούς πόρους σε μία μόνο περίοδο λειτουργίας Spark. Παρακολουθήστε τη χρήση των πόρων για να αποφύγετε την πίεση της μνήμης και τη διαμάχη της CPU.

Έξοδος από ένα σημειωματάριο

Η exit() μέθοδος εξέρχεται από ένα σημειωματάριο με μια τιμή. Μπορείτε να εκτελέσετε κλήσεις συνάρτησης ένθεσης σε ένα σημειωματάριο με αλληλεπιδραστικό τρόπο ή σε μια διοχέτευση.

  • Όταν καλείτε μια exit() συνάρτηση από ένα σημειωματάριο αλληλεπιδραστικά, το σημειωματάριο Fabric δημιουργεί μια εξαίρεση, παραλείπει την εκτέλεση των επόμενων κελιών και διατηρεί ενεργή την περίοδο λειτουργίας Spark.

  • Όταν ενορχηστρώνετε ένα σημειωματάριο σε μια διοχέτευση που καλεί μια exit() συνάρτηση, η δραστηριότητα σημειωματαρίου επιστρέφει με μια τιμή εξόδου. Αυτό ολοκληρώνει την εκτέλεση της διοχέτευσης και διακόπτει την περίοδο λειτουργίας Spark.

  • Όταν καλείτε μια exit() συνάρτηση σε ένα σημειωματάριο στο οποίο γίνεται αναφορά, το Fabric Spark διακόπτει την περαιτέρω εκτέλεση του σημειωματαρίου αναφοράς και συνεχίζει να εκτελεί τα επόμενα κελιά στο κύριο σημειωματάριο που καλεί τη run() συνάρτηση. Για παράδειγμα: Το Notebook1 έχει τρία κελιά και καλεί μια exit() συνάρτηση στο δεύτερο κελί. Το Notebook2 έχει πέντε κελιά και καλεί run(notebook1) στο τρίτο κελί. Όταν εκτελείτε το Notebook2, το Notebook1 σταματά στο δεύτερο κελί όταν πατάτε τη exit() συνάρτηση. Το Notebook2 συνεχίζει να εκτελεί το τέταρτο κελί και το πέμπτο κελί του.

notebookutils.notebook.exit("value string")

Συμπεριφορά επιστροφής

Η exit() μέθοδος δεν επιστρέφει τιμή. Τερματίζει το τρέχον σημειωματάριο και μεταβιβάζει την παρεχόμενη συμβολοσειρά στο σημειωματάριο ή τη διοχέτευση κλήσης.

Σημείωμα

Η exit() συνάρτηση αντικαθιστά την τρέχουσα έξοδο κελιού. Για να αποφύγετε την απώλεια της εξόδου άλλων δηλώσεων κώδικα, καλέστε notebookutils.notebook.exit() σε ξεχωριστό κελί.

Σημαντικό

Μην καλείτε notebookutils.notebook.exit() μέσα σε ένα try-catch μπλοκ. Η έξοδος δεν θα τεθεί σε ισχύ όταν είναι τυλιγμένη σε χειρισμό εξαιρέσεων. Η exit() κλήση πρέπει να βρίσκεται στο ανώτερο επίπεδο του κώδικά σας για να λειτουργήσει σωστά.

Για παράδειγμα:

Το σημειωματάριο Sample1 έχει τα ακόλουθα δύο κελιά:

  • Το κελί 1 ορίζει μια παράμετρο εισόδου με προεπιλεγμένη τιμή 10.

  • Το κελί 2 εξέρχεται από το σημειωματάριο με είσοδο ως τιμή εξόδου.

Στιγμιότυπο οθόνης που εμφανίζει ένα δείγμα σημειωματάριου της συνάρτησης εξόδου.

Μπορείτε να εκτελέσετε το Δείγμα1 σε άλλο σημειωματάριο με προεπιλεγμένες τιμές:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

10

Μπορείτε να εκτελέσετε το Δείγμα1 σε άλλο σημειωματάριο και να ορίσετε την τιμή εισόδου ως 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

20