Compartir vía


Inicio rápido: Copia masiva con el controlador mssql-python para Python

En esta guía de inicio rápido, usará el controlador mssql-python para copiar datos de forma masiva entre bases de datos. La aplicación descarga tablas de un esquema de base de datos de origen en archivos Parquet locales mediante Apache Arrow y, a continuación, los carga en una base de datos de destino mediante el método de alto rendimiento bulkcopy . Puede usar este patrón para migrar, replicar o transformar datos entre SQL Server, Azure SQL Database y SQL Database en Fabric.

El mssql-python controlador no requiere ninguna dependencia externa en máquinas Windows. El controlador instala todo lo que necesita con una sola pip instalación, lo que le permite usar la versión más reciente del controlador para nuevos scripts sin interrumpir otros scripts que no tenga tiempo para actualizar y probar.

Documentación de mssql-python | Código fuente de mssql-python | Paquete (PyPI) | uv

Prerrequisitos

  • Python 3

    • Si aún no tiene Python, instale el entorno de ejecución de Python y el administrador de paquetes pip desde python.org.

    • ¿No desea usar su propio entorno? Abra como un devcontainer mediante GitHub Codespaces.

  • Visual Studio Code con las siguientes extensiones:

  • Interfaz de Azure Command-Line (CLI) para la autenticación sin contraseña en macOS y Linux.

  • Si aún no tiene uv, siga las instrucciones de instalación.

  • Una base de datos de origen en SQL Server, Azure SQL Database o SQL Database en Fabric con el AdventureWorks2025 esquema de ejemplo y una cadena de conexión válida.

  • Una base de datos de destino en SQL Server, Azure SQL Database o SQL Database en Fabric con una cadena de conexión válida. El usuario debe tener permiso para crear y escribir en tablas. Si no tiene una segunda base de datos, puede cambiar la cadena de conexión de destino para que apunte a la misma base de datos y use un esquema diferente para las tablas de destino.

  • Instale requisitos previos específicos del sistema operativo de un solo uso.

    apk add libtool krb5-libs krb5-dev
    

Creación de una base de datos SQL

Este inicio rápido requiere el esquema Ligero AdventureWorks2025 como base de datos de origen.

Creación del proyecto y ejecución del código

  1. Creación de un nuevo proyecto
  2. Agregar dependencias
  3. Inicio de Visual Studio Code
  4. Actualizar pyproject.toml
  5. Actualizar main.py
  6. Guardar las cadenas de conexión
  7. Usa uv run para ejecutar el script

Creación de un nuevo proyecto

  1. Abra una ventana del terminal en el directorio de desarrollo. Si no tiene uno, cree un directorio denominado python, scripts, etc. Evite carpetas en OneDrive, la sincronización puede interferir con la administración del entorno virtual.

  2. Cree un proyecto con uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Agregar dependencias

En el mismo directorio, instale los mssql-pythonpaquetes , python-dotenvy pyarrow .

uv add mssql-python python-dotenv pyarrow

Iniciar Visual Studio Code

En el mismo directorio, ejecute el siguiente comando.

code .

Actualizar pyproject.toml

  1. Pyproject.toml contiene los metadatos del proyecto. Abra el archivo en su editor favorito.

  2. Revise el contenido del archivo. Debe ser similar a este ejemplo. Tenga en cuenta la versión de Python y la dependencia para que mssql-python use >= para definir una versión mínima. Si prefiere una versión exacta, cambie el >= valor anterior al número de versión a ==. Las versiones resueltas de cada paquete se almacenan en uv.lock. El archivo de bloqueo garantiza que los desarrolladores que trabajan en el proyecto usen versiones de paquetes coherentes. También garantiza que se use el mismo conjunto exacto de versiones de paquete al distribuir el paquete a los usuarios finales. No debe editar el uv.lock archivo.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Actualice la descripción para que sea más descriptivo.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Guarde y cierre el archivo.

