Aracılığıyla paylaş


Hızlı Başlangıç: Python için mssql-python sürücüsüyle toplu kopyalama

Bu hızlı başlangıç kılavuzunda, veritabanları arasında verileri toplu şekilde kopyalamak için mssql-python sürücüsünü kullanacaksınız. Uygulama, Apache Arrow kullanarak bir kaynak veritabanı şemasından yerel Parquet dosyalarına tablo indirir, ardından yüksek performanslı bulkcopy yöntemi kullanarak bunları hedef veritabanına yükler. Sql Server, Azure SQL Veritabanı ve Doku'daki SQL veritabanı arasında verileri geçirmek, çoğaltmak veya dönüştürmek için bu düzeni kullanabilirsiniz.

Sürücü, mssql-python Windows makinelerinde dış bağımlılık gerektirmez. Sürücü, ihtiyaç duyduğu her şeyi tek pip bir yüklemeyle yükler ve yükseltmeye ve test etmeye vaktiniz olmayan diğer betikleri bozmadan yeni betikler için sürücünün en son sürümünü kullanmanıza olanak sağlar.

mssql-python belgeleri | mssql-python kaynak kodu | Paket (PyPI) | Uv

Önkoşullar

  • Python 3

    • Python'larınız yoksa, python.orgPython çalışma zamanını ve pip paket yöneticisini yükleyin.

    • Kendi ortamınızı kullanmak istemiyor musunuz? GitHub Codespaces kullanarak devcontainer olarak açın.

  • Visual Studio Code aşağıdaki uzantılarla:

  • macOS ve Linux'ta parolasız kimlik doğrulaması için Azure Command-Line Arabirimi (CLI).

  • henüz sahip uvdeğilseniz yükleme yönergelerini izleyin.

  • AdventureWorks2025 örnek şema ve geçerli bir bağlantı dizesiyle SQL Server, Azure SQL Veritabanı veya Fabric'teki SQL veritabanında bulunan bir kaynak veritabanı.

  • Geçerli bir bağlantı dizesine sahip olan SQL Server, Azure SQL Veritabanı veya Fabric içerisindeki SQL veritabanı üzerindeki hedef veritabanı. Kullanıcının tablo oluşturma ve tablolara yazma izni olmalıdır. İkinci bir veritabanınız yoksa, hedef bağlantı dizesini aynı veritabanına işaret etmek üzere değiştirebilir ve hedef tablolar için farklı bir şema kullanabilirsiniz.

  • Tek seferlik işletim sistemine özgü önkoşulları yükleyin.

    apk add libtool krb5-libs krb5-dev
    

SQL veritabanı oluşturma

Bu hızlı başlangıçta kaynak veritabanı olarak AdventureWorks2025 Lightweight şeması gerekir.

Projeyi oluşturma ve kodu çalıştırma

  1. Yeni proje oluşturma
  2. Bağımlılık ekleme
  3. Visual Studio Code'ı başlatma
  4. pyproject.toml güncelleştirme
  5. Main.py'yi Güncelle
  6. Bağlantı dizelerini kaydetme
  7. Betik dosyasını çalıştırmak için uv run kullanın

Yeni proje oluşturma

  1. Geliştirme dizininizde bir komut istemi açın. Bir tane yoksa, python, scripts gibi bir adla yeni bir dizin oluşturun. OneDrive'daki klasörleri kullanmaktan kaçının, çünkü eşitleme sanal ortamınızı yönetmenize engel olabilir.

  2. ile yeni bir uv oluşturun.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Bağımlılık ekleme

Aynı dizinde , mssql-pythonve python-dotenv paketlerini yükleyinpyarrow.

uv add mssql-python python-dotenv pyarrow

Visual Studio Code'ı başlatma

Aynı dizinde aşağıdaki komutu çalıştırın.

code .

