Bagikan melalui


Mulai cepat: Menyalin secara massal dengan driver mssql-python untuk Python

Dalam panduan cepat ini, Anda menggunakan driver mssql-python untuk menyalin data dalam jumlah besar antar database. Aplikasi mengunduh tabel dari skema database sumber ke file Parquet lokal menggunakan Apache Arrow, lalu mengunggahnya ke database tujuan menggunakan metode berkinerja bulkcopy tinggi. Anda dapat menggunakan pola ini untuk memigrasikan, mereplikasi, atau mengubah data antara SQL Server, Azure SQL Database, dan database SQL di Fabric.

Driver mssql-python tidak memerlukan dependensi eksternal apa pun pada komputer Windows. Driver menginstal semua yang dibutuhkan dengan satu pip instalasi, memungkinkan Anda menggunakan versi terbaru driver untuk skrip baru tanpa merusak skrip lain yang belum sempat Anda tingkatkan dan uji.

dokumentasi mssql-pythonkode sumber mssql-pythonPaket (PyPI)uv

Prasyarat

  • Python 3

    • Jika Anda belum memiliki Python, instal runtime Python dan manajer paket pip dari python.org.

    • Tidak ingin menggunakan lingkungan Anda sendiri? Buka sebagai devcontainer menggunakan GitHub Codespaces.

  • Visual Studio Code dengan ekstensi berikut:

  • Azure Command-Line Interface (CLI) untuk autentikasi tanpa kata sandi di macOS dan Linux.

  • Jika Anda belum memiliki uv, ikuti instruksi penginstalan.

  • Database sumber di SQL Server, Azure SQL Database, atau database SQL di Fabric dengan AdventureWorks2025 skema sampel dan string koneksi yang valid.

  • Database tujuan di SQL Server, Azure SQL Database, atau database SQL di Fabric dengan string koneksi yang valid. Pengguna harus memiliki izin untuk membuat dan menulis ke tabel. Jika Anda tidak memiliki database kedua, Anda dapat mengubah string koneksi tujuan untuk menunjuk ke database yang sama dan menggunakan skema yang berbeda untuk tabel tujuan.

  • Instal prasyarat khusus sistem operasi satu kali.

    apk add libtool krb5-libs krb5-dev
    

Membuat basis data SQL

Mulai cepat ini memerlukan skema AdventureWorks2025 Lightweight sebagai database sumber.

Membuat proyek dan menjalankan kode

  1. Membuat proyek baru
  2. Menambahkan dependensi
  3. Luncurkan Visual Studio Code
  4. Memperbarui pyproject.toml
  5. Memperbarui main.py
  6. Menyimpan string koneksi
  7. Menggunakan uv run untuk menjalankan skrip

Membuat proyek baru

  1. Buka jendela perintah di direktori pengembangan Anda. Jika Anda tidak memilikinya, buat direktori baru yang disebut python, , scriptsdll. Hindari folder di OneDrive Anda, sinkronisasi dapat mengganggu pengelolaan lingkungan virtual Anda.

  2. Buat proyek baru dengan uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Tambah dependensi

Di direktori yang sama, instal paket mssql-python, python-dotenv, dan pyarrow.

uv add mssql-python python-dotenv pyarrow

Luncurkan Visual Studio Code

Di direktori yang sama, jalankan perintah berikut.

code .

Memperbarui pyproject.toml

  1. pyproject.toml berisi metadata untuk proyek Anda. Buka file di editor favorit Anda.

  2. Tinjau isi dari file tersebut. Ini harus mirip dengan contoh ini. Perhatikan versi dan dependensi Python untuk mssql-python digunakan >= untuk menentukan versi minimum. Jika Anda lebih suka versi yang tepat, ubah >= sebelum nomor versi menjadi ==. Versi yang diselesaikan dari setiap paket kemudian disimpan di uv.lock. Lockfile memastikan bahwa pengembang yang mengerjakan proyek menggunakan versi paket yang konsisten. Ini juga memastikan bahwa set versi paket yang sama persis digunakan saat mendistribusikan paket Anda kepada pengguna akhir. Anda tidak boleh mengedit uv.lock file.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Perbarui deskripsi agar lebih deskriptif.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Simpan dan tutup file.

