Udostępnij za pośrednictwem


Szybki start: kopiowanie zbiorcze za pomocą sterownika mssql-python dla języka Python

W tym szybkim przewodniku użyjesz sterownika do zbiorczego kopiowania danych między bazami danych. Aplikacja pobiera tabele ze źródłowego schematu bazy danych do lokalnych plików Parquet przy użyciu narzędzia Apache Arrow, a następnie przekazuje je do docelowej bazy danych przy użyciu metody wysokiej wydajności bulkcopy . Za pomocą tego wzorca można migrować, replikować lub przekształcać dane między programem SQL Server, usługą Azure SQL Database i bazą danych SQL w usłudze Fabric.

Sterownik mssql-python nie wymaga żadnych zależności zewnętrznych na maszynach z systemem Windows. Sterownik instaluje wszystko, czego potrzebuje w ramach pojedynczej pip instalacji, co pozwala na użycie najnowszej wersji sterownika dla nowych skryptów bez przerywania innych skryptów, których nie masz czasu na uaktualnienie i przetestowanie.

Dokumentacja mssql-python Kod źródłowy mssql-python Pakiet (PyPI) uv

Wymagania wstępne

  • Python 3

    • Jeśli nie masz jeszcze języka Python, zainstaluj środowisko uruchomieniowe języka Python i menedżera pakietów z python.org.

    • Nie chcesz używać własnego środowiska? Otwórz jako devcontainer przy użyciu usługi GitHub Codespaces.

  • Visual Studio Code z następującymi rozszerzeniami:

    • Rozszerzenie języka Python dla programu Visual Studio Code
  • Interfejs azure Command-Line (CLI) na potrzeby uwierzytelniania bez hasła w systemach macOS i Linux.

  • Jeśli jeszcze nie masz uvprogramu , postępuj zgodnie z instrukcjami instalacji.

  • Źródłowa baza danych w programie SQL Server, usłudze Azure SQL Database lub SQL Database w środowisku Fabric z przykładowym schematem AdventureWorks2025 i prawidłowym ciągiem połączenia.

  • Docelowa baza danych na SQL Server, usługa Azure SQL Database lub baza danych SQL w Fabric z prawidłowymi parametrami połączenia. Użytkownik musi mieć uprawnienia do tworzenia i zapisywania w tabelach. Jeśli nie masz drugiej bazy danych, możesz zmienić docelowe parametry połączenia tak, aby wskazywały tę samą bazę danych i użyć innego schematu dla tabel docelowych.

  • Zainstaluj jednorazowe wymagania wstępne dotyczące systemu operacyjnego.

    apk add libtool krb5-libs krb5-dev
    

Tworzenie bazy danych SQL

Ten przewodnik szybkiego uruchamiania wymaga schematu AdventureWorks2025 Lightweight jako bazy danych źródłowej.

Tworzenie projektu i uruchamianie kodu

  1. Tworzenie nowego projektu
  2. Dodawanie zależności
  3. Uruchamianie programu Visual Studio Code
  4. Aktualizowanie pliku pyproject.toml
  5. Aktualizowanie main.py
  6. Zapisywanie parametrów połączenia
  7. Użyj narzędzia uv run, aby wykonać skrypt

Tworzenie nowego projektu

  1. Otwórz wiersz polecenia w katalogu deweloperów. Jeśli go nie masz, utwórz nowy katalog o nazwie python, scriptsitp. Unikaj folderów w usłudze OneDrive, synchronizacja może zakłócać zarządzanie środowiskiem wirtualnym.

  2. Utwórz nowy projekt za pomocą polecenia uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Dodawanie zależności

W tym samym katalogu zainstaluj pakiety mssql-python, python-dotenv i pyarrow.

uv add mssql-python python-dotenv pyarrow

Uruchom program Visual Studio Code.

W tym samym katalogu uruchom następujące polecenie.

code .

