筆記本的單元測試

您可以使用 單元測試 來協助改善筆記本程式代碼的品質和一致性。 單元測試是測試獨立式程式代碼單位的方法,例如函式的早期和經常。 這可協助您更快找到程式碼的問題、更快速地找出您程式代碼的錯誤假設,並簡化整體程式代碼撰寫工作。

本文是使用函式進行基本 單元測試 的簡介。 單元測試類別和介面,以及使用 存根模擬測試線束等進階概念,同時在筆記本的單元測試時也受到支援。本文範圍之外。 本文也未涵蓋其他類型的測試方法,例如 整合測試系統測試驗收測試非功能性測試 方法,例如 效能測試可用性測試

本文示範下列各項:

  • 如何組織函式及其單元測試。
  • 如何在 Python、R、Scala 以及 SQL 中撰寫使用者定義函式,這些函式是設計成經過單元測試的。
  • 如何從 Python、R、Scala 和 SQL 筆記本呼叫這些函式。
  • 如何使用適用於 Python 的熱門測試架構 pytestR 測試以及 ScalaTest for Scala,在 Python、R 和 Scala 中撰寫單元測試。 此外,如何撰寫 SQL,以單元測試 SQL 使用者定義函式 (SQL UDF)。
  • 如何從 Python、R、Scala 和 SQL 筆記本執行這些單元測試。

組織函式和單元測試

使用筆記本來組織函式及其單元測試有一些常見的方法。 每個方法都有其優點和挑戰。

針對 Python、R 和 Scala 筆記本,常見的方法包括下列各項:

針對 Python 和 R 筆記本,Databricks 建議將函式及其單元測試儲存在筆記本之外。 針對 Scala 筆記本,Databricks 建議在一個筆記本中包含函式,以及個別筆記本中的單元測試。

針對 SQL 筆記本,Databricks 建議您將函式儲存為架構中的 SQL 使用者定義函式 (SQL UDF)(也稱為資料庫)。 然後,您可以從 SQL 筆記本呼叫這些 SQL UDF 及其單元測試。

撰寫函式

本節描述一組簡單的範例函式,可決定下列各項:

  • 數據表是否存在於資料庫中。
  • 數據行是否存在於數據表中。
  • 該數據行內某個值的數據行中有多少個數據列。

這些函式的目的很簡單,因此您可以專注於本文中的單元測試詳細數據,而不是專注於函式本身。

若要取得最佳的單元測試結果,函式應該會傳回單一可預測結果,且為單一數據類型。 例如,若要檢查某個專案是否存在,函式應該傳回 true 或 false 的布爾值。 若要傳回存在的數據列數目,函式應該會傳回非負數的整數。 第一個範例中不應該傳回 false,如果某個專案不存在,則傳回 false,如果它確實存在,則傳回它本身。 同樣地,在第二個範例中,它不應該傳回存在的數據列數目,如果沒有數據列,則傳回 false。

您可以將這些函式新增至現有的 Azure Databricks 工作區,如下所示,在 Python、R、Scala 或 SQL 中。

Python

下列程式代碼假設您已 設定 Databricks Git 資料夾 (Repos)新增存放庫,並在您的 Azure Databricks 工作區中開啟存放庫。

在存放庫中建立名為 myfunctions.py 的檔案,並將下列內容新增至檔案。 本文中的其他範例預期此檔案會命名為 myfunctions.py。 您可以針對自己的檔案使用不同的名稱。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

R

下列程式代碼假設您已 設定 Databricks Git 資料夾 (Repos)新增存放庫,並在您的 Azure Databricks 工作區中開啟存放庫。

在存放庫中建立名為 myfunctions.r 的檔案,並將下列內容新增至檔案。 本文中的其他範例預期此檔案會命名為 myfunctions.r。 您可以針對自己的檔案使用不同的名稱。

library(SparkR)

# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
  tableExists(paste(db_name, ".", table_name, sep = ""))
}

# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
  column_name %in% colnames(dataframe)
}

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
  df = filter(dataframe, dataframe[[column_name]] == column_value)

  count(df)
}

Scala

使用下列內容建立名為 myfunctions 的 Scala 筆記本。 本文中的其他範例預期此筆記本名稱為 myfunctions。 您可以針對自己的筆記本使用不同的名稱。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
  return spark.catalog.tableExists(dbName + "." + tableName)
}

// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
  val nameOfColumn = null

  for(nameOfColumn <- dataFrame.columns) {
    if (nameOfColumn == columnName) {
      return true
    }
  }

  return false
}

// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
  val df = dataFrame.filter(col(columnName) === columnValue)

  return df.count()
}

Sql

下列程式代碼假設您在名為 maindefault 的目錄內,有第三方範例數據集鑽石,可從 Azure Databricks 工作區存取。 如果您想要使用的目錄或架構有不同的名稱,請變更下列 USE 其中一個或兩個語句以符合。

建立 SQL 筆記本 ,並將下列內容新增至這個新的筆記本。 然後將 筆記本附加 至叢集,然後 執行 筆記本,將下列 SQL UDF 新增至指定的目錄和架構。

注意

SQL UDF table_exists ,只能 column_exists 與 Unity 目錄搭配使用。 Unity 目錄的 SQL UDF 支援處於 公開預覽狀態

USE CATALOG main;
USE SCHEMA default;

CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
                                        db_name      STRING,
                                        table_name   STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.tables
     WHERE table_catalog = table_exists.catalog_name
       AND table_schema  = table_exists.db_name
       AND table_name    = table_exists.table_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
                                         db_name      STRING,
                                         table_name   STRING,
                                         column_name  STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.columns
     WHERE table_catalog = column_exists.catalog_name
       AND table_schema  = column_exists.db_name
       AND table_name    = column_exists.table_name
       AND column_name   = column_exists.column_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
  RETURNS BIGINT
  RETURN SELECT count(*)
         FROM main.default.diamonds
         WHERE clarity = clarity_value

呼叫函式

本節描述呼叫上述函式的程序代碼。 例如,您可以使用這些函式來計算資料表中指定值存在於指定資料行中的數據列數目。 不過,在繼續之前,您會想要檢查數據表是否確實存在,以及該數據表中是否確實存在數據行。 下列程式代碼會檢查這些條件。

如果您將上一節的函式新增至 Azure Databricks 工作區,您可以從工作區呼叫這些函式,如下所示。

Python

在與存放庫中上述myfunctions.py檔案相同的資料夾中建立 Python 筆記本,並將下列內容新增至筆記本。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後將 筆記本附加 至叢集,然後 執行 筆記本以查看結果。

from myfunctions import *

tableName   = "diamonds"
dbName      = "default"
columnName  = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

  df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

R

在與存放庫中上myfunctions.r一個檔案相同的資料夾中建立 R 筆記本,並將下列內容新增至筆記本。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後將 筆記本附加 至叢集,然後 執行 筆記本以查看結果。

library(SparkR)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "VVS2"

# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {

  df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))

  # And the specified column exists in that table...
  if (column_exists(df, column_name)) {
    # Then report the number of rows for the specified value in that column.
    num_rows = num_rows_in_column_for_value(df, column_name, column_value)

    print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = "")) 
  } else {
    print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
  }

} else {
  print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}

Scala

在與上述 myfunctions Scala 筆記本相同的資料夾中建立另一個 Scala Notebook,並將下列內容新增至這個新的筆記本。

在此新筆記本的第一個數據格中,新增下列程序代碼,其會呼叫 %run magic。 此魔術可讓新筆記本使用筆記本的內容 myfunctions

%run ./myfunctions

在這個新筆記本的第二個數據格中,新增下列程序代碼。 視需要變更數據表名稱、架構(資料庫)名稱、數據行名稱和數據行值的變數值。 然後將 筆記本附加 至叢集,然後 執行 筆記本以查看結果。

val tableName   = "diamonds"
val dbName      = "default"
val columnName  = "clarity"
val columnValue = "VVS2"

// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {

  val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)

  // And the specified column exists in that table...
  if (columnExists(df, columnName)) {
    // Then report the number of rows for the specified value in that column.
    val numRows = numRowsInColumnForValue(df, columnName, columnValue)

    println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
  } else {
    println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
  }

} else {
  println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}

Sql

將下列程式代碼新增至上述筆記本中的新儲存格,或新增至個別筆記本中的儲存格。 視需要變更架構或目錄名稱以符合您的名稱,然後執行此數據格以查看結果。

SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
  table_exists("main", "default", "diamonds")
THEN
  -- And the specified column exists in that table...
  (SELECT CASE
   WHEN
     column_exists("main", "default", "diamonds", "clarity")
   THEN
     -- Then report the number of rows for the specified value in that column.
     printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
            num_rows_for_clarity_in_diamonds("VVS2"))
   ELSE
     printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
   END)
ELSE
  printf("Table 'main.default.diamonds' does not exist.")
END

撰寫單元測試

