您可以使用 單元測試 來協助改善筆記本程式代碼的品質和一致性。 單元測試是一種測試獨立程式碼單元的方法,例如函式,並且強調早期和頻繁地進行測試。 這可協助您更快找到程式碼的問題、更快速地找出您程式代碼的錯誤假設,並簡化整體程式代碼撰寫工作。
本文簡介使用函式進行的基本單元測試。 單元測試類別和介面等進階概念,以及使用 存根、模擬和 測試工具,雖然在筆記本單元測試時也受到支援,但這些概念不在本文的討論範圍之內。 本文也不涵蓋其他類型的測試方法,例如 整合測試、系統測試、驗收測試或 非功能性測試 方法,例如 效能測試 或 可用性測試。
本文示範下列各項:
- 如何組織函式及其單元測試。
- 如何在 Python、R、Scala 以及 SQL 中撰寫使用者定義函式,這些函式是設計成經過單元測試的。
- 如何從 Python、R、Scala 和 SQL 筆記本呼叫這些函式。
- 如何使用適用於 Python 的熱門測試框架 pytest、適用於 R 的 testthat 和適用於 Scala 的 ScalaTest,來撰寫 Python、R 和 Scala 的單元測試。 此外,如何撰寫 SQL,以單元測試 SQL 使用者定義函式 (SQL UDF)。
- 如何從 Python、R、Scala 和 SQL 筆記本執行這些單元測試。
注意
Azure Databricks 建議在筆記本中撰寫和執行單元測試。 雖然您可以在 Web 終端機中執行某些命令,但 Web 終端機有更多限制,例如缺少 Spark 的支援。 請參閱 Azure Databricks 網頁終端機中的 執行殼層命令。
整理功能和單元測試
使用筆記本組織函式及其單元測試有幾種常見的方法。 每個方法都有其優點和挑戰。
對於 Python、R 和 Scala 的筆記本,常見的方法包括以下幾項:
-
將函式及其單元測試儲存在筆記本外。。
- 優點:您可以在筆記本內外呼叫這些函式。 測試架構的設計較適合用來在筆記本外部執行測試。
- 挑戰:Scala 筆記本不支援此方法。 此方法也會增加要追蹤和維護的檔案數目。
-
將函式儲存在一個筆記本中,單元測試儲存在另一個筆記本中。。
- 優點:這些函式更容易跨筆記本重複使用。
- 挑戰:需要追蹤和維護的筆記本數目增加。 這些函式無法在筆記本外部使用。 這些函式在筆記本外部測試也比較困難。
-
在相同的筆記本中保存函式及其相應的單元測試。。
- 優點:函式及其單元測試會儲存在單一筆記本中,以方便追蹤和維護。
- 挑戰:這些函式可能更難跨筆記本重複使用。 這些函式無法在筆記本外部使用。 這些函式在筆記本外部測試也比較困難。
針對 Python 和 R 筆記本,Databricks 建議將函式及其單元測試儲存在筆記本之外。 針對 Scala 筆記本,Databricks 建議將函式放在一個筆記本中,並在另一個筆記本中進行其單元測試。
針對 SQL 筆記本,Databricks 建議您將函式儲存為 SQL 使用者定義函式 (SQL UDF),並存放在您的架構中(也稱為資料庫)。 然後,您可以從 SQL 筆記本呼叫這些 SQL UDF 及其單元測試。
撰寫函式
本節描述一組簡單的範例函式,可決定下列各項:
- 數據表是否存在於資料庫中。
- 欄是否存在於資料表中。
- 一列中某個值的行數是多少。
這些函式的目的很簡單,因此您可以專注於本文中的單元測試詳細數據,而不是專注於函式本身。
若要取得最佳的單元測試結果,函式應該會傳回單一可預測結果,且為單一數據類型。 例如,若要檢查某個專案是否存在,函式應該傳回 true 或 false 的布爾值。 若要傳回存在的數據列數目,函式應該會傳回非負數的整數。 第一個範例不應該傳回 false,也不應該傳回某個專案本身,無論該專案是否存在。 同樣地,在第二個範例中,它不應該傳回存在的數據列數目,如果沒有數據列,則傳回 false。
您可以將這些函式新增至現有的 Azure Databricks 工作區,如下所示,在 Python、R、Scala 或 SQL 中。
Python
下列程式代碼假設您已 設定 Databricks Git 資料夾、 新增存放庫,並在 Azure Databricks 工作區中開啟存放庫。
檔案,並將下列內容新增至檔案。 本文中的其他範例預期此檔案會命名為 myfunctions.py。 您可以針對自己的檔案使用不同的名稱。
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
return spark.catalog.tableExists(f"{dbName}.{tableName}")
# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
if columnName in dataFrame.columns:
return True
else:
return False
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
df = dataFrame.filter(col(columnName) == columnValue)
return df.count()
R
下列程式代碼假設您已 設定 Databricks Git 資料夾、 新增存放庫,並在 Azure Databricks 工作區中開啟存放庫。
檔案,並將下列內容新增至檔案。 本文中的其他範例預期此檔案會命名為 myfunctions.r。 您可以針對自己的檔案使用不同的名稱。
library(SparkR)
# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
tableExists(paste(db_name, ".", table_name, sep = ""))
}
# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
column_name %in% colnames(dataframe)
}
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
df = filter(dataframe, dataframe[[column_name]] == column_value)
count(df)
}
程式語言 Scala
使用下列內容建立名為 的 myfunctions。 本文中的其他範例預期此筆記本命名為 myfunctions。 您可以針對自己的筆記本使用不同的名稱。
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
return spark.catalog.tableExists(dbName + "." + tableName)
}
// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
val nameOfColumn = null
for(nameOfColumn <- dataFrame.columns) {
if (nameOfColumn == columnName) {
return true
}
}
return false
}
// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
val df = dataFrame.filter(col(columnName) === columnValue)
return df.count()
}
SQL
在下列程式碼中,假設您在 Azure Databricks 工作區中可以存取的資料目錄中,有一個名為 的目錄,其下的名為 default 的架構內有第三方範例數據集 main。 如果您想要使用的目錄或架構有不同的名稱,請變更下列其中一個或兩個 USE 語句以相符。
建立 SQL 筆記本,並將下列內容新增至這個新的筆記本。 然後 將筆記本 連結至叢集,執行 筆記本,將下列 SQL UDF 新增至指定的目錄和架構。
注意
SQL UDF table_exists 和 column_exists 僅適用於 Unity 目錄。 Unity Catalog 的 SQL UDF 支援目前處於公開預覽階段。
USE CATALOG main;
USE SCHEMA default;
CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
db_name STRING,
table_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.tables
WHERE table_catalog = table_exists.catalog_name
AND table_schema = table_exists.db_name
AND table_name = table_exists.table_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
db_name STRING,
table_name STRING,
column_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.columns
WHERE table_catalog = column_exists.catalog_name
AND table_schema = column_exists.db_name
AND table_name = column_exists.table_name
AND column_name = column_exists.column_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
RETURNS BIGINT
RETURN SELECT count(*)
FROM main.default.diamonds
WHERE clarity = clarity_value
呼叫函式
本節描述呼叫上述函式的程序代碼。 例如,您可以使用這些函式來計算資料表中指定值存在於指定資料行中的數據列數目。 不過,在繼續之前,您會想要檢查數據表是否確實存在,以及該數據表中是否確實存在數據行。 下列程式代碼會檢查這些條件。
如果您將上一節的函式新增至 Azure Databricks 工作區,您可以從工作區呼叫這些函式,如下所示。
Python
,並將下列內容新增至筆記本。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後 將筆記本 連結至叢集, 筆記本執行以查看結果。
from myfunctions import *
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "VVS2"
# If the table exists in the specified database...
if tableExists(tableName, dbName):
df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")
# And the specified column exists in that table...
if columnExists(df, columnName):
# Then report the number of rows for the specified value in that column.
numRows = numRowsInColumnForValue(df, columnName, columnValue)
print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
else:
print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.")
R
,並將下列內容新增至筆記本。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後 將筆記本 連結至叢集, 筆記本執行以查看結果。
library(SparkR)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "VVS2"
# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {
df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))
# And the specified column exists in that table...
if (column_exists(df, column_name)) {
# Then report the number of rows for the specified value in that column.
num_rows = num_rows_in_column_for_value(df, column_name, column_value)
print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = ""))
} else {
print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
}
} else {
print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}
程式語言 Scala
在與上述 myfunctions Scala 筆記本相同的資料夾中建立另一個 Scala 筆記本,並將下列內容新增至這個新的筆記本。
在這個新筆記本的第一個單元格中,新增下列程式碼,這段程式碼會呼叫 %run magic 命令。 此魔術可將 myfunctions 筆記本的內容提供給您的新筆記本使用。
%run ./myfunctions
在這個新筆記本的第二個儲存格中,新增下列代碼。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後 將筆記本 連結至叢集, 筆記本執行以查看結果。
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "VVS2"
// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {
val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)
// And the specified column exists in that table...
if (columnExists(df, columnName)) {
// Then report the number of rows for the specified value in that column.
val numRows = numRowsInColumnForValue(df, columnName, columnValue)
println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
} else {
println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
}
} else {
println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}
SQL
將下列程式代碼新增至上述筆記本中的新儲存格,或新增至個別筆記本中的儲存格。 視需要變更資料庫結構或目錄名稱以符合您的名稱,然後執行此程式區塊以查看結果。
SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
table_exists("main", "default", "diamonds")
THEN
-- And the specified column exists in that table...
(SELECT CASE
WHEN
column_exists("main", "default", "diamonds", "clarity")
THEN
-- Then report the number of rows for the specified value in that column.
printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
num_rows_for_clarity_in_diamonds("VVS2"))
ELSE
printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
END)
ELSE
printf("Table 'main.default.diamonds' does not exist.")
END
撰寫單元測試
本節說明測試本文開頭所描述之每個函式的程序代碼。 如果您未來對函式進行任何變更,您可以使用單元測試來判斷這些函式是否仍如預期般運作。
如果您將函式新增至本文開頭的 Azure Databricks 工作區,您可以將這些函式的單元測試新增至工作區,如下所示。
Python
在與存放庫中先前 test_myfunctions.py 檔案相同的資料夾中,建立名為 myfunctions.py 的另一個檔案,並將下列內容新增至檔案。 根據預設,pytest 會尋找名稱開頭為 .py 的 test_ 檔案(或以 _test結尾) 進行測試。 同樣地,根據預設,pytest 會在這些檔案中尋找名稱開頭為 test_ 來測試的函式。
一般而言,最好
import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "SI2"
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
StructField("_c0", IntegerType(), True), \
StructField("carat", FloatType(), True), \
StructField("cut", StringType(), True), \
StructField("color", StringType(), True), \
StructField("clarity", StringType(), True), \
StructField("depth", FloatType(), True), \
StructField("table", IntegerType(), True), \
StructField("price", IntegerType(), True), \
StructField("x", FloatType(), True), \
StructField("y", FloatType(), True), \
StructField("z", FloatType(), True), \
])
data = [ (1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]
df = spark.createDataFrame(data, schema)
# Does the table exist?
def test_tableExists():
assert tableExists(tableName, dbName) is True
# Does the column exist?
def test_columnExists():
assert columnExists(df, columnName) is True
# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
assert numRowsInColumnForValue(df, columnName, columnValue) > 0
R
在與存放庫中先前 test_myfunctions.r 檔案相同的資料夾中,建立名為 myfunctions.r 的另一個檔案,並將下列內容新增至檔案。 根據預設,testthat 會尋找名稱開頭為 .r 來測試的 test 檔案。
一般而言,最好
library(testthat)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "SI2"
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
structField("_c0", "integer"),
structField("carat", "float"),
structField("cut", "string"),
structField("color", "string"),
structField("clarity", "string"),
structField("depth", "float"),
structField("table", "integer"),
structField("price", "integer"),
structField("x", "float"),
structField("y", "float"),
structField("z", "float"))
data <- list(list(as.integer(1), 0.23, "Ideal", "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))
df <- createDataFrame(data, schema)
# Does the table exist?
test_that ("The table exists.", {
expect_true(table_exists(table_name, db_name))
})
# Does the column exist?
test_that ("The column exists in the table.", {
expect_true(column_exists(df, column_name))
})
# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})
程式語言 Scala
在與上述 myfunctions Scala 筆記本相同的資料夾中建立另一個 Scala 筆記本,並將下列內容新增至這個新的筆記本。
在新筆記本的第一個單元格中,新增下列代碼,會呼叫 %run magic。 此魔術可將 myfunctions 筆記本的內容提供給您的新筆記本使用。
%run ./myfunctions
在第二個儲存格中,新增下列程式代碼。 此程式代碼會定義單元測試,並指定如何執行它們。
一般而言,最好
import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._
class DataTests extends AsyncFunSuite {
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "SI2"
// Create fake data for the unit tests to run against.
// In general, it is a best practice to not run unit tests
// against functions that work with data in production.
val schema = StructType(Array(
StructField("_c0", IntegerType),
StructField("carat", FloatType),
StructField("cut", StringType),
StructField("color", StringType),
StructField("clarity", StringType),
StructField("depth", FloatType),
StructField("table", IntegerType),
StructField("price", IntegerType),
StructField("x", FloatType),
StructField("y", FloatType),
StructField("z", FloatType)
))
val data = Seq(
Row(1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
).asJava
val df = spark.createDataFrame(data, schema)
// Does the table exist?
test("The table exists") {
assert(tableExists(tableName, dbName) == true)
}
// Does the column exist?
test("The column exists") {
assert(columnExists(df, columnName) == true)
}
// Is there at least one row for the value in the specified column?
test("There is at least one matching row") {
assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
}
}
nocolor.nodurations.nostacks.stats.run(new DataTests)
注意
此程式碼範例使用 ScalaTest 中的 FunSuite 測試樣式。 如需其他可用的測試樣式,請參閱 選取項目的測試樣式。
SQL
新增單元測試之前,您應該注意,一般而言,最好 不要 對生產環境中使用數據的函式執行單元測試。 對於新增、移除或其他變更數據的函式來說,這特別重要。 若要以非預期的方式保護生產數據不受單元測試危害,您應該對非生產數據執行單元測試。 其中一個常見方法是針對 視圖而非數據表執行單元測試。
若要建立檢視,您可以從上述筆記本或個別筆記本中的新單元格呼叫 CREATE VIEW 命令。 下列範例假設您在名為 diamonds的目錄中、名為 default 的架構內,擁有一個名為 main 的現有資料表。 視需要變更這些名稱以符合您自己的名稱,然後只執行該儲存格。
USE CATALOG main;
USE SCHEMA default;
CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;
建立檢視後,將以下每一個 SELECT 語句新增到前一個筆記本或另一個筆記本中的新單元格。 視需要將名稱改成您自己的。
SELECT if(table_exists("main", "default", "view_diamonds"),
printf("PASS: The table 'main.default.view_diamonds' exists."),
printf("FAIL: The table 'main.default.view_diamonds' does not exist."));
SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));
SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));
執行單元測試
本節說明如何執行您在上一節中編碼的單元測試。 當您執行單元測試時,會取得結果,其中顯示哪些單元測試通過且失敗。
如果您將上一節的單元測試新增至 Azure Databricks 工作區,您可以從工作區執行這些單元測試。 您可以手動或定期執行這些單元測試。
Python
在與存放庫中上述 test_myfunctions.py 檔案相同的資料夾中建立 Python 筆記本,並新增下列內容。
在新筆記本的第一個儲存格中,新增下列程式代碼,然後執行會呼叫magic的 %pip 單元格。 此 magic 會安裝 pytest。
%pip install pytest
在第二個儲存格中,新增下列程式代碼,然後執行儲存格。 結果顯示哪些單元測試通過且失敗。
import pytest
import sys
# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True
# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
R
在與存放庫中上述 test_myfunctions.r 檔案相同的資料夾中建立 R Notebook,並新增下列內容。
請在第一個儲存格中添加以下程式碼,然後執行呼叫 install.packages 函式的儲存格。 這個函式會安裝 testthat。
install.packages("testthat")
在第二個儲存格中,新增下列程式代碼,然後執行儲存格。 結果顯示哪些單元測試通過且失敗。
library(testthat)
source("myfunctions.r")
test_dir(".", reporter = "tap")
程式語言 Scala
從上一節的筆記本中依次執行第一個儲存格,然後執行第二個儲存格。 結果顯示哪些單元測試通過且失敗。
SQL
逐一執行前一節筆記本中的三個儲存格。 結果會顯示每個單元測試都通過或失敗。
如果您在執行單元測試之後不再需要檢視,您可以刪除檢視。 若要刪除此檢視,您可以將下列程式代碼新增至上述其中一個筆記本內的新單元格,然後只執行該儲存格。
DROP VIEW view_diamonds;
提示
您可以在叢集的 驅動程式記錄中檢視筆記本執行的結果(包括單元測試結果)。 您也可以指定叢集 記錄傳遞的位置。
您可以設定持續整合和持續傳遞或部署 (CI/CD) 系統,例如 GitHub Actions,以在程式碼變更時自動執行單元測試。 如需範例,請參閱 軟體工程最佳做法中的筆記本的 GitHub Actions 涵蓋範圍。