Aktualizowanie pliku pyproject.toml

  1. Plik pyproject.toml zawiera metadane projektu. Otwórz plik w ulubionym edytorze.

  2. Przejrzyj zawartość pliku. Powinien być podobny do tego przykładu. Zanotuj wersję Pythona i zależność dla mssql-python, które używają >= do określenia minimalnej wersji. Jeśli wolisz dokładną wersję, zmień >= wartość przed numerem wersji na ==. Rozwiązane wersje każdego pakietu są następnie przechowywane w uv.lock. Plik lockfile zapewnia, że deweloperzy pracujący nad projektem korzystają ze spójnych wersji pakietów. Gwarantuje również, że podczas dystrybucji pakietu dla użytkowników końcowych jest używany dokładnie ten sam zestaw wersji pakietów. Nie należy edytować uv.lock pliku.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Zaktualizuj opis, aby był bardziej opisowy.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Zapisz i zamknij plik.

Aktualizowanie main.py

  1. Otwórz plik o nazwie main.py. Powinien być podobny do tego przykładu.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Zastąp zawartość main.py następującymi blokami kodu. Każdy blok opiera się na poprzednim i powinien zostać umieszczony w main.py w odpowiedniej kolejności.

    Wskazówka

    Jeśli program Visual Studio Code ma problemy z rozwiązaniem problemów z pakietami, należy zaktualizować interpreter, aby używał środowiska wirtualnego.

  3. W górnej części elementu main.py dodaj importy i stałe. Skrypt używa mssql_python do łączności z bazą danych, pyarrow i pyarrow.parquet do obsługi danych kolumnowych i operacji we/wy plików Parquet, python-dotenv do ładowania parametrów połączenia z pliku .env, oraz skompilowanego wzorca regex, który weryfikuje identyfikatory SQL, aby zapobiec iniekcji.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Poniżej importów dodaj mapowanie typu SQL-to-Arrow. W tym słowniku typy kolumn programu SQL Server są tłumaczone na ich odpowiedniki apache Arrow, aby podczas pisania tekstu w pliku Parquet zachowano wierność danych. Dwie funkcje pomocnicze tworzą dokładne ciągi typu SQL (na przykład NVARCHAR(100) lub DECIMAL(18,2)) z INFORMATION_SCHEMA metadanych i rozpoznają pasujący typ strzałki dla każdej kolumny.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Dodaj funkcje introspekcji schematu i generowania DDL. _get_arrow_schema zapytania INFORMATION_SCHEMA.COLUMNS korzystające z zapytań sparametryzowanych, tworzy schemat strzałki i przechowuje oryginalny typ SQL jako metadane pola, dzięki czemu tabela docelowa może zostać ponownie utworzona z dokładnymi definicjami kolumn. _create_table_ddl odczytuje te metadane z powrotem w celu wygenerowania DROP/CREATE TABLE DDL. Typ timestamp (rowversion) jest mapowany ponownie na VARBINARY(8), ponieważ jest generowany automatycznie i nie można go bezpośrednio wstawiać.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Dodaj funkcję pobierania. download_table przesyła strumieniowo wiersze ze źródłowej tabeli w partiach za pomocą fetchmany(), konwertuje każdą wartość na typ języka Python zgodny z Apache Arrow i zapisuje partie rekordów przyrostowo do lokalnego pliku Parquet za pomocą ParquetWriter. Dzięki temu pamięć jest ograniczona niezależnie od rozmiaru tabeli. Funkcja używa dwóch oddzielnych kursorów: jeden do odczytywania metadanych kolumny, a drugi do strumieniowego przesyłania danych.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Dodaj hak wzbogacający. enrich_parquet jest symbolem zastępczym, za pomocą którego można dodawać przekształcenia, kolumny pochodne lub łączenia do danych przed ich przekazaniem. W tym przewodniku 'quickstart' jest to operacja, która nic nie robi i zwraca ścieżkę pliku bez zmian.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Dodaj funkcję upload. upload_parquet Odczytuje schemat Arrow z pliku Parquet, generuje i wykonuje DROP/CREATE TABLE DDL, aby przygotować miejsce docelowe, a następnie odczytuje plik w partiach i wywołuje cursor.bulkcopy() wysokowydajne operacje wstawiania zbiorczego. Opcja table_lock=True zwiększa przepływność, minimalizując rywalizację o blokadę. Po zakończeniu przesyłania funkcja uruchamia SELECT COUNT(*), aby sprawdzić, czy liczba wierszy jest zgodna.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Dodaj funkcję aranżacji. transfer_tables łączy ze sobą trzy fazy. Łączy się ze źródłową bazą danych, odnajduje wszystkie tabele podstawowe w danym schemacie za pośrednictwem INFORMATION_SCHEMA.TABLES, pobiera każdą z nich do lokalnego pliku Parquet, uruchamia mechanizm wzbogacania, a następnie łączy się z docelową bazą danych i przesyła każdy plik.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Na koniec dodaj main punkt wejścia. Ładuje .env plik, wywołuje transfer_tables ze źródłowymi i docelowymi parametrami połączenia i wyświetla całkowity czas, który upłynął.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Zapisz i zamknij plik main.py.

