اختبار الوحدة لدفاتر الملاحظات

يمكنك استخدام اختبار الوحدة للمساعدة في تحسين جودة التعليمات البرمجية لدفاتر الملاحظات واتساقها. اختبار الوحدة هو نهج لاختبار وحدات التعليمات البرمجية المكتفية ذاتيا، مثل الوظائف، في وقت مبكر وغالبا. يساعدك هذا في العثور على مشكلات في التعليمات البرمجية الخاصة بك بشكل أسرع، والكشف عن الافتراضات الخاطئة حول التعليمات البرمجية الخاصة بك في وقت أقرب، وتبسيط جهود الترميز الشاملة.

هذه المقالة هي مقدمة لاختبار الوحدة الأساسية مع الوظائف. المفاهيم المتقدمة مثل فئات اختبار الوحدة والواجهات، بالإضافة إلى استخدام الكعب والزخارف وتسخير الاختبار، بينما يتم دعمها أيضا عند اختبار الوحدة لدفاتر الملاحظات، خارج نطاق هذه المقالة. لا تغطي هذه المقالة أيضا أنواعا أخرى من أساليب الاختبار، مثل اختبار التكامل أو اختبار النظام أو اختبار القبول أو أساليب الاختبار غير الوظيفية مثل اختبار الأداء أو اختبار قابلية الاستخدام.

توضح هذه المقالة ما يلي:

  • كيفية تنظيم الوظائف واختبارات الوحدة الخاصة بها.
  • كيفية كتابة الدالات في Python وR وSc scala، بالإضافة إلى الوظائف المعرفة من قبل المستخدم في SQL، المصممة جيدا لاختبار الوحدة.
  • كيفية استدعاء هذه الدالات من دفاتر ملاحظات Python وR وSc scala وSQL.
  • كيفية كتابة اختبارات الوحدة في Python وR وScala باستخدام أطر الاختبار الشائعة pytest ل Python، واختبار R، وScalaTest ل Scala. أيضا كيفية كتابة SQL التي تختبر الوحدة وظائف SQL المعرفة من قبل المستخدم (SQL UDFs).
  • كيفية تشغيل اختبارات الوحدة هذه من دفاتر ملاحظات Python وR وSc scala وSQL.

تنظيم الدالات واختبارات الوحدة

هناك بعض الأساليب الشائعة لتنظيم وظائفك واختبارات الوحدة الخاصة بها باستخدام دفاتر الملاحظات. ولكل نهج فوائده وتحدياته.

بالنسبة لدفاتر ملاحظات Python وR وSc scala، تتضمن الأساليب الشائعة ما يلي:

بالنسبة لدفاتر ملاحظات Python وR، توصي Databricks بتخزين الوظائف واختبارات الوحدة الخاصة بها خارج دفاتر الملاحظات. بالنسبة لدفاتر ملاحظات Scala، توصي Databricks بتضمين الوظائف في دفتر ملاحظات واحد واختبارات الوحدة الخاصة بها في دفتر ملاحظات منفصل.

بالنسبة لدفاتر ملاحظات SQL، توصي Databricks بتخزين الوظائف كدالات يحددها المستخدم SQL (SQL UDFs) في مخططاتك (المعروفة أيضا باسم قواعد البيانات). يمكنك بعد ذلك استدعاء SQL UDFs هذه واختبارات الوحدة الخاصة بها من دفاتر ملاحظات SQL.

كتابة الدالات

يصف هذا القسم مجموعة بسيطة من وظائف المثال التي تحدد ما يلي:

  • ما إذا كان هناك جدول في قاعدة بيانات.
  • ما إذا كان هناك عمود في جدول.
  • عدد الصفوف الموجودة في عمود لقيمة داخل هذا العمود.

تهدف هذه الدالات إلى أن تكون بسيطة، بحيث يمكنك التركيز على تفاصيل اختبار الوحدة في هذه المقالة بدلا من التركيز على الوظائف نفسها.

