Komponententests für Notebooks
Sie können Komponententests verwenden, um die Qualität und Konsistenz des Codes Ihres Notebooks zu verbessern. Komponententests sind ein Ansatz zum Testen eigenständiger Codeeinheiten, z. B. Funktionen, frühzeitig und häufig. Dies hilft Ihnen, Probleme mit Ihrem Code schneller zu finden, fehlerhafte Annahmen über Ihren Code früher aufzudecken und Ihre allgemeinen Codierungsbemühungen zu optimieren.
Dieser Artikel enthält eine Einführung in grundlegende Komponententests mit Funktionen. Fortgeschrittene Konzepte wie Komponententestklassen und -schnittstellen sowie die Verwendung von Stubs, Mocks und Testumgebungen werden zwar ebenfalls bei Komponententests für Notebooks unterstützt, liegen jedoch außerhalb des Rahmens dieses Artikels. In diesem Artikel werden auch keine anderen Arten von Testmethoden wie Integrationstests, Systemtests, Akzeptanztestsoder nicht funktionalen Tests Methoden wie Leistungstests oder Benutzerfreundlichkeitstestsbehandelt.
In diesem Artikel wird Folgendes veranschaulicht:
- Wie man Funktionen und ihre Unit-Tests organisiert
- Wie man Funktionen in Python, R, Scala sowie benutzerdefinierte Funktionen in SQL schreibt, die gut für Unit-Tests konzipiert sind.
- Aufrufen dieser Funktionen aus Python-, R-, Scala- und SQL-Notizbüchern
- Schreiben von Komponententests in Python, R und Scala mithilfe der beliebten Testframeworks pytest für Python, Testthat für R und ScalaTest für Scala. Darüber hinaus wird das Schreiben von SQL-Code erläutert, der Komponententests für benutzerdefinierte SQL-Funktionen (SQL User-Defined Functions, SQL UDFs) ausführt.
- So führen Sie diese Komponententests aus Python-, R-, Scala- und SQL-Notizbüchern aus.
Organisieren von Funktionen und Komponententests
Es gibt einige gängige Ansätze zum Organisieren Ihrer Funktionen und deren Komponententests mit Notizbüchern. Jeder Ansatz hat seine Vorteile und Herausforderungen.
Für Python-, R- und Scala-Notizbücher sind die folgenden gängigen Ansätze enthalten:
- Speichern von Funktionen und deren Komponententests außerhalb von Notebooks
- Vorteile: Sie können diese Funktionen mit und außerhalb von Notizbüchern aufrufen. Testframeworks sind besser darauf ausgelegt, Tests außerhalb von Notizbüchern auszuführen.
- Herausforderungen: Dieser Ansatz wird für Scala-Notizbücher nicht unterstützt. Dieser Ansatz erhöht auch die Anzahl der Dateien, die nachverfolgt und verwaltet werden sollen.
- Speichern von Funktionen in einem Notebook und deren Komponententests in einem separaten Notebook
- Vorteile: Diese Funktionen sind in verschiedenen Notizbüchern einfacher wiederzuverwenden.
- Herausforderungen: Die Anzahl der Notizbücher, die nachverfolgt und verwaltet werden sollen, erhöht sich. Diese Funktionen können nicht außerhalb von Notizbüchern verwendet werden. Diese Funktionen können auch schwieriger außerhalb von Notizbüchern getestet werden.
- Speichern von Funktionen und deren Komponententests innerhalb desselben Notebooks
- Vorteile: Funktionen und deren Komponententests werden in einem einzigen Notizbuch gespeichert, um die Nachverfolgung und Wartung zu erleichtern.
- Herausforderungen: Diese Funktionen können in Notizbüchern schwieriger wiederverwendet werden. Diese Funktionen können nicht außerhalb von Notizbüchern verwendet werden. Diese Funktionen können auch schwieriger außerhalb von Notizbüchern getestet werden.
Bei Python- und R-Notizbüchern empfiehlt Databricks, Funktionen und deren Komponententests außerhalb von Notizbüchern zu speichern. Für Scala-Notizbücher empfiehlt Databricks, Funktionen in einem Notizbuch und deren Komponententests in einem separaten Notizbuch einzuschließen.
Für SQL-Notizbücher empfiehlt Databricks, Funktionen als benutzerdefinierte SQL-Funktionen (SQL UDFs) in Ihren Schemas (auch als Datenbanken bezeichnet) zu speichern. Anschließend können Sie diese SQL-UDFs und deren Komponententests aus SQL-Notizbüchern aufrufen.
Funktionen schreiben
In diesem Abschnitt wird ein einfacher Satz von Beispielfunktionen beschrieben, die Folgendes bestimmen:
- Gibt an, ob eine Tabelle in einer Datenbank vorhanden ist.
- Gibt an, ob eine Spalte in einer Tabelle vorhanden ist.
- Gibt an, wie viele Zeilen in einer Spalte für einen Wert in dieser Spalte vorhanden sind.
Diese Funktionen sollen einfach sein, damit Sie sich auf die Komponententestdetails in diesem Artikel konzentrieren können, anstatt sich auf die Funktionen selbst zu konzentrieren.
Um die besten Komponententests zu erzielen, sollte eine Funktion ein einzelnes vorhersagbares Ergebnis zurückgeben und einen einzelnen Datentyp aufweisen. Um beispielsweise zu überprüfen, ob etwas vorhanden ist, sollte die Funktion einen booleschen Wert von "true" oder "false" zurückgeben. Um die Anzahl der vorhandenen Zeilen zurückzugeben, sollte die Funktion eine nicht negative ganze Zahl zurückgeben. Sie sollte im ersten Beispiel nicht FALSE zurückgeben, wenn kein Element vorhanden ist, oder das Element selbst zurückgeben, wenn es vorhanden ist. Ebenso sollte sie für das zweite Beispiel weder die Anzahl der vorhandenen Zeilen noch "false" zurückgeben, wenn keine Zeilen vorhanden sind.
Sie können diese Funktionen einem vorhandenen Azure Databricks-Arbeitsbereich wie folgt in Python, R, Scala oder SQL hinzufügen.
Python
Im folgenden Code wird davon ausgegangen, dass Sie Set up Databricks Git folders (Repos)haben, ein Repositoryhinzugefügt haben und das Repository in Ihrem Azure Databricks-Arbeitsbereich geöffnet haben.
Erstellen Sie eine Datei namens myfunctions.py
innerhalb des Repositorys, und fügen Sie der Datei den folgenden Inhalt hinzu. Andere Beispiele in diesem Artikel erwarten, dass diese Datei myfunctions.py
benannt wird. Sie können unterschiedliche Namen für Ihre eigenen Dateien verwenden.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
return spark.catalog.tableExists(f"{dbName}.{tableName}")
# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
if columnName in dataFrame.columns:
return True
else:
return False
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
df = dataFrame.filter(col(columnName) == columnValue)
return df.count()
R
Im folgenden Code wird davon ausgegangen, dass Sie Set up Databricks Git folders (Repos)haben, ein Repositoryhinzugefügt haben und das Repository in Ihrem Azure Databricks-Arbeitsbereich geöffnet haben.
Erstellen Sie eine Datei namens myfunctions.r
innerhalb des Repositorys, und fügen Sie der Datei den folgenden Inhalt hinzu. Andere Beispiele in diesem Artikel erwarten, dass diese Datei myfunctions.r
benannt wird. Sie können unterschiedliche Namen für Ihre eigenen Dateien verwenden.
library(SparkR)
# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
tableExists(paste(db_name, ".", table_name, sep = ""))
}
# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
column_name %in% colnames(dataframe)
}
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
df = filter(dataframe, dataframe[[column_name]] == column_value)
count(df)
}
Scala
Erstellen Sie ein Scala-Notizbuch mit dem Namen myfunctions
mit dem folgenden Inhalt. Andere Beispiele in diesem Artikel erwarten, dass dieses Notizbuch myfunctions
benannt wird. Sie können unterschiedliche Namen für Ihre eigenen Notizbücher verwenden.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
return spark.catalog.tableExists(dbName + "." + tableName)
}
// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
val nameOfColumn = null
for(nameOfColumn <- dataFrame.columns) {
if (nameOfColumn == columnName) {
return true
}
}
return false
}
// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
val df = dataFrame.filter(col(columnName) === columnValue)
return df.count()
}
SQL
Im folgenden Code wird davon ausgegangen, dass Sie über das Drittanbieter-Beispieldatenset Diamanten innerhalb eines Schemas mit dem Namen default
in einem Katalog mit dem Namen main
verfügen, auf den Über Ihren Azure Databricks-Arbeitsbereich zugegriffen werden kann. Wenn der katalog oder das schema, den Sie verwenden möchten, einen anderen Namen aufweist, ändern Sie eine oder beide der folgenden USE
Anweisungen so, dass sie übereinstimmen.
Erstellen Sie ein SQL-Notizbuch, und fügen Sie diesem neuen Notizbuch den folgenden Inhalt hinzu. Fügen Sie dann das Notizbuch an einen Cluster an, und das Notizbuch ausführen, um dem angegebenen Katalog und Schema die folgenden SQL-UDFs hinzuzufügen.
Anmerkung
Die SQL-UDFs table_exists
und column_exists
funktionieren nur mit dem Unity Catalog. Die SQL-UDF-Unterstützung für Unity-Katalog befindet sich in Public Preview.
USE CATALOG main;
USE SCHEMA default;
CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
db_name STRING,
table_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.tables
WHERE table_catalog = table_exists.catalog_name
AND table_schema = table_exists.db_name
AND table_name = table_exists.table_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
db_name STRING,
table_name STRING,
column_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.columns
WHERE table_catalog = column_exists.catalog_name
AND table_schema = column_exists.db_name
AND table_name = column_exists.table_name
AND column_name = column_exists.column_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
RETURNS BIGINT
RETURN SELECT count(*)
FROM main.default.diamonds
WHERE clarity = clarity_value
Aufrufen von Funktionen
In diesem Abschnitt werden Code beschrieben, der die vorherigen Funktionen aufruft. Sie können diese Funktionen beispielsweise verwenden, um die Anzahl der Zeilen in der Tabelle zu zählen, in denen ein angegebener Wert in einer angegebenen Spalte vorhanden ist. Sie möchten jedoch überprüfen, ob die Tabelle tatsächlich vorhanden ist und ob die Spalte tatsächlich in dieser Tabelle vorhanden ist, bevor Sie fortfahren. Der folgende Code überprüft diese Bedingungen.
Wenn Sie die Funktionen aus dem vorherigen Abschnitt zu Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie diese Funktionen wie folgt aus Ihrem Arbeitsbereich aufrufen.
Python
Erstellen sie ein Python-Notizbuch im selben Ordner wie die vorherige myfunctions.py
Datei in Ihrem Repository, und fügen Sie dem Notizbuch den folgenden Inhalt hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbankname), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie dann das Notizbuch an einen Cluster an, und das Notizbuch ausführen, um die Ergebnisse anzuzeigen.
from myfunctions import *
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "VVS2"
# If the table exists in the specified database...
if tableExists(tableName, dbName):
df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")
# And the specified column exists in that table...
if columnExists(df, columnName):
# Then report the number of rows for the specified value in that column.
numRows = numRowsInColumnForValue(df, columnName, columnValue)
print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
else:
print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.")
R
Erstellen Sie ein R-Notizbuch im selben Ordner wie die vorherige myfunctions.r
Datei in Ihrem Repository, und fügen Sie dem Notizbuch den folgenden Inhalt hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbankname), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie dann das Notizbuch an einen Cluster an, und das Notizbuch ausführen, um die Ergebnisse anzuzeigen.
library(SparkR)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "VVS2"
# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {
df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))
# And the specified column exists in that table...
if (column_exists(df, column_name)) {
# Then report the number of rows for the specified value in that column.
num_rows = num_rows_in_column_for_value(df, column_name, column_value)
print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = ""))
} else {
print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
}
} else {
print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}
Scala
Erstellen Sie ein weiteres Scala-Notizbuch im selben Ordner wie das vorherige myfunctions
Scala-Notizbuch, und fügen Sie diesem neuen Notizbuch den folgenden Inhalt hinzu.
Fügen Sie in der ersten Zelle dieses neuen Notizbuchs den folgenden Code hinzu, der die %run Magie aufruft. Diese Magie macht den Inhalt des myfunctions
Notizbuchs für Ihr neues Notizbuch verfügbar.
%run ./myfunctions
Fügen Sie in der zweiten Zelle dieses neuen Notizbuchs den folgenden Code hinzu. Ändern Sie die Variablenwerte für den Tabellennamen, den Schemanamen (Datenbankname), den Spaltennamen und den Spaltenwert nach Bedarf. Fügen Sie dann das Notizbuch an einen Cluster an, und das Notizbuch ausführen, um die Ergebnisse anzuzeigen.
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "VVS2"
// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {
val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)
// And the specified column exists in that table...
if (columnExists(df, columnName)) {
// Then report the number of rows for the specified value in that column.
val numRows = numRowsInColumnForValue(df, columnName, columnValue)
println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
} else {
println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
}
} else {
println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}
SQL
Fügen Sie einer neuen Zelle im vorherigen Notizbuch oder einer Zelle in einem separaten Notizbuch den folgenden Code hinzu. Ändern Sie die Schema- oder Katalognamen, falls erforderlich, damit sie ihren entsprechen, und führen Sie dann diese Zelle aus, um die Ergebnisse anzuzeigen.
SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
table_exists("main", "default", "diamonds")
THEN
-- And the specified column exists in that table...
(SELECT CASE
WHEN
column_exists("main", "default", "diamonds", "clarity")
THEN
-- Then report the number of rows for the specified value in that column.
printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
num_rows_for_clarity_in_diamonds("VVS2"))
ELSE
printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
END)
ELSE
printf("Table 'main.default.diamonds' does not exist.")
END
Schreiben von Komponententests
In diesem Abschnitt werden Code beschrieben, der die einzelnen Funktionen testet, die am Anfang dieses Artikels beschrieben werden. Wenn Sie zukünftig Änderungen an Funktionen vornehmen, können Sie mithilfe von Komponententests ermitteln, ob diese Funktionen weiterhin wie erwartet funktionieren.
Wenn Sie die Funktionen am Anfang dieses Artikels zu Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie Komponententests für diese Funktionen wie folgt zu Ihrem Arbeitsbereich hinzufügen.
Python
Erstellen Sie eine weitere Datei namens test_myfunctions.py
im selben Ordner wie die vorherige myfunctions.py
Datei in Ihrem Repository, und fügen Sie der Datei den folgenden Inhalt hinzu. Standardmäßig sucht pytest
nach .py
Dateien, deren Namen mit test_
(oder mit _test
enden) beginnen, um zu testen. In ähnlicher Weise sucht pytest
standardmäßig in diesen Dateien nach Funktionen, deren Namen mit test_
beginnen, zu testen.
Im Allgemeinen ist es eine bewährte Praxis, keine Einheitstests für Funktionen durchzuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Kompromittierungen durch Komponententests zu schützen, sollten Sie Komponententests für Nichtproduktionsdaten ausführen. Ein gemeinsamer Ansatz besteht darin, gefälschte Daten zu erstellen, die den Produktionsdaten so nah wie möglich sind. Im folgenden Codebeispiel werden Fakedaten erstellt, mit denen die Komponententests ausgeführt werden sollen.
import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "SI2"
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
StructField("_c0", IntegerType(), True), \
StructField("carat", FloatType(), True), \
StructField("cut", StringType(), True), \
StructField("color", StringType(), True), \
StructField("clarity", StringType(), True), \
StructField("depth", FloatType(), True), \
StructField("table", IntegerType(), True), \
StructField("price", IntegerType(), True), \
StructField("x", FloatType(), True), \
StructField("y", FloatType(), True), \
StructField("z", FloatType(), True), \
])
data = [ (1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]
df = spark.createDataFrame(data, schema)
# Does the table exist?
def test_tableExists():
assert tableExists(tableName, dbName) is True
# Does the column exist?
def test_columnExists():
assert columnExists(df, columnName) is True
# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
assert numRowsInColumnForValue(df, columnName, columnValue) > 0
R
Erstellen Sie eine weitere Datei namens test_myfunctions.r
im selben Ordner wie die vorherige myfunctions.r
Datei in Ihrem Repository, und fügen Sie der Datei den folgenden Inhalt hinzu. Standardmäßig sucht testthat
nach .r
Dateien, deren Namen mit test
zum Testen beginnen.
Im Allgemeinen gilt es als Best Practices, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um zu verhindern, dass Ihre Produktionsdaten in unerwarteter Weise durch Unit-Tests beeinträchtigt werden, sollten Sie Unit-Tests mit Nicht-Produktionsdaten ausführen. Ein gemeinsamer Ansatz besteht darin, gefälschte Daten zu erstellen, die den Produktionsdaten so nah wie möglich sind. Im folgenden Codebeispiel werden Fakedaten erstellt, mit denen die Komponententests ausgeführt werden sollen.
library(testthat)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "SI2"
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
structField("_c0", "integer"),
structField("carat", "float"),
structField("cut", "string"),
structField("color", "string"),
structField("clarity", "string"),
structField("depth", "float"),
structField("table", "integer"),
structField("price", "integer"),
structField("x", "float"),
structField("y", "float"),
structField("z", "float"))
data <- list(list(as.integer(1), 0.23, "Ideal", "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))
df <- createDataFrame(data, schema)
# Does the table exist?
test_that ("The table exists.", {
expect_true(table_exists(table_name, db_name))
})
# Does the column exist?
test_that ("The column exists in the table.", {
expect_true(column_exists(df, column_name))
})
# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})
Scala
Erstellen Sie ein weiteres Scala-Notizbuch im selben Ordner wie das vorherige myfunctions
Scala-Notizbuch, und fügen Sie diesem neuen Notizbuch den folgenden Inhalt hinzu.
Fügen Sie in der ersten Zelle des neuen Notizbuchs den folgenden Code hinzu, der die %run
Magie aufruft. Diese Magie macht den Inhalt des myfunctions
Notizbuchs für Ihr neues Notizbuch verfügbar.
%run ./myfunctions
Fügen Sie in der zweiten Zelle den folgenden Code hinzu. Dieser Code definiert Die Komponententests und gibt an, wie sie ausgeführt werden.
Im Allgemeinen gilt es als Best Practices, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten vor unerwarteten Beeinträchtigungen durch Ihre Komponententests zu schützen, sollten Sie Komponententests mit Nicht-Produktionsdaten ausführen. Ein gemeinsamer Ansatz besteht darin, gefälschte Daten zu erstellen, die den Produktionsdaten so nah wie möglich sind. Im folgenden Codebeispiel werden Fakedaten erstellt, mit denen die Komponententests ausgeführt werden sollen.
import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._
class DataTests extends AsyncFunSuite {
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "SI2"
// Create fake data for the unit tests to run against.
// In general, it is a best practice to not run unit tests
// against functions that work with data in production.
val schema = StructType(Array(
StructField("_c0", IntegerType),
StructField("carat", FloatType),
StructField("cut", StringType),
StructField("color", StringType),
StructField("clarity", StringType),
StructField("depth", FloatType),
StructField("table", IntegerType),
StructField("price", IntegerType),
StructField("x", FloatType),
StructField("y", FloatType),
StructField("z", FloatType)
))
val data = Seq(
Row(1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
).asJava
val df = spark.createDataFrame(data, schema)
// Does the table exist?
test("The table exists") {
assert(tableExists(tableName, dbName) == true)
}
// Does the column exist?
test("The column exists") {
assert(columnExists(df, columnName) == true)
}
// Is there at least one row for the value in the specified column?
test("There is at least one matching row") {
assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
}
}
nocolor.nodurations.nostacks.stats.run(new DataTests)
Anmerkung
In diesem Codebeispiel wird der FunSuite
Teststil in ScalaTest verwendet. Weitere verfügbare Teststile finden Sie unter Auswahl von Teststilen für Ihr Projekt.
SQL
Bevor Sie Komponententests hinzufügen, sollte Ihnen bewusst sein, dass es als Best Practices gilt, keine Komponententests für Funktionen auszuführen, die mit Daten in der Produktion arbeiten. Dies ist besonders wichtig für Funktionen, die Daten hinzufügen, entfernen oder anderweitig ändern. Um Ihre Produktionsdaten davor zu schützen, auf unerwartete Weise durch Unit-Tests beeinträchtigt zu werden, sollten Sie Unit-Tests mit Nicht-Produktionsdaten durchführen. Ein gängiger Ansatz besteht darin, Komponententests für Ansichten anstelle von Tabellen auszuführen.
Zum Erstellen einer Ansicht können Sie den Befehl CREATE VIEW aus einer neuen Zelle entweder im vorherigen Notizbuch oder in einem separaten Notizbuch aufrufen. Im folgenden Beispiel wird davon ausgegangen, dass Sie eine vorhandene Tabelle mit dem Namen diamonds
in einem Schema (Datenbank) mit dem Namen default
in einem Katalog mit dem Namen main
haben. Ändern Sie diese Namen nach Bedarf so, damit diese Ihren eigenen entsprechen, und führen Sie dann lediglich diese Zelle aus.
USE CATALOG main;
USE SCHEMA default;
CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;
Fügen Sie nach dem Erstellen der Ansicht jede der folgenden SELECT
-Anweisungen einer eigenen neuen Zelle im vorherigen Notebook oder ihren eigenen neuen Zellen in einem separaten Notebook hinzu. Ändern Sie die Namen bei Bedarf so, dass sie Ihren eigenen entsprechen.
SELECT if(table_exists("main", "default", "view_diamonds"),
printf("PASS: The table 'main.default.view_diamonds' exists."),
printf("FAIL: The table 'main.default.view_diamonds' does not exist."));
SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));
SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));
Ausführen von Komponententests
In diesem Abschnitt wird beschrieben, wie Sie die Komponententests ausführen, die Sie im vorherigen Abschnitt codiert haben. Wenn Sie die Komponententests ausführen, erhalten Sie Ergebnisse, die zeigen, welche Komponententests bestanden und fehlgeschlagen sind.
Wenn Sie die Komponententests aus dem vorherigen Abschnitt zu Ihrem Azure Databricks-Arbeitsbereich hinzugefügt haben, können Sie diese Komponententests aus Ihrem Arbeitsbereich ausführen. Sie können diese Komponententests manuell oder nach einem Zeitplan ausführen.
Python
Erstellen Sie ein Python-Notizbuch im selben Ordner wie die vorherige test_myfunctions.py
Datei in Ihrem Repository, und fügen Sie den folgenden Inhalt hinzu.
Fügen Sie in der ersten Zelle des neuen Notebooks den folgenden Code hinzu, und führen Sie anschließend die Zelle aus, die den Magic-Befehl %pip
aufruft. Durch diesen Magic-Befehl wird pytest
installiert.
%pip install pytest
Fügen Sie in der zweiten Zelle den folgenden Code hinzu, und führen Sie dann die Zelle aus. Die Ergebnisse zeigen, welche Komponententests bestanden und fehlgeschlagen sind.
import pytest
import sys
# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True
# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
R
Erstellen Sie ein R-Notizbuch im selben Ordner wie die vorherige test_myfunctions.r
Datei in Ihrem Repository, und fügen Sie den folgenden Inhalt hinzu.
Fügen Sie in der ersten Zelle den folgenden Code hinzu, und führen Sie dann die Zelle aus, die die funktion install.packages
aufruft. Durch diese Funktion wird testthat
installiert.
install.packages("testthat")
Fügen Sie in der zweiten Zelle den folgenden Code hinzu, und führen Sie dann die Zelle aus. Die Ergebnisse zeigen, welche Komponententests bestanden und fehlgeschlagen sind.
library(testthat)
source("myfunctions.r")
test_dir(".", reporter = "tap")
Scala
Führen Sie die erste und anschließend zweite Zelle im Notebook aus dem vorherigen Abschnitt aus. Die Ergebnisse zeigen, welche Komponententests bestanden und fehlgeschlagen sind.
SQL
Führen Sie jede der drei Zellen im Notebook aus dem vorherigen Abschnitt aus. Die Ergebnisse zeigen, ob jeder Komponententest bestanden oder fehlgeschlagen ist.
Wenn Sie die Ansicht nach dem Ausführen der Komponententests nicht mehr benötigen, können Sie die Ansicht löschen. Um diese Ansicht zu löschen, können Sie einer neuen Zelle innerhalb eines der vorherigen Notizbücher den folgenden Code hinzufügen und dann nur diese Zelle ausführen.
DROP VIEW view_diamonds;
Tipp
Sie können die Ergebnisse Ihrer Notizbuchausführungen (einschließlich Komponententestergebnissen) in den Treiberprotokollen Ihres Clustersanzeigen. Sie können ebenfalls einen Speicherort für die Protokollübermittlung Ihres Clusters angeben.
Sie können ein kontinuierliches Integrations- und Kontinuierliches Bereitstellungssystem (CI/CD) einrichten, z. B. GitHub-Aktionen, um Ihre Komponententests automatisch auszuführen, wenn sich der Code ändert. Ein Beispiel finden Sie in der Abdeckung von GitHub Actions unter Best Practices für die Softwareentwicklung für Notebooks.
Weitere Ressourcen
pytest
- pytest Startseite
- Pytest-Anleitungen
- Pytest-Referenzhandbücher
- Best Practices für die Softwareentwicklung für Notebooks