Memperbarui main.py

  1. Buka file bernama main.py. Ini harus mirip dengan contoh ini.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Ganti konten main.py dengan blok kode berikut. Setiap blok dibangun pada blok sebelumnya dan harus ditempatkan secara main.py berurutan.

    Petunjuk / Saran

    Jika Visual Studio Code mengalami masalah dalam menyelesaikan paket, Anda perlu memperbarui penerjemah untuk menggunakan lingkungan virtual.

  3. Di bagian main.pyatas , tambahkan impor dan konstanta. Skrip ini menggunakan mssql_python untuk konektivitas database, pyarrow dan pyarrow.parquet untuk penanganan data kolom dan I/O file Parquet, python-dotenv untuk memuat string koneksi dari .env file, dan pola regex yang dikompilasi yang memvalidasi pengidentifikasi SQL untuk mencegah injeksi.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Di bawah impor, tambahkan pemetaan jenis SQL-ke-Arrow. Kamus ini menerjemahkan tipe kolom SQL Server ke setara dengan Apache Arrow-nya sehingga integritas data dipertahankan saat menulis ke Parquet. Dua fungsi pembantu membangun string jenis SQL yang tepat (misalnya, NVARCHAR(100) atau DECIMAL(18,2)) dari metadata INFORMATION_SCHEMA dan menentukan jenis Arrow yang cocok untuk setiap kolom.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Tambahkan introspeksi skema dan fungsi pembuatan DDL. _get_arrow_schema kueri INFORMATION_SCHEMA.COLUMNS menggunakan kueri berparameter, membangun skema Panah, dan menyimpan jenis SQL asli sebagai metadata bidang sehingga tabel tujuan dapat dibuat ulang dengan definisi kolom yang tepat. _create_table_ddl membaca metadata tersebut kembali untuk menghasilkan DROP/CREATE TABLE DDL. Jenis timestamp (rowversion) dipetakan ulang ke VARBINARY(8) karena dibuat secara otomatis dan tidak dapat disisipkan secara langsung.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Tambahkan fungsi unduhan. download_table mengalirkan baris dari tabel sumber dalam batch menggunakan fetchmany(), mengonversi setiap nilai menjadi jenis Python yang kompatibel dengan Arrow, dan menuliskan batch rekaman secara bertahap ke file Parquet lokal menggunakan ParquetWriter. Ini menjaga memori tetap terikat terlepas dari ukuran tabel. Fungsi ini menggunakan dua kursor terpisah: satu untuk membaca metadata kolom, dan satu lagi untuk mengalirkan data.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Tambahkan kait pengayaan. enrich_parquet adalah placeholder di mana Anda dapat menambahkan transformasi, kolom turunan, atau menggabungkan data sebelum diunggah. Dalam panduan memulai cepat ini, tidak ada operasi yang mengembalikan jalur file tersebut dalam kondisi yang tidak berubah.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Tambahkan fungsi unggahan. upload_parquet membaca skema Arrow dari file Parquet, menghasilkan dan menjalankan DROP/CREATE TABLE DDL untuk menyiapkan tujuan, lalu membaca file dalam batch dan memanggil cursor.bulkcopy() untuk penyisipan massal berkinerja tinggi. Opsi table_lock=True meningkatkan throughput dengan meminimalkan ketidakcocokan kunci. Setelah unggahan selesai, fungsi menjalankan SELECT COUNT(*) untuk memverifikasi kecocokan jumlah baris.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Tambahkan fungsi orkestrasi. transfer_tables mengikat tiga fase bersama-sama. Ini terhubung ke database sumber, menemukan semua tabel dasar dalam skema yang diberikan melalui INFORMATION_SCHEMA.TABLES, mengunduh masing-masing ke file Parquet lokal, menjalankan kait pengayaan, lalu terhubung ke database tujuan dan mengunggah setiap file.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Terakhir, tambahkan main titik masuk. Ini memuat .env file, memanggil transfer_tables dengan string koneksi asal dan tujuan, dan mencetak waktu yang sudah berlalu.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Simpan dan tutup main.py.

Menyimpan string koneksi

  1. .gitignore Buka file dan tambahkan pengecualian untuk .env file. File Anda harus mirip dengan contoh ini. Pastikan untuk menyimpan dan menutupnya setelah selesai.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. Di direktori saat ini, buat file baru bernama .env.

  3. Dalam file .env, tambahkan entri untuk string koneksi sumber dan tujuan Anda. Ganti placeholder dengan nama server dan database Anda yang sesuai.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Petunjuk / Saran

    String koneksi yang digunakan di sini sebagian besar tergantung pada jenis database SQL yang Anda sambungkan. Jika Anda menyambungkan ke Azure SQL Database atau database SQL di Fabric, gunakan string koneksi ODBC dari tab string koneksi. Anda mungkin perlu menyesuaikan jenis autentikasi tergantung pada skenario Anda. Untuk informasi selengkapnya tentang string koneksi dan sintaksnya, lihat referensi sintaks string koneksi.

Petunjuk / Saran

Di macOS, baik ActiveDirectoryInteractive dan ActiveDirectoryDefault berfungsi untuk autentikasi Microsoft Entra. ActiveDirectoryInteractive meminta Anda untuk masuk setiap kali Anda menjalankan skrip. ** Untuk menghindari peringatan masuk berulang, masuk satu kali melalui Azure CLI dengan menjalankan az login, lalu gunakan ActiveDirectoryDefault untuk menggunakan kembali kredensial yang di-cache.

Menggunakan uv run untuk menjalankan skrip

  1. Di jendela terminal dari sebelumnya, atau jendela terminal baru terbuka ke direktori yang sama, jalankan perintah berikut.

     uv run main.py
    

    Berikut adalah output yang diharapkan saat skrip selesai.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Sambungkan ke database tujuan menggunakan SQL Server Management Studio (SSMS) atau ekstensi MSSQL untuk VISUAL Code dan verifikasi bahwa tabel dan data berhasil dibuat.

  3. Untuk menyebarkan skrip Anda ke komputer lain, salin semua file kecuali folder .venv ke komputer lain. Lingkungan virtual dibuat ulang ketika pertama kali dijalankan.

Cara kerja kode

Aplikasi melakukan transfer data pulang-pergi penuh dalam tiga fase:

  1. Unduh: Menyambungkan ke database sumber, membaca metadata kolom dari INFORMATION_SCHEMA.COLUMNS, membangun skema Apache Arrow, lalu mengunduh setiap tabel ke dalam file Parquet lokal.
  2. Pengayaan (opsional): Menyediakan hook (enrich_parquet) tempat Anda dapat menambahkan transformasi, kolom turunan, atau gabungan sebelum mengunggah.
  3. Unggah: Membaca setiap file Parquet dalam batch, membuat ulang tabel dalam database tujuan menggunakan DDL yang dihasilkan dari metadata skema Arrow, lalu menggunakan cursor.bulkcopy() untuk sisipan massal berkinerja tinggi.

Langkah selanjutnya

mssql-python Kunjungi repositori GitHub driver untuk contoh lebih lanjut, untuk menyumbangkan ide atau melaporkan masalah.