Komponententests für Notebooks

Sie können Komponententests verwenden, um die Qualität und Konsistenz des Codes Ihres Notebooks zu verbessern. Komponententests sind ein Ansatz zum frühzeitigen und häufigen Testen eigenständiger Codeeinheiten (z. B. Funktionen). Dies hilft Ihnen, Probleme im Code schneller zu finden, falsche Annahmen zu Ihrem Code früher aufzudecken und Ihre allgemeinen Codierungsbemühungen zu optimieren.

Dieser Artikel ist eine Einführung in grundlegende Komponententests mit Funktionen. Fortgeschrittene Konzepte wie Komponententestklassen und -schnittstellen sowie die Verwendung von Stubs, Mocks und Testumgebungen werden zwar auch bei Komponententests für Notebooks unterstützt, liegen aber außerhalb des Rahmens dieses Artikels. Dieser Artikel behandelt auch keine anderen Arten von Testmethoden, wie z. B. Integrationstests, Systemtests, Akzeptanztests, oder nicht funktionale Testmethoden, wie Leistungstests oder Benutzerfreundlichkeitstests.

In diesem Artikel wird Folgendes veranschaulicht:

  • Organisieren von Funktionen und ihren Komponententests.
  • Schreiben von Funktionen in Python, R, Scala sowie benutzerdefinierter Funktionen in SQL, die gut für Komponententests geeignet sind.
  • Aufrufen dieser Funktionen aus Python-, R-, Skala- und SQL-Notebooks.
  • Schreiben von Komponententests in Python, R und Scala mithilfe der beliebten Testframeworks pytest für Python, testthat für R und ScalaTest für Scala. Schreiben von SQL-Code, der Komponententests für benutzerdefinierte SQL-Funktionen (SQL-UDFs) ausführt.
  • Ausführen dieser Komponententests aus Python-, R-, Skala- und SQL-Notebooks.

Organisieren von Funktionen und Komponententests

Es gibt einige gängige Ansätze zum Organisieren Ihrer Funktionen und deren Komponententests mit Notebooks. Jeder Ansatz hat seine eigenen Vor- und Nachteile.

Für Python-, R- und Skala-Notebooks werden die folgenden Ansätze häufig verwendet:

  • Speichern von Funktionen und deren Komponententests außerhalb von Notebooks..
    • Vorteile: Sie können diese Funktionen mit und außerhalb von Notebooks aufrufen. Testframeworks sind besser für die Ausführung von Tests außerhalb von Notebooks ausgelegt.
    • Herausforderungen: Dieser Ansatz wird für Skala-Notebooks nicht unterstützt. Dieser Ansatz erhöht auch die Anzahl der Dateien, die nachverfolgt und verwaltet werden müssen.
  • Speichern von Funktionen in einem Notebook und deren Komponententests in einem separaten Notebook..
    • Vorteile: Die Wiederverwendung dieser Funktionen in Notebooks ist einfacher.
    • Herausforderungen: Die Anzahl der nachzuverfolgenden und zu verwaltenden Notebooks nimmt zu. Diese Funktionen können nicht außerhalb von Notebooks verwendet werden. Das Testen dieser Funktionen kann auch außerhalb von Notebooks schwieriger sein.
  • Speichern von Funktionen und deren Komponententests innerhalb desselben Notebooks..
    • Vorteil: Funktionen und deren Komponententests werden in einem einzigen Notizbuch gespeichert, um die Nachverfolgung und Wartung zu erleichtern.
    • Herausforderungen: Die Wiederverwendung dieser Funktionen kann für Notebooks schwieriger sein. Diese Funktionen können nicht außerhalb von Notebooks verwendet werden. Das Testen dieser Funktionen kann auch außerhalb von Notebooks schwieriger sein.

Für Python- und R-Notebooks empfiehlt Databricks das Speichern von Funktionen und deren Komponententests außerhalb von Notebooks. Für Skala-Notebooks empfiehlt Databricks, Funktionen in ein Notebook und deren Komponententests in ein separates Notebook einzuschließen.

