Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página descreve como testar o código que usa o Driver ODBC do Databricks.
Use qualquer estrutura de teste para idiomas compatíveis com ODBC. Os exemplos a seguir usam pyodbc, pytest e unittest.mock para testar conexões de driver ODBC. Esse código é baseado no exemplo em Conectar Python e pyodbc ao Azure Databricks.
Funções auxiliares
O helpers.py arquivo contém funções de utilitário para trabalhar com conexões ODBC:
-
connect_to_dsn: abre uma conexão com um recurso de computação do Azure Databricks. -
get_cursor_from_connection: Obtém um cursor para a execução de consultas. -
select_from_nyctaxi_trips: consulta o número especificado de linhas desamples.nyctaxi.trips. -
print_rows: imprime o conteúdo do conjunto de resultados no console.
# helpers.py
from pyodbc import connect, Connection, Cursor
def connect_to_dsn(
connstring: str,
autocommit: bool
) -> Connection:
connection = connect(
connstring,
autocommit = autocommit
)
return connection
def get_cursor_from_connection(
connection: Connection
) -> Cursor:
cursor = connection.cursor()
return cursor
def select_from_nyctaxi_trips(
cursor: Cursor,
num_rows: int
) -> Cursor:
select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
return select_cursor
def print_rows(cursor: Cursor):
for row in cursor.fetchall():
print(row)
Classe Main
O main.py arquivo chama as funções auxiliares para conectar e consultar dados:
# main.py
from helpers import *
connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
cursor = get_cursor_from_connection(
connection = connection)
select_cursor = select_from_nyctaxi_trips(
cursor = cursor,
num_rows = 2
)
print_rows(
cursor = select_cursor
)
Testes de unidade com zombaria
O test_helpers.py arquivo usa pytest e unittest.mock para testar a select_from_nyctaxi_trips função. Executar uma simulação das conexões de banco de dados sem utilizar recursos de computação reais assegura que os testes sejam realizados em segundos, não afetando os "workspaces" do Azure Databricks.
# test_helpers.py
from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime
@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
mock_connection.return_value.getinfo.return_value = "Spark SQL"
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"
@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_cursor.return_value.rowcount = -1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_cursor = get_cursor_from_connection(
connection = mock_connection
)
assert mock_cursor.rowcount == -1
@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.arraysize = 1
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
assert mock_select_cursor.arraysize == 1
@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
]
mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)
mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)
mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)
print_rows(
cursor = mock_select_cursor
)
captured = capsys.readouterr()
assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
"(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"
Porque select_from_nyctaxi_trips só executa uma SELECT instrução, mocking não é estritamente necessário aqui. No entanto, o uso de mocks é especialmente útil ao testar funções que modificam dados (INSERT INTO, UPDATE, DELETE FROM), pois você pode executar testes repetidamente sem afetar o estado da tabela.