Comparteix via


Prueba del controlador ODBC de Databricks (Simba)

En esta página se describe cómo probar el código que usa el controlador ODBC de Databricks.

Use cualquier marco de pruebas para lenguajes compatibles con ODBC. En los ejemplos siguientes se usa pyodbc, pytest y unittest.mock para probar las conexiones de controladores ODBC. Este código se basa en el ejemplo de Conexión de Python y pyodbc a Azure Databricks.

Funciones del asistente

El helpers.py archivo contiene funciones de utilidad para trabajar con conexiones ODBC:

  • connect_to_dsn: abre una conexión a un recurso de proceso de Azure Databricks.
  • get_cursor_from_connection: obtiene un cursor para ejecutar consultas.
  • select_from_nyctaxi_trips: consulta el número especificado de filas de samples.nyctaxi.trips.
  • print_rows: imprime el contenido del conjunto de resultados en la consola.
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
  connstring: str,
  autocommit: bool
) -> Connection:

  connection = connect(
    connstring,
    autocommit = autocommit
  )

  return connection

def get_cursor_from_connection(
  connection: Connection
) -> Cursor:

  cursor = connection.cursor()
  return cursor

def select_from_nyctaxi_trips(
  cursor: Cursor,
  num_rows: int
) -> Cursor:

  select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  return select_cursor

def print_rows(cursor: Cursor):
  for row in cursor.fetchall():
    print(row)

Clase Main

El main.py archivo llama a las funciones auxiliares para conectarse y consultar datos:

# main.py

from helpers import *

connection = connect_to_dsn(
  connstring = "DSN=<your-dsn-name>",
  autocommit = True
)

cursor = get_cursor_from_connection(
  connection = connection)

select_cursor = select_from_nyctaxi_trips(
  cursor = cursor,
  num_rows = 2
)

print_rows(
  cursor = select_cursor
)

Pruebas unitarias con simulación

El test_helpers.py archivo usa pytest y unittest.mock para probar la select_from_nyctaxi_trips función. Simulación simula conexiones de base de datos sin usar recursos de proceso reales, por lo que las pruebas se ejecutan en segundos sin afectar a las áreas de trabajo de Azure Databricks.

# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
  mock_connection.return_value.getinfo.return_value = "Spark SQL"

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_cursor.return_value.rowcount = -1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
  mock_cursor = mock_connection.return_value.cursor
  mock_get_cursor = mock_cursor.return_value.execute
  mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
    (datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
    (datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
  ]

  mock_connection = connect_to_dsn(
    connstring = "DSN=<your-dsn-name>",
    autocommit = True
  )

  mock_get_cursor = get_cursor_from_connection(
    connection = mock_connection
  )

  mock_select_cursor = select_from_nyctaxi_trips(
    cursor = mock_get_cursor,
    num_rows = 2
  )

  print_rows(
    cursor = mock_select_cursor
  )

  captured = capsys.readouterr()
  assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
                         "(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

Dado que select_from_nyctaxi_trips solo ejecuta una SELECT declaración, los mocks no son estrictamente necesarios aquí. Sin embargo, la simulación es especialmente útil al probar funciones que modifican datos (INSERT INTO, UPDATE, DELETE FROM), ya que puede ejecutar pruebas repetidamente sin afectar al estado de la tabla.