NotebookUtils not defteri çalıştırma ve düzenleme

Not defterini çalıştırmak, birden çok not defterini paralel olarak çalıştırmak veya not defterinden bir değerle çıkmak için not defteri yardımcı programlarını kullanın. Kullanılabilir yöntemlere genel bir bakış elde etmek için aşağıdaki komutu çalıştırın:

notebookutils.notebook.help()

Aşağıdaki tabloda, kullanılabilir not defteri çalıştırma ve düzenleme yöntemleri listelenir:

Yöntem Signature Açıklama
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Not defterini çalıştırır ve çıkış değerini döndürür.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Bağımlılık ilişkileri desteğiyle aynı anda birden çok not defteri çalıştırır.
validateDAG validateDAG(dag: Any): bool BIR DAG tanımının doğru yapılandırılıp yapılandırılmadığını doğrular.
exit exit(value: str): None Geçerli not defterini bir değerle kapatır.

Not defteri CRUD işlemleri (oluşturma, alma, güncelleştirme, silme, listeleme) için bkz. Not defteri yapıtlarını yönetme.

Uyarı

config içindeki runMultiple() parametresi yalnızca Python'da kullanılabilir. Scala ve R bu parametreyi desteklemez.

Uyarı

Not defteri yardımcı programları Apache Spark iş tanımları (SJD) için geçerli değildir.

Bir deftere başvur.

run() yöntemi bir not defterine başvurur ve çıkış değerini döndürür. İç içe işlev çağrılarını, etkileşimli olarak bir not defterinde veya bir işlem hattında çalıştırabilirsiniz. Atıfta bulunulan not defteri, bu işlevi çağıran not defterinin Spark havuzunda çalışır.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Örneğin:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Dönüş değeri

run() yöntemi, notebookutils.notebook.exit(value) bağlı not defterinde tam olarak iletilen diziyi döndürür. Eğer exit() alt defterde çağrılmazsa, boş bir dize ("") döndürülür.

Kumaş defterleri, çalışma alanı kimliğini belirterek çalışma alanları arasında not defterlerine erişimi de destekler.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Başvuru çalıştırmasını incelemek için hücre çıkışında anlık görüntü bağlantısını açın. Snapshot, çalıştırma sonuçlarını görselleştirir ve atıfta bulunulan not defterinde hata ayıklamaya yardımcı olur.

Referans çalıştırma sonucunun ekran görüntüsü.

Anlık görüntü örneğinin ekran görüntüsü.

Çocuk not defterlerini parametreleri alacak şekilde ayarlayın

veya run()aracılığıyla runMultiple() çağrılan bir alt not defteri oluşturduğunuzda, not defterinin üst öğeden bağımsız değişkenler alabilmesi için bir parametre hücresi ayarlayın:

  1. Varsayılan parametre değerlerine sahip bir kod hücresi oluşturun.
  2. Not defteri kullanıcı arabiriminde Hücreyi parametre olarak işaretle seçeneğini belirleyerek hücreyi parametre hücresi olarak işaretleyin.
  3. Yürütme sırasında, parametre hücre değerleri ana öğeden geçirilen bağımsız değişkenlerle değiştirilir.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Tavsiye

Çıkış değerleri her zaman string ifadesidir. Üst not defterinde sayısal bir değere ihtiyacınız varsa, alma işleminin ardından sonucu dönüştürün (örneğin, int(result)).

Değerlendirmeler

  • Çalışma alanları arasında referans alınan not defteri, 1.2 ve daha üstü çalışma zamanı sürümleriyle desteklenir.
  • Not Defteri Kaynağı içerisinde bulunan dosyaları kullanıyorsanız, etkileşimli çalıştırma ile aynı klasöre işaret ettiklerinden emin olmak için başvurulan not defterinde notebookutils.nbResPath kullanın.
  • Referans çalıştırması, alt not defterlerinin yalnızca ebeveynle aynı göl evini kullanmaları, ebeveynin göl evini devralmaları veya hiçbiri bir tanım yapmaması durumunda çalıştırılmasına olanak tanır. Eğer alt not defteri, ana not defterinden farklı bir lakehouse tanımlıyorsa yürütme engellenir. Bu denetimi atlamak için useRootDefaultLakehouse: True bağımsız değişkenlerde ayarlayın.
  • notebookutils.notebook.exit(value)'yi bir try-catch bloğu içinde çağrısı yapmayın. Çıkış çağrısı, özel durum işlemeye sarmalandığında geçerli olmaz.

Birden çok not defterini paralel olarak çalıştırma işlemi

Birden çok not defterini paralel veya önceden tanımlanmış bir topolojik yapıda çalıştırmak için kullanın notebookutils.notebook.runMultiple() . API, bir Spark oturumu içinde çok iş parçacıklı bir yöntem kullanır; bu da referans verilen not defteri çalıştırmalarının işlem kaynaklarını paylaştığı anlamına gelir.