للحصول على أفضل نتائج اختبار الوحدة، يجب أن ترجع الدالة نتيجة واحدة يمكن التنبؤ بها وأن تكون من نوع بيانات واحد. على سبيل المثال، للتحقق مما إذا كان هناك شيء ما، يجب أن ترجع الدالة قيمة منطقية صحيحة أو خاطئة. لإرجاع عدد الصفوف الموجودة، يجب أن ترجع الدالة عددا صحيحا غير سالب. لا ينبغي، في المثال الأول، إرجاع إما خطأ إذا لم يكن هناك شيء ما أو الشيء نفسه إذا كان موجودا. وبالمثل، بالنسبة للمثال الثاني، يجب ألا ترجع إما عدد الصفوف الموجودة أو الخاطئة إذا لم تكن هناك صفوف.

يمكنك إضافة هذه الدالات إلى مساحة عمل Azure Databricks موجودة كما يلي، في Python أو R أو Scala أو SQL.

Python

تفترض التعليمات البرمجية التالية أن لديك إعداد مجلدات Databricks Git (Repos) وإضافة مستودع وفتح المستودع في مساحة عمل Azure Databricks.

أنشئ ملفا باسم myfunctions.py داخل المستودع، وأضف المحتويات التالية إلى الملف. تتوقع أمثلة أخرى في هذه المقالة تسمية myfunctions.pyهذا الملف . يمكنك استخدام أسماء مختلفة لملفاتك الخاصة.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

R

تفترض التعليمات البرمجية التالية أن لديك إعداد مجلدات Databricks Git (Repos) وإضافة مستودع وفتح المستودع في مساحة عمل Azure Databricks.

أنشئ ملفا باسم myfunctions.r داخل المستودع، وأضف المحتويات التالية إلى الملف. تتوقع أمثلة أخرى في هذه المقالة تسمية myfunctions.rهذا الملف . يمكنك استخدام أسماء مختلفة لملفاتك الخاصة.

library(SparkR)

# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
  tableExists(paste(db_name, ".", table_name, sep = ""))
}

# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
  column_name %in% colnames(dataframe)
}

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
  df = filter(dataframe, dataframe[[column_name]] == column_value)

  count(df)
}

Scala

إنشاء دفتر ملاحظات Scala باسم myfunctions بالمحتويات التالية. تتوقع أمثلة أخرى في هذه المقالة تسمية myfunctionsدفتر الملاحظات هذا . يمكنك استخدام أسماء مختلفة لدفاتر الملاحظات الخاصة بك.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
  return spark.catalog.tableExists(dbName + "." + tableName)
}

// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
  val nameOfColumn = null

  for(nameOfColumn <- dataFrame.columns) {
    if (nameOfColumn == columnName) {
      return true
    }
  }

  return false
}

// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
  val df = dataFrame.filter(col(columnName) === columnValue)

  return df.count()
}

SQL

تفترض التعليمات البرمجية التالية أن لديك عينة من معينات مجموعة البيانات التابعة لجهة خارجية داخل مخطط مسمى default داخل كتالوج مسمى main يمكن الوصول إليه من مساحة عمل Azure Databricks. إذا كان الكتالوج أو المخطط الذي تريد استخدامه له اسم مختلف، فقم بتغيير إحدى العبارات التالية USE أو كليهما لمطابقتها.

إنشاء دفتر ملاحظات SQL وإضافة المحتويات التالية إلى دفتر الملاحظات الجديد هذا. ثم قم بإرفاق دفتر الملاحظات بمجموعة وتشغيلدفتر الملاحظات لإضافة SQL UDFs التالية إلى الكتالوج والمخطط المحددين.

إشعار

SQL UDFs table_exists والعمل column_exists فقط مع كتالوج Unity. يتوفر دعم SQL UDF ل Unity Catalog في المعاينة العامة.

USE CATALOG main;
USE SCHEMA default;

CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
                                        db_name      STRING,
                                        table_name   STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.tables
     WHERE table_catalog = table_exists.catalog_name
       AND table_schema  = table_exists.db_name
       AND table_name    = table_exists.table_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
                                         db_name      STRING,
                                         table_name   STRING,
                                         column_name  STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.columns
     WHERE table_catalog = column_exists.catalog_name
       AND table_schema  = column_exists.db_name
       AND table_name    = column_exists.table_name
       AND column_name   = column_exists.column_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
  RETURNS BIGINT
  RETURN SELECT count(*)
         FROM main.default.diamonds
         WHERE clarity = clarity_value

دالات الاستدعاء