pyproject.toml güncelle

  1. pyproject.toml, projenizin meta verilerini içerir. Dosyayı sık kullandığınız düzenleyicide açın.

  2. Dosyanın içeriğini gözden geçirin. Bu örneğe benzer olmalıdır. Python sürümüne ve bağımlılığına dikkat edin; en düşük sürümü tanımlamak için mssql-python, >= kullanır. Tam sürümü tercih ediyorsanız, sürüm numarasının önündeki >='yi == ile değiştirin. Her paketin çözümlenen sürümleri daha sonra uv.lock dosyasında depolanır. Lockfile, proje üzerinde çalışan geliştiricilerin tutarlı paket sürümlerini kullanmasını sağlar. Ayrıca, paketinizi son kullanıcılara dağıtırken tam olarak aynı paket sürümleri kümesinin kullanılmasını sağlar. Dosyayı düzenlememelisiniz uv.lock .

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Açıklamayı daha açıklayıcı olacak şekilde güncelleştirin.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Dosyayı kaydedin ve kapatın.

Güncelle main.py

  1. adlı main.pydosyayı açın. Bu örneğe benzer olmalıdır.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. öğesinin main.py içeriğini aşağıdaki kod bloklarıyla değiştirin. Her blok bir öncekini oluşturur ve sırayla yerleştirilmelidir main.py .

    Tavsiye

    Visual Studio Code paketleri çözümleme konusunda sorun yaşıyorsa, yorumlayıcıyı sanal ortamı kullanacak şekilde güncelleştirmeniz gerekir.

  3. main.py dosyasının en üstüne içeri aktarmaları ve sabitleri ekleyin. Betik, veritabanı bağlantısı için mssql_python, sütunlu veri işleme ve Parquet dosyası G/Ç için pyarrow ve pyarrow.parquet, bir python-dotenv dosyasından bağlantı dizelerini yüklemek için .env, ve enjeksiyonu önlemek amacıyla SQL tanımlayıcılarını doğrulayan derlenmiş bir regex deseni kullanır.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. İçeri aktarmaların altına SQL-Arrow türü eşlemesini ekleyin. Bu sözlük, Sql Server sütun türlerini Apache Arrow eşdeğerlerine çevirir, böylece Parquet'e yazarken veri uygunluğu korunur. İki yardımcı işlev meta verilerden NVARCHAR(100) tam SQL türü dizeleri (örneğin, DECIMAL(18,2) veya INFORMATION_SCHEMA) oluşturur ve her sütun için eşleşen Ok türünü çözümler.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Şema introspection ve DDL oluşturma işlevlerini ekleyin. _get_arrow_schema parametreli sorgularla çalışır INFORMATION_SCHEMA.COLUMNS , bir Arrow şeması oluşturur ve hedef tablonun tam sütun tanımlarıyla yeniden oluşturulabilmesi için özgün SQL türünü alan meta verisi olarak depolar. _create_table_ddl DDL oluşturmak DROP/CREATE TABLE için bu meta verileri yeniden okur. timestamp (rowversion) türü, otomatik olarak oluşturulduğu ve doğrudan eklenemediği için VARBINARY(8)'a yeniden eşlenir.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. İndirme işlevini ekleyin. download_table kullanarak, fetchmany() bir kaynak tablodaki satırları toplu olarak akışla aktarır, her değeri Arrow uyumlu bir Python türüne dönüştürür ve ParquetWriter kullanarak kayıt toplu işlemlerini artımlı olarak yerel bir Parquet dosyasına yazar. Bu, tablo boyutundan bağımsız olarak belleği sınırlanmış durumda tutar. İşlev iki ayrı imleç kullanır: biri sütun meta verilerini okumak için, diğeri de verileri akışa almak için.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Zenginleştirme kancasını ekleyin. enrich_parquet, verilere karşıya yüklenmeden önce dönüştürmeler, türetilmiş sütunlar veya birleştirmeler ekleyebileceğiniz bir yer tutucudur. Bu hızlı başlangıçta, dosya yolunu değiştirmeden döndüren bir no-op.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Yükleme fonksiyonunu ekleyin. upload_parquet Parquet dosyasından Arrow şemasını okur, sonra hedefi hazırlamak için DROP/CREATE TABLE DDL oluşturur ve yürütür, ardından dosyayı parça parça okuyup yüksek performanslı toplu ekleme için cursor.bulkcopy() çağrısında bulunur. bu table_lock=True seçenek, kilit çekişmesi en aza indirilerek aktarım hızını artırır. Karşıya yükleme tamamlandıktan sonra işlev, satır sayısı eşleşmelerini doğrulamak için bir SELECT COUNT(*) çalıştırır.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Düzenleme işlevini ekleyin. transfer_tables üç aşamayı birbirine bağlar. Kaynak veritabanına bağlanır, aracılığıyla INFORMATION_SCHEMA.TABLESverilen şemadaki tüm temel tabloları bulur, her birini yerel bir Parquet dosyasına indirir, zenginleştirme kancasını çalıştırır, ardından hedef veritabanına bağlanır ve her dosyayı karşıya yükler.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Son olarak, giriş noktasını ekleyin main . Dosyayı yükler .env , kaynak ve hedef bağlantı dizeleriyle çağırır transfer_tables ve geçen toplam süreyi yazdırır.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. kaydedin ve kapatın main.py.