ile notebookutils.notebook.runMultiple()şunları yapabilirsiniz:

  • Her birinin tamamlanmasını beklemeden birden çok not defterini aynı anda çalıştırın.

  • Basit bir JSON biçimi kullanarak not defterleriniz için bağımlılıkları ve yürütme sırasını belirtin.

  • Spark işlem kaynaklarının kullanımını iyileştirin ve Doku projelerinizin maliyetini azaltın.

  • Çıktıdaki her not defteri çalıştırma kaydının anlık görüntülerini görüntüleyin ve not defteri görevlerinizi rahatça izleyin/ayıklayın.

  • Her yönetici etkinliğinin çıkış değerini alın ve bunları aşağı akış görevlerinde kullanın.

Daha fazla örnek ve kullanım ayrıntılarını görüntülemek için komutunu çalıştırın notebookutils.notebook.help("runMultiple") .

Basit bir not defteri listesi çalıştırma

Aşağıdaki örnek, paralel olarak bir not defteri listesi çalıştırır:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Kök not defterinin yürütme sonucu aşağıdaki gibidir:

Bir not defteri listesinin referans ekran görüntüsü.

Dönüş değeri

runMultiple() yöntemi, her anahtarın etkinlik adı ve her değerin aşağıdaki anahtarlara sahip olduğu bir sözlük döndürür:

  • exitVal: Alt not defterinin exit() çağrısı tarafından döndürülen dize veya çağrılmadıysa exit() boş bir dize.
  • exception: Etkinliğin başarısız olması veya None başarılı olması durumunda bir hata nesnesi.

Not defterlerini DAG yapısıyla çalıştırma

Aşağıdaki örnek, notebookutils.notebook.runMultiple() kullanarak defterleri bir DAG yapısında çalıştırır.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Kök not defterinin yürütme sonucu aşağıdaki gibidir:

Parametreleri olan not defterlerinin bir listesinin ekran görüntüsü.

DAG parametre referansı

Aşağıdaki tabloda DAG tanımında kullanabileceğiniz her alan açıklanmaktadır:

Veri Alanı Seviye Zorunlu Açıklama
activities Kök Evet Çalıştırılacak not defterlerini tanımlayan etkinlik nesnelerinin listesi.
timeoutInSeconds Kök Hayır Tüm DAG için maksimum zaman aşımı. Varsayılan değer 43200'dür (12 saat).
concurrency Kök Hayır Eşzamanlı olarak çalıştırılacak en fazla not defteri sayısı. Varsayılan değer, kullanılabilir CPU çekirdek sayısının 3 katıdır. Daha sıkı denetime ihtiyacınız varsa bu değeri açıkça ayarlayın veya sınırsız eşzamanlılık için kullanın 0 .
name Activity Evet Etkinlik için benzersiz bir ad. Sonuçları tanımlamak ve bağımlılıkları tanımlamak için kullanılır.
path Activity Evet Yürütülecek not defteri öğesi adı veya yolu.
timeoutPerCellInSeconds Activity Hayır Çocuk not defterindeki her bir hücre için azami zaman aşımı. Varsayılan değer 90 saniyedir.
args Activity Hayır Çocuk deftere geçirilecek parametrelerin sözlüğü.
workspace Activity Hayır Not defterinin bulunduğu çalışma alanı adı veya kimliği. Varsayılan olarak, alt not defteri arayanla aynı çalışma alanında çalışır.
retry Activity Hayır Etkinlik başarısız olursa yeniden deneme denemelerinin sayısı. Varsayılan değer 0'dır.
retryIntervalInSeconds Activity Hayır Yeniden deneme girişimleri arasında saniye cinsinden bekleme süresi. Varsayılan değer 0'dır.
dependencies Activity Hayır Bu etkinlik başlamadan önce tamamlanması gereken etkinlik adlarının listesi.

Etkinlikler arasında çıkış değerlerine referans verme

ifadesini kullanarak args alandaki bir bağımlılık etkinliğinin @activity() çıkış değerine başvurabilirsiniz. Bu düzen, bir DAG'deki not defterleri arasında veri geçirmenizi sağlar.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Tavsiye

Bir DAG içindeki @activity('activity_name').exitValue() alanında ifadeyi, bir etkinlikten diğerine sonuçları geçirmek için kullanın.

Dinamik DAG oluşturma

Birden çok bölüm arasında genişletilmiş işleme gibi senaryolar için programatik olarak DAG yapıları oluşturabilirsiniz.

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

DAG'yi doğrula

Yürütmeden önce DAG yapınızın geçerli olduğunu doğrulamak için kullanın validateDAG() . Yinelenen etkinlik adları, eksik bağımlılıklar ve döngüsel başvurular gibi sorunları tespit eder.

notebookutils.notebook.validateDAG(DAG)

Dönüş değeri