يصف هذا القسم التعليمات البرمجية التي تستدعي الدالات السابقة. يمكنك استخدام هذه الدالات، على سبيل المثال، لحساب عدد الصفوف في الجدول حيث توجد قيمة محددة داخل عمود محدد. ومع ذلك، قد تحتاج إلى التحقق مما إذا كان الجدول موجودا بالفعل، وما إذا كان العمود موجودا بالفعل في هذا الجدول، قبل المتابعة. تتحقق التعليمات البرمجية التالية من هذه الشروط.

إذا أضفت الدالات من القسم السابق إلى مساحة عمل Azure Databricks، يمكنك استدعاء هذه الدالات من مساحة العمل الخاصة بك كما يلي.

Python

قم بإنشاء دفتر ملاحظات Python في نفس المجلد مثل الملف السابق myfunctions.py في المستودع الخاص بك، وأضف المحتويات التالية إلى دفتر الملاحظات. قم بتغيير قيم المتغيرات لاسم الجدول واسم المخطط (قاعدة البيانات) واسم العمود وقيمة العمود حسب الحاجة. ثم قم بإرفاق دفتر الملاحظات بمجموعة وتشغيلدفتر الملاحظات لمشاهدة النتائج.

from myfunctions import *

tableName   = "diamonds"
dbName      = "default"
columnName  = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

  df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

R

قم بإنشاء دفتر ملاحظات R في نفس المجلد مثل الملف السابق myfunctions.r في المستودع الخاص بك، وأضف المحتويات التالية إلى دفتر الملاحظات. قم بتغيير قيم المتغيرات لاسم الجدول واسم المخطط (قاعدة البيانات) واسم العمود وقيمة العمود حسب الحاجة. ثم قم بإرفاق دفتر الملاحظات بمجموعة وتشغيلدفتر الملاحظات لمشاهدة النتائج.

library(SparkR)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "VVS2"

# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {

  df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))

  # And the specified column exists in that table...
  if (column_exists(df, column_name)) {
    # Then report the number of rows for the specified value in that column.
    num_rows = num_rows_in_column_for_value(df, column_name, column_value)

    print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = "")) 
  } else {
    print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
  }

} else {
  print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}

Scala

أنشئ دفتر ملاحظات Scala آخر في نفس المجلد مثل دفتر ملاحظات Scala السابق myfunctions ، وأضف المحتويات التالية إلى دفتر الملاحظات الجديد هذا.

في الخلية الأولى لدفتر الملاحظات الجديد هذا، أضف التعليمات البرمجية التالية، التي تستدعي سحر ٪run . يجعل هذا السحر محتويات دفتر الملاحظات myfunctions متوفرة لدفتر الملاحظات الجديد.

%run ./myfunctions

في الخلية الثانية لدفتر الملاحظات الجديد، أضف التعليمات البرمجية التالية. قم بتغيير قيم المتغيرات لاسم الجدول واسم المخطط (قاعدة البيانات) واسم العمود وقيمة العمود حسب الحاجة. ثم قم بإرفاق دفتر الملاحظات بمجموعة وتشغيلدفتر الملاحظات لمشاهدة النتائج.

val tableName   = "diamonds"
val dbName      = "default"
val columnName  = "clarity"
val columnValue = "VVS2"

// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {

  val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)

  // And the specified column exists in that table...
  if (columnExists(df, columnName)) {
    // Then report the number of rows for the specified value in that column.
    val numRows = numRowsInColumnForValue(df, columnName, columnValue)

    println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
  } else {
    println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
  }

} else {
  println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}

SQL

أضف التعليمات البرمجية التالية إلى خلية جديدة في دفتر الملاحظات السابق أو إلى خلية في دفتر ملاحظات منفصل. قم بتغيير أسماء المخطط أو الكتالوج إذا لزم الأمر لمطابقة أسماءك، ثم قم بتشغيل هذه الخلية لمشاهدة النتائج.

SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
  table_exists("main", "default", "diamonds")
THEN
  -- And the specified column exists in that table...
  (SELECT CASE
   WHEN
     column_exists("main", "default", "diamonds", "clarity")
   THEN
     -- Then report the number of rows for the specified value in that column.
     printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
            num_rows_for_clarity_in_diamonds("VVS2"))
   ELSE
     printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
   END)
ELSE
  printf("Table 'main.default.diamonds' does not exist.")
END

اختبارات وحدة الكتابة

