Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ebben a rövid útmutatóban az mssql-python illesztővel tömegesen másolhat adatokat az adatbázisok között. Az alkalmazás letölti a táblákat egy forrásadatbázis-sémából a helyi Parquet-fájlokba az Apache Arrow használatával, majd feltölti őket egy céladatbázisba a nagy teljesítményű bulkcopy módszerrel. Ezzel a mintával migrálhatja, replikálhatja vagy átalakíthatja az adatokat az SQL Server, az Azure SQL Database és az SQL Database között a Fabricben.
Az mssql-python illesztőprogram nem igényel külső függőségeket a Windows rendszerű gépeken. Az illesztőprogram egyetlen pip telepítéssel mindent telepít, amire szüksége van, így az illesztőprogram legújabb verzióját használhatja az új szkriptekhez anélkül, hogy megszegi azokat a szkripteket, amelyek frissítésére és tesztelésére nincs ideje.
az mssql-python dokumentációja | mssql-python forráskód | Csomag (PyPI) | Uv
Előfeltételek
Python 3
Ha még nem rendelkezik Pythonnal, telepítse a Python futtatókörnyezetet és a pip csomagkezelőta python.org.
Nem szeretné használni a saját környezetét? Nyissa meg devcontainerként a GitHub Codespaces használatával.
Visual Studio Code a következő bővítményekkel:
Az Azure Command-Line Interface (CLI) a jelszó nélküli hitelesítéshez macOS és Linux rendszeren.
Ha még nem tette meg
uv, kövesse a telepítési utasításokat.Forrásadatbázis az SQL Serveren, az Azure SQL Database-en vagy az SQL Database-ben a Fabricben a
AdventureWorks2025mintasémával és egy érvényes kapcsolati sztringgel.Egy céladatbázis az SQL Serveren, az Azure SQL-adatbázison vagy a Fabricben lévő SQL-adatbázison, érvényes kapcsolati karakterlánccal. A felhasználónak engedéllyel kell rendelkeznie a táblák létrehozásához és írásához. Ha nincs második adatbázisa, módosíthatja a célkapcsolati sztringet úgy, hogy ugyanarra az adatbázisra mutasson, és használjon másik sémát a céltáblákhoz.
Egyszeri operációs rendszerspecifikus előfeltételek telepítése.
SQL-adatbázis létrehozása
Ehhez a rövid útmutatóhoz az AdventureWorks2025 egyszerűsített sémája szükséges forrásadatbázisként.
A projekt létrehozása és a kód futtatása
- Új projekt létrehozása
- Függőségek hozzáadása
- A Visual Studio Code indítása
- Pyproject.toml frissítése
- Main.py frissítése
- A kapcsolati sztringek mentése
- A szkript végrehajtása uv-futtatás használatával
Új projekt létrehozása
Nyisson meg egy parancssort a fejlesztői címtárban. Ha nem rendelkezik ilyennel, hozzon létre egy új , stb. nevű
pythonscriptskönyvtárat. Kerülje a mappákat a OneDrive-on, a szinkronizálás megzavarhatja a virtuális környezet kezelését.Hozzon létre egy új projektet a
uvsegítségével.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Függőségek hozzáadása
Ugyanabban a könyvtárban telepítse a mssql-python, python-dotenvés pyarrow a csomagokat.
uv add mssql-python python-dotenv pyarrow
Indítsa el a Visual Studio Code-ot
Ugyanabban a könyvtárban futtassa a következő parancsot.
code .
Pyproject.toml frissítése
A pyproject.toml a projekt metaadatait tartalmazza. Nyissa meg a fájlt a kedvenc szerkesztőjében.
Tekintse át a fájl tartalmát. Ennek hasonlónak kell lennie ehhez a példához. Jegyezze fel a Python-verziót és a függőséget egy minimális verzió meghatározásához
mssql-python>=. Ha pontos verziót szeretne, módosítsa a>=verziószám előtti értéket==. Az egyes csomagok feloldott verzióit ezután az uv.lock tárolja. A lockfile biztosítja, hogy a projekten dolgozó fejlesztők egységes csomagverziókat használjanak. Azt is biztosítja, hogy pontosan ugyanazt a csomagverziót használja a csomag végfelhasználóknak való terjesztésekor. Ne szerkessze auv.lockfájlt.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Módosítsa a leírást részletesebbre.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Mentse és zárja be a fájlt.
Main.py frissítése
Nyissa meg a nevű
main.pyfájlt. Ennek hasonlónak kell lennie ehhez a példához.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Cserélje le a következő kódblokkok tartalmát
main.py. Minden blokk az előzőre épül, ésmain.pysorrendben kell elhelyezni.Jótanács
Ha a Visual Studio Code-nak problémái vannak a csomagok megoldásával, frissítenie kell az értelmezőt a virtuális környezet használatára.
A felső részen
main.pyadja hozzá az importot és az állandókat. A szkript amssql_pythonaz adatbázis-kapcsolathoz, apyarrowéspyarrow.parquetaz oszlopos adatkezeléshez és a Parquet-fájl I/O-hoz, apython-dotenva.envfájlból való kapcsolati sztringek betöltéséhez, valamint egy lefordított regex minta, amely az SQL-azonosítók ellenőrzésére szolgál az injektálás megakadályozása érdekében."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameAz importálás alatt adja hozzá az SQL-to-Arrow típusú leképezést. Ez a szótár lefordítja az SQL Server-oszloptípusokat az Apache Arrow megfelelőire, így az adatok hűsége megmarad a Parquetbe való íráskor. A két segédfüggvény pontos SQL-típusú sztringeket hoz létre (például
NVARCHAR(100)vagyDECIMAL(18,2)) aINFORMATION_SCHEMAmetaadatokból, és feloldja az egyes oszlopokhoz tartozó megfelelő Arrow-típust._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAdja hozzá a sémabevezetési és DDL-létrehozási függvényeket.
_get_arrow_schemaparaméterezett lekérdezéseket futtatINFORMATION_SCHEMA.COLUMNS, Arrow sémát épít fel, és az eredeti SQL-típust mező metaadatként tárolja annak érdekében, hogy a céltábla pontos oszlopdefiníciókkal újra létrehozható legyen._create_table_ddlvisszaolvassa a metaadatokat a DDL létrehozásáhozDROP/CREATE TABLE. A(z)timestamp(rowversion) típus át van alakítvaVARBINARY(8)típusra, mivel automatikus módon generálódik, és közvetlenül nem szúrható be.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Adja hozzá a letöltési funkciót.
download_tablesorokat streamel egy forrástáblából kötegekbenfetchmany(), az egyes értékeket Arrow-kompatibilis Python-típussá alakítja, és fokozatosan egy helyi Parquet-fájlbaParquetWriterírja a rekordokat. Így a memória a tábla méretétől függetlenül korlátozott marad. A függvény két külön kurzort használ: az egyik az oszlop metaadatainak olvasására, a másik pedig az adatok streamelésére.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAdja hozzá a bővítőhorogot.
enrich_parquetegy helyőrző, ahol átalakításokat, származtatott oszlopokat vagy illesztéseket adhat hozzá az adatokhoz a feltöltés előtt. Ebben a gyorsindító útmutatóban ez egy művelet nélküli funkció, amely változatlanul adja vissza a fájl elérési útját.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAdja hozzá a feltöltési függvényt.
upload_parquetBeolvassa a Nyíl sémát a Parquet-fájlból, létrehozza és végrehajtjaDROP/CREATE TABLEa DDL-t a cél előkészítéséhez, majd kötegekben olvassa be a fájlt, és nagy teljesítményű tömeges beszúrást kér.cursor.bulkcopy()Atable_lock=Truebeállítás a zárolási versengés minimalizálásával javítja az átviteli sebességet. A feltöltés befejezése után a függvény futtat egySELECT COUNT(*)parancsot a sorok számának ellenőrzésére.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAdjon hozzá egy orchestration funkciót.
transfer_tablesösszekapcsolja a három fázist. Csatlakozik a forrásadatbázishoz, felderíti az adott sémaINFORMATION_SCHEMA.TABLESösszes alaptábláját, letölti mindegyiket egy helyi Parquet-fájlba, futtatja a bővítési horgot, majd csatlakozik a céladatbázishoz, és feltölti az egyes fájlokat.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Végül adja hozzá a
mainbelépési pontot. Betölti a.envfájlt, meghívjatransfer_tablesa forrás- és célkapcsolati sztringeket, és kinyomtatja az eltelt időt.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Mentés és bezárás
main.py.
A kapcsolati sztringek mentése
Nyissa meg a
.gitignorefájlt, és adjon hozzá kizárást a fájlokhoz.env. A fájlnak hasonlónak kell lennie ehhez a példához. Mindenképpen mentse és zárja be, ha elkészült.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envAz aktuális könyvtárban hozzon létre egy új fájlt.
.envA fájlban
.envadjon hozzá bejegyzéseket a forrás- és célkapcsolati karakterláncokhoz. Cserélje le a helyőrző értékeket a tényleges kiszolgáló- és adatbázisnevekre.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Jótanács
Az itt használt kapcsolati sztring nagyban függ attól, hogy milyen TÍPUSÚ SQL-adatbázishoz csatlakozik. Ha Azure SQL Database-hez vagy SQL-adatbázishoz csatlakozik a Fabricben, használja az ODBC kapcsolati sztringet a kapcsolati sztringek lapról. Előfordulhat, hogy a forgatókönyvtől függően módosítania kell a hitelesítési típust. A kapcsolati sztringekről és azok szintaxisáról további információt a kapcsolati sztring szintaxisának hivatkozásában talál.
Jótanács
macOS rendszeren mind a ActiveDirectoryInteractive, mind a ActiveDirectoryDefault használható a Microsoft Entra-hitelesítéshez.
ActiveDirectoryInteractive minden alkalommal, amikor futtatja a szkriptet, arra kéri, hogy jelentkezzen be. Az ismétlődő bejelentkezési kérések elkerülése érdekében jelentkezzen be egyszer az Azure CLI segítségével a az login parancs végrehajtásával, majd használja a ActiveDirectoryDefault, amely újrahasználja a gyorsítótárazott hitelesítő adatokat.
Használja az "uv run" parancsot a szkript futtatásához
A korábban megnyitott terminálablakban vagy egy új terminálablakban futtassa a következő parancsot.
uv run main.pyA szkript befejeződésekor a következő várható kimenet jelenik meg.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sCsatlakozzon a céladatbázishoz az SQL Server Management Studio (SSMS) vagy a VS Code MSSQL-bővítménye használatával, és ellenőrizze, hogy a táblák és adatok sikeresen létrejöttek-e.
Ha a szkriptet egy másik gépen szeretné üzembe helyezni, másolja az összes fájlt, kivéve a
.venvmappát a másik gépre. A virtuális környezet az első futtatáskor újra létre lesz hozva.
A kód működése
Az alkalmazás három fázisban hajtja végre a teljes oda-vissza adatátvitelt:
-
Letöltés: Csatlakozik a forrásadatbázishoz, beolvassa az oszlop metaadatait
INFORMATION_SCHEMA.COLUMNS, létrehoz egy Apache Arrow-sémát, majd letölti az egyes táblákat egy helyi Parquet-fájlba. -
Bővítés (nem kötelező): Olyan horogot (
enrich_parquet) biztosít, amelyben átalakításokat, származtatott oszlopokat vagy illesztéseket adhat hozzá feltöltés előtt. -
Feltöltés: Beolvassa az egyes Parquet-fájlokat kötegekben, újra létrehozza a táblát a céladatbázisban a Nyílséma metaadataiból létrehozott DDL használatával, majd nagy teljesítményű tömeges beszúráshoz használja
cursor.bulkcopy().
Következő lépés
Látogasson el az mssql-python illesztőprogram GitHub-adattárába további példákért, ötletek megosztásához, vagy problémák jelentéséhez.