Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
I denne quickstart bruger du driveren mssql-python til at massekopiere data mellem SQL-databaser i Fabric. Applikationen downloader tabeller fra et kildedatabaseskema til lokale Parquet-filer ved hjælp af Apache Arrow og uploader dem derefter til en destinationsdatabase ved hjælp af højtydende bulkcopy metoden. Du kan bruge dette mønster til at migrere, replikere eller transformere data mellem SQL-databaser i Fabric.
Driveren mssql-python kræver ingen eksterne afhængigheder på Windows-maskiner. Driveren installerer alt, hvad den har brug for, med en enkelt pip installation, så du kan bruge den nyeste version af driveren til nye scripts uden at bryde andre scripts, som du ikke har tid til at opgradere og teste.
mssql-python dokumentation | mssql-python kildekode | Pakke (PyPI) | UV
Forudsætninger
Indlæs AdventureWorks-eksempeldata i din SQL-database som kildedatabase.
(Valgfrit) En anden SQL-database i Fabric til at bruge som destination. Brugeren skal have tilladelse til at oprette og skrive til tabeller. Hvis du ikke har en anden database, kan du ændre destinationsforbindelsesstrengen, så den peger på den samme database og bruge et andet skema til destinationstabellerne.
Python 3
Hvis du ikke allerede har Python, så installer Python runtime og pip package manager fra python.org.
Vil du ikke bruge dit eget miljø? Åbn som en devcontainer ved hjælp af GitHub Codespaces.
Visual Studio Code med følgende udvidelser:
Azure Command-Line Interface (CLI) - Påkrævet for adgangskodefri autentificering på macOS og Linux.
Hvis du ikke allerede har
uv, skal du installereuvved at følge instruktionerne fra https://docs.astral.sh/uv/getting-started/installation/.Installer engangsoperativsystemspecifikke forudsætninger.
Opret projektet, og kør koden
- Opret et nyt projekt
- Tilføj afhængigheder
- Start Visual Studio Code
- Opdater pyproject.toml
- Opdater main.py
- Gem forbindelsesstrengene
- Brug uv run til at udføre scriptet
Opret et nyt projekt
Åbn en kommandoprompt i din udviklingsmappe. Hvis du ikke har en, skal du oprette en ny mappe kaldet
python,scriptsosv. Undgå mapper på dit OneDrive, synkroniseringen kan forstyrre administrationen af dit virtuelle miljø.Opret et nyt projekt med
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Tilføj afhængigheder
I den samme mappe skal du installere pakkerne mssql-python, python-dotenv, og pyarrow .
uv add mssql-python python-dotenv pyarrow
Start Visual Studio Code
Kør følgende kommando i samme mappe.
code .
Opdater pyproject.toml
pyproject.toml indeholder metadataene for dit projekt. Åbn filen i din foretrukne editor.
Gennemgå indholdet af filen. Det burde ligne dette eksempel. Bemærk Python-versionen og afhængigheden for anvendelser
>=tilmssql-pythonat definere en minimumsversion. Hvis du foretrækker en nøjagtig version, så ændr versionsnummeret>===før til . De løste versioner af hver pakke gemmes derefter i uv.lock. Låsefilen sikrer, at udviklerne, der arbejder på projektet, bruger ensartede pakkeversioner. Det sikrer også, at det samme sæt pakkeversioner bruges, når din pakke distribueres til slutbrugerne. Du bør ikke redigere filenuv.lock.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Opdater beskrivelsen, så den er mere beskrivende.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Gem og luk filen.
Opdater main.py
Åbn filen med navnet
main.py. Det burde ligne dette eksempel.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Erstat indholdet af filen med følgende kodeblokke. Hver blok bygger videre på den forrige og bør tilføjes
main.pyi rækkefølge.Tips
Hvis Visual Studio Code har problemer med at løse pakker, skal du opdatere fortolkeren for at bruge det virtuelle miljø.
Øverst i
main.py, tilføj importerne og konstanterne. Scriptet brugermssql_pythontil databaseforbindelse,pyarrowtilpyarrow.parquethåndtering af kolonnedata og Parquet-fil-I/O tilpython-dotenvindlæsning af forbindelsesstrenge fra en.envfil, samt et kompileret regex-mønster, der validerer SQL-identifikatorer for at forhindre injektion."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameUnder importene tilføjes SQL-til-Arrow-typemappingen. Denne ordbog oversætter SQL Server-kolonnetyper til deres Apache Arrow-ækvivalenter, så datanøjagtigheden bevares, når man skriver til Parquet. De to hjælpefunktioner bygger nøjagtige SQL-typestrenge (for eksempel
NVARCHAR(100)ellerDECIMAL(18,2)) ud fraINFORMATION_SCHEMAmetadata og løser den matchende Arrow-type for hver kolonne._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vTilføj funktionerne til skema-introspektion og DDL-generering.
_get_arrow_schemaforespørgslerINFORMATION_SCHEMA.COLUMNSved hjælp af parameteriserede forespørgsler, bygger et Arrow-skema og gemmer den oprindelige SQL-type som feltmetadata, så destinationstabellen kan genskabes med nøjagtige kolonnedefinitioner._create_table_ddllæser metadataene tilbage for at generereDROP/CREATE TABLEDDL. Typentimestamp(rowversion) ommappes tilVARBINARY(8), fordi den er auto-genereret og ikke direkte indsættbar.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Tilføj downloadfunktionen.
download_tablelæser rækker fra en kildetabel i batches ved hjælp affetchmany, konverterer hver værdi til en Arrow-kompatibel Python-type, og skriver recordbatches inkrementelt til en Parquet-fil medpq.ParquetWriter. Denne tilgang undgår at indlæse hele tabellen i hukommelsen. Funktionen bruger to separate cursors: én til at læse kolonnemetadata og en anden til at streame dataene. Hvis bordet er tomt, vender det tilbage tidligt.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") with pq.ParquetWriter(parquet_file, schema) as writer: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) writer.write_batch(batch) row_count += len(rows) if row_count == 0: os.remove(parquet_file) return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countTilføj berigelseskrogen.
enrich_parqueter en pladsholder, hvor du kan tilføje transformationer, afledte kolonner eller joins til data, før de uploades. I denne quickstart er det en no-op, der returnerer filstien uændret.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileTilføj upload-funktionen.
upload_parquetlæser Arrow-skemaet fra Parquet-filen, genererer og udførerDROP/CREATE TABLEDDL for at forberede destinationen, læser derefter filen i batches og kaldercursor.bulkcopy()på højtydende bulkindsættelse. Mulighedentable_lock=Trueforbedrer gennemstrømningen ved at minimere låskonkurrence. Når uploaden er færdig, kører funktionen enSELECT COUNT(*)for at verificere, at rækkeantallet stemmer overens.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows conn.commit() elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedTilføj orkestreringsfunktionen.
transfer_tablesbinder de tre faser sammen. Den forbinder til kildedatabasen, opdager alle basetabeller i det givne skema viaINFORMATION_SCHEMA.TABLES, downloader hver enkelt til en lokal Parquet-fil, kører berigelseskrogen, forbinder derefter til destinationsdatabasen og uploader hver fil.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched_files = [] for table_name, parquet_file in parquet_files: enriched_file = enrich_parquet(parquet_file) enriched_files.append((table_name, enriched_file)) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched_files: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Til sidst tilføj indgangspunktet
main. Den indlæser filen.env, kaldertransfer_tablesmed kilde- og destinationsforbindelsesstrengene og udskriver den samlede forløbne tid.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Gem og luk
main.py.
Gem forbindelsesstrengene
Åbn filen,
.gitignoreog tilføj en udeladelse for.envfiler. Din fil skal ligne dette eksempel. Sørg for at gemme og lukke den, når du er færdig.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envI den aktuelle mappe skal du oprette en ny fil med navnet
.env.Inden for
.envfilen tilføjes der poster for dine kilde- og destinationsforbindelser. Erstat pladsholderværdierne med dine faktiske server- og databasenavne.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Tips
For SQL-database i Fabric skal du bruge ODBC-forbindelsesstrengen fra fanen Forbindelsesstrenge uden DRIVER-oplysningerne . For mere information, se Connect to your SQL-database i Fabric.
Tips
På macOS fungerer begge ActiveDirectoryInteractive til ActiveDirectoryDefault Microsoft Entra-autentificering.
ActiveDirectoryInteractive opfordrer dig til at logge ind hver gang, du kører scriptet. For at undgå gentagne login-prompts, log ind én gang via Azure CLI ved at køre az login, og brug ActiveDirectoryDefaultderefter , som genbruger den cachede legitimationsoplysninger.
Brug uv run til at udføre scriptet
I terminalvinduet fra før, eller et nyt terminalvindue, der er åbent i den samme mappe, skal du køre følgende kommando.
uv run main.pyHer er det forventede output, når scriptet er færdigt.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sForbind til destinationsdatabasen og verificér, at tabellerne og dataene er oprettet med succes. For flere muligheder for, hvordan du forbinder, se Connect to your SQL-database i Fabric.
For at deploye dit script til en anden maskine, kopier alle filer undtagen
.venvmappen til den anden maskine. Det virtuelle miljø genskabes ved første gennemspilning.
Hvordan koden fungerer
Applikationen udfører en fuld rundrejse dataoverførsel i tre faser:
-
Download: Forbinder til kildedatabasen, læser kolonnemetadata fra
INFORMATION_SCHEMA.COLUMNS, bygger et Apache Arrow-skema og downloader derefter hver tabel i batches til en lokal Parquet-fil ved hjælp afpq.ParquetWriter. -
Berik (valgfrit): Giver en hook (
enrich_parquet), hvor du kan tilføje transformationer, afledte kolonner eller joins før upload. -
Upload: Læser hver Parquet-fil i batches, genskaber tabellen i destinationsdatabasen ved hjælp af DDL genereret fra Arrow-skemametadata, og bruger
cursor.bulkcopy()derefter til højtydende bulk-insert.
Næste trin
Besøg driverens mssql-python GitHub-lager for at få flere eksempler for at bidrage med ideer eller rapportere problemer.