validateDAG() yöntemi, DAG yapısı geçerliyse True, doğrulama başarısız olursa bir özel durum oluşturur.

Tavsiye

Üretim iş akışlarında yapısal hataları erken yakalamak için her zaman validateDAG()'den önce runMultiple() çağrısı yapın.

RunMultiple hatalarını işleme

runMultiple() yöntemi, her anahtarın etkinlik adı olduğu ve her değerin bir exitVal (dize) ve bir exception (hata nesnesi veya None) içerdiği bir sözlük döndürür. Bazı etkinlikler başarısız olsa bile kısmi sonuçları inceleyebilirsiniz:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Değerlendirmeler

  • Birden çok not defteri çalıştırmasının paralellik derecesi, Spark oturumunun toplam kullanılabilir işlem kaynağıyla sınırlıdır.
  • Varsayılan eşzamanlı not defteri sayısı , kullanılabilir CPU çekirdek sayısının 3 katıdır. Bu değeri özelleştirebilirsiniz, ancak aşırı paralellik yüksek işlem kaynağı kullanımı nedeniyle kararlılık ve performans sorunlarına yol açabilir. Sorun oluşursa, DAG parametresindeki eşzamanlılık alanını ayarlayarak not defterlerini birden çok runMultiple çağrıya ayırmayı veya eşzamanlılığı azaltmayı göz önünde bulundurun.
  • DAG'nin tamamı için varsayılan zaman aşımı 12 saattir ve alt not defterindeki her hücre için varsayılan zaman aşımı 90 saniyedir. DAG parametresindeki timeoutInSeconds ve timeoutPerCellInSeconds alanlarını ayarlayarak zaman aşımını değiştirebilirsiniz.
  • retry ve retryIntervalInSeconds, ağ zaman aşımı veya geçici hizmet kullanılamaması gibi geçici sorunlar nedeniyle başarısız olabilecek faaliyetler için yapılandırın.
  • Paralel not defterleri işlem kaynaklarını tek bir Spark oturumunda paylaşır. Bellek baskısı ve CPU çekişmesi önlemek için kaynak kullanımını izleyin.

Not defterinden çıkış yap

exit() yöntemi, bir not defterinden bir değerle çıkar. İç içe işlev çağrılarını, etkileşimli olarak bir not defterinde veya bir işlem hattında çalıştırabilirsiniz.

  • Bir not defterinden etkileşimli olarak exit() işlevini çağırdığınızda, Fabric not defteri bir hata üretir, sonraki hücreleri çalıştırmayı atlar ve Spark oturumunu canlı tutar.

  • İşlev çağıran exit() bir işlem hattında bir not defterini düzenlerseniz, not defteri etkinliği bir çıkış değeriyle birlikte döner. Bu işlem hattı çalıştırmasını tamamlar ve Spark oturumunu durdurur.

  • Başvurulan bir not defterindeki bir exit() işlevini çağırdığınızda, Doku Spark, başvurulan not defterinin daha fazla çalışmasını durdurur ve ana not defterinde bu işlevi çağıran sonraki hücreleri çalıştırmaya devam eder. Örneğin: Notebook1 üç hücreye sahiptir ve ikinci hücredeki bir exit() işlevi çağırır. Notebook2 beş hücreye sahiptir ve üçüncü hücrede run(notebook1) çağrısı yapar. Notebook2'yi çalıştırdığınızda, Notebook1 exit() işlevine ulaşıldığında ikinci hücrede durur. Not Defteri2, dördüncü hücresini ve beşinci hücresini çalıştırmaya devam eder.

notebookutils.notebook.exit("value string")

Dönüş davranışı

exit() yöntemi bir değer döndürmez. Geçerli not defterini sonlandırır ve sağlanan dizeyi çağıran not defterine veya işlem hattına geçirir.

Uyarı

exit() işlevi mevcut hücre çıkışını değiştirir. Diğer kod deyimlerinin çıkışını kaybetmemek için ayrı bir hücrede notebookutils.notebook.exit() çağırın.

Önemli

notebookutils.notebook.exit()'yi bir try-catch bloğu içinde çağrısı yapmayın. Çıkış, özel durum işlemeye sarmalandığında geçerli olmaz. Doğru exit() çalışması için çağrı kodunuzun en üst düzeyinde olmalıdır.

Örneğin:

Sample1 not defteri aşağıdaki iki hücreye sahiptir:

  • Hücre 1, varsayılan değeri 10 olarak ayarlanmış bir giriş parametresi tanımlar.

  • 2. hücre, çıkış değeri olarak input ile not defterinden çıkar.

Çıkış işlevinin örnek not defterini gösteren ekran görüntüsü.

Sample1'i varsayılan değerlerle başka bir not defterinde çalıştırabilirsiniz:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Çıktı:

10

Sample1'i başka bir not defterinde çalıştırabilir ve giriş değerini 20 olarak ayarlayabilirsiniz:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Çıktı:

20