Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In deze quickstart gebruikt u het mssql-python stuurprogramma om gegevens bulksgewijs tussen databases te kopiëren. De toepassing downloadt tabellen van een brondatabaseschema naar lokale Parquet-bestanden met behulp van Apache Arrow en uploadt ze vervolgens naar een doeldatabase met behulp van de krachtige bulkcopy methode. U kunt dit patroon gebruiken om gegevens te migreren, repliceren of transformeren tussen SQL Server, Azure SQL Database en SQL Database in Fabric.
Het mssql-python stuurprogramma vereist geen externe afhankelijkheden op Windows-computers. Het stuurprogramma installeert alles wat het nodig heeft met één pip installatie, zodat u de nieuwste versie van het stuurprogramma voor nieuwe scripts kunt gebruiken zonder dat andere scripts die u niet hoeft te upgraden en te testen, worden onderbroken.
documentatie | mssql-python-broncode | Pakket (PyPI) | Uv
Vereiste voorwaarden
Python 3
Als u Nog geen Python hebt, installeert u de Python-runtime en pip-pakketbeheer vanuit python.org.
Wilt u niet uw eigen omgeving gebruiken? Open als een devcontainer met GitHub Codespaces.
Visual Studio Code met de volgende extensies:
Azure Command-Line Interface (CLI) voor verificatie zonder wachtwoord in macOS en Linux.
Als u dat nog niet hebt
uv, volgt u de installatie-instructies.Een brondatabase op SQL Server, Azure SQL Database of SQL-database in Fabric met het
AdventureWorks2025voorbeeldschema en een geldige verbindingsreeks.Een doeldatabase in SQL Server, Azure SQL Database of SQL Database in Fabric met een geldige verbindingsreeks. De gebruiker moet gemachtigd zijn om tabellen te maken en naar tabellen te schrijven. Als u geen tweede database hebt, kunt u de doelverbindingsreeks wijzigen zodat deze verwijst naar dezelfde database en een ander schema voor de doeltabellen gebruikt.
Installeer eenmalige vereisten voor het besturingssysteem.
Een SQL-database maken
Voor deze quickstart is het Lightweight-schema AdventureWorks2025 vereist als brondatabase.
Het project maken en de code uitvoeren
- Een nieuw project maken
- Afhankelijkheden toevoegen
- Visual Studio Code starten
- Pyproject.toml bijwerken
- Main.py bijwerken
- De verbindingsreeksen opslaan
- Uv-uitvoering gebruiken om het script uit te voeren
Een nieuw project maken
Open een opdrachtprompt in uw ontwikkelingsmap. Als u geen map hebt, maakt u een nieuwe map met de naam
python,scriptsenzovoort. Vermijd mappen in uw OneDrive, dan kan de synchronisatie het beheer van uw virtuele omgeving verstoren.Maak een nieuw project met
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Afhankelijkheden toevoegen
Installeer in dezelfde map de mssql-python, python-dotenven pyarrow pakketten.
uv add mssql-python python-dotenv pyarrow
Visual Studio Code starten
Voer in dezelfde map de volgende opdracht uit.
code .
Pyproject.toml bijwerken
Het pyproject.toml bevat de metagegevens voor uw project. Open het bestand in uw favoriete editor.
Controleer de inhoud van het bestand. Deze moet vergelijkbaar zijn met dit voorbeeld. Noteer de Python-versie en -afhankelijkheid voor
mssql-pythongebruik>=om een minimale versie te definiëren. Als u de voorkeur geeft aan een exacte versie, wijzigt u de>=voor het versienummer in==. De opgeloste versies van elk pakket worden vervolgens opgeslagen in de uv.lock. Het lockfile zorgt ervoor dat ontwikkelaars die aan het project werken, consistente pakketversies gebruiken. Het zorgt er ook voor dat dezelfde set pakketversies wordt gebruikt bij het distribueren van uw pakket naar eindgebruikers. U moet hetuv.lockbestand niet bewerken.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Werk de beschrijving bij zodat deze meer beschrijvend is.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Sla het bestand op en sluit het.
Main.py bijwerken
Open het bestand met de naam
main.py. Deze moet vergelijkbaar zijn met dit voorbeeld.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Vervang de inhoud van
main.pymet de volgende codeblokken. Elk blok bouwt voort op de vorige en moet opmain.pyvolgorde worden geplaatst.Aanbeveling
Als Visual Studio Code problemen ondervindt bij het oplossen van pakketten, moet u de interpreter bijwerken om de virtuele omgeving te gebruiken.
Voeg bovenaan
main.pyde import- en constanten toe. Het script gebruiktmssql_pythonvoor databaseconnectiviteit,pyarrowenpyarrow.parquetvoor het verwerken van kolomgegevens en Parquet-bestands-I/O, énpython-dotenvvoor het laden van verbindingsreeksen uit een.envbestand, en een gecompileerd regex-patroon dat SQL-identifiers (id's) valideert om injectie te voorkomen."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameVoeg onder de imports de SQL-naar-Arrow type mapping toe. Deze woordenlijst vertaalt SQL Server-kolomtypen naar hun Apache Arrow-equivalenten, zodat de betrouwbaarheid van gegevens behouden blijft bij het schrijven naar Parquet. De twee helperfuncties bouwen exacte SQL-typetekenreeksen (bijvoorbeeld
NVARCHAR(100)ofDECIMAL(18,2)) op basis vanINFORMATION_SCHEMAmetagegevens en lossen het overeenkomende pijltype voor elke kolom op._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vVoeg de functies schema-introspectie en DDL-generatie toe.
_get_arrow_schemaINFORMATION_SCHEMA.COLUMNS-query’s met behulp van geparameteriseerde query's, bouwt een Arrow schema en slaat het oorspronkelijke SQL-type op als veldmetagegevens, zodat de doeltabel opnieuw kan worden gemaakt met exacte kolomdefinities._create_table_ddlleest die metagegevens terug om DDL te genererenDROP/CREATE TABLE. Hettimestamp-type (rowversion) wordt opnieuw gemapt naarVARBINARY(8)omdat het automatisch wordt gegenereerd en niet kan worden ingevoegd.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Voeg de downloadfunctie toe.
download_tablestreamt rijen uit een brontabel in batches met behulp vanfetchmany(), converteert elke waarde naar een python-type dat compatibel is met pijlen en schrijft recordbatches incrementeel naar een lokaal Parquet-bestand met behulp vanParquetWriter. Hierdoor blijft geheugen gebonden, ongeacht de tabelgrootte. De functie maakt gebruik van twee afzonderlijke cursors: een voor het lezen van kolommetagegevens en een andere om de gegevens te streamen.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countVoeg de verrijkingshook toe.
enrich_parquetis een tijdelijke aanduiding waar u transformaties, afgeleide kolommen of joins kunt toevoegen aan gegevens voordat deze worden geüpload. In deze snelstartgids is er sprake van een no-op die het bestandspad ongewijzigd teruggeeft.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileVoeg de uploadfunctie toe.
upload_parquetleest het Arrow-schema uit het Parquet-bestand, genereert en voertDROP/CREATE TABLEDDL uit om de bestemming voor te bereiden. Vervolgens wordt het bestand in batches gelezen en wordtcursor.bulkcopy()aangeroepen voor een bulkinvoer met hoge prestaties. Detable_lock=Trueoptie verbetert de doorvoer door vergrendelingsconflicten te minimaliseren. Nadat het uploaden is voltooid, wordt eenSELECT COUNT(*)functie uitgevoerd om te controleren of het aantal rijen overeenkomt.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedVoeg de orchestratiefunctie toe.
transfer_tableskoppelt de drie fasen aan elkaar. Het maakt verbinding met de brondatabase, detecteert alle basistabellen in het opgegeven schema viaINFORMATION_SCHEMA.TABLES, downloadt elke tabel naar een lokaal Parquet-bestand, voert de verrijkingshook uit, maakt vervolgens verbinding met de doeldatabase en uploadt elk bestand.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Voeg tot slot het
maintoegangspunt toe. Het laadt het bestand.env, roepttransfer_tablesaan met de verbindingsreeksen voor de bron en het doel, en drukt de totale verstreken tijd af.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Opslaan en sluiten
main.py.
De verbindingsreeksen opslaan
Open het
.gitignorebestand en voeg een uitsluiting toe voor.envbestanden. Het bestand moet er ongeveer uitzien als in dit voorbeeld. Zorg ervoor dat u deze opslaat en sluit wanneer u klaar bent.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envMaak in de huidige map een nieuw bestand met de naam
.env.Voeg in het
.envbestand vermeldingen toe voor uw bron- en doelverbindingsreeksen. Vervang de tijdelijke aanduidingen door de werkelijke server- en databasenamen.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Aanbeveling
De hier gebruikte verbindingsreeks is grotendeels afhankelijk van het type SQL-database waarmee u verbinding maakt. Als u verbinding maakt met een Azure SQL Database of een SQL-database in Fabric, gebruikt u de ODBC-verbindingsreeks op het tabblad Verbindingsreeksen. Mogelijk moet u het verificatietype aanpassen, afhankelijk van uw scenario. Zie de naslaginformatie over de syntaxis van de verbindingsreeks voor meer informatie over verbindingsreeksen en de bijbehorende syntaxis.
Aanbeveling
In macOS werken beide ActiveDirectoryInteractive en ActiveDirectoryDefault voor Microsoft Entra-verificatie.
ActiveDirectoryInteractive u wordt gevraagd u aan te melden telkens wanneer u het script uitvoert. Als u herhaalde aanmeldingsprompts wilt voorkomen, meldt u zich eenmaal aan via de Azure CLI door uit te voeren az loginen vervolgens te gebruiken ActiveDirectoryDefault, waarmee de referenties in de cache opnieuw worden gebruikt.
Gebruik uv run om het script uit te voeren
Voer in het terminalvenster van vóór, of een nieuw terminalvenster dat is geopend in dezelfde map, de volgende opdracht uit.
uv run main.pyDit is de verwachte uitvoer wanneer het script is voltooid.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sMaak verbinding met de doeldatabase met behulp van SQL Server Management Studio (SSMS) of de MSSQL-extensie voor VS Code en controleer of de tabellen en gegevens zijn gemaakt.
Als u uw script op een andere computer wilt implementeren, kopieert u alle bestanden, met uitzondering van de
.venvmap naar de andere computer. De virtuele omgeving wordt opnieuw gemaakt met de eerste uitvoering.
Hoe de code werkt
De toepassing voert een volledige retourgegevensoverdracht uit in drie fasen:
-
Download: Maakt verbinding met de brondatabase, leest kolommetagegevens van
INFORMATION_SCHEMA.COLUMNS, bouwt een Apache Arrow-schema en downloadt vervolgens elke tabel in een lokaal Parquet-bestand. -
Verrijken (optioneel): Biedt een haak (
enrich_parquet) waar u transformaties, afgeleide kolommen of joins kunt toevoegen voordat u uploadt. -
Uploaden: leest elk Parquet-bestand in batches, maakt de tabel opnieuw in de doeldatabase met behulp van DDL die is gegenereerd op basis van metagegevens van het pijlschema en gebruikt
cursor.bulkcopy()vervolgens voor bulksgewijs invoegen met hoge prestaties.
Volgende stap
Ga naar de GitHub-opslagplaats van het mssql-python stuurprogramma voor meer voorbeelden, om ideeën bij te dragen of problemen te rapporteren.