Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
V tomto rychlém průvodci použijete mssql-python ovladač k hromadnému kopírování dat mezi databázemi. Aplikace stáhne tabulky ze zdrojového schématu databáze do místních souborů Parquet pomocí Apache Arrow a pak je nahraje do cílové databáze pomocí vysoce výkonné bulkcopy metody. Tento vzor můžete použít k migraci, replikaci nebo transformaci dat mezi SQL Serverem, Azure SQL Database a SQL databází v prostředí Fabric.
Ovladač mssql-python nevyžaduje žádné externí závislosti na počítačích s Windows. Ovladač nainstaluje všechno, co potřebuje, s jednou pip instalací, což vám umožní používat nejnovější verzi ovladače pro nové skripty bez přerušení jiných skriptů, které nemáte čas upgradovat a testovat.
Dokumentace mssql-python, zdrojový kód mssql-python, Balíček (PyPI) uv
Předpoklady
Python 3
Pokud ještě nemáte Python, nainstalujte Python runtime a správce balíčků pip z python.org.
Nechcete používat vlastní prostředí? Otevřete jako devcontainer pomocí GitHub Codespaces.
Visual Studio Code s následujícími rozšířeními:
- Pythonové rozšíření pro Visual Studio Code
Azure Command-Line Interface (CLI) pro ověřování bez hesla v systému macOS a Linux.
Pokud ještě
uvnemáte, postupujte podle pokynů k instalaci.Zdrojová databáze na SQL Serveru, Azure SQL Database nebo SQL databázi ve Fabric s
AdventureWorks2025ukázkovým schématem a platným připojovacím řetězcem.Cílová databáze na serveru SQL Server, Azure SQL Database nebo databázi SQL ve Fabric s platným připojovacím řetězcem. Uživatel musí mít oprávnění k vytváření a zápisu do tabulek. Pokud nemáte druhou databázi, můžete změnit cílový připojovací řetězec tak, aby odkazovat na stejnou databázi, a použít jiné schéma pro cílové tabulky.
Nainstalujte požadavky specifické pro jednorázový operační systém.
Vytvoření databáze SQL
Tento rychlý start vyžaduje jako zdrojovou databázi schéma AdventureWorks2025 Lightweight .
Vytvoření projektu a spuštění kódu
- Vytvoření nového projektu
- Přidání závislostí
- Spuštění editoru Visual Studio Code
- Aktualizace pyproject.toml
- Aktualizace main.py
- Uložte připojovací řetězce
- Použij uv run pro spuštění skriptu
Vytvoření nového projektu
Otevřete příkazový řádek ve vývojovém adresáři. Pokud ho nemáte, vytvořte nový adresář s názvem
python,scriptsatd. Vyhněte se složkám na OneDrivu, synchronizace může narušit správu vašeho virtuálního prostředí.Vytvořte nový projekt pomocí
uvpříkazu .uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Přidejte závislosti
Ve stejném adresáři nainstalujte balíčky mssql-python, python-dotenv a pyarrow.
uv add mssql-python python-dotenv pyarrow
Spusťte Visual Studio Code
Ve stejném adresáři spusťte následující příkaz.
code .
Aktualizace pyproject.toml
Soubor pyproject.toml obsahuje metadata pro váš projekt. Otevřete soubor v oblíbeném editoru.
Zkontrolujte obsah souboru. Měl by se podobat tomuto příkladu. Všimněte si verze Pythonu a závislosti pro
mssql-python, které používá>=k definování minimální verze. Pokud dáváte přednost přesné verzi, změňte>=před číslem verze na==. Vyřešené verze každého balíčku jsou pak uloženy v uv.lock. Soubor lockfile zajišťuje, aby vývojáři pracující na projektu používali konzistentní verze balíčků. Také zajišťuje, aby se při distribuci balíčku koncovým uživatelům používala úplně stejná sada verzí balíčků. Souboruv.lockbyste neměli upravovat.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Aktualizujte popis, aby byl popisnější.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Uložte a zavřete soubor.
Aktualizace main.py
Otevřete soubor s názvem
main.py. Měl by se podobat tomuto příkladu.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()main.pyObsah nahraďte následujícími bloky kódu. Každý blok vychází z předchozího bloku a měl by být vmain.pydaném pořadí.Návod
Pokud má Visual Studio Code potíže s řešením balíčků, musíte interpreta aktualizovat tak, aby používal virtuální prostředí.
V horní části souboru
main.pypřidejte importy a konstanty. Skript používámssql_pythonpro připojení k databázi,pyarrowapyarrow.parquetpro zpracování sloupcových dat a vstupně-výstupní operace souborů Parquet,python-dotenvpro načítání připojovacích řetězců ze souboru.env, a zkompilovaný vzor regulárního výrazu, který ověřuje identifikátory SQL, aby se zabránilo injection."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return namePod importy přidejte mapování typu SQL-to-Arrow. Tento slovník překládá typy sloupců SQL Serveru na jejich ekvivalenty Apache Arrow, aby se při zápisu do Parquet zachovala věrnost dat. Tyto dvě pomocné funkce vytvářejí přesné řetězce typu SQL (například
NVARCHAR(100)neboDECIMAL(18,2)) zINFORMATION_SCHEMAmetadat a určí odpovídající Arrow typ pro každý sloupec._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vPřidejte introspekci schématu a funkce generování DDL.
_get_arrow_schemaINFORMATION_SCHEMA.COLUMNSprovádí dotazy využívající parametrizované dotazy, sestaví Arrow schéma a uloží původní typ SQL jako metadata polí, aby bylo možné cílovou tabulku znovu vytvořit s přesnými definicemi sloupců._create_table_ddlčte zpět tato metadata k generováníDROP/CREATE TABLEDDL. Typtimestamp(rowversion) je namapován naVARBINARY(8), protože se automaticky generuje a není možné ho přímo vložit.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Přidejte funkci pro stažení.
download_tablestreamuje řádky ze zdrojové tabulky v dávkách pomocífetchmany(), převede každou hodnotu na typ Python kompatibilní se šipkou a zapíše dávky záznamů přírůstkově do místního souboru Parquet pomocíParquetWriter. To udržuje paměť ohraničenou bez ohledu na velikost tabulky. Funkce používá dva samostatné kurzory: jeden ke čtení metadat sloupců a druhý pro streamování dat.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countPřidejte háček pro rozšiřování.
enrich_parquetje zástupný symbol, ve kterém můžete před nahráním přidat transformace, odvozené sloupce nebo spojení k datům. V tomto rychlém startu se jedná o no-op, který vrátí cestu k souboru beze změny.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_filePřidejte funkci pro nahrání.
upload_parquetpřečte Arrow schema ze souboru Parquet, vygeneruje a spustíDROP/CREATE TABLEDDL pro přípravu cíle, poté načte soubor v dávkách a využijecursor.bulkcopy()pro vysoce výkonné hromadné vložení. Tatotable_lock=Truemožnost zlepšuje propustnost minimalizací kolizí zámků. Po dokončení nahrávání spustí funkceSELECT COUNT(*), která ověří, že se shoduje počet řádků.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedPřidejte funkci orchestrace.
transfer_tablesspojí tři fáze dohromady. Připojí se ke zdrojové databázi, zjistí všechny základní tabulky v daném schématu prostřednictvímINFORMATION_SCHEMA.TABLES, stáhne každý z nich do místního souboru Parquet, spustí háček rozšiřování, pak se připojí k cílové databázi a nahraje každý soubor.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Nakonec přidejte vstupní bod
main. Načte.envsoubor, volátransfer_tablesse zdrojovými a cílovými připojovacími řetězci a vytiskne celkový uplynulý čas.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Uložte a zavřete
main.py.
Uložte připojovací řetězce
.gitignoreOtevřete soubor a přidejte vyloučení souborů.env. Soubor by měl být podobný tomuto příkladu. Až budete hotovi, nezapomeňte ho uložit a zavřít.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envV aktuálním adresáři vytvořte nový soubor s názvem
.env..envV souboru přidejte položky pro zdrojové a cílové připojovací řetězce. Nahraďte zástupné hodnoty skutečnými názvy serverů a databází.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Návod
Připojovací řetězec použitý zde do značné míry závisí na typu databáze SQL, ke které se připojujete. Pokud se připojujete k Azure SQL Database nebo k SQL databázi ve Fabric, použijte připojovací řetězec ODBC z karty Připojovací řetězce. Možná budete muset upravit typ ověřování v závislosti na vašem scénáři. Další informace o připojovacích řetězcích a jejich syntaxi najdete v referenčních informacích k syntaxi připojovacího řetězce.
Návod
V systému macOS funguje ověřování pomocí Microsoft Entra jak s ActiveDirectoryInteractive, tak s ActiveDirectoryDefault.
ActiveDirectoryInteractive vás vyzve k přihlášení při každém spuštění skriptu. Pokud se chcete vyhnout opakovaným výzvám k přihlášení, přihlaste se jednou přes Azure CLI spuštěním az logina pak použijte ActiveDirectoryDefaultpříkaz , který znovu použije přihlašovací údaje uložené v mezipaměti.
Použijte 'uv run' ke spuštění skriptu
V okně terminálu před nebo v novém okně terminálu, které se otevře ve stejném adresáři, spusťte následující příkaz.
uv run main.pyTady je očekávaný výstup po dokončení skriptu.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sPřipojte se k cílové databázi pomocí aplikace SQL Server Management Studio (SSMS) nebo rozšíření MSSQL pro VS Code a ověřte, že se tabulky a data úspěšně vytvořily.
Pokud chcete skript nasadit do jiného počítače, zkopírujte všechny soubory s výjimkou
.venvsložky na druhý počítač. Virtuální prostředí se znovu vytvoří při prvním spuštění.
Jak kód funguje
Aplikace provádí kompletní přenos dat během zpáteční cesty ve třech fázích:
-
Ke stažení: Připojí se ke zdrojové databázi, přečte metadata sloupců z
INFORMATION_SCHEMA.COLUMNS, sestaví schéma Apache Arrow a pak stáhne každou tabulku do místního souboru Parquet. -
Obohacení (volitelné): Poskytuje háček (
enrich_parquet), kde můžete před nahráním přidat transformace, odvozené sloupce nebo spojení. -
Nahrání: Načte každý soubor Parquet v dávkách, znovu vytvoří tabulku v cílové databázi pomocí DDL generovaného z metadat schématu Arrow a použije
cursor.bulkcopy()k výkonnému hromadnému vložení.
Další krok
Další příklady najdete v mssql-python úložišti GitHubu ovladače, kde můžete přispívat nápady nebo hlásit problémy.