Condividi tramite


Avvio rapido: Eseguire la copia bulk con il driver mssql-python per Python

In questa guida introduttiva, usi il driver mssql-python per copiare in blocco i dati tra i database. L'applicazione scarica le tabelle da uno schema del database di origine ai file Parquet locali usando Apache Arrow, quindi le carica in un database di destinazione usando il metodo ad alte prestazioni bulkcopy . È possibile usare questo modello per eseguire la migrazione, replicare o trasformare i dati tra SQL Server, database SQL di Azure e database SQL in Fabric.

Il mssql-python driver non richiede dipendenze esterne nei computer Windows. Il driver installa tutti gli elementi necessari con un'unica pip installazione, consentendo di usare la versione più recente del driver per i nuovi script senza interrompere altri script che non è necessario aggiornare e testare.

Documentazione di mssql-python | Codice sorgente di mssql-python | Pacchetto (PyPI) | uv

Prerequisiti

  • Python 3

    • Se Python non è già disponibile, installare il runtime Python e la gestione pacchetti pip da python.org.

    • Non si vuole usare il proprio ambiente? Apri come devcontainer utilizzando GitHub Codespaces.

  • Visual Studio Code con le seguenti estensioni:

    • Estensione di Python per Visual Studio Code
  • Interfaccia Command-Line di Azure per l'autenticazione senza password in macOS e Linux.

  • Se non hai già uv, segui le istruzioni di installazione.

  • Database di origine in SQL Server, database SQL di Azure o database SQL in Fabric con lo AdventureWorks2025 schema di esempio e una stringa di connessione valida.

  • Database di destinazione in SQL Server, database SQL di Azure o database SQL in Fabric con una stringa di connessione valida. L'utente deve disporre dell'autorizzazione per creare e scrivere nelle tabelle. Se non si dispone di un secondo database, è possibile modificare la stringa di connessione di destinazione in modo che punti allo stesso database e usare uno schema diverso per le tabelle di destinazione.

  • Installare prerequisiti specifici del sistema operativo monouso.

    apk add libtool krb5-libs krb5-dev
    

Creare un database SQL

Questa configurazione rapida richiede lo schema del database AdventureWorks2025 Lightweight come database di origine.

Creare il progetto ed eseguire il codice

  1. Crea un nuovo progetto
  2. Aggiungere dipendenze
  3. Avviare Visual Studio Code
  4. Aggiornare pyproject.toml
  5. Aggiornare main.py
  6. Salvare le stringhe di connessione
  7. Usare uv run per eseguire lo script

Creare un nuovo progetto

  1. Apri un prompt dei comandi nella directory di sviluppo. Se non è disponibile, creare una nuova directory denominata python, scriptse così via. Evitare cartelle in OneDrive, la sincronizzazione può interferire con la gestione dell'ambiente virtuale.

  2. Creare un nuovo progetto con uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Aggiungere dipendenze

Nella stessa directory installare i pacchetti mssql-python, python-dotenv e pyarrow.

uv add mssql-python python-dotenv pyarrow

Avviare Visual Studio Code

Nella stessa directory eseguire il comando seguente.

code .

Aggiornare pyproject.toml

  1. Pyproject.toml contiene i metadati per il progetto. Aprire il file nell'editor preferito.

  2. Esaminare il contenuto del file. Dovrebbe essere simile a questo esempio. Si noti la versione di Python e la dipendenza per mssql-python, che usa >= per definire una versione minima. Se si preferisce una versione esatta, cambiare >= prima del numero di versione in ==. Le versioni risolte di ogni pacchetto vengono quindi memorizzate nel uv.lock. Il file di blocco garantisce che gli sviluppatori che lavorano sul progetto usino versioni coerenti del pacchetto. Garantisce inoltre che lo stesso set di versioni del pacchetto venga usato durante la distribuzione del pacchetto agli utenti finali. Non è consigliabile modificare il uv.lock file.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Aggiornare la descrizione in modo che sia più descrittiva.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Salva e chiudi il file.