يصف هذا القسم التعليمات البرمجية التي تختبر كل دالة من الدالات الموضحة في بداية هذه المقالة. إذا قمت بإجراء أي تغييرات على الدالات في المستقبل، يمكنك استخدام اختبارات الوحدة لتحديد ما إذا كانت هذه الدالات لا تزال تعمل كما تتوقعها.

إذا أضفت الدالات في بداية هذه المقالة إلى مساحة عمل Azure Databricks، يمكنك إضافة اختبارات الوحدة لهذه الدالات إلى مساحة العمل الخاصة بك كما يلي.

Python

قم بإنشاء ملف آخر باسم test_myfunctions.py في نفس المجلد مثل الملف السابق myfunctions.py في المستودع الخاص بك، وأضف المحتويات التالية إلى الملف. بشكل افتراضي، pytest يبحث عن .py الملفات التي تبدأ أسماؤها ب test_ (أو تنتهي ب _test) للاختبار. وبالمثل، بشكل افتراضي، pytest يبحث داخل هذه الملفات عن الوظائف التي تبدأ أسماؤها ب test_ للاختبار.

بشكل عام، من أفضل الممارسات عدم تشغيل اختبارات الوحدة مقابل الوظائف التي تعمل مع البيانات في الإنتاج. هذا مهم بشكل خاص للدالات التي تضيف البيانات أو تزيلها أو تغيرها بطريقة أخرى. لحماية بيانات الإنتاج من الاختراق بواسطة اختبارات الوحدة بطرق غير متوقعة، يجب تشغيل اختبارات الوحدة مقابل البيانات غير الإنتاجية. أحد النهج الشائعة هو إنشاء بيانات وهمية أقرب ما يمكن إلى بيانات الإنتاج. ينشئ مثال التعليمات البرمجية التالي بيانات مزيفة لاختبارات الوحدة لتشغيلها.

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName    = "diamonds"
dbName       = "default"
columnName   = "clarity"
columnValue  = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("_c0",     IntegerType(), True), \
  StructField("carat",   FloatType(),   True), \
  StructField("cut",     StringType(),  True), \
  StructField("color",   StringType(),  True), \
  StructField("clarity", StringType(),  True), \
  StructField("depth",   FloatType(),   True), \
  StructField("table",   IntegerType(), True), \
  StructField("price",   IntegerType(), True), \
  StructField("x",       FloatType(),   True), \
  StructField("y",       FloatType(),   True), \
  StructField("z",       FloatType(),   True), \
])

data = [ (1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
         (2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0

R

قم بإنشاء ملف آخر باسم test_myfunctions.r في نفس المجلد مثل الملف السابق myfunctions.r في المستودع الخاص بك، وأضف المحتويات التالية إلى الملف. بشكل افتراضي، testthat يبحث عن .r الملفات التي تبدأ test أسماؤها بالاختبار.

بشكل عام، من أفضل الممارسات عدم تشغيل اختبارات الوحدة مقابل الوظائف التي تعمل مع البيانات في الإنتاج. هذا مهم بشكل خاص للدالات التي تضيف البيانات أو تزيلها أو تغيرها بطريقة أخرى. لحماية بيانات الإنتاج من الاختراق بواسطة اختبارات الوحدة بطرق غير متوقعة، يجب تشغيل اختبارات الوحدة مقابل البيانات غير الإنتاجية. أحد النهج الشائعة هو إنشاء بيانات وهمية أقرب ما يمكن إلى بيانات الإنتاج. ينشئ مثال التعليمات البرمجية التالي بيانات مزيفة لاختبارات الوحدة لتشغيلها.

library(testthat)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "SI2"

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
  structField("_c0",     "integer"),
  structField("carat",   "float"),
  structField("cut",     "string"),
  structField("color",   "string"),
  structField("clarity", "string"),
  structField("depth",   "float"),
  structField("table",   "integer"),
  structField("price",   "integer"),
  structField("x",       "float"),
  structField("y",       "float"),
  structField("z",       "float"))

data <- list(list(as.integer(1), 0.23, "Ideal",   "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
             list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))

df <- createDataFrame(data, schema)

# Does the table exist?
test_that ("The table exists.", {
  expect_true(table_exists(table_name, db_name))
})

# Does the column exist?
test_that ("The column exists in the table.", {
  expect_true(column_exists(df, column_name))
})

# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
  expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})

