Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
In questa guida introduttiva, usi il driver mssql-python per copiare in blocco i dati tra i database. L'applicazione scarica le tabelle da uno schema del database di origine ai file Parquet locali usando Apache Arrow, quindi le carica in un database di destinazione usando il metodo ad alte prestazioni bulkcopy . È possibile usare questo modello per eseguire la migrazione, replicare o trasformare i dati tra SQL Server, database SQL di Azure e database SQL in Fabric.
Il mssql-python driver non richiede dipendenze esterne nei computer Windows. Il driver installa tutti gli elementi necessari con un'unica pip installazione, consentendo di usare la versione più recente del driver per i nuovi script senza interrompere altri script che non è necessario aggiornare e testare.
Documentazione di mssql-python | Codice sorgente di mssql-python | Pacchetto (PyPI) | uv
Prerequisiti
Python 3
Se Python non è già disponibile, installare il runtime Python e la gestione pacchetti pip da python.org.
Non si vuole usare il proprio ambiente? Apri come devcontainer utilizzando GitHub Codespaces.
Visual Studio Code con le seguenti estensioni:
- Estensione di Python per Visual Studio Code
Interfaccia Command-Line di Azure per l'autenticazione senza password in macOS e Linux.
Se non hai già
uv, segui le istruzioni di installazione.Database di origine in SQL Server, database SQL di Azure o database SQL in Fabric con lo
AdventureWorks2025schema di esempio e una stringa di connessione valida.Database di destinazione in SQL Server, database SQL di Azure o database SQL in Fabric con una stringa di connessione valida. L'utente deve disporre dell'autorizzazione per creare e scrivere nelle tabelle. Se non si dispone di un secondo database, è possibile modificare la stringa di connessione di destinazione in modo che punti allo stesso database e usare uno schema diverso per le tabelle di destinazione.
Installare prerequisiti specifici del sistema operativo monouso.
Creare un database SQL
Questa configurazione rapida richiede lo schema del database AdventureWorks2025 Lightweight come database di origine.
Creare il progetto ed eseguire il codice
- Crea un nuovo progetto
- Aggiungere dipendenze
- Avviare Visual Studio Code
- Aggiornare pyproject.toml
- Aggiornare main.py
- Salvare le stringhe di connessione
- Usare uv run per eseguire lo script
Creare un nuovo progetto
Apri un prompt dei comandi nella directory di sviluppo. Se non è disponibile, creare una nuova directory denominata
python,scriptse così via. Evitare cartelle in OneDrive, la sincronizzazione può interferire con la gestione dell'ambiente virtuale.Creare un nuovo progetto con
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Aggiungere dipendenze
Nella stessa directory installare i pacchetti mssql-python, python-dotenv e pyarrow.
uv add mssql-python python-dotenv pyarrow
Avviare Visual Studio Code
Nella stessa directory eseguire il comando seguente.
code .
Aggiornare pyproject.toml
Pyproject.toml contiene i metadati per il progetto. Aprire il file nell'editor preferito.
Esaminare il contenuto del file. Dovrebbe essere simile a questo esempio. Si noti la versione di Python e la dipendenza per
mssql-python, che usa>=per definire una versione minima. Se si preferisce una versione esatta, cambiare>=prima del numero di versione in==. Le versioni risolte di ogni pacchetto vengono quindi memorizzate nel uv.lock. Il file di blocco garantisce che gli sviluppatori che lavorano sul progetto usino versioni coerenti del pacchetto. Garantisce inoltre che lo stesso set di versioni del pacchetto venga usato durante la distribuzione del pacchetto agli utenti finali. Non è consigliabile modificare iluv.lockfile.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Aggiornare la descrizione in modo che sia più descrittiva.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Salva e chiudi il file.
Aggiornare main.py
Aprire il file denominato
main.py. Dovrebbe essere simile a questo esempio.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Sostituire il contenuto di
main.pycon i seguenti blocchi di codice. Ogni blocco si basa su quello precedente e deve essere inserito inmain.pyordine.Suggerimento
Se Visual Studio Code non riesce a risolvere i pacchetti, è necessario aggiornare l'interprete per usare l'ambiente virtuale.
Nella parte superiore di
main.pyaggiungere le importazioni e le costanti. Lo script usamssql_pythonper la connettivitàpyarrowdel database epyarrow.parquetper la gestione dei dati a colonne e L/O dei file Parquet,python-dotenvper il caricamento di stringhe di connessione da un.envfile e un modello regex compilato che convalida gli identificatori SQL per impedire l'inserimento."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameSotto le importazioni, aggiungere il mapping del tipo da SQL a Arrow. Questo dizionario converte i tipi di colonna di SQL Server nei rispettivi equivalenti apache Arrow in modo che la fedeltà dei dati venga mantenuta durante la scrittura in Parquet. Le due funzioni helper compilano stringhe di tipo SQL esatte (ad esempio,
NVARCHAR(100)oDECIMAL(18,2)) daiINFORMATION_SCHEMAmetadati e risolvono il tipo freccia corrispondente per ogni colonna._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAggiungere le funzioni di introspezione dello schema e generazione DDL.
_get_arrow_schemaesegue interrogazioniINFORMATION_SCHEMA.COLUMNSutilizzando query con parametri, costruisce uno schema Arrow e conserva il tipo SQL originale come metadati di campo, così da permettere la ricreazione della tabella di destinazione con definizioni di colonna esatte._create_table_ddllegge di nuovo i metadati per generareDROP/CREATE TABLEDDL. Il tipotimestamp(rowversion) viene rimappato aVARBINARY(8)perché viene generato automaticamente e non direttamente inseribile.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Aggiungere la funzione di download.
download_tabletrasmette le righe da una tabella di origine in batch usandofetchmany(), converte ogni valore in un tipo Python compatibile con arrow e scrive i batch di record in modo incrementale in un file Parquet locale usandoParquetWriter. In questo modo la memoria viene limitata indipendentemente dalle dimensioni della tabella. La funzione usa due cursori separati: uno per leggere i metadati della colonna e un altro per trasmettere i dati.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAggiungere il gancio di arricchimento.
enrich_parquetè un segnaposto in cui è possibile aggiungere trasformazioni, colonne derivate o join ai dati prima del caricamento. In questa guida rapida, si tratta di un'operazione no-op che restituisce il percorso del file intatto.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAggiungere la funzione di caricamento.
upload_parquetlegge lo schema Arrow dal file Parquet, genera ed esegueDROP/CREATE TABLEDDL per preparare la destinazione, quindi legge il file in batch e chiamacursor.bulkcopy()per l'inserimento in blocco ad alte prestazioni. L'opzionetable_lock=Truemigliora la velocità effettiva riducendo al minimo i conflitti di blocco. Al termine del caricamento, la funzione esegue unSELECT COUNT(*)per verificare che il conteggio delle righe corrisponda.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAggiungere la funzione di orchestrazione.
transfer_tablesunisce le tre fasi. Si connette al database di origine, individua tutte le tabelle di base nello schema specificato tramiteINFORMATION_SCHEMA.TABLES, ne scarica ognuna in un file Parquet locale, esegue l'hook di arricchimento, quindi si connette al database di destinazione e carica ogni file.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Infine, aggiungere il
mainpunto di ingresso. Carica il.envfile, chiamatransfer_tablescon le stringhe di connessione di origine e di destinazione e stampa il tempo trascorso totale.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Salvare e chiudere
main.py.
Salvare le stringhe di connessione
Aprire il
.gitignorefile e aggiungere un'esclusione per.envi file. Il file dovrebbe essere simile a questo esempio. Assicurati di salvarlo e chiuderlo quando hai finito.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envNella directory corrente creare un nuovo file denominato
.env.All'interno del
.envfile aggiungere voci per le stringhe di connessione di origine e di destinazione. Sostituire i valori segnaposto con i nomi effettivi del server e del database.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Suggerimento
La stringa di connessione usata qui dipende in gran parte dal tipo di database SQL a cui ci si connette. Se ci si connette a un database SQL di Azure o a un database SQL in Fabric, usare la stringa di connessione ODBC dalla scheda Stringhe di connessione. Potrebbe essere necessario modificare il tipo di autenticazione a seconda dello scenario. Per altre informazioni sulle stringhe di connessione e sulla relativa sintassi, vedere informazioni di riferimento sulla sintassi delle stringhe di connessione.
Suggerimento
In macOS, entrambi ActiveDirectoryInteractive e ActiveDirectoryDefault funzionano per l'autenticazione di Microsoft Entra.
ActiveDirectoryInteractive richiede di accedere ogni volta che si esegue lo script. Per evitare richieste di accesso ripetute, accedere una sola volta tramite l'interfaccia della riga di comando di Azure eseguendo az login, quindi usare ActiveDirectoryDefault, che riutilizza le credenziali memorizzate nella cache.
Usare uv run per eseguire lo script
Nella finestra del terminale precedente o in una nuova finestra del terminale aperta nella stessa directory eseguire il comando seguente.
uv run main.pyEcco l'output previsto al termine dello script.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConnettersi al database di destinazione usando SQL Server Management Studio (SSMS) o l'estensione MSSQL per VS Code e verificare che le tabelle e i dati siano stati creati correttamente.
Per distribuire lo script in un altro computer, copiare tutti i file ad eccezione della
.venvcartella nell'altro computer. L'ambiente virtuale viene ricreato con la prima esecuzione.
Funzionamento del codice
L'applicazione esegue un trasferimento completo dei dati di andata e ritorno in tre fasi:
-
Download: si connette al database di origine, legge i metadati delle colonne da
INFORMATION_SCHEMA.COLUMNS, compila uno schema Apache Arrow, quindi scarica ogni tabella in un file Parquet locale. -
Arricchimento (facoltativo): fornisce un hook (
enrich_parquet) in cui è possibile aggiungere trasformazioni, colonne derivate o join prima del caricamento. -
Caricamento: legge ogni file Parquet in batch, ricrea la tabella nel database di destinazione usando DDL generato dai metadati dello schema Arrow, quindi usa
cursor.bulkcopy()per l'inserimento in blocco ad alte prestazioni.
Passo successivo
Per altri esempi, visitare il mssql-python repository GitHub del driver per contribuire a idee o segnalare problemi.