Für SQL-Notebooks empfiehlt Databricks, Funktionen als benutzerdefinierte SQL-Funktionen (SQL-UDFs) in Ihren Schemas (auch als Datenbanken bezeichnet) zu speichern. Anschließend können Sie diese SQL-UDFs und deren Komponententests aus SQL-Notebooks aufrufen.

Schreiben Funktionen

In diesem Abschnitt wird ein einfacher Satz von Beispielfunktionen beschrieben, die Folgendes bestimmen:

  • Ob eine Tabelle in einer Datenbank vorhanden ist.
  • Ob eine Spalte in einer Tabelle vorhanden ist.
  • Wie viele Zeilen in einer Spalte für einen Wert in dieser Spalte vorhanden sind.

Diese Funktionen sollen einfach sein, damit Sie sich auf die Details der Komponententests in diesem Artikel und nicht auf die Funktionen selbst konzentrieren können.

Um die besten Komponententests zu erhalten, sollte eine Funktion ein einzelnes vorhersagbares Ergebnis zurückgeben und einen einzelnen Datentyp aufweisen. Um beispielsweise zu überprüfen, ob etwas vorhanden ist, sollte die Funktion den booleschen Wert TRUE oder FALSE zurückgeben. Um die Anzahl der vorhandenen Zeilen zurückzugeben, sollte die Funktion eine nicht negative ganze Zahl zurückgeben. Sie sollte im ersten Beispiel nicht entweder FALSE zurückgeben, wenn etwas nicht vorhanden ist, oder das Element selbst, wenn es vorhanden ist. Ebenso sollte sie für das zweite Beispiel weder die Anzahl der Zeilen zurückgeben, die vorhanden sind, noch FALSE, wenn keine Zeilen vorhanden sind.

Sie können diese Funktionen in Python, R, Scala oder SQL wie folgt einem vorhandenen Azure Databricks-Arbeitsbereich hinzufügen.

Python

Im folgenden Code wird davon ausgegangen, dass Sie die Databricks Git-Ordner (Repositorys) eingerichtet, ein Repository hinzugefügt und das Repository in Ihrem Azure Databricks-Arbeitsbereich geöffnet haben.

Erstellen Sie eine Datei namens myfunctions.py innerhalb des Repositorys, und fügen Sie der Datei den folgenden Inhalt hinzu. Weitere Beispiele in diesem Artikel erwarten, dass diese Datei den Namen myfunctions.py trägt. Sie können andere Namen für Ihre eigenen Dateien verwenden.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

R

Im folgenden Code wird davon ausgegangen, dass Sie die Databricks Git-Ordner (Repositorys) eingerichtet, ein Repository hinzugefügt und das Repository in Ihrem Azure Databricks-Arbeitsbereich geöffnet haben.

Erstellen Sie eine Datei namens myfunctions.r innerhalb des Repositorys, und fügen Sie der Datei den folgenden Inhalt hinzu. Weitere Beispiele in diesem Artikel erwarten, dass diese Datei den Namen myfunctions.r trägt. Sie können andere Namen für Ihre eigenen Dateien verwenden.

library(SparkR)

# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
  tableExists(paste(db_name, ".", table_name, sep = ""))
}

# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
  column_name %in% colnames(dataframe)
}

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
  df = filter(dataframe, dataframe[[column_name]] == column_value)

  count(df)
}

Scala

Erstellen Sie ein Scala-Notebook mit dem Namen myfunctions und dem folgenden Inhalt. Weitere Beispiele in diesem Artikel erwarten, dass dieses Notebook den Namen myfunctions trägt. Sie können andere Namen für Ihre eigenen Notebooks verwenden.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
  return spark.catalog.tableExists(dbName + "." + tableName)
}

// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
  val nameOfColumn = null

  for(nameOfColumn <- dataFrame.columns) {
    if (nameOfColumn == columnName) {
      return true
    }
  }

  return false
}

// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
  val df = dataFrame.filter(col(columnName) === columnValue)

  return df.count()
}

Sql

Im folgenden Code wird davon ausgegangen, dass Sie über das Drittanbieter-Beispieldataset diamonds innerhalb eines Schemas namens default innerhalb eines Katalogs namens main verfügen, auf das über Ihren Azure Databricks-Arbeitsbereich zugegriffen werden kann. Wenn der Katalog oder das Schema, den bzw. das Sie verwenden möchten, einen anderen Namen trägt, ändern Sie eine oder beide der folgenden USE-Anweisungen entsprechend.

