Een Databricks-notebook uitvoeren vanuit een ander notebook

Belangrijk

Gebruik Databricks-taken voor indeling van notitieblokken. Gebruik werkruimtebestanden voor code modularisatiescenario's. Gebruik alleen de technieken die in dit artikel worden beschreven wanneer uw use-case niet kan worden geïmplementeerd met behulp van een Databricks-taak, zoals voor het herhalen van notebooks via een dynamische set parameters of als u geen toegang hebt tot werkruimtebestanden. Zie Databricks-taken en code delen voor meer informatie.

Vergelijking van %run en dbutils.notebook.run()

Met de %run opdracht kunt u een ander notitieblok opnemen in een notitieblok. U kunt bijvoorbeeld uw %run code modulariseren door ondersteunende functies in een afzonderlijk notebook te plaatsen. U kunt deze ook gebruiken om notebooks samen te voegen die de stappen in een analyse implementeren. Wanneer u het notitieblok gebruikt %run, wordt het aangeroepen notebook onmiddellijk uitgevoerd en worden de functies en variabelen die erin zijn gedefinieerd, beschikbaar in het aanroepende notebook.

De dbutils.notebook API is een aanvulling op %run omdat u hiermee parameters kunt doorgeven aan en waarden kunt retourneren uit een notebook. Hiermee kunt u complexe werkstromen en pijplijnen bouwen met afhankelijkheden. U kunt bijvoorbeeld een lijst met bestanden in een map ophalen en de namen doorgeven aan een ander notitieblok, wat niet mogelijk is met %run. U kunt ook if-then-else-werkstromen maken op basis van retourwaarden of andere notebooks aanroepen met behulp van relatieve paden.

In tegenstelling tot %run, wordt met de dbutils.notebook.run() methode een nieuwe taak gestart om het notebook uit te voeren.

Deze methoden, zoals alle dbutils API's, zijn alleen beschikbaar in Python en Scala. U kunt echter ook dbutils.notebook.run() een R-notebook aanroepen.

Gebruiken %run om een notebook te importeren

In dit voorbeeld definieert het eerste notebook een functie, reversedie beschikbaar is in het tweede notebook nadat u de %run magie hebt gebruikt om uit te voeren shared-code-notebook.

Gedeeld codenotitieblok

Voorbeeld van het importeren van notitieblokken

Omdat beide notebooks zich in dezelfde map in de werkruimte bevinden, gebruikt u het voorvoegsel ./ om ./shared-code-notebook aan te geven dat het pad moet worden omgezet ten opzichte van het huidige actieve notebook. U kunt notitieblokken ordenen in mappen, zoals %run ./dir/notebook, of een absoluut pad gebruiken, zoals %run /Users/username@organization.com/directory/notebook.

Notitie

  • %runmoet op zichzelf in een cel staan, omdat het hele notebook inline wordt uitgevoerd.
  • U kunt een Python-bestand en import de entiteiten die in dat bestand zijn gedefinieerd, niet uitvoeren %run in een notebook. Als u wilt importeren uit een Python-bestand, raadpleegt u Uw code modulariseren met behulp van bestanden. U kunt het bestand ook inpakken in een Python-bibliotheek, een Azure Databricks-bibliotheek maken vanuit die Python-bibliotheek en de bibliotheek installeren in het cluster dat u gebruikt om uw notebook uit te voeren.
  • Wanneer u %run een notebook met widgets uitvoert, wordt het opgegeven notitieblok standaard uitgevoerd met de standaardwaarden van de widget. U kunt ook waarden doorgeven aan widgets; zie Databricks-widgets gebruiken met %run.

dbutils.notebook API

De methoden die beschikbaar zijn in de dbutils.notebook API zijn run en exit. Zowel parameters als retourwaarden moeten tekenreeksen zijn.

run(path: String, timeout_seconds: int, arguments: Map): String

Voer een notebook uit en retourneer de afsluitwaarde. De methode start een tijdelijke taak die onmiddellijk wordt uitgevoerd.

De timeout_seconds parameter bepaalt de time-out van de uitvoering (0 betekent geen time-out): de aanroep om een uitzondering te run genereren als deze niet binnen de opgegeven tijd wordt voltooid. Als Azure Databricks langer dan 10 minuten uitvalt, mislukt de uitvoering van timeout_secondshet notebook, ongeacht.

Met arguments de parameter worden widgetwaarden van het doelnotitieblok ingesteld. Als het notebook dat u uitvoert een widget heeft met de naam Aen u een sleutel-waardepaar ("A": "B") doorgeeft als onderdeel van de parameter argumenten aan de run() aanroep, wordt het ophalen van de waarde van de widget A geretourneerd "B". U vindt de instructies voor het maken en werken met widgets in het artikel databricks-widgets .

Notitie

  • De arguments parameter accepteert alleen Latijnse tekens (ASCII-tekenset). Als u niet-ASCII-tekens gebruikt, wordt een fout geretourneerd.
  • Taken die zijn gemaakt met behulp van de dbutils.notebook API, moeten binnen 30 dagen of minder worden voltooid.

run Gebruik

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run Voorbeeld

Stel dat u een notitieblok hebt met de naam workflowsfoo van een widget waarmee de waarde van de widget wordt afgedrukt:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

Het uitvoeren dbutils.notebook.run("workflows", 60, {"foo": "bar"}) produceert het volgende resultaat:

Notitieblok met widget

De widget had de waarde die u hebt doorgegeven met behulp van dbutils.notebook.run(), "bar"in plaats van de standaardwaarde.

exit(value: String): void Sluit een notitieblok af met een waarde. Als u een notebook aanroept met behulp van de run methode, is dit de geretourneerde waarde.

dbutils.notebook.exit("returnValue")

Als u een taak aanroept dbutils.notebook.exit , wordt het notitieblok voltooid. Als u wilt dat de taak mislukt, genereert u een uitzondering.

Voorbeeld

In het volgende voorbeeld geeft u argumenten door aan DataImportNotebook en voert u verschillende notebooks (DataCleaningNotebook of ErrorHandlingNotebook) uit op basis van het resultaat van DataImportNotebook.

if-else-voorbeeld

Wanneer de code wordt uitgevoerd, ziet u een koppeling naar het actieve notebook:

Koppeling naar actieve notebook

Als u de details van de uitvoering wilt weergeven, klikt u op de notebookkoppelingstaak #xxxx.

Resultaat van kortstondige notebookuitvoering

Geef gestructureerde gegevens door

In deze sectie ziet u hoe u gestructureerde gegevens tussen notebooks doorgeeft.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Afhandeling van fouten

In deze sectie ziet u hoe u fouten kunt afhandelen.

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Gelijktijdig meerdere notebooks uitvoeren

U kunt meerdere notebooks tegelijk uitvoeren met behulp van standaard-Scala- en Python-constructies zoals Threads (Scala, Python) en Futures (Scala, Python). In de voorbeeldnotebooks wordt gedemonstreerd hoe u deze constructies gebruikt.

  1. Download de volgende 4 notebooks. De notebooks worden geschreven in Scala.
  2. Importeer de notebooks in één map in de werkruimte.
  3. Voer het notebook Gelijktijdig uitvoeren uit.

Gelijktijdig uitvoeren van notebook

Notebook downloaden

Parallel notebook uitvoeren

Notebook downloaden

Notebook testen

Notebook downloaden

Notebook testen 2

Notebook downloaden