Actualizar main.py

  1. Abra el archivo denominado main.py. Debe ser similar a este ejemplo.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Reemplace el contenido de main.py por los siguientes bloques de código. Cada bloque se basa en el anterior y debe colocarse en el orden correcto.

    Sugerencia

    Si Visual Studio Code tiene problemas para resolver los paquetes, debe actualizar el intérprete para usar el entorno virtual.

  3. En la parte superior de main.py, agregue las importaciones y constantes. El script usa mssql_python para la conectividad de base de datos y pyarrowpyarrow.parquet para el control de datos en columnas y la E/S de archivos Parquet, python-dotenv para cargar cadenas de conexión desde un .env archivo y un patrón regex compilado que valida los identificadores SQL para evitar la inyección.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Debajo de las importaciones, agregue la asignación de tipos SQL a Arrow. Este diccionario traduce los tipos de columna de SQL Server a sus equivalentes de Apache Arrow para que se conserve la fidelidad de los datos al escribir en Parquet. Las dos funciones auxiliares crean cadenas de tipo SQL exactas (por ejemplo, NVARCHAR(100) o DECIMAL(18,2)) a partir de INFORMATION_SCHEMA metadatos y resuelven el tipo de flecha coincidente para cada columna.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Agregue las funciones de introspección de esquema y generación de DDL. _get_arrow_schema consultas INFORMATION_SCHEMA.COLUMNS que usan consultas parametrizadas, compila un esquema Arrow y almacena el tipo SQL original como metadatos de campo para que la tabla de destino se pueda recrear con definiciones de columna exactas. _create_table_ddl lee los metadatos para generar DROP/CREATE TABLE DDL. El tipo timestamp (rowversion) se remapea a VARBINARY(8) porque se genera automáticamente y no se puede insertar directamente.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Agregue la función de descarga. download_table transmite filas de una tabla de origen en lotes mediante fetchmany(), convierte cada valor en un tipo de Python compatible con Arrow y escribe lotes de registros de manera incremental en un archivo Parquet local mediante ParquetWriter. Esto mantiene la memoria limitada independientemente del tamaño de la tabla. La función usa dos cursores independientes: uno para leer metadatos de columna y otro para transmitir los datos.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Agregue el enlace de enriquecimiento. enrich_parquet es un marcador de posición en el que puede agregar transformaciones, columnas derivadas o combinaciones a datos antes de cargarlos. En este inicio rápido, es un no-op que devuelve la ruta de acceso del archivo sin cambios.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Agregue la función upload. upload_parquet lee el esquema de Arrow del archivo Parquet, genera y ejecuta DROP/CREATE TABLE DDL para preparar el destino y, a continuación, lee el archivo por lotes y llama a cursor.bulkcopy() para la inserción masiva de alto rendimiento. La opción table_lock=True mejora el rendimiento al minimizar la contención de bloqueos. Una vez completada la carga, la función ejecuta un SELECT COUNT(*) para comprobar que el recuento de filas coincide.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Agregue la función de orquestación. transfer_tables vincula las tres fases juntas. Se conecta a la base de datos de origen, detecta todas las tablas base del esquema especificado a través de INFORMATION_SCHEMA.TABLES, descarga cada una en un archivo Parquet local, ejecuta la función de enriquecimiento y, a continuación, se conecta a la base de datos de destino y carga cada archivo.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Por último, agregue el punto de entrada main. Carga el archivo .env, llama a transfer_tables con las cadenas de conexión de origen y destino e imprime el tiempo total transcurrido.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Guarde y cierre main.py.

Guardar las cadenas de conexión

  1. Abra el .gitignore archivo y agregue una exclusión para .env los archivos. El archivo debe ser similar a este ejemplo. Asegúrese de guardarlo y cerrarlo cuando haya terminado.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. En el directorio actual, cree un nuevo archivo denominado .env.

  3. En el .env archivo, agregue entradas para las cadenas de conexión de origen y destino. Reemplace los valores de marcador de posición por los nombres reales de servidor y base de datos.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Sugerencia

    La cadena de conexión que se usa aquí depende en gran medida del tipo de base de datos SQL a la que se conecta. Si se conecta a una base de datos de Azure SQL o a una base de datos SQL en Fabric, use la cadena de conexión ODBC de la pestaña Cadenas de conexión. Es posible que tenga que ajustar el tipo de autenticación en función de su escenario. Para obtener más información sobre las cadenas de conexión y su sintaxis, consulte referencia de sintaxis de cadena de conexión.

Sugerencia

En macOS, tanto ActiveDirectoryInteractive como ActiveDirectoryDefault funcionan para la autenticación de Microsoft Entra. ActiveDirectoryInteractive le pide que inicie sesión cada vez que ejecute el script. Para evitar mensajes de inicio de sesión repetidos, inicie sesión una vez a través de la CLI de Azure mediante la ejecución az loginde y, a continuación, use ActiveDirectoryDefault, que reutiliza la credencial almacenada en caché.

Usa uv run para ejecutar el script

  1. En la ventana de terminal desde antes o una nueva ventana de terminal abierta en el mismo directorio, ejecute el siguiente comando.

     uv run main.py
    

    Esta es la salida esperada cuando se completa el script.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Conéctese a la base de datos de destino mediante SQL Server Management Studio (SSMS) o la extensión MSSQL para VS Code y compruebe que las tablas y los datos se crearon correctamente.

  3. Para implementar el script en otra máquina, copie todos los archivos excepto la .venv carpeta en la otra máquina. El entorno virtual se vuelve a crear con la primera ejecución.

Funcionamiento del código

La aplicación realiza una transferencia completa de datos de ida y vuelta en tres fases:

  1. Descargar: se conecta a la base de datos de origen, lee los metadatos de columna de INFORMATION_SCHEMA.COLUMNS, compila un esquema de Apache Arrow y, a continuación, descarga cada tabla en un archivo Parquet local.
  2. Enrich (opcional): proporciona un enlace (enrich_parquet) donde puede agregar transformaciones, columnas derivadas o combinaciones antes de cargar.
  3. Cargar: lee cada archivo Parquet en lotes, vuelve a crear la tabla en la base de datos de destino mediante DDL generado a partir de metadatos de esquema de flecha y, a continuación, usa cursor.bulkcopy() para la inserción masiva de alto rendimiento.

Paso siguiente

Visite el repositorio de mssql-python GitHub del controlador para obtener más ejemplos, para contribuir a ideas o notificar problemas.