Aggiornare main.py

  1. Aprire il file denominato main.py. Dovrebbe essere simile a questo esempio.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Sostituire il contenuto di main.py con i seguenti blocchi di codice. Ogni blocco si basa su quello precedente e deve essere inserito in main.py ordine.

    Suggerimento

    Se Visual Studio Code non riesce a risolvere i pacchetti, è necessario aggiornare l'interprete per usare l'ambiente virtuale.

  3. Nella parte superiore di main.pyaggiungere le importazioni e le costanti. Lo script usa mssql_python per la connettività pyarrow del database e pyarrow.parquet per la gestione dei dati a colonne e L/O dei file Parquet, python-dotenv per il caricamento di stringhe di connessione da un .env file e un modello regex compilato che convalida gli identificatori SQL per impedire l'inserimento.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Sotto le importazioni, aggiungere il mapping del tipo da SQL a Arrow. Questo dizionario converte i tipi di colonna di SQL Server nei rispettivi equivalenti apache Arrow in modo che la fedeltà dei dati venga mantenuta durante la scrittura in Parquet. Le due funzioni helper compilano stringhe di tipo SQL esatte (ad esempio, NVARCHAR(100) o DECIMAL(18,2)) dai INFORMATION_SCHEMA metadati e risolvono il tipo freccia corrispondente per ogni colonna.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Aggiungere le funzioni di introspezione dello schema e generazione DDL. _get_arrow_schema esegue interrogazioni INFORMATION_SCHEMA.COLUMNS utilizzando query con parametri, costruisce uno schema Arrow e conserva il tipo SQL originale come metadati di campo, così da permettere la ricreazione della tabella di destinazione con definizioni di colonna esatte. _create_table_ddl legge di nuovo i metadati per generare DROP/CREATE TABLE DDL. Il tipo timestamp (rowversion) viene rimappato a VARBINARY(8) perché viene generato automaticamente e non direttamente inseribile.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Aggiungere la funzione di download. download_table trasmette le righe da una tabella di origine in batch usando fetchmany(), converte ogni valore in un tipo Python compatibile con arrow e scrive i batch di record in modo incrementale in un file Parquet locale usando ParquetWriter. In questo modo la memoria viene limitata indipendentemente dalle dimensioni della tabella. La funzione usa due cursori separati: uno per leggere i metadati della colonna e un altro per trasmettere i dati.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Aggiungere il gancio di arricchimento. enrich_parquet è un segnaposto in cui è possibile aggiungere trasformazioni, colonne derivate o join ai dati prima del caricamento. In questa guida rapida, si tratta di un'operazione no-op che restituisce il percorso del file intatto.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Aggiungere la funzione di caricamento. upload_parquet legge lo schema Arrow dal file Parquet, genera ed esegue DROP/CREATE TABLE DDL per preparare la destinazione, quindi legge il file in batch e chiama cursor.bulkcopy() per l'inserimento in blocco ad alte prestazioni. L'opzione table_lock=True migliora la velocità effettiva riducendo al minimo i conflitti di blocco. Al termine del caricamento, la funzione esegue un SELECT COUNT(*) per verificare che il conteggio delle righe corrisponda.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Aggiungere la funzione di orchestrazione. transfer_tables unisce le tre fasi. Si connette al database di origine, individua tutte le tabelle di base nello schema specificato tramite INFORMATION_SCHEMA.TABLES, ne scarica ognuna in un file Parquet locale, esegue l'hook di arricchimento, quindi si connette al database di destinazione e carica ogni file.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Infine, aggiungere il main punto di ingresso. Carica il .env file, chiama transfer_tables con le stringhe di connessione di origine e di destinazione e stampa il tempo trascorso totale.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Salvare e chiudere main.py.

Salvare le stringhe di connessione

  1. Aprire il .gitignore file e aggiungere un'esclusione per .env i file. Il file dovrebbe essere simile a questo esempio. Assicurati di salvarlo e chiuderlo quando hai finito.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Nella directory corrente creare un nuovo file denominato .env.

  3. All'interno del .env file aggiungere voci per le stringhe di connessione di origine e di destinazione. Sostituire i valori segnaposto con i nomi effettivi del server e del database.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Suggerimento

    La stringa di connessione usata qui dipende in gran parte dal tipo di database SQL a cui ci si connette. Se ci si connette a un database SQL di Azure o a un database SQL in Fabric, usare la stringa di connessione ODBC dalla scheda Stringhe di connessione. Potrebbe essere necessario modificare il tipo di autenticazione a seconda dello scenario. Per altre informazioni sulle stringhe di connessione e sulla relativa sintassi, vedere informazioni di riferimento sulla sintassi delle stringhe di connessione.

Suggerimento

In macOS, entrambi ActiveDirectoryInteractive e ActiveDirectoryDefault funzionano per l'autenticazione di Microsoft Entra. ActiveDirectoryInteractive richiede di accedere ogni volta che si esegue lo script. Per evitare richieste di accesso ripetute, accedere una sola volta tramite l'interfaccia della riga di comando di Azure eseguendo az login, quindi usare ActiveDirectoryDefault, che riutilizza le credenziali memorizzate nella cache.

Usare uv run per eseguire lo script

  1. Nella finestra del terminale precedente o in una nuova finestra del terminale aperta nella stessa directory eseguire il comando seguente.

     uv run main.py
    

    Ecco l'output previsto al termine dello script.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Connettersi al database di destinazione usando SQL Server Management Studio (SSMS) o l'estensione MSSQL per VS Code e verificare che le tabelle e i dati siano stati creati correttamente.

  3. Per distribuire lo script in un altro computer, copiare tutti i file ad eccezione della .venv cartella nell'altro computer. L'ambiente virtuale viene ricreato con la prima esecuzione.

Funzionamento del codice

L'applicazione esegue un trasferimento completo dei dati di andata e ritorno in tre fasi:

  1. Download: si connette al database di origine, legge i metadati delle colonne da INFORMATION_SCHEMA.COLUMNS, compila uno schema Apache Arrow, quindi scarica ogni tabella in un file Parquet locale.
  2. Arricchimento (facoltativo): fornisce un hook (enrich_parquet) in cui è possibile aggiungere trasformazioni, colonne derivate o join prima del caricamento.
  3. Caricamento: legge ogni file Parquet in batch, ricrea la tabella nel database di destinazione usando DDL generato dai metadati dello schema Arrow, quindi usa cursor.bulkcopy() per l'inserimento in blocco ad alte prestazioni.

Passo successivo

Per altri esempi, visitare il mssql-python repository GitHub del driver per contribuire a idee o segnalare problemi.