Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu hızlı başlangıç kılavuzunda, veritabanları arasında verileri toplu şekilde kopyalamak için mssql-python sürücüsünü kullanacaksınız. Uygulama, Apache Arrow kullanarak bir kaynak veritabanı şemasından yerel Parquet dosyalarına tablo indirir, ardından yüksek performanslı bulkcopy yöntemi kullanarak bunları hedef veritabanına yükler. Sql Server, Azure SQL Veritabanı ve Doku'daki SQL veritabanı arasında verileri geçirmek, çoğaltmak veya dönüştürmek için bu düzeni kullanabilirsiniz.
Sürücü, mssql-python Windows makinelerinde dış bağımlılık gerektirmez. Sürücü, ihtiyaç duyduğu her şeyi tek pip bir yüklemeyle yükler ve yükseltmeye ve test etmeye vaktiniz olmayan diğer betikleri bozmadan yeni betikler için sürücünün en son sürümünü kullanmanıza olanak sağlar.
mssql-python belgeleri | mssql-python kaynak kodu | Paket (PyPI) | Uv
Önkoşullar
Python 3
Python'larınız yoksa, python.orgPython çalışma zamanını ve pip paket yöneticisini yükleyin.
Kendi ortamınızı kullanmak istemiyor musunuz? GitHub Codespaces kullanarak devcontainer olarak açın.
Visual Studio Code aşağıdaki uzantılarla:
macOS ve Linux'ta parolasız kimlik doğrulaması için Azure Command-Line Arabirimi (CLI).
henüz sahip
uvdeğilseniz yükleme yönergelerini izleyin.AdventureWorks2025örnek şema ve geçerli bir bağlantı dizesiyle SQL Server, Azure SQL Veritabanı veya Fabric'teki SQL veritabanında bulunan bir kaynak veritabanı.Geçerli bir bağlantı dizesine sahip olan SQL Server, Azure SQL Veritabanı veya Fabric içerisindeki SQL veritabanı üzerindeki hedef veritabanı. Kullanıcının tablo oluşturma ve tablolara yazma izni olmalıdır. İkinci bir veritabanınız yoksa, hedef bağlantı dizesini aynı veritabanına işaret etmek üzere değiştirebilir ve hedef tablolar için farklı bir şema kullanabilirsiniz.
Tek seferlik işletim sistemine özgü önkoşulları yükleyin.
SQL veritabanı oluşturma
Bu hızlı başlangıçta kaynak veritabanı olarak AdventureWorks2025 Lightweight şeması gerekir.
Projeyi oluşturma ve kodu çalıştırma
- Yeni proje oluşturma
- Bağımlılık ekleme
- Visual Studio Code'ı başlatma
- pyproject.toml güncelleştirme
- Main.py'yi Güncelle
- Bağlantı dizelerini kaydetme
- Betik dosyasını çalıştırmak için uv run kullanın
Yeni proje oluşturma
Geliştirme dizininizde bir komut istemi açın. Bir tane yoksa,
python,scriptsgibi bir adla yeni bir dizin oluşturun. OneDrive'daki klasörleri kullanmaktan kaçının, çünkü eşitleme sanal ortamınızı yönetmenize engel olabilir.-
uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Bağımlılık ekleme
Aynı dizinde , mssql-pythonve python-dotenv paketlerini yükleyinpyarrow.
uv add mssql-python python-dotenv pyarrow
Visual Studio Code'ı başlatma
Aynı dizinde aşağıdaki komutu çalıştırın.
code .
pyproject.toml güncelle
pyproject.toml, projenizin meta verilerini içerir. Dosyayı sık kullandığınız düzenleyicide açın.
Dosyanın içeriğini gözden geçirin. Bu örneğe benzer olmalıdır. Python sürümüne ve bağımlılığına dikkat edin; en düşük sürümü tanımlamak için
mssql-python,>=kullanır. Tam sürümü tercih ediyorsanız, sürüm numarasının önündeki>='yi==ile değiştirin. Her paketin çözümlenen sürümleri daha sonra uv.lock dosyasında depolanır. Lockfile, proje üzerinde çalışan geliştiricilerin tutarlı paket sürümlerini kullanmasını sağlar. Ayrıca, paketinizi son kullanıcılara dağıtırken tam olarak aynı paket sürümleri kümesinin kullanılmasını sağlar. Dosyayı düzenlememelisinizuv.lock.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Açıklamayı daha açıklayıcı olacak şekilde güncelleştirin.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Dosyayı kaydedin ve kapatın.
Güncelle main.py
adlı
main.pydosyayı açın. Bu örneğe benzer olmalıdır.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()öğesinin
main.pyiçeriğini aşağıdaki kod bloklarıyla değiştirin. Her blok bir öncekini oluşturur ve sırayla yerleştirilmelidirmain.py.Tavsiye
Visual Studio Code paketleri çözümleme konusunda sorun yaşıyorsa, yorumlayıcıyı sanal ortamı kullanacak şekilde güncelleştirmeniz gerekir.
main.pydosyasının en üstüne içeri aktarmaları ve sabitleri ekleyin. Betik, veritabanı bağlantısı içinmssql_python, sütunlu veri işleme ve Parquet dosyası G/Ç içinpyarrowvepyarrow.parquet, birpython-dotenvdosyasından bağlantı dizelerini yüklemek için.env, ve enjeksiyonu önlemek amacıyla SQL tanımlayıcılarını doğrulayan derlenmiş bir regex deseni kullanır."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameİçeri aktarmaların altına SQL-Arrow türü eşlemesini ekleyin. Bu sözlük, Sql Server sütun türlerini Apache Arrow eşdeğerlerine çevirir, böylece Parquet'e yazarken veri uygunluğu korunur. İki yardımcı işlev meta verilerden
NVARCHAR(100)tam SQL türü dizeleri (örneğin,DECIMAL(18,2)veyaINFORMATION_SCHEMA) oluşturur ve her sütun için eşleşen Ok türünü çözümler._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vŞema introspection ve DDL oluşturma işlevlerini ekleyin.
_get_arrow_schemaparametreli sorgularla çalışırINFORMATION_SCHEMA.COLUMNS, bir Arrow şeması oluşturur ve hedef tablonun tam sütun tanımlarıyla yeniden oluşturulabilmesi için özgün SQL türünü alan meta verisi olarak depolar._create_table_ddlDDL oluşturmakDROP/CREATE TABLEiçin bu meta verileri yeniden okur.timestamp(rowversion) türü, otomatik olarak oluşturulduğu ve doğrudan eklenemediği içinVARBINARY(8)'a yeniden eşlenir.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )İndirme işlevini ekleyin.
download_tablekullanarak,fetchmany()bir kaynak tablodaki satırları toplu olarak akışla aktarır, her değeri Arrow uyumlu bir Python türüne dönüştürür veParquetWriterkullanarak kayıt toplu işlemlerini artımlı olarak yerel bir Parquet dosyasına yazar. Bu, tablo boyutundan bağımsız olarak belleği sınırlanmış durumda tutar. İşlev iki ayrı imleç kullanır: biri sütun meta verilerini okumak için, diğeri de verileri akışa almak için.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countZenginleştirme kancasını ekleyin.
enrich_parquet, verilere karşıya yüklenmeden önce dönüştürmeler, türetilmiş sütunlar veya birleştirmeler ekleyebileceğiniz bir yer tutucudur. Bu hızlı başlangıçta, dosya yolunu değiştirmeden döndüren bir no-op.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileYükleme fonksiyonunu ekleyin.
upload_parquetParquet dosyasından Arrow şemasını okur, sonra hedefi hazırlamak içinDROP/CREATE TABLEDDL oluşturur ve yürütür, ardından dosyayı parça parça okuyup yüksek performanslı toplu ekleme içincursor.bulkcopy()çağrısında bulunur. butable_lock=Trueseçenek, kilit çekişmesi en aza indirilerek aktarım hızını artırır. Karşıya yükleme tamamlandıktan sonra işlev, satır sayısı eşleşmelerini doğrulamak için birSELECT COUNT(*)çalıştırır.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedDüzenleme işlevini ekleyin.
transfer_tablesüç aşamayı birbirine bağlar. Kaynak veritabanına bağlanır, aracılığıylaINFORMATION_SCHEMA.TABLESverilen şemadaki tüm temel tabloları bulur, her birini yerel bir Parquet dosyasına indirir, zenginleştirme kancasını çalıştırır, ardından hedef veritabanına bağlanır ve her dosyayı karşıya yükler.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Son olarak, giriş noktasını ekleyin
main. Dosyayı yükler.env, kaynak ve hedef bağlantı dizeleriyle çağırırtransfer_tablesve geçen toplam süreyi yazdırır.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()kaydedin ve kapatın
main.py.
Bağlantı dizelerini kaydetme
.gitignoredosyasını açın ve.envdosyaları için bir dışlama ekleyin. Dosyanız bu örneğe benzer olmalıdır. İşiniz bittiğinde kaydetmeyi ve kapatmayı unutmayın.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envGeçerli dizinde adlı
.envyeni bir dosya oluşturun.dosya içinde
.env, kaynak ve hedef bağlantı dizeleriniz için girdiler ekleyin. Yer tutucu değerlerini gerçek sunucu ve veritabanı adlarınızla değiştirin.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Tavsiye
Burada kullanılan bağlantı dizesi büyük ölçüde bağlandığınız SQL veritabanının türüne bağlıdır. Eğer bir Azure SQL Veritabanına veya Fabric'teki SQL veritabanına bağlanıyorsanız, bağlantı dizeleri sekmesindeki ODBC bağlantı dizesini kullanın. Senaryonuza bağlı olarak kimlik doğrulama türünü ayarlamanız gerekebilir. Bağlantı dizeleri ve söz dizimi hakkında daha fazla bilgi için bkz. bağlantı dizesi söz dizimi başvurusu.
Tavsiye
macOS'ta hem ActiveDirectoryInteractive hem de ActiveDirectoryDefault Microsoft Entra kimlik doğrulaması için çalışır.
ActiveDirectoryInteractive betiği her çalıştırdığınızda oturum açmanızı ister. Yinelenen oturum açma istemlerini önlemek için Azure CLI aracılığıyla az login komutunu çalıştırarak bir kez oturum açın, ardından önbellekteki kimlik bilgilerini yeniden kullanan ActiveDirectoryDefault kullanın.
Betiği yürütmek için uv çalıştır komutunu kullan
Önceki terminal penceresinde veya aynı dizine açılan yeni bir terminal penceresinde aşağıdaki komutu çalıştırın.
uv run main.pyİşte betik tamamlandığında beklenen çıktı şöyledir.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sSQL Server Management Studio (SSMS) veya VS Code için MSSQL uzantısını kullanarak hedef veritabanına bağlanın ve tabloların ve verilerin başarıyla oluşturulduğunu doğrulayın.
Betiğinizi başka bir makineye dağıtmak için
.venvklasör dışındaki tüm dosyaları diğer makineye kopyalayın. Sanal ortam ilk çalıştırmayla yeniden oluşturulur.
Kod nasıl çalışır?
Uygulama üç aşamada tam gidiş dönüş veri aktarımı gerçekleştirir:
-
İndir: Kaynak veritabanına bağlanır, öğesinden
INFORMATION_SCHEMA.COLUMNSsütun meta verilerini okur, bir Apache Arrow şeması oluşturur, ardından her tabloyu yerel bir Parquet dosyasına indirir. -
Zenginleştir (isteğe bağlı): Karşıya yüklemeden önce dönüştürmeler, türetilmiş sütunlar veya birleştirmeler ekleyebileceğiniz bir kanca (
enrich_parquet) sağlar. -
Karşıya yükleme: Her Parquet dosyasını toplu olarak okur, Ok şeması meta verilerinden oluşturulan DDL'yi kullanarak hedef veritabanında tabloyu yeniden oluşturur ve ardından yüksek performanslı toplu ekleme için kullanır
cursor.bulkcopy().
Sonraki adım
Fikirlere mssql-python katkıda bulunmak veya sorunları bildirmek için daha fazla örnek için sürücü GitHub deposunu ziyaret edin.