本節說明測試本文開頭所描述之每個函式的程序代碼。 如果您未來對函式進行任何變更,您可以使用單元測試來判斷這些函式是否仍如預期般運作。

如果您將函式新增至本文開頭的 Azure Databricks 工作區,您可以將這些函式的單元測試新增至工作區,如下所示。

Python

在與存放庫中先前myfunctions.py檔案相同的資料夾中建立名為 test_myfunctions.py 的另一個檔案,並將下列內容新增至檔案。 根據預設, pytest 尋找 .py 名稱開頭為 test_ (或結尾 _test為 ) 的檔案,以進行測試。 同樣地,根據預設, pytest 這些檔案會尋找名稱開頭 test_ 為 要測試的函式。

一般而言, 最好不要 對生產環境中使用數據的函式執行單元測試。 對於新增、移除或其他變更數據的函式來說,這特別重要。 若要以非預期的方式保護生產數據不受單元測試危害,您應該對非生產數據執行單元測試。 其中一個常見方法是建立盡可能接近生產數據的假數據。 下列程式代碼範例會建立要針對單元測試執行的假數據。

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName    = "diamonds"
dbName       = "default"
columnName   = "clarity"
columnValue  = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("_c0",     IntegerType(), True), \
  StructField("carat",   FloatType(),   True), \
  StructField("cut",     StringType(),  True), \
  StructField("color",   StringType(),  True), \
  StructField("clarity", StringType(),  True), \
  StructField("depth",   FloatType(),   True), \
  StructField("table",   IntegerType(), True), \
  StructField("price",   IntegerType(), True), \
  StructField("x",       FloatType(),   True), \
  StructField("y",       FloatType(),   True), \
  StructField("z",       FloatType(),   True), \
])

data = [ (1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
         (2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0

R

在與存放庫中先前myfunctions.r檔案相同的資料夾中建立名為 test_myfunctions.r 的另一個檔案,並將下列內容新增至檔案。 根據預設, testthat 尋找 .r 名稱開頭 test 為 要測試的檔案。

一般而言, 最好不要 對生產環境中使用數據的函式執行單元測試。 對於新增、移除或其他變更數據的函式來說,這特別重要。 若要以非預期的方式保護生產數據不受單元測試危害,您應該對非生產數據執行單元測試。 其中一個常見方法是建立盡可能接近生產數據的假數據。 下列程式代碼範例會建立要針對單元測試執行的假數據。

library(testthat)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "SI2"

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
  structField("_c0",     "integer"),
  structField("carat",   "float"),
  structField("cut",     "string"),
  structField("color",   "string"),
  structField("clarity", "string"),
  structField("depth",   "float"),
  structField("table",   "integer"),
  structField("price",   "integer"),
  structField("x",       "float"),
  structField("y",       "float"),
  structField("z",       "float"))

data <- list(list(as.integer(1), 0.23, "Ideal",   "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
             list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))

df <- createDataFrame(data, schema)

# Does the table exist?
test_that ("The table exists.", {
  expect_true(table_exists(table_name, db_name))
})

# Does the column exist?
test_that ("The column exists in the table.", {
  expect_true(column_exists(df, column_name))
})

# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
  expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})

Scala

在與上述 myfunctions Scala 筆記本相同的資料夾中建立另一個 Scala Notebook,並將下列內容新增至這個新的筆記本。

在新筆記本的第一個數據格中,新增下列程序代碼,其會呼叫 %run magic。 此魔術可讓新筆記本使用筆記本的內容 myfunctions

%run ./myfunctions

在第二個儲存格中,新增下列程式代碼。 此程式代碼會定義單元測試,並指定如何執行它們。

一般而言, 最好不要 對生產環境中使用數據的函式執行單元測試。 對於新增、移除或其他變更數據的函式來說,這特別重要。 若要以非預期的方式保護生產數據不受單元測試危害,您應該對非生產數據執行單元測試。 其中一個常見方法是建立盡可能接近生產數據的假數據。 下列程式代碼範例會建立要針對單元測試執行的假數據。

import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._

class DataTests extends AsyncFunSuite {

  val tableName   = "diamonds"
  val dbName      = "default"
  val columnName  = "clarity"
  val columnValue = "SI2"

  // Create fake data for the unit tests to run against.
  // In general, it is a best practice to not run unit tests
  // against functions that work with data in production.
  val schema = StructType(Array(
                 StructField("_c0",     IntegerType),
                 StructField("carat",   FloatType),
                 StructField("cut",     StringType),
                 StructField("color",   StringType),
                 StructField("clarity", StringType),
                 StructField("depth",   FloatType),
                 StructField("table",   IntegerType),
                 StructField("price",   IntegerType),
                 StructField("x",       FloatType),
                 StructField("y",       FloatType),
                 StructField("z",       FloatType)
               ))