Erstellen Sie ein SQL-Notebook, und fügen Sie diesem neuen Notebook den folgenden Inhalt hinzu. Fügen Sie das Notebook dann an einen Cluster an, und führen Sie das Notebook aus, um dem angegebenen Katalog und Schema die folgenden SQL-UDFs hinzuzufügen.

Hinweis

Die SQL-UDFs table_exists und column_exists funktionieren nur mit Unity Catalog. Die SQL-UDF-Unterstützung für Unity Catalog befindet sich in der öffentlichen Vorschau.

USE CATALOG main;
USE SCHEMA default;

CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
                                        db_name      STRING,
                                        table_name   STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.tables
     WHERE table_catalog = table_exists.catalog_name
       AND table_schema  = table_exists.db_name
       AND table_name    = table_exists.table_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
                                         db_name      STRING,
                                         table_name   STRING,
                                         column_name  STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.columns
     WHERE table_catalog = column_exists.catalog_name
       AND table_schema  = column_exists.db_name
       AND table_name    = column_exists.table_name
       AND column_name   = column_exists.column_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
  RETURNS BIGINT
  RETURN SELECT count(*)
         FROM main.default.diamonds
         WHERE clarity = clarity_value

Aufrufen von Funktionen

In diesem Abschnitt wird Code beschrieben, der die vorherigen Funktionen aufruft. Mithilfe dieser Funktionen können Sie z. B. die Anzahl der Zeilen in der Tabelle zählen, in der ein angegebener Wert in einer angegebenen Spalte vorhanden ist. Sie möchten jedoch überprüfen, ob die Tabelle tatsächlich vorhanden ist, und ob die Spalte tatsächlich in dieser Tabelle vorhanden ist, bevor Sie fortfahren. Der folgende Code überprüft diese Bedingungen.

Wenn Sie die Funktionen aus dem vorherigen Abschnitt Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie diese Funktionen aus Ihrem Arbeitsbereich wie folgt aufrufen.

Python

Erstellen Sie ein Python-Notebook im gleichen Ordner wie die vorherige Datei myfunctions.py in Ihrem Repository, und fügen Sie dem Notebook den folgenden Inhalt hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbanknamen), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie das Notebook dann an einen Cluster an, und führen Sie das Notebook aus, um die Ergebnisse anzuzeigen.

from myfunctions import *

tableName   = "diamonds"
dbName      = "default"
columnName  = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

  df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

R

Erstellen Sie ein R-Notebook im gleichen Ordner wie die vorherige Datei myfunctions.r in Ihrem Repository, und fügen Sie dem Notebook den folgenden Inhalt hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbanknamen), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie das Notebook dann an einen Cluster an, und führen Sie das Notebook aus, um die Ergebnisse anzuzeigen.

library(SparkR)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "VVS2"

# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {

  df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))

  # And the specified column exists in that table...
  if (column_exists(df, column_name)) {
    # Then report the number of rows for the specified value in that column.
    num_rows = num_rows_in_column_for_value(df, column_name, column_value)

    print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = "")) 
  } else {
    print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
  }

} else {
  print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}

Scala

Erstellen Sie ein Scala-Notebook im gleichen Ordner wie das vorherige myfunctions-Scala-Notebook, und fügen Sie diesem neuen Notebook den folgenden Inhalt hinzu.

Fügen Sie in der ersten Zelle dieses neuen Notebooks den folgenden Code hinzu, der den Magic-Befehl %run aufruft. Durch diesen Magic-Befehl wird der Inhalt des myfunctions-Notebooks für Ihr neues Notebook verfügbar.

%run ./myfunctions

Fügen Sie in der zweiten Zelle dieses neuen Notebooks den folgenden Code hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbanknamen), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie das Notebook dann an einen Cluster an, und führen Sie das Notebook aus, um die Ergebnisse anzuzeigen.

val tableName   = "diamonds"
val dbName      = "default"
val columnName  = "clarity"
val columnValue = "VVS2"

// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {

  val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)

  // And the specified column exists in that table...
  if (columnExists(df, columnName)) {
    // Then report the number of rows for the specified value in that column.
    val numRows = numRowsInColumnForValue(df, columnName, columnValue)

    println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
  } else {
    println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
  }

} else {
  println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}

Sql

Fügen Sie den folgenden Code in eine neue Zelle des vorherigen Notebooks oder in eine Zelle eines separaten Notebooks ein. Ändern Sie die Schema- oder Katalognamen, falls erforderlich, damit sie mit Ihren Namen übereinstimmen, und führen Sie dann diese Zelle aus, um die Ergebnisse anzuzeigen.

SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
  table_exists("main", "default", "diamonds")
THEN
  -- And the specified column exists in that table...
  (SELECT CASE
   WHEN
     column_exists("main", "default", "diamonds", "clarity")
   THEN
     -- Then report the number of rows for the specified value in that column.
     printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
            num_rows_for_clarity_in_diamonds("VVS2"))
   ELSE
     printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
   END)
ELSE
  printf("Table 'main.default.diamonds' does not exist.")
END

Schreiben von Komponententests

In diesem Abschnitt wird Code beschrieben, der jede der Funktionen testet, die am Anfang dieses Artikels beschrieben werden. Wenn Sie zukünftig Änderungen an Funktionen vornehmen, können Sie Komponententests verwenden, um zu ermitteln, ob diese Funktionen weiterhin wie erwartet funktionieren.

Wenn Sie die Funktionen am Anfang dieses Artikels zu Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie Ihrem Arbeitsbereich Komponententests für diese Funktionen wie folgt hinzufügen.

Python

Erstellen Sie eine weitere Datei namens test_myfunctions.py im gleichen Ordner wie die vorherige Datei myfunctions.py in Ihrem Repository, und fügen Sie der Datei den folgenden Inhalt hinzu. pytest sucht zum Testen standardmäßig nach .py-Dateien, deren Namen mit test_ beginnen (oder mit _test enden). pytest sucht außerdem standardmäßig zum Testen in diesen Dateien nach Funktionen, deren Namen mit test_ beginnen.

Im Allgemeinen ist es eine bewährte Methode, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Kompromittierungen durch Komponententests zu schützen, sollten Sie Komponententests für Nichtproduktionsdaten ausführen. Ein gängiger Ansatz besteht darin, Fake-Daten zu erstellen, die möglichst nah an den Produktionsdaten liegen. Im folgenden Codebeispiel werden Fake-Daten erstellt, mit denen die Komponententests ausgeführt werden sollen.

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName    = "diamonds"
dbName       = "default"
columnName   = "clarity"
columnValue  = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("_c0",     IntegerType(), True), \
  StructField("carat",   FloatType(),   True), \
  StructField("cut",     StringType(),  True), \
  StructField("color",   StringType(),  True), \
  StructField("clarity", StringType(),  True), \
  StructField("depth",   FloatType(),   True), \
  StructField("table",   IntegerType(), True), \
  StructField("price",   IntegerType(), True), \
  StructField("x",       FloatType(),   True), \
  StructField("y",       FloatType(),   True), \
  StructField("z",       FloatType(),   True), \
])

data = [ (1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
         (2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0

R

Erstellen Sie eine weitere Datei namens test_myfunctions.r im gleichen Ordner wie die vorherige Datei myfunctions.r in Ihrem Repository, und fügen Sie der Datei den folgenden Inhalt hinzu. testthat sucht standardmäßig zum Testen nach .r-Dateien, deren Namen mit test beginnen.

Im Allgemeinen ist es eine bewährte Methode, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Kompromittierungen durch Komponententests zu schützen, sollten Sie Komponententests für Nichtproduktionsdaten ausführen. Ein gängiger Ansatz besteht darin, Fake-Daten zu erstellen, die möglichst nah an den Produktionsdaten liegen. Im folgenden Codebeispiel werden Fake-Daten erstellt, mit denen die Komponententests ausgeführt werden sollen.

library(testthat)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "SI2"

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
  structField("_c0",     "integer"),
  structField("carat",   "float"),
  structField("cut",     "string"),
  structField("color",   "string"),
  structField("clarity", "string"),
  structField("depth",   "float"),
  structField("table",   "integer"),
  structField("price",   "integer"),
  structField("x",       "float"),
  structField("y",       "float"),
  structField("z",       "float"))

data <- list(list(as.integer(1), 0.23, "Ideal",   "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
             list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))

df <- createDataFrame(data, schema)

# Does the table exist?
test_that ("The table exists.", {
  expect_true(table_exists(table_name, db_name))
})

# Does the column exist?
test_that ("The column exists in the table.", {
  expect_true(column_exists(df, column_name))
})

# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
  expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})

