Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa pagina descrive come testare il codice che usa il driver ODBC di Databricks.
Usare qualsiasi framework di test per i linguaggi compatibili con ODBC. Gli esempi seguenti usano pyodbc, pytest e unittest.mock per testare le connessioni del driver ODBC. Questo codice si basa sull'esempio in Connettere Python e pyodbc ad Azure Databricks.
Funzioni di supporto
Il helpers.py file contiene funzioni di utilità per l'uso delle connessioni ODBC:
-
connect_to_dsn: apre una connessione a una risorsa di calcolo di Azure Databricks. -
get_cursor_from_connection: ottiene un cursore per l'esecuzione di query. -
select_from_nyctaxi_trips: Interroga il numero specificato di righe dasamples.nyctaxi.trips. -
print_rows: stampa il contenuto del set di risultati nella console.
# helpers.py
from pyodbc import connect, Connection, Cursor
def connect_to_dsn(
connstring: str,
autocommit: bool
) -> Connection:
connection = connect(
connstring,
autocommit = autocommit
)
return connection
def get_cursor_from_connection(
connection: Connection
) -> Cursor:
cursor = connection.cursor()
return cursor
def select_from_nyctaxi_trips(
cursor: Cursor,
num_rows: int
) -> Cursor:
select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
return select_cursor
def print_rows(cursor: Cursor):
for row in cursor.fetchall():
print(row)
Classe principale
Il main.py file chiama le funzioni helper per connettersi ed eseguire query sui dati:
# main.py
from helpers import *
connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
cursor = get_cursor_from_connection(
connection = connection)
select_cursor = select_from_nyctaxi_trips(
cursor = cursor,
num_rows = 2
)
print_rows(
cursor = select_cursor
)
Test unitari con simulazione
Il test_helpers.py file usa pytest e unittest.mock per testare la select_from_nyctaxi_trips funzione. La simulazione simula le connessioni di database senza usare risorse di calcolo effettive, quindi i test vengono eseguiti in pochi secondi senza influire sulle aree di lavoro di Azure Databricks.
# test_helpers.py
from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime
@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
mock_connection.return_value.getinfo.return_value = "Spark SQL"
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"
@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_cursor.return_value.rowcount = -1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_cursor = get_cursor_from_connection(
connection = mock_connection
)
assert mock_cursor.rowcount == -1
@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.arraysize = 1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
assert mock_select_cursor.arraysize == 1
@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
]
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
print_rows(
cursor = mock_select_cursor
)
captured = capsys.readouterr()
assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
"(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"
Poiché select_from_nyctaxi_trips viene eseguita solo un'istruzione SELECT, la creazione di mock non è strettamente necessaria qui. Tuttavia, la simulazione è particolarmente utile quando si testano funzioni che modificano i dati (INSERT INTO, UPDATE, DELETE FROM), perché è possibile eseguire i test ripetutamente senza influire sullo stato della tabella.