  val data = Seq(
                  Row(1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
                  Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
                ).asJava

  val df = spark.createDataFrame(data, schema)

  // Does the table exist?
  test("The table exists") {
    assert(tableExists(tableName, dbName) == true)
  }

  // Does the column exist?
  test("The column exists") {
    assert(columnExists(df, columnName) == true)
  }

  // Is there at least one row for the value in the specified column?
  test("There is at least one matching row") {
    assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
  }
}

nocolor.nodurations.nostacks.stats.run(new DataTests)

注意

此程式代碼範例會 FunSuite 使用 ScalaTest 中的測試樣式。 如需其他可用的測試樣式,請參閱 為您的項目選取測試樣式。

Sql

新增單元測試之前,您應該注意一般而言, 最好不要 對生產環境中使用數據的函式執行單元測試。 對於新增、移除或其他變更數據的函式來說,這特別重要。 若要以非預期的方式保護生產數據不受單元測試危害,您應該對非生產數據執行單元測試。 其中一個常見方法是對檢視執行單元測試,而不是對數據表執行。

若要建立檢視,您可以從上述筆記本或個別筆記本中的新單元格呼叫 CREATE VIEW 命令。 下列範例假設您在名為 main的目錄內具有名為 的架構 (database) default 內名為 diamonds 的現有數據表。 視需要變更這些名稱以符合您自己的名稱,然後只執行該儲存格。

USE CATALOG main;
USE SCHEMA default;

CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;

建立檢視之後,請將下列 SELECT 每一個語句新增至先前筆記本中自己的新單元格,或新增至個別筆記本中自己的新單元格。 視需要變更名稱以符合您自己的名稱。

SELECT if(table_exists("main", "default", "view_diamonds"),
          printf("PASS: The table 'main.default.view_diamonds' exists."),
          printf("FAIL: The table 'main.default.view_diamonds' does not exist."));

SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
          printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
          printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));

SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
          printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
          printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));

執行單元測試

本節說明如何執行您在上一節中編碼的單元測試。 當您執行單元測試時,會取得結果,其中顯示哪些單元測試通過且失敗。

如果您將上一節的單元測試新增至 Azure Databricks 工作區,您可以從工作區執行這些單元測試。 您可以手動依排程執行這些單元測試。

Python

在與存放庫中上 test_myfunctions.py 一個檔案相同的資料夾中建立 Python 筆記本,並新增下列內容。

在新筆記本的第一個儲存格中,新增下列程式代碼,然後執行會呼叫magic的 %pip 單元格。 此 magic 會 pytest安裝 。

%pip install pytest

在第二個儲存格中,新增下列程式代碼,然後執行儲存格。 結果顯示哪些單元測試通過且失敗。

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."

R

在與存放庫中上 test_myfunctions.r 一個檔案相同的資料夾中建立 R 筆記本,並新增下列內容。

在第一個儲存格中,新增下列程式代碼,然後執行會呼叫 函式的 install.packages 儲存格。 這個函式會 testthat安裝 。

install.packages("testthat")

在第二個儲存格中,新增下列程式代碼,然後執行儲存格。 結果顯示哪些單元測試通過且失敗。

library(testthat)
source("myfunctions.r")

test_dir(".", reporter = "tap")

Scala

從上一節執行筆記本中的第一個和第二個單元格。 結果顯示哪些單元測試通過且失敗。

Sql

從上一節執行筆記本中三個儲存格中的每一個。 結果會顯示每個單元測試都通過或失敗。

如果您在執行單元測試之後不再需要檢視,您可以刪除檢視。 若要刪除此檢視,您可以將下列程式代碼新增至上述其中一個筆記本內的新單元格,然後只執行該儲存格。

DROP VIEW view_diamonds;

提示

您可以在叢集的 驅動程式記錄中檢視筆記本執行的結果(包括單元測試結果)。 您也可以指定叢集 記錄傳遞的位置。

您可以設定持續整合和持續傳遞或部署 (CI/CD) 系統,例如 GitHub Actions,以在程式碼變更時自動執行單元測試。 如需範例,請參閱筆記本軟體工程最佳做法中的 GitHub Actions 涵蓋範圍。

其他資源

pytest

testthat

ScalaTest

SQL