Scala

أنشئ دفتر ملاحظات Scala آخر في نفس المجلد مثل دفتر ملاحظات Scala السابق myfunctions ، وأضف المحتويات التالية إلى دفتر الملاحظات الجديد هذا.

في الخلية الأولى لدفتر الملاحظات الجديد، أضف التعليمات البرمجية التالية، والتي تستدعي %run السحر. يجعل هذا السحر محتويات دفتر الملاحظات myfunctions متوفرة لدفتر الملاحظات الجديد.

%run ./myfunctions

في الخلية الثانية، أضف التعليمات البرمجية التالية. تحدد هذه التعليمة البرمجية اختبارات الوحدة الخاصة بك وتحدد كيفية تشغيلها.

بشكل عام، من أفضل الممارسات عدم تشغيل اختبارات الوحدة مقابل الوظائف التي تعمل مع البيانات في الإنتاج. هذا مهم بشكل خاص للدالات التي تضيف البيانات أو تزيلها أو تغيرها بطريقة أخرى. لحماية بيانات الإنتاج من الاختراق بواسطة اختبارات الوحدة بطرق غير متوقعة، يجب تشغيل اختبارات الوحدة مقابل البيانات غير الإنتاجية. أحد النهج الشائعة هو إنشاء بيانات وهمية أقرب ما يمكن إلى بيانات الإنتاج. ينشئ مثال التعليمات البرمجية التالي بيانات مزيفة لاختبارات الوحدة لتشغيلها.

import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._

class DataTests extends AsyncFunSuite {

  val tableName   = "diamonds"
  val dbName      = "default"
  val columnName  = "clarity"
  val columnValue = "SI2"

  // Create fake data for the unit tests to run against.
  // In general, it is a best practice to not run unit tests
  // against functions that work with data in production.
  val schema = StructType(Array(
                 StructField("_c0",     IntegerType),
                 StructField("carat",   FloatType),
                 StructField("cut",     StringType),
                 StructField("color",   StringType),
                 StructField("clarity", StringType),
                 StructField("depth",   FloatType),
                 StructField("table",   IntegerType),
                 StructField("price",   IntegerType),
                 StructField("x",       FloatType),
                 StructField("y",       FloatType),
                 StructField("z",       FloatType)
               ))

  val data = Seq(
                  Row(1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
                  Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
                ).asJava

  val df = spark.createDataFrame(data, schema)

  // Does the table exist?
  test("The table exists") {
    assert(tableExists(tableName, dbName) == true)
  }

  // Does the column exist?
  test("The column exists") {
    assert(columnExists(df, columnName) == true)
  }

  // Is there at least one row for the value in the specified column?
  test("There is at least one matching row") {
    assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
  }
}

nocolor.nodurations.nostacks.stats.run(new DataTests)

إشعار

يستخدم مثال التعليمات البرمجية FunSuite هذا نمط الاختبار في ScalaTest. للحصول على أنماط الاختبار الأخرى المتوفرة، راجع تحديد أنماط الاختبار لمشروعك.

SQL

قبل إضافة اختبارات الوحدة، يجب أن تدرك أنه بشكل عام، من أفضل الممارسات عدم تشغيل اختبارات الوحدة مقابل الوظائف التي تعمل مع البيانات في الإنتاج. هذا مهم بشكل خاص للدالات التي تضيف البيانات أو تزيلها أو تغيرها بطريقة أخرى. لحماية بيانات الإنتاج من الاختراق بواسطة اختبارات الوحدة بطرق غير متوقعة، يجب تشغيل اختبارات الوحدة مقابل البيانات غير الإنتاجية. أحد النهج الشائعة هو تشغيل اختبارات الوحدة مقابل طرق العرض بدلا من الجداول.

لإنشاء طريقة عرض، يمكنك استدعاء الأمر CREATE VIEW من خلية جديدة في دفتر الملاحظات السابق أو دفتر ملاحظات منفصل. يفترض المثال التالي أن لديك جدولا موجودا يسمى diamonds ضمن مخطط (قاعدة بيانات) يسمى default داخل كتالوج يسمى main. قم بتغيير هذه الأسماء لمطابقة الأسماء الخاصة بك حسب الحاجة، ثم قم بتشغيل تلك الخلية فقط.

USE CATALOG main;
USE SCHEMA default;

CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;

بعد إنشاء طريقة العرض، أضف كل من العبارات التالية SELECT إلى الخلية الجديدة الخاصة بها في دفتر الملاحظات السابق أو إلى الخلية الجديدة الخاصة بها في دفتر ملاحظات منفصل. قم بتغيير الأسماء لمطابقة الأسماء الخاصة بك حسب الحاجة.

SELECT if(table_exists("main", "default", "view_diamonds"),
          printf("PASS: The table 'main.default.view_diamonds' exists."),
          printf("FAIL: The table 'main.default.view_diamonds' does not exist."));

SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
          printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
          printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));

SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
          printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
          printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));

إجراء اختبارات الوحدة

يصف هذا القسم كيفية تشغيل اختبارات الوحدة التي قمت بتكوينها في القسم السابق. عند تشغيل اختبارات الوحدة، تحصل على نتائج توضح اختبارات الوحدة التي تم اجتيازها وفشلها.

إذا أضفت اختبارات الوحدة من القسم السابق إلى مساحة عمل Azure Databricks، يمكنك تشغيل اختبارات الوحدة هذه من مساحة العمل الخاصة بك. يمكنك تشغيل اختبارات الوحدة هذه إما يدويا أو حسب جدول زمني.

Python

قم بإنشاء دفتر ملاحظات Python في نفس المجلد مثل الملف السابق test_myfunctions.py في المستودع الخاص بك، وأضف المحتويات التالية.

في الخلية الأولى لدفتر الملاحظات الجديد، أضف التعليمات البرمجية التالية، ثم قم بتشغيل الخلية، التي تستدعي %pip السحر. هذا السحر يثبت pytest.

%pip install pytest

في الخلية الثانية، أضف التعليمات البرمجية التالية ثم قم بتشغيل الخلية. تظهر النتائج اختبارات الوحدة التي تم اجتيازها وفشلها.

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."

R

أنشئ دفتر ملاحظات R في نفس المجلد مثل الملف السابق test_myfunctions.r في المستودع الخاص بك، وأضف المحتويات التالية.

في الخلية الأولى، أضف التعليمات البرمجية التالية، ثم قم بتشغيل الخلية، التي تستدعي الدالة install.packages . تقوم هذه الدالة testthatبتثبيت .

install.packages("testthat")

في الخلية الثانية، أضف التعليمات البرمجية التالية، ثم قم بتشغيل الخلية. تظهر النتائج اختبارات الوحدة التي تم اجتيازها وفشلها.

library(testthat)
source("myfunctions.r")

test_dir(".", reporter = "tap")

Scala

قم بتشغيل الخلية الأولى ثم الثانية في دفتر الملاحظات من المقطع السابق. تظهر النتائج اختبارات الوحدة التي تم اجتيازها وفشلها.

SQL

قم بتشغيل كل خلية من الخلايا الثلاث في دفتر الملاحظات من المقطع السابق. تظهر النتائج ما إذا كان كل اختبار وحدة قد تم اجتيازه أو فشله.

إذا لم تعد بحاجة إلى طريقة العرض بعد تشغيل اختبارات الوحدة، يمكنك حذف طريقة العرض. لحذف طريقة العرض هذه، يمكنك إضافة التعليمات البرمجية التالية إلى خلية جديدة داخل أحد دفاتر الملاحظات السابقة ثم تشغيل تلك الخلية فقط.

DROP VIEW view_diamonds;

تلميح

يمكنك عرض نتائج عمليات تشغيل دفتر الملاحظات (بما في ذلك نتائج اختبار الوحدة) في سجلات برامج تشغيل نظام المجموعة. يمكنك أيضا تحديد موقع لتسليم سجل نظام المجموعة الخاص بك.

يمكنك إعداد نظام تكامل مستمر وتسليم مستمر أو نشر (CI/CD)، مثل GitHub Actions، لتشغيل اختبارات الوحدة تلقائيا كلما تغيرت التعليمات البرمجية الخاصة بك. على سبيل المثال، راجع تغطية GitHub Actions في أفضل ممارسات هندسة البرامج لدفاتر الملاحظات.

الموارد الإضافية

pytest

اختبار الذي

ScalaTest

SQL