Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Dalam panduan cepat ini, Anda menggunakan driver mssql-python untuk menyalin data dalam jumlah besar antar database. Aplikasi mengunduh tabel dari skema database sumber ke file Parquet lokal menggunakan Apache Arrow, lalu mengunggahnya ke database tujuan menggunakan metode berkinerja bulkcopy tinggi. Anda dapat menggunakan pola ini untuk memigrasikan, mereplikasi, atau mengubah data antara SQL Server, Azure SQL Database, dan database SQL di Fabric.
Driver mssql-python tidak memerlukan dependensi eksternal apa pun pada komputer Windows. Driver menginstal semua yang dibutuhkan dengan satu pip instalasi, memungkinkan Anda menggunakan versi terbaru driver untuk skrip baru tanpa merusak skrip lain yang belum sempat Anda tingkatkan dan uji.
dokumentasi mssql-python
Prasyarat
Python 3
Jika Anda belum memiliki Python, instal runtime Python dan manajer paket pip dari python.org.
Tidak ingin menggunakan lingkungan Anda sendiri? Buka sebagai devcontainer menggunakan GitHub Codespaces.
Visual Studio Code dengan ekstensi berikut:
- Ekstensi Python untuk Visual Studio Code
Azure Command-Line Interface (CLI) untuk autentikasi tanpa kata sandi di macOS dan Linux.
Jika Anda belum memiliki
uv, ikuti instruksi penginstalan.Database sumber di SQL Server, Azure SQL Database, atau database SQL di Fabric dengan
AdventureWorks2025skema sampel dan string koneksi yang valid.Database tujuan di SQL Server, Azure SQL Database, atau database SQL di Fabric dengan string koneksi yang valid. Pengguna harus memiliki izin untuk membuat dan menulis ke tabel. Jika Anda tidak memiliki database kedua, Anda dapat mengubah string koneksi tujuan untuk menunjuk ke database yang sama dan menggunakan skema yang berbeda untuk tabel tujuan.
Instal prasyarat khusus sistem operasi satu kali.
Membuat basis data SQL
Mulai cepat ini memerlukan skema AdventureWorks2025 Lightweight sebagai database sumber.
Membuat proyek dan menjalankan kode
- Membuat proyek baru
- Menambahkan dependensi
- Luncurkan Visual Studio Code
- Memperbarui pyproject.toml
- Memperbarui main.py
- Menyimpan string koneksi
- Menggunakan uv run untuk menjalankan skrip
Membuat proyek baru
Buka jendela perintah di direktori pengembangan Anda. Jika Anda tidak memilikinya, buat direktori baru yang disebut
python, ,scriptsdll. Hindari folder di OneDrive Anda, sinkronisasi dapat mengganggu pengelolaan lingkungan virtual Anda.Buat proyek baru dengan
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Tambah dependensi
Di direktori yang sama, instal paket mssql-python, python-dotenv, dan pyarrow.
uv add mssql-python python-dotenv pyarrow
Luncurkan Visual Studio Code
Di direktori yang sama, jalankan perintah berikut.
code .
Memperbarui pyproject.toml
pyproject.toml berisi metadata untuk proyek Anda. Buka file di editor favorit Anda.
Tinjau isi dari file tersebut. Ini harus mirip dengan contoh ini. Perhatikan versi dan dependensi Python untuk
mssql-pythondigunakan>=untuk menentukan versi minimum. Jika Anda lebih suka versi yang tepat, ubah>=sebelum nomor versi menjadi==. Versi yang diselesaikan dari setiap paket kemudian disimpan di uv.lock. Lockfile memastikan bahwa pengembang yang mengerjakan proyek menggunakan versi paket yang konsisten. Ini juga memastikan bahwa set versi paket yang sama persis digunakan saat mendistribusikan paket Anda kepada pengguna akhir. Anda tidak boleh mengedituv.lockfile.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Perbarui deskripsi agar lebih deskriptif.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Simpan dan tutup file.
Memperbarui main.py
Buka file bernama
main.py. Ini harus mirip dengan contoh ini.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Ganti konten
main.pydengan blok kode berikut. Setiap blok dibangun pada blok sebelumnya dan harus ditempatkan secaramain.pyberurutan.Petunjuk / Saran
Jika Visual Studio Code mengalami masalah dalam menyelesaikan paket, Anda perlu memperbarui penerjemah untuk menggunakan lingkungan virtual.
Di bagian
main.pyatas , tambahkan impor dan konstanta. Skrip ini menggunakanmssql_pythonuntuk konektivitas database,pyarrowdanpyarrow.parquetuntuk penanganan data kolom dan I/O file Parquet,python-dotenvuntuk memuat string koneksi dari.envfile, dan pola regex yang dikompilasi yang memvalidasi pengidentifikasi SQL untuk mencegah injeksi."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameDi bawah impor, tambahkan pemetaan jenis SQL-ke-Arrow. Kamus ini menerjemahkan tipe kolom SQL Server ke setara dengan Apache Arrow-nya sehingga integritas data dipertahankan saat menulis ke Parquet. Dua fungsi pembantu membangun string jenis SQL yang tepat (misalnya,
NVARCHAR(100)atauDECIMAL(18,2)) dari metadataINFORMATION_SCHEMAdan menentukan jenis Arrow yang cocok untuk setiap kolom._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vTambahkan introspeksi skema dan fungsi pembuatan DDL.
_get_arrow_schemakueriINFORMATION_SCHEMA.COLUMNSmenggunakan kueri berparameter, membangun skema Panah, dan menyimpan jenis SQL asli sebagai metadata bidang sehingga tabel tujuan dapat dibuat ulang dengan definisi kolom yang tepat._create_table_ddlmembaca metadata tersebut kembali untuk menghasilkanDROP/CREATE TABLEDDL. Jenistimestamp(rowversion) dipetakan ulang keVARBINARY(8)karena dibuat secara otomatis dan tidak dapat disisipkan secara langsung.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Tambahkan fungsi unduhan.
download_tablemengalirkan baris dari tabel sumber dalam batch menggunakanfetchmany(), mengonversi setiap nilai menjadi jenis Python yang kompatibel dengan Arrow, dan menuliskan batch rekaman secara bertahap ke file Parquet lokal menggunakanParquetWriter. Ini menjaga memori tetap terikat terlepas dari ukuran tabel. Fungsi ini menggunakan dua kursor terpisah: satu untuk membaca metadata kolom, dan satu lagi untuk mengalirkan data.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countTambahkan kait pengayaan.
enrich_parquetadalah placeholder di mana Anda dapat menambahkan transformasi, kolom turunan, atau menggabungkan data sebelum diunggah. Dalam panduan memulai cepat ini, tidak ada operasi yang mengembalikan jalur file tersebut dalam kondisi yang tidak berubah.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileTambahkan fungsi unggahan.
upload_parquetmembaca skema Arrow dari file Parquet, menghasilkan dan menjalankanDROP/CREATE TABLEDDL untuk menyiapkan tujuan, lalu membaca file dalam batch dan memanggilcursor.bulkcopy()untuk penyisipan massal berkinerja tinggi. Opsitable_lock=Truemeningkatkan throughput dengan meminimalkan ketidakcocokan kunci. Setelah unggahan selesai, fungsi menjalankanSELECT COUNT(*)untuk memverifikasi kecocokan jumlah baris.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedTambahkan fungsi orkestrasi.
transfer_tablesmengikat tiga fase bersama-sama. Ini terhubung ke database sumber, menemukan semua tabel dasar dalam skema yang diberikan melaluiINFORMATION_SCHEMA.TABLES, mengunduh masing-masing ke file Parquet lokal, menjalankan kait pengayaan, lalu terhubung ke database tujuan dan mengunggah setiap file.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Terakhir, tambahkan
maintitik masuk. Ini memuat.envfile, memanggiltransfer_tablesdengan string koneksi asal dan tujuan, dan mencetak waktu yang sudah berlalu.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Simpan dan tutup
main.py.
Menyimpan string koneksi
.gitignoreBuka file dan tambahkan pengecualian untuk.envfile. File Anda harus mirip dengan contoh ini. Pastikan untuk menyimpan dan menutupnya setelah selesai.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envDi direktori saat ini, buat file baru bernama
.env.Dalam file
.env, tambahkan entri untuk string koneksi sumber dan tujuan Anda. Ganti placeholder dengan nama server dan database Anda yang sesuai.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Petunjuk / Saran
String koneksi yang digunakan di sini sebagian besar tergantung pada jenis database SQL yang Anda sambungkan. Jika Anda menyambungkan ke Azure SQL Database atau database SQL di Fabric, gunakan string koneksi ODBC dari tab string koneksi. Anda mungkin perlu menyesuaikan jenis autentikasi tergantung pada skenario Anda. Untuk informasi selengkapnya tentang string koneksi dan sintaksnya, lihat referensi sintaks string koneksi.
Petunjuk / Saran
Di macOS, baik ActiveDirectoryInteractive dan ActiveDirectoryDefault berfungsi untuk autentikasi Microsoft Entra.
ActiveDirectoryInteractive meminta Anda untuk masuk setiap kali Anda menjalankan skrip. **
Untuk menghindari peringatan masuk berulang, masuk satu kali melalui Azure CLI dengan menjalankan az login, lalu gunakan ActiveDirectoryDefault untuk menggunakan kembali kredensial yang di-cache.
Menggunakan uv run untuk menjalankan skrip
Di jendela terminal dari sebelumnya, atau jendela terminal baru terbuka ke direktori yang sama, jalankan perintah berikut.
uv run main.pyBerikut adalah output yang diharapkan saat skrip selesai.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sSambungkan ke database tujuan menggunakan SQL Server Management Studio (SSMS) atau ekstensi MSSQL untuk VISUAL Code dan verifikasi bahwa tabel dan data berhasil dibuat.
Untuk menyebarkan skrip Anda ke komputer lain, salin semua file kecuali folder
.venvke komputer lain. Lingkungan virtual dibuat ulang ketika pertama kali dijalankan.
Cara kerja kode
Aplikasi melakukan transfer data pulang-pergi penuh dalam tiga fase:
-
Unduh: Menyambungkan ke database sumber, membaca metadata kolom dari
INFORMATION_SCHEMA.COLUMNS, membangun skema Apache Arrow, lalu mengunduh setiap tabel ke dalam file Parquet lokal. -
Pengayaan (opsional): Menyediakan hook (
enrich_parquet) tempat Anda dapat menambahkan transformasi, kolom turunan, atau gabungan sebelum mengunggah. -
Unggah: Membaca setiap file Parquet dalam batch, membuat ulang tabel dalam database tujuan menggunakan DDL yang dihasilkan dari metadata skema Arrow, lalu menggunakan
cursor.bulkcopy()untuk sisipan massal berkinerja tinggi.
Langkah selanjutnya
mssql-python Kunjungi repositori GitHub driver untuk contoh lebih lanjut, untuk menyumbangkan ide atau melaporkan masalah.