Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In dieser Schnellstartanleitung verwenden Sie den mssql-python Treiber zum Massenkopien von Daten zwischen Datenbanken. Die Anwendung lädt Tabellen aus einem Quelldatenbankschema in lokale Parkettdateien mithilfe von Apache Arrow herunter und lädt sie dann mithilfe der hochleistungsorientierten bulkcopy Methode in eine Zieldatenbank hoch. Sie können dieses Muster verwenden, um Daten zwischen SQL Server, Azure SQL-Datenbank und SQL-Datenbank in Fabric zu migrieren, zu replizieren oder zu transformieren.
Der mssql-python Treiber erfordert keine externen Abhängigkeiten von Windows-Computern. Der Treiber installiert alles, was er mit einer einzigen pip Installation benötigt, sodass Sie die neueste Version des Treibers für neue Skripts verwenden können, ohne andere Skripts zu unterbrechen, die Sie nicht zum Upgrade und Test benötigen.
mssql-python-Dokumentation | mssql-python-Quellcode | Paket (PyPI) | Uv
Voraussetzungen
Python 3
Wenn Sie noch nicht über Python verfügen, installieren Sie den Python-Runtime - und Pip-Paket-Manager aus python.org.
Möchten Sie Ihre eigene Umgebung nicht verwenden? Öffnen Sie als Devcontainer mit GitHub Codespaces.
Visual Studio Code mit den folgenden Erweiterungen:
Azure Command-Line Interface (CLI) für die kennwortlose Authentifizierung unter macOS und Linux.
Wenn Sie
uvnoch nicht haben, befolgen Sie die Installationsanleitung.Eine Quelldatenbank in SQL Server, Azure SQL-Datenbank oder SQL-Datenbank in Fabric mit dem
AdventureWorks2025Beispielschema und einer gültigen Verbindungszeichenfolge.Eine Zieldatenbank in SQL Server, Azure SQL-Datenbank oder SQL-Datenbank in Fabric mit einer gültigen Verbindungszeichenfolge. Der Benutzer muss über die Berechtigung zum Erstellen und Schreiben in Tabellen verfügen. Wenn Sie nicht über eine zweite Datenbank verfügen, können Sie die Zielverbindungszeichenfolge so ändern, dass sie auf dieselbe Datenbank verweist und ein anderes Schema für die Zieltabellen verwendet.
Installieren Sie die einmaligen betriebsystem-spezifischen Voraussetzungen.
Erstellen einer SQL-Datenbank
Für diese Schnellstartanleitung ist das AdventureWorks2025 Lightweight-Schema als Quelldatenbank erforderlich.
Erstellen Sie das Projekt, und führen Sie den Code aus.
- Ein neues Projekt erstellen
- Hinzufügen von Abhängigkeiten
- Starten von Visual Studio Code
- Pyproject.toml aktualisieren
- Aktualisieren von main.py
- Speichern der Verbindungszeichenfolgen
- Verwenden Sie 'uv run', um das Skript auszuführen
Neues Projekt erstellen
Öffnen Sie eine Eingabeaufforderung in Ihrem Entwicklungsverzeichnis. Wenn Sie kein Verzeichnis haben, erstellen Sie ein neues Verzeichnis namens
python,scriptsusw. Vermeiden Sie Ordner auf Ihrem OneDrive, die Synchronisierung kann die Verwaltung Ihrer virtuellen Umgebung beeinträchtigen.Erstellen Sie ein neues Projekt mit
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Hinzufügen von Abhängigkeiten
Im selben Verzeichnis installieren Sie die Pakete mssql-python, python-dotenv und pyarrow.
uv add mssql-python python-dotenv pyarrow
Starten Sie Visual Studio Code.
Führen Sie im selben Verzeichnis den folgenden Befehl aus.
code .
Pyproject.toml aktualisieren
Das pyproject.toml enthält die Metadaten für Ihr Projekt. Öffnen Sie die Datei in Ihrem bevorzugten Editor.
Überprüfen Sie den Inhalt der Datei. Es sollte mit diesem Beispiel vergleichbar sein. Beachten Sie die Python-Version und Abhängigkeit, denn
mssql-pythonverwendet>=, um eine Mindestversion zu definieren. Wenn Sie eine genaue Version bevorzugen, ändern Sie die>=Vorversionsnummer in==. Die aufgelösten Versionen der einzelnen Pakete werden dann im uv.lock gespeichert. Die Sperrdatei stellt sicher, dass Entwickler, die am Projekt arbeiten, konsistente Paketversionen verwenden. Außerdem wird sichergestellt, dass derselbe Satz von Paketversionen beim Verteilen des Pakets an Endbenutzer verwendet wird. Sie sollten dieuv.lockDatei nicht bearbeiten.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Aktualisieren Sie die Beschreibung so, dass sie aussagekräftiger ist.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Speichern und schließen Sie die Datei.
Aktualisieren von main.py
Öffnen Sie die Datei mit dem Namen
main.py. Es sollte mit diesem Beispiel vergleichbar sein.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Ersetzen Sie den Inhalt von
main.pymit den folgenden Codeblöcken. Jeder Block baut auf dem vorherigen auf und sollte inmain.pyder Reihenfolge platziert werden.Tipp
Wenn Visual Studio Code Probleme beim Beheben von Paketen hat, müssen Sie den Interpreter aktualisieren, um die virtuelle Umgebung zu verwenden.
Fügen Sie am Anfang von
main.pydie Importe und Konstanten hinzu. Das Skript verwendetmssql_pythonfür die Datenbankkonnektivität,pyarrowundpyarrow.parquetfür die Verarbeitung von Spaltendaten und die Parquet-Datei-E/A,python-dotenvzum Laden von Verbindungszeichenfolgen aus einer.env-Datei, und ein kompiliertes Regex-Muster, das SQL-Bezeichner überprüft, um Injektionen zu verhindern."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameFügen Sie unterhalb der Importe die SQL-zu-Arrow-Typzuordnung hinzu. Dieses Wörterbuch übersetzt SQL Server-Spaltentypen in die entsprechenden Apache Arrow-Typen, sodass die Datentreue beim Schreiben in Parquet beibehalten wird. Die beiden Hilfsfunktionen erstellen präzise SQL-Typ-Zeichenfolgen (z. B.
NVARCHAR(100)oderDECIMAL(18,2)) ausINFORMATION_SCHEMAMetadaten und ermitteln den passenden Arrow-Typ für jede Spalte._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vFügen Sie die Schemaintrospection- und DDL-Generierungsfunktionen hinzu.
_get_arrow_schemafragtINFORMATION_SCHEMA.COLUMNSmithilfe von parameterisierten Abfragen ab, erstellt ein Pfeilschema und speichert den ursprünglichen SQL-Typ als Feldmetadaten, damit die Zieltabelle mit exakten Spaltendefinitionen neu erstellt werden kann._create_table_ddlliest diese Metadaten zurück, um DDL zu generierenDROP/CREATE TABLE. Der Typtimestamp(Rowversion) wird zuVARBINARY(8)neu zugeordnet, weil er automatisch generiert wird und nicht direkt eingefügt werden kann.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Fügen Sie die Downloadfunktion hinzu.
download_tableStreamt in Batches Zeilen aus einer Quelltabelle mithilfe vonfetchmany(), konvertiert jeden Wert in einen Arrow-kompatiblen Python-Typ und schreibt Datensatzbatches inkrementell in eine lokale Parquet-Datei mithilfe vonParquetWriter. Dadurch bleibt der Speicher unabhängig von der Tabellengröße begrenzt. Die Funktion verwendet zwei separate Cursor: einen zum Lesen von Spaltenmetadaten und einen zum Streamen der Daten.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countFügen Sie den Anreicherungshaken hinzu.
enrich_parquetist ein Platzhalter, in dem Sie Transformationen, abgeleitete Spalten oder Verknüpfungen zu Daten hinzufügen können, bevor sie hochgeladen werden. In dieser Schnellstartanleitung handelt es sich um eine no-op, die den Dateipfad unverändert zurückgibt.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileFügen Sie die Upload-Funktion hinzu.
upload_parquetliest das Arrow-Schema aus der Parquet-Datei, generiert und führtDROP/CREATE TABLEDDL aus, um das Ziel vorzubereiten, liest dann die Datei in Batches und ruftcursor.bulkcopy()für hochleistungsfähige Masseneinfügungen auf. Dietable_lock=True-Option verbessert den Durchsatz, indem die Sperrkonkurrenz minimiert wird. Nach Abschluss des Uploads wird eineSELECT COUNT(*)Funktion ausgeführt, um die Übereinstimmungen der Zeilenanzahl zu überprüfen.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedFügen Sie die Orchestrierungsfunktion hinzu.
transfer_tablesverbindet die drei Phasen zusammen. Sie stellt eine Verbindung mit der Quelldatenbank her, ermittelt alle Basistabellen im angegebenen Schema überINFORMATION_SCHEMA.TABLES, lädt jede in eine lokale Parquet-Datei herunter, führt den Enrichment-Hook aus, stellt dann eine Verbindung mit der Zieldatenbank her und lädt jede Datei hoch.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Fügen Sie schließlich den
mainEinstiegspunkt hinzu. Sie lädt die.envDatei, rufttransfer_tablesdie Quell- und Zielverbindungszeichenfolgen auf und druckt die gesamte verstrichene Zeit.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Speichern Sie und schließen Sie
main.py.
Speichern der Verbindungszeichenfolgen
Öffnen Sie die
.gitignoreDatei, und fügen Sie einen Ausschluss für Dateien hinzu.env. Ihre Datei sollte mit diesem Beispiel vergleichbar sein. Achten Sie darauf, sie zu speichern und zu schließen, wenn Sie fertig sind.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envErstellen Sie im aktuellen Verzeichnis eine neue Datei mit dem Namen
.env.Fügen Sie in der
.envDatei Einträge für die Quell- und Zielverbindungsstrings hinzu. Ersetzen Sie die Platzhalterwerte durch die tatsächlichen Server- und Datenbanknamen.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Tipp
Die hier verwendete Verbindungszeichenfolge hängt weitgehend vom Typ der SQL-Datenbank ab, mit der Sie eine Verbindung herstellen. Wenn Sie eine Verbindung mit einer Azure SQL-Datenbank oder einer SQL-Datenbank in Fabric herstellen, verwenden Sie die ODBC-Verbindungszeichenfolge auf der Registerkarte "Verbindungszeichenfolgen". Möglicherweise müssen Sie den Authentifizierungstyp je nach Szenario anpassen. Weitere Informationen zu Verbindungszeichenfolgen und deren Syntax finden Sie in der Referenz zur Verbindungszeichenfolgensyntax.
Tipp
Unter macOS funktionieren sowohl ActiveDirectoryInteractive als auch ActiveDirectoryDefault für die Microsoft Entra-Authentifizierung.
ActiveDirectoryInteractive fordert Sie auf, sich jedes Mal anzumelden, wenn Sie das Skript ausführen. Um wiederholte Anmeldeaufforderungen zu vermeiden, melden Sie sich einmal über die Azure CLI an, indem Sie den Befehl az login ausführen, und verwenden Sie dann ActiveDirectoryDefault, die die zwischengespeicherten Anmeldeinformationen nutzt.
Verwenden Sie "uv run", um das Skript auszuführen.
Führen Sie im Terminalfenster vor oder in einem neuen Terminalfenster, das im selben Verzeichnis geöffnet ist, den folgenden Befehl aus.
uv run main.pyDies ist die erwartete Ausgabe, wenn das Skript abgeschlossen ist.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sStellen Sie eine Verbindung mit der Zieldatenbank mithilfe von SQL Server Management Studio (SSMS) oder der MSSQL-Erweiterung für VS Code her, und stellen Sie sicher, dass die Tabellen und Daten erfolgreich erstellt wurden.
Um Ihr Skript auf einem anderen Computer bereitzustellen, kopieren Sie alle Dateien mit Ausnahme des
.venvOrdners auf den anderen Computer. Die virtuelle Umgebung wird mit der ersten Ausführung neu erstellt.
Funktionsweise des Codes
Die Anwendung führt eine vollständige Roundtrip-Datenübertragung in drei Phasen durch:
-
Download: Stellt eine Verbindung mit der Quelldatenbank her, liest Spaltenmetadaten aus
INFORMATION_SCHEMA.COLUMNS, erstellt ein Apache Arrow-Schema und lädt dann jede Tabelle in eine lokale Parkettdatei herunter. -
Anreicher (optional): Stellt einen Hook (
enrich_parquet) bereit, an dem Sie Transformationen, abgeleitete Spalten oder Verknüpfungen vor dem Hochladen hinzufügen können. -
Upload: Liest jede Parquet-Datei in Batches, erstellt die Tabelle in der Zieldatenbank mithilfe von DDL, die aus Arrow-Schemametadaten generiert wurde, und verwendet dann
cursor.bulkcopy()für leistungsstarke Masseneinfügungen.
Nächster Schritt
Besuchen Sie das mssql-python GitHub-Treiberrepository für weitere Beispiele, um Ideen beizutragen oder Probleme zu melden.