Bağlantı dizelerini kaydetme

  1. .gitignore dosyasını açın ve .env dosyaları için bir dışlama ekleyin. Dosyanız bu örneğe benzer olmalıdır. İşiniz bittiğinde kaydetmeyi ve kapatmayı unutmayın.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Geçerli dizinde adlı .envyeni bir dosya oluşturun.

  3. dosya içinde .env , kaynak ve hedef bağlantı dizeleriniz için girdiler ekleyin. Yer tutucu değerlerini gerçek sunucu ve veritabanı adlarınızla değiştirin.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Tavsiye

    Burada kullanılan bağlantı dizesi büyük ölçüde bağlandığınız SQL veritabanının türüne bağlıdır. Eğer bir Azure SQL Veritabanına veya Fabric'teki SQL veritabanına bağlanıyorsanız, bağlantı dizeleri sekmesindeki ODBC bağlantı dizesini kullanın. Senaryonuza bağlı olarak kimlik doğrulama türünü ayarlamanız gerekebilir. Bağlantı dizeleri ve söz dizimi hakkında daha fazla bilgi için bkz. bağlantı dizesi söz dizimi başvurusu.

Tavsiye

macOS'ta hem ActiveDirectoryInteractive hem de ActiveDirectoryDefault Microsoft Entra kimlik doğrulaması için çalışır. ActiveDirectoryInteractive betiği her çalıştırdığınızda oturum açmanızı ister. Yinelenen oturum açma istemlerini önlemek için Azure CLI aracılığıyla az login komutunu çalıştırarak bir kez oturum açın, ardından önbellekteki kimlik bilgilerini yeniden kullanan ActiveDirectoryDefault kullanın.

Betiği yürütmek için uv çalıştır komutunu kullan

  1. Önceki terminal penceresinde veya aynı dizine açılan yeni bir terminal penceresinde aşağıdaki komutu çalıştırın.

     uv run main.py
    

    İşte betik tamamlandığında beklenen çıktı şöyledir.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. SQL Server Management Studio (SSMS) veya VS Code için MSSQL uzantısını kullanarak hedef veritabanına bağlanın ve tabloların ve verilerin başarıyla oluşturulduğunu doğrulayın.

  3. Betiğinizi başka bir makineye dağıtmak için .venv klasör dışındaki tüm dosyaları diğer makineye kopyalayın. Sanal ortam ilk çalıştırmayla yeniden oluşturulur.

Kod nasıl çalışır?

Uygulama üç aşamada tam gidiş dönüş veri aktarımı gerçekleştirir:

  1. İndir: Kaynak veritabanına bağlanır, öğesinden INFORMATION_SCHEMA.COLUMNSsütun meta verilerini okur, bir Apache Arrow şeması oluşturur, ardından her tabloyu yerel bir Parquet dosyasına indirir.
  2. Zenginleştir (isteğe bağlı): Karşıya yüklemeden önce dönüştürmeler, türetilmiş sütunlar veya birleştirmeler ekleyebileceğiniz bir kanca (enrich_parquet) sağlar.
  3. Karşıya yükleme: Her Parquet dosyasını toplu olarak okur, Ok şeması meta verilerinden oluşturulan DDL'yi kullanarak hedef veritabanında tabloyu yeniden oluşturur ve ardından yüksek performanslı toplu ekleme için kullanır cursor.bulkcopy() .

Sonraki adım

Fikirlere mssql-python katkıda bulunmak veya sorunları bildirmek için daha fazla örnek için sürücü GitHub deposunu ziyaret edin.