次の方法で共有


Databricks ODBC ドライバー (Simba) をテストする

このページでは、 Databricks ODBC ドライバーを使用するコードをテストする方法について説明します。

ODBC 互換言語用の任意のテスト フレームワークを使用します。 次の例では、 pyodbcpytestunittest.mock を使用して ODBC ドライバー接続をテストします。 このコードは、 Python と pyodbc を Azure Databricks に接続する例に基づいています。

補助関数

helpers.py ファイルには、ODBC 接続を操作するためのユーティリティ関数が含まれています。

  • connect_to_dsn: Azure Databricks コンピューティング リソースへの接続を開きます。
  • get_cursor_from_connection: クエリを実行するためのカーソルを取得します。
  • select_from_nyctaxi_trips: samples.nyctaxi.tripsから指定された行数を照会します。
  • print_rows: 結果セットの内容をコンソールに出力します。
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

Main クラス

main.py ファイルはヘルパー関数を呼び出して、データに接続してクエリを実行します。

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

モックを使用した単体テスト

test_helpers.py ファイルでは、pytest と unittest.mock を使用して、select_from_nyctaxi_trips関数をテストします。 モックは、実際のコンピューティング リソースを使用せずにデータベース接続をシミュレートするため、テストは Azure Databricks ワークスペースに影響を与えずに数秒で実行されます。

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

select_from_nyctaxi_tripsSELECTステートメントのみを実行するため、ここではモック作成は厳密には必要ありません。 ただし、モックは、テーブルの状態に影響を与えずにテストを繰り返し実行できるため、データを変更する関数 (INSERT INTOUPDATEDELETE FROM) をテストする場合に特に便利です。