Scala

Erstellen Sie ein Scala-Notebook im gleichen Ordner wie das vorherige myfunctions-Scala-Notebook, und fügen Sie diesem neuen Notebook den folgenden Inhalt hinzu.

Fügen Sie in der ersten Zelle des neuen Notebooks den folgenden Code hinzu, der den Magic-Befehl %run aufruft. Durch diesen Magic-Befehl wird der Inhalt des myfunctions-Notebooks für Ihr neues Notebook verfügbar.

%run ./myfunctions

Fügen Sie in der zweiten Zelle den folgenden Code hinzu. Dieser Code definiert Ihre Komponententests und gibt an, wie sie ausgeführt werden.

Im Allgemeinen ist es eine bewährte Methode, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Kompromittierungen durch Komponententests zu schützen, sollten Sie Komponententests für Nichtproduktionsdaten ausführen. Ein gängiger Ansatz besteht darin, Fake-Daten zu erstellen, die möglichst nah an den Produktionsdaten liegen. Im folgenden Codebeispiel werden Fake-Daten erstellt, mit denen die Komponententests ausgeführt werden sollen.

import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._

class DataTests extends AsyncFunSuite {

  val tableName   = "diamonds"
  val dbName      = "default"
  val columnName  = "clarity"
  val columnValue = "SI2"

  // Create fake data for the unit tests to run against.
  // In general, it is a best practice to not run unit tests
  // against functions that work with data in production.
  val schema = StructType(Array(
                 StructField("_c0",     IntegerType),
                 StructField("carat",   FloatType),
                 StructField("cut",     StringType),
                 StructField("color",   StringType),
                 StructField("clarity", StringType),
                 StructField("depth",   FloatType),
                 StructField("table",   IntegerType),
                 StructField("price",   IntegerType),
                 StructField("x",       FloatType),
                 StructField("y",       FloatType),
                 StructField("z",       FloatType)
               ))

  val data = Seq(
                  Row(1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
                  Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
                ).asJava

  val df = spark.createDataFrame(data, schema)

  // Does the table exist?
  test("The table exists") {
    assert(tableExists(tableName, dbName) == true)
  }

  // Does the column exist?
  test("The column exists") {
    assert(columnExists(df, columnName) == true)
  }

  // Is there at least one row for the value in the specified column?
  test("There is at least one matching row") {
    assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
  }
}

nocolor.nodurations.nostacks.stats.run(new DataTests)

Hinweis

In diesem Codebeispiel wird die FunSuite-Testformatvorlagen in ScalaTest verwendet. Weitere verfügbare Testformatvorlagen finden Sie unter Auswählen von Testformatvorlagen für Ihr Projekt.

Sql

Bevor Sie Komponententests hinzufügen, sollte Ihnen bewusst sein, dass es eine bewährte Methode ist, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Kompromittierungen durch Komponententests zu schützen, sollten Sie Komponententests für Nichtproduktionsdaten ausführen. Ein gängiger Ansatz besteht darin, Komponententests für Ansichten und nicht für Tabellen auszuführen.

Um eine Ansicht zu erstellen, können Sie den Befehl CREATE VIEW aus einer neuen Zelle im vorherigen Notebook oder einem separaten Notebook ausführen. Im folgenden Beispiel wird davon ausgegangen, dass Sie über eine vorhandene Tabelle mit dem Namen diamonds innerhalb eines Schemas (Datenbank) mit dem Namen default innerhalb eines Katalogs mit dem Namen main verfügen. Ändern Sie diese Namen nach Bedarf so, dass sie Ihren eigenen entsprechen, und führen Sie dann nur diese Zelle aus.

USE CATALOG main;
USE SCHEMA default;

CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;

Fügen Sie nach dem Erstellen der Ansicht jede der folgenden SELECT-Anweisungen einer eigenen neuen Zelle im vorherigen Notebook oder ihren eigenen neuen Zellen in einem separaten Notebook hinzu. Ändern Sie die Namen nach Bedarf so, dass sie Ihren eigenen entsprechen.

SELECT if(table_exists("main", "default", "view_diamonds"),
          printf("PASS: The table 'main.default.view_diamonds' exists."),
          printf("FAIL: The table 'main.default.view_diamonds' does not exist."));

SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
          printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
          printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));

SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
          printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
          printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));

Durchführen von Komponententests

In diesem Abschnitt wird beschrieben, wie Sie die Komponententests ausführen, die Sie im vorherigen Abschnitt erstellt haben. Wenn Sie die Komponententests ausführen, erhalten Sie Ergebnisse, die zeigen, welche Komponententests bestanden wurden bzw. fehlgeschlagen sind.

Wenn Sie die Komponententests aus dem vorherigen Abschnitt Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie diese Komponententests von Ihrem Arbeitsbereich aus ausführen. Sie können diese Komponententests entweder manuell oder nach einem Zeitplan ausführen.

Python

Erstellen Sie ein Python-Notebook im gleichen Ordner wie die vorherige Datei test_myfunctions.py in Ihrem Repository, und fügen Sie den folgenden Inhalt hinzu.

Fügen Sie in der ersten Zelle des neuen Notebooks den folgenden Code hinzu, und führen Sie dann die Zelle aus, die den Magic-Befehl %pip aufruft. Mit diesem Magic-Befehl wird pytest installiert.

%pip install pytest

Fügen Sie in der zweiten Zelle den folgenden Code hinzu, und führen Sie die Zelle dann aus. Die Ergebnisse zeigen, welche Komponententests bestanden wurden bzw. fehlgeschlagen sind.

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."

R

Erstellen Sie ein R-Notebook im gleichen Ordner wie die vorherige Datei test_myfunctions.r in Ihrem Repository, und fügen Sie den folgenden Inhalt hinzu.

Fügen Sie in der ersten Zelle den folgenden Code hinzu, und führen Sie dann die Zelle aus, die die Funktion install.packages aufruft. Mit dieser Funktion wird testthat installiert.

install.packages("testthat")

Fügen Sie in der zweiten Zelle den folgenden Code hinzu, und führen Sie die Zelle dann aus. Die Ergebnisse zeigen, welche Komponententests bestanden wurden bzw. fehlgeschlagen sind.

library(testthat)
source("myfunctions.r")

test_dir(".", reporter = "tap")

Scala

Führen Sie die erste und dann zweite Zelle im Notebook aus dem vorherigen Abschnitt aus. Die Ergebnisse zeigen, welche Komponententests bestanden wurden bzw. fehlgeschlagen sind.

Sql

Führen Sie jede der drei Zellen im Notebook aus dem vorherigen Abschnitt aus. Die Ergebnisse zeigen, ob die einzelnen Komponententests bestanden wurden bzw. fehlgeschlagen sind.

Wenn Sie die Ansicht nach dem Ausführen der Komponententests nicht mehr benötigen, können Sie die Ansicht löschen. Um diese Ansicht zu löschen, können Sie den folgenden Code einer neuen Zelle in einem der vorherigen Notebooks hinzufügen und dann nur diese Zelle ausführen.

DROP VIEW view_diamonds;

Tipp

Sie können die Ergebnisse Ihrer Notebookausführungen (einschließlich der Komponententestergebnisse) in den Treiberprotokollen Ihres Clusters anzeigen. Sie können auch einen Speicherort für die Protokollübermittlung Ihres Clusters angeben.

Sie können ein CI/CD-System (Continuous Integration und Continuous Delivery) einrichten, z. B. GitHub Actions, um Ihre Komponententests bei jeder Codeänderung automatisch auszuführen. Ein Beispiel finden Sie in der Abdeckung von GitHub Actions unter Bewährte Methoden für die Softwareentwicklung für Notebooks.

Zusätzliche Ressourcen

pytest

testthat

ScalaTest

SQL