Zapisywanie parametrów połączenia

  1. .gitignore Otwórz plik i dodaj wykluczenie dla .env plików. Plik powinien być podobny do tego przykładu. Pamiętaj, aby zapisać i zamknąć go po zakończeniu.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. W bieżącym katalogu utwórz nowy plik o nazwie .env.

  3. W pliku .env dodaj wpisy dla ciągów połączenia źródłowego i docelowego. Zastąp wartości symboli zastępczych rzeczywistymi nazwami serwera i bazy danych.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Wskazówka

    Parametry połączenia używane w tym miejscu w dużej mierze zależą od typu bazy danych SQL, z którą nawiązujesz połączenie. Jeśli nawiązujesz połączenie z usługą Azure SQL Database lub bazą danych SQL w sieci szkieletowej, użyj parametrów połączenia ODBC z karty parametry połączenia. W zależności od scenariusza może być konieczne dostosowanie typu uwierzytelniania. Aby uzyskać więcej informacji na temat parametrów połączenia i ich składni, zobacz dokumentację składni parametrów połączenia.

Wskazówka

Zarówno ActiveDirectoryInteractive, jak i ActiveDirectoryDefault działają na systemie macOS do uwierzytelniania Microsoft Entra. ActiveDirectoryInteractive monituje o zalogowanie się przy każdym uruchomieniu skryptu. Aby uniknąć powtarzających się monitów logowania, zaloguj się raz za pośrednictwem Azure CLI, uruchamiając az login, a następnie użyj ActiveDirectoryDefault, który wykorzystuje buforowane poświadczenie.

Użyj narzędzia uv run, aby wykonać skrypt

  1. W wcześniejszym oknie terminalu lub w nowym oknie terminalu, które jest otwarte w tym samym katalogu, wykonaj następujące polecenie.

     uv run main.py
    

    Oto oczekiwane dane wyjściowe po zakończeniu działania skryptu.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Połącz się z docelową bazą danych przy użyciu programu SQL Server Management Studio (SSMS) lub rozszerzenia MSSQL dla programu VS Code i sprawdź, czy tabele i dane zostały utworzone pomyślnie.

  3. Aby wdrożyć skrypt na innej maszynie, skopiuj wszystkie pliki z wyjątkiem .venv folderu na inną maszynę. Środowisko wirtualne jest odtwarzane podczas pierwszego uruchomienia.

Jak działa kod

Aplikacja wykonuje pełny transfer danych w obie strony w trzech fazach:

  1. Pobieranie: łączy się ze źródłową bazą danych, odczytuje metadane kolumn z INFORMATION_SCHEMA.COLUMNS, tworzy schemat Apache Arrow, a następnie pobiera każdą tabelę do lokalnego pliku Parquet.
  2. Wzbogacanie (opcjonalnie): zapewnia punkt zaczepienia (enrich_parquet), w którym można dodawać przekształcenia, kolumny pochodne lub sprzężenia przed przekazaniem.
  3. Przesyłanie: odczytuje każdy plik Parquet w partiach, ponownie tworzy tabelę w docelowej bazie danych przy użyciu języka DDL wygenerowanego na podstawie metadanych schematu Arrow, a następnie wykorzystuje cursor.bulkcopy() do wydajnego zbiorczego wstawiania.

Następne kroki

Odwiedź repozytorium GitHub sterowników mssql-python, aby uzyskać więcej przykładów, wnieść pomysły lub zgłaszać problemy.