Nota
O acceso a esta páxina require autorización. Pode tentar iniciar sesión ou modificar os directorios.
O acceso a esta páxina require autorización. Pode tentar modificar os directorios.
En esta guía de inicio rápido, usará el controlador mssql-python para copiar datos de forma masiva entre bases de datos. La aplicación descarga tablas de un esquema de base de datos de origen en archivos Parquet locales mediante Apache Arrow y, a continuación, los carga en una base de datos de destino mediante el método de alto rendimiento bulkcopy . Puede usar este patrón para migrar, replicar o transformar datos entre SQL Server, Azure SQL Database y SQL Database en Fabric.
El mssql-python controlador no requiere ninguna dependencia externa en máquinas Windows. El controlador instala todo lo que necesita con una sola pip instalación, lo que le permite usar la versión más reciente del controlador para nuevos scripts sin interrumpir otros scripts que no tenga tiempo para actualizar y probar.
Documentación de mssql-python | Código fuente de mssql-python | Paquete (PyPI) | uv
Prerrequisitos
Python 3
Si aún no tiene Python, instale el entorno de ejecución de Python y el administrador de paquetes pip desde python.org.
¿No desea usar su propio entorno? Abra como un devcontainer mediante GitHub Codespaces.
Visual Studio Code con las siguientes extensiones:
Interfaz de Azure Command-Line (CLI) para la autenticación sin contraseña en macOS y Linux.
Si aún no tiene
uv, siga las instrucciones de instalación.Una base de datos de origen en SQL Server, Azure SQL Database o SQL Database en Fabric con el
AdventureWorks2025esquema de ejemplo y una cadena de conexión válida.Una base de datos de destino en SQL Server, Azure SQL Database o SQL Database en Fabric con una cadena de conexión válida. El usuario debe tener permiso para crear y escribir en tablas. Si no tiene una segunda base de datos, puede cambiar la cadena de conexión de destino para que apunte a la misma base de datos y use un esquema diferente para las tablas de destino.
Instale requisitos previos específicos del sistema operativo de un solo uso.
Creación de una base de datos SQL
Este inicio rápido requiere el esquema Ligero AdventureWorks2025 como base de datos de origen.
Creación del proyecto y ejecución del código
- Creación de un nuevo proyecto
- Agregar dependencias
- Inicio de Visual Studio Code
- Actualizar pyproject.toml
- Actualizar main.py
- Guardar las cadenas de conexión
- Usa uv run para ejecutar el script
Creación de un nuevo proyecto
Abra una ventana del terminal en el directorio de desarrollo. Si no tiene uno, cree un directorio denominado
python,scripts, etc. Evite carpetas en OneDrive, la sincronización puede interferir con la administración del entorno virtual.Cree un proyecto con
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Agregar dependencias
En el mismo directorio, instale los mssql-pythonpaquetes , python-dotenvy pyarrow .
uv add mssql-python python-dotenv pyarrow
Iniciar Visual Studio Code
En el mismo directorio, ejecute el siguiente comando.
code .
Actualizar pyproject.toml
Pyproject.toml contiene los metadatos del proyecto. Abra el archivo en su editor favorito.
Revise el contenido del archivo. Debe ser similar a este ejemplo. Tenga en cuenta la versión de Python y la dependencia para que
mssql-pythonuse>=para definir una versión mínima. Si prefiere una versión exacta, cambie el>=valor anterior al número de versión a==. Las versiones resueltas de cada paquete se almacenan en uv.lock. El archivo de bloqueo garantiza que los desarrolladores que trabajan en el proyecto usen versiones de paquetes coherentes. También garantiza que se use el mismo conjunto exacto de versiones de paquete al distribuir el paquete a los usuarios finales. No debe editar eluv.lockarchivo.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Actualice la descripción para que sea más descriptivo.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Guarde y cierre el archivo.
Actualizar main.py
Abra el archivo denominado
main.py. Debe ser similar a este ejemplo.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Reemplace el contenido de
main.pypor los siguientes bloques de código. Cada bloque se basa en el anterior y debe colocarse en el orden correcto.Sugerencia
Si Visual Studio Code tiene problemas para resolver los paquetes, debe actualizar el intérprete para usar el entorno virtual.
En la parte superior de
main.py, agregue las importaciones y constantes. El script usamssql_pythonpara la conectividad de base de datos ypyarrowpyarrow.parquetpara el control de datos en columnas y la E/S de archivos Parquet,python-dotenvpara cargar cadenas de conexión desde un.envarchivo y un patrón regex compilado que valida los identificadores SQL para evitar la inyección."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameDebajo de las importaciones, agregue la asignación de tipos SQL a Arrow. Este diccionario traduce los tipos de columna de SQL Server a sus equivalentes de Apache Arrow para que se conserve la fidelidad de los datos al escribir en Parquet. Las dos funciones auxiliares crean cadenas de tipo SQL exactas (por ejemplo,
NVARCHAR(100)oDECIMAL(18,2)) a partir deINFORMATION_SCHEMAmetadatos y resuelven el tipo de flecha coincidente para cada columna._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAgregue las funciones de introspección de esquema y generación de DDL.
_get_arrow_schemaconsultasINFORMATION_SCHEMA.COLUMNSque usan consultas parametrizadas, compila un esquema Arrow y almacena el tipo SQL original como metadatos de campo para que la tabla de destino se pueda recrear con definiciones de columna exactas._create_table_ddllee los metadatos para generarDROP/CREATE TABLEDDL. El tipotimestamp(rowversion) se remapea aVARBINARY(8)porque se genera automáticamente y no se puede insertar directamente.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Agregue la función de descarga.
download_tabletransmite filas de una tabla de origen en lotes mediantefetchmany(), convierte cada valor en un tipo de Python compatible con Arrow y escribe lotes de registros de manera incremental en un archivo Parquet local medianteParquetWriter. Esto mantiene la memoria limitada independientemente del tamaño de la tabla. La función usa dos cursores independientes: uno para leer metadatos de columna y otro para transmitir los datos.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAgregue el enlace de enriquecimiento.
enrich_parquetes un marcador de posición en el que puede agregar transformaciones, columnas derivadas o combinaciones a datos antes de cargarlos. En este inicio rápido, es un no-op que devuelve la ruta de acceso del archivo sin cambios.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAgregue la función upload.
upload_parquetlee el esquema de Arrow del archivo Parquet, genera y ejecutaDROP/CREATE TABLEDDL para preparar el destino y, a continuación, lee el archivo por lotes y llama acursor.bulkcopy()para la inserción masiva de alto rendimiento. La opcióntable_lock=Truemejora el rendimiento al minimizar la contención de bloqueos. Una vez completada la carga, la función ejecuta unSELECT COUNT(*)para comprobar que el recuento de filas coincide.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAgregue la función de orquestación.
transfer_tablesvincula las tres fases juntas. Se conecta a la base de datos de origen, detecta todas las tablas base del esquema especificado a través deINFORMATION_SCHEMA.TABLES, descarga cada una en un archivo Parquet local, ejecuta la función de enriquecimiento y, a continuación, se conecta a la base de datos de destino y carga cada archivo.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Por último, agregue el punto de entrada
main. Carga el archivo.env, llama atransfer_tablescon las cadenas de conexión de origen y destino e imprime el tiempo total transcurrido.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Guarde y cierre
main.py.
Guardar las cadenas de conexión
Abra el
.gitignorearchivo y agregue una exclusión para.envlos archivos. El archivo debe ser similar a este ejemplo. Asegúrese de guardarlo y cerrarlo cuando haya terminado.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envEn el directorio actual, cree un nuevo archivo denominado
.env.En el
.envarchivo, agregue entradas para las cadenas de conexión de origen y destino. Reemplace los valores de marcador de posición por los nombres reales de servidor y base de datos.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Sugerencia
La cadena de conexión que se usa aquí depende en gran medida del tipo de base de datos SQL a la que se conecta. Si se conecta a una base de datos de Azure SQL o a una base de datos SQL en Fabric, use la cadena de conexión ODBC de la pestaña Cadenas de conexión. Es posible que tenga que ajustar el tipo de autenticación en función de su escenario. Para obtener más información sobre las cadenas de conexión y su sintaxis, consulte referencia de sintaxis de cadena de conexión.
Sugerencia
En macOS, tanto ActiveDirectoryInteractive como ActiveDirectoryDefault funcionan para la autenticación de Microsoft Entra.
ActiveDirectoryInteractive le pide que inicie sesión cada vez que ejecute el script. Para evitar mensajes de inicio de sesión repetidos, inicie sesión una vez a través de la CLI de Azure mediante la ejecución az loginde y, a continuación, use ActiveDirectoryDefault, que reutiliza la credencial almacenada en caché.
Usa uv run para ejecutar el script
En la ventana de terminal desde antes o una nueva ventana de terminal abierta en el mismo directorio, ejecute el siguiente comando.
uv run main.pyEsta es la salida esperada cuando se completa el script.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConéctese a la base de datos de destino mediante SQL Server Management Studio (SSMS) o la extensión MSSQL para VS Code y compruebe que las tablas y los datos se crearon correctamente.
Para implementar el script en otra máquina, copie todos los archivos excepto la
.venvcarpeta en la otra máquina. El entorno virtual se vuelve a crear con la primera ejecución.
Funcionamiento del código
La aplicación realiza una transferencia completa de datos de ida y vuelta en tres fases:
-
Descargar: se conecta a la base de datos de origen, lee los metadatos de columna de
INFORMATION_SCHEMA.COLUMNS, compila un esquema de Apache Arrow y, a continuación, descarga cada tabla en un archivo Parquet local. -
Enrich (opcional): proporciona un enlace (
enrich_parquet) donde puede agregar transformaciones, columnas derivadas o combinaciones antes de cargar. -
Cargar: lee cada archivo Parquet en lotes, vuelve a crear la tabla en la base de datos de destino mediante DDL generado a partir de metadatos de esquema de flecha y, a continuación, usa
cursor.bulkcopy()para la inserción masiva de alto rendimiento.
Paso siguiente
Visite el repositorio de mssql-python GitHub del controlador para obtener más ejemplos, para contribuir a ideas o notificar problemas.