Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym szybkim przewodniku użyjesz sterownika do zbiorczego kopiowania danych między bazami danych. Aplikacja pobiera tabele ze źródłowego schematu bazy danych do lokalnych plików Parquet przy użyciu narzędzia Apache Arrow, a następnie przekazuje je do docelowej bazy danych przy użyciu metody wysokiej wydajności bulkcopy . Za pomocą tego wzorca można migrować, replikować lub przekształcać dane między programem SQL Server, usługą Azure SQL Database i bazą danych SQL w usłudze Fabric.
Sterownik mssql-python nie wymaga żadnych zależności zewnętrznych na maszynach z systemem Windows. Sterownik instaluje wszystko, czego potrzebuje w ramach pojedynczej pip instalacji, co pozwala na użycie najnowszej wersji sterownika dla nowych skryptów bez przerywania innych skryptów, których nie masz czasu na uaktualnienie i przetestowanie.
Dokumentacja mssql-python
Wymagania wstępne
Python 3
Jeśli nie masz jeszcze języka Python, zainstaluj środowisko uruchomieniowe języka Python i menedżera pakietów z python.org.
Nie chcesz używać własnego środowiska? Otwórz jako devcontainer przy użyciu usługi GitHub Codespaces.
Visual Studio Code z następującymi rozszerzeniami:
- Rozszerzenie języka Python dla programu Visual Studio Code
Interfejs azure Command-Line (CLI) na potrzeby uwierzytelniania bez hasła w systemach macOS i Linux.
Jeśli jeszcze nie masz
uvprogramu , postępuj zgodnie z instrukcjami instalacji.Źródłowa baza danych w programie SQL Server, usłudze Azure SQL Database lub SQL Database w środowisku Fabric z przykładowym schematem
AdventureWorks2025i prawidłowym ciągiem połączenia.Docelowa baza danych na SQL Server, usługa Azure SQL Database lub baza danych SQL w Fabric z prawidłowymi parametrami połączenia. Użytkownik musi mieć uprawnienia do tworzenia i zapisywania w tabelach. Jeśli nie masz drugiej bazy danych, możesz zmienić docelowe parametry połączenia tak, aby wskazywały tę samą bazę danych i użyć innego schematu dla tabel docelowych.
Zainstaluj jednorazowe wymagania wstępne dotyczące systemu operacyjnego.
Tworzenie bazy danych SQL
Ten przewodnik szybkiego uruchamiania wymaga schematu AdventureWorks2025 Lightweight jako bazy danych źródłowej.
Tworzenie projektu i uruchamianie kodu
- Tworzenie nowego projektu
- Dodawanie zależności
- Uruchamianie programu Visual Studio Code
- Aktualizowanie pliku pyproject.toml
- Aktualizowanie main.py
- Zapisywanie parametrów połączenia
- Użyj narzędzia uv run, aby wykonać skrypt
Tworzenie nowego projektu
Otwórz wiersz polecenia w katalogu deweloperów. Jeśli go nie masz, utwórz nowy katalog o nazwie
python,scriptsitp. Unikaj folderów w usłudze OneDrive, synchronizacja może zakłócać zarządzanie środowiskiem wirtualnym.Utwórz nowy projekt za pomocą polecenia
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Dodawanie zależności
W tym samym katalogu zainstaluj pakiety mssql-python, python-dotenv i pyarrow.
uv add mssql-python python-dotenv pyarrow
Uruchom program Visual Studio Code.
W tym samym katalogu uruchom następujące polecenie.
code .
Aktualizowanie pliku pyproject.toml
Plik pyproject.toml zawiera metadane projektu. Otwórz plik w ulubionym edytorze.
Przejrzyj zawartość pliku. Powinien być podobny do tego przykładu. Zanotuj wersję Pythona i zależność dla
mssql-python, które używają>=do określenia minimalnej wersji. Jeśli wolisz dokładną wersję, zmień>=wartość przed numerem wersji na==. Rozwiązane wersje każdego pakietu są następnie przechowywane w uv.lock. Plik lockfile zapewnia, że deweloperzy pracujący nad projektem korzystają ze spójnych wersji pakietów. Gwarantuje również, że podczas dystrybucji pakietu dla użytkowników końcowych jest używany dokładnie ten sam zestaw wersji pakietów. Nie należy edytowaćuv.lockpliku.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Zaktualizuj opis, aby był bardziej opisowy.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Zapisz i zamknij plik.
Aktualizowanie main.py
Otwórz plik o nazwie
main.py. Powinien być podobny do tego przykładu.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Zastąp zawartość
main.pynastępującymi blokami kodu. Każdy blok opiera się na poprzednim i powinien zostać umieszczony wmain.pyw odpowiedniej kolejności.Wskazówka
Jeśli program Visual Studio Code ma problemy z rozwiązaniem problemów z pakietami, należy zaktualizować interpreter, aby używał środowiska wirtualnego.
W górnej części elementu
main.pydodaj importy i stałe. Skrypt używamssql_pythondo łączności z bazą danych,pyarrowipyarrow.parquetdo obsługi danych kolumnowych i operacji we/wy plików Parquet,python-dotenvdo ładowania parametrów połączenia z pliku.env, oraz skompilowanego wzorca regex, który weryfikuje identyfikatory SQL, aby zapobiec iniekcji."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return namePoniżej importów dodaj mapowanie typu SQL-to-Arrow. W tym słowniku typy kolumn programu SQL Server są tłumaczone na ich odpowiedniki apache Arrow, aby podczas pisania tekstu w pliku Parquet zachowano wierność danych. Dwie funkcje pomocnicze tworzą dokładne ciągi typu SQL (na przykład
NVARCHAR(100)lubDECIMAL(18,2)) zINFORMATION_SCHEMAmetadanych i rozpoznają pasujący typ strzałki dla każdej kolumny._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vDodaj funkcje introspekcji schematu i generowania DDL.
_get_arrow_schemazapytaniaINFORMATION_SCHEMA.COLUMNSkorzystające z zapytań sparametryzowanych, tworzy schemat strzałki i przechowuje oryginalny typ SQL jako metadane pola, dzięki czemu tabela docelowa może zostać ponownie utworzona z dokładnymi definicjami kolumn._create_table_ddlodczytuje te metadane z powrotem w celu wygenerowaniaDROP/CREATE TABLEDDL. Typtimestamp(rowversion) jest mapowany ponownie naVARBINARY(8), ponieważ jest generowany automatycznie i nie można go bezpośrednio wstawiać.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Dodaj funkcję pobierania.
download_tableprzesyła strumieniowo wiersze ze źródłowej tabeli w partiach za pomocąfetchmany(), konwertuje każdą wartość na typ języka Python zgodny z Apache Arrow i zapisuje partie rekordów przyrostowo do lokalnego pliku Parquet za pomocąParquetWriter. Dzięki temu pamięć jest ograniczona niezależnie od rozmiaru tabeli. Funkcja używa dwóch oddzielnych kursorów: jeden do odczytywania metadanych kolumny, a drugi do strumieniowego przesyłania danych.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countDodaj hak wzbogacający.
enrich_parquetjest symbolem zastępczym, za pomocą którego można dodawać przekształcenia, kolumny pochodne lub łączenia do danych przed ich przekazaniem. W tym przewodniku 'quickstart' jest to operacja, która nic nie robi i zwraca ścieżkę pliku bez zmian.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileDodaj funkcję upload.
upload_parquetOdczytuje schemat Arrow z pliku Parquet, generuje i wykonujeDROP/CREATE TABLEDDL, aby przygotować miejsce docelowe, a następnie odczytuje plik w partiach i wywołujecursor.bulkcopy()wysokowydajne operacje wstawiania zbiorczego. Opcjatable_lock=Truezwiększa przepływność, minimalizując rywalizację o blokadę. Po zakończeniu przesyłania funkcja uruchamiaSELECT COUNT(*), aby sprawdzić, czy liczba wierszy jest zgodna.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedDodaj funkcję aranżacji.
transfer_tablesłączy ze sobą trzy fazy. Łączy się ze źródłową bazą danych, odnajduje wszystkie tabele podstawowe w danym schemacie za pośrednictwemINFORMATION_SCHEMA.TABLES, pobiera każdą z nich do lokalnego pliku Parquet, uruchamia mechanizm wzbogacania, a następnie łączy się z docelową bazą danych i przesyła każdy plik.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Na koniec dodaj
mainpunkt wejścia. Ładuje.envplik, wywołujetransfer_tablesze źródłowymi i docelowymi parametrami połączenia i wyświetla całkowity czas, który upłynął.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Zapisz i zamknij plik
main.py.
Zapisywanie parametrów połączenia
.gitignoreOtwórz plik i dodaj wykluczenie dla.envplików. Plik powinien być podobny do tego przykładu. Pamiętaj, aby zapisać i zamknąć go po zakończeniu.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envW bieżącym katalogu utwórz nowy plik o nazwie
.env.W pliku
.envdodaj wpisy dla ciągów połączenia źródłowego i docelowego. Zastąp wartości symboli zastępczych rzeczywistymi nazwami serwera i bazy danych.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Wskazówka
Parametry połączenia używane w tym miejscu w dużej mierze zależą od typu bazy danych SQL, z którą nawiązujesz połączenie. Jeśli nawiązujesz połączenie z usługą Azure SQL Database lub bazą danych SQL w sieci szkieletowej, użyj parametrów połączenia ODBC z karty parametry połączenia. W zależności od scenariusza może być konieczne dostosowanie typu uwierzytelniania. Aby uzyskać więcej informacji na temat parametrów połączenia i ich składni, zobacz dokumentację składni parametrów połączenia.
Wskazówka
Zarówno ActiveDirectoryInteractive, jak i ActiveDirectoryDefault działają na systemie macOS do uwierzytelniania Microsoft Entra.
ActiveDirectoryInteractive monituje o zalogowanie się przy każdym uruchomieniu skryptu. Aby uniknąć powtarzających się monitów logowania, zaloguj się raz za pośrednictwem Azure CLI, uruchamiając az login, a następnie użyj ActiveDirectoryDefault, który wykorzystuje buforowane poświadczenie.
Użyj narzędzia uv run, aby wykonać skrypt
W wcześniejszym oknie terminalu lub w nowym oknie terminalu, które jest otwarte w tym samym katalogu, wykonaj następujące polecenie.
uv run main.pyOto oczekiwane dane wyjściowe po zakończeniu działania skryptu.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sPołącz się z docelową bazą danych przy użyciu programu SQL Server Management Studio (SSMS) lub rozszerzenia MSSQL dla programu VS Code i sprawdź, czy tabele i dane zostały utworzone pomyślnie.
Aby wdrożyć skrypt na innej maszynie, skopiuj wszystkie pliki z wyjątkiem
.venvfolderu na inną maszynę. Środowisko wirtualne jest odtwarzane podczas pierwszego uruchomienia.
Jak działa kod
Aplikacja wykonuje pełny transfer danych w obie strony w trzech fazach:
-
Pobieranie: łączy się ze źródłową bazą danych, odczytuje metadane kolumn z
INFORMATION_SCHEMA.COLUMNS, tworzy schemat Apache Arrow, a następnie pobiera każdą tabelę do lokalnego pliku Parquet. -
Wzbogacanie (opcjonalnie): zapewnia punkt zaczepienia (
enrich_parquet), w którym można dodawać przekształcenia, kolumny pochodne lub sprzężenia przed przekazaniem. -
Przesyłanie: odczytuje każdy plik Parquet w partiach, ponownie tworzy tabelę w docelowej bazie danych przy użyciu języka DDL wygenerowanego na podstawie metadanych schematu Arrow, a następnie wykorzystuje
cursor.bulkcopy()do wydajnego zbiorczego wstawiania.
Następne kroki
Odwiedź repozytorium GitHub sterowników mssql-python, aby uzyskać więcej przykładów, wnieść pomysły lub zgłaszać problemy.