Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Neste início rápido, você usará o mssql-python driver para copiar dados em massa entre bancos de dados. O aplicativo baixa tabelas de um esquema de banco de dados de origem para arquivos Parquet locais usando o Apache Arrow e, em seguida, carrega-as em um banco de dados de destino usando o método de alto desempenho bulkcopy . Você pode usar esse padrão para migrar, replicar ou transformar dados entre o SQL Server, o Banco de Dados SQL do Azure e o Banco de Dados SQL no Fabric.
O mssql-python driver não requer nenhuma dependência externa em computadores Windows. O driver instala tudo o que precisa com uma única pip instalação, permitindo que você use a versão mais recente do driver para novos scripts sem quebrar outros scripts que você não tem tempo para atualizar e testar.
Pré-requisitos
Python 3
Se você ainda não tiver o Python, instale o runtime do Python e o gerenciador de pacotes pip de python.org.
Não quer usar seu próprio ambiente? Abra como um devcontainer usando codespaces do GitHub.
Visual Studio Code com as seguintes extensões:
- extensão Python para Visual Studio Code
CLI (Interface Command-Line) do Azure para autenticação sem senha no macOS e linux.
Se você ainda não tiver
uv, siga as instruções de instalação.Um banco de dados de origem no SQL Server, banco de dados SQL do Azure ou banco de dados SQL no Fabric com o
AdventureWorks2025esquema de exemplo e uma cadeia de conexão válida.Um banco de dados de destino no SQL Server, no Banco de Dados SQL do Azure ou no Banco de Dados SQL no Fabric com uma cadeia de conexão válida. O usuário deve ter permissão para criar e gravar em tabelas. Se você não tiver um segundo banco de dados, poderá alterar a cadeia de conexão de destino para apontar para o mesmo banco de dados e usar um esquema diferente para as tabelas de destino.
Instale pré-requisitos específicos do sistema operacional uma vez.
Criar um Banco de Dados SQL
Este início rápido requer o esquema AdventureWorks2025 Lightweight como o banco de dados de origem.
Criar o projeto e executar o código
- Criar um novo projeto
- Adicionar dependências
- Iniciar Visual Studio Code
- Atualizar pyproject.toml
- Atualizar main.py
- Salvar as cadeias de conexão
- Use o comando uv run para executar o script
Crie um novo projeto
Abra um prompt de comando no diretório de desenvolvimento. Se você não tiver um, crie um novo diretório chamado
python,scriptsetc. Evite pastas em seu OneDrive, a sincronização pode interferir no gerenciamento do seu ambiente virtual.Crie um novo projeto com
uv.uv init mssql-python-bcp-qs cd mssql-python-bcp-qs
Adicionar dependências
No mesmo diretório, instale os pacotes mssql-python, python-dotenv e pyarrow.
uv add mssql-python python-dotenv pyarrow
Iniciar o Visual Studio Code
No mesmo diretório, execute o comando a seguir.
code .
Atualizar pyproject.toml
O pyproject.toml contém os metadados do projeto. Abra o arquivo em seu editor favorito.
Examine o conteúdo do arquivo. Deve ser semelhante a este exemplo. Observe a versão e a dependência do Python para que
mssql-pythonuse>=para definir uma versão mínima. Se preferir uma versão exata, altere o>=para==antes do número da versão. As versões resolvidas de cada pacote são armazenadas no uv.lock. O arquivo de bloqueio garante que os desenvolvedores que trabalham no projeto estejam usando versões de pacote consistentes. Ele também garante que exatamente o mesmo conjunto de versões de pacote seja usado ao distribuir seu pacote para os usuários finais. Você não deve editar ouv.lockarquivo.[project] name = "mssql-python-bcp-qs" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" dependencies = [ "mssql-python>=1.4.0", "python-dotenv>=1.1.1", "pyarrow>=19.0.0", ]Atualize a descrição para ser mais descritiva.
description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"Salve e feche o arquivo.
Atualizar main.py
Abra o arquivo chamado
main.py. Deve ser semelhante a este exemplo.def main(): print("Hello from mssql-python-bcp-qs!") if __name__ == "__main__": main()Substitua o conteúdo de
main.pypelos seguintes blocos de código. Cada bloco se baseia no anterior e deve ser colocado na ordem correta emmain.py.Dica
Se o Visual Studio Code estiver tendo problemas para resolver pacotes, você precisará atualizar o interpretador para usar o ambiente virtual.
Na parte superior,
main.pyadicione as importações e as constantes. O script usamssql_pythonpara conectividade de banco de dados,pyarrowepyarrow.parquetpara processamento de dados em formato colunar e leitura/gravação de arquivos Parquet,python-dotenvpara carregar strings de conexão de um arquivo.env, e um padrão regex compilado que valida identificadores SQL para evitar injeções."""Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema.""" import os import re import time from uuid import UUID import pyarrow as pa import pyarrow.parquet as pq from dotenv import load_dotenv import mssql_python BATCH_SIZE = 64_000 _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$") def _validate_ident(name: str) -> str: if not _SAFE_IDENT.match(name): raise ValueError(f"Unsafe SQL identifier: {name!r}") return nameAbaixo das importações, adicione o mapeamento de tipos SQL para Arrow. Esse dicionário converte os tipos de coluna do SQL Server nos seus equivalentes Apache Arrow, de modo que a fidelidade dos dados seja preservada ao gravar no Parquet. As duas funções auxiliares criam cadeias de caracteres de tipo SQL exatas (por exemplo,
NVARCHAR(100)ouDECIMAL(18,2)) deINFORMATION_SCHEMAmetadados e resolvem o tipo de seta correspondente para cada coluna._SQL_TO_ARROW = { "bit": pa.bool_(), "tinyint": pa.uint8(), "smallint": pa.int16(), "int": pa.int32(), "bigint": pa.int64(), "float": pa.float64(), "real": pa.float32(), "smallmoney": pa.decimal128(10, 4), "money": pa.decimal128(19, 4), "date": pa.date32(), "datetime": pa.timestamp("ms"), "datetime2": pa.timestamp("us"), "smalldatetime": pa.timestamp("s"), "uniqueidentifier": pa.string(), "xml": pa.string(), "image": pa.binary(), "binary": pa.binary(), "varbinary": pa.binary(), "timestamp": pa.binary(), } def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str: """Build the exact SQL type string from INFORMATION_SCHEMA metadata.""" dt = data_type.lower() if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"): length = "MAX" if max_length == -1 else str(max_length) return f"{dt.upper()}({length})" if dt in ("decimal", "numeric"): return f"{dt.upper()}({precision},{scale})" return dt.upper() def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType: sql_type = sql_type.lower() if sql_type in _SQL_TO_ARROW: return _SQL_TO_ARROW[sql_type] if sql_type in ("decimal", "numeric"): return pa.decimal128(precision, scale) if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"): return pa.string() return pa.string() def _convert_value(v): """Convert a SQL value to an Arrow-compatible Python type.""" if isinstance(v, UUID): return str(v) return vAdicione funções para introspecção de esquemas e geração de DDL.
_get_arrow_schemaconsulta usandoINFORMATION_SCHEMA.COLUMNSconsultas parametrizadas, cria um esquema Arrow e armazena o tipo SQL original como metadados de campo, permitindo que a tabela de destino seja recriada com definições exatas de coluna._create_table_ddllê os metadados de volta para gerarDROP/CREATE TABLEDDL. O tipotimestamp(rowversion) é remapeado paraVARBINARY(8)porque é gerado automaticamente e não pode ser inserido diretamente.def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema: """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS. Stores the original SQL type as field metadata so the round-trip CREATE TABLE can reproduce exact column definitions. """ cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE, " "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), " "COALESCE(NUMERIC_PRECISION, 0), " "COALESCE(NUMERIC_SCALE, 0), " "IS_NULLABLE " "FROM INFORMATION_SCHEMA.COLUMNS " "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? " "ORDER BY ORDINAL_POSITION", (schema_name, table_name), ) rows = cursor.fetchall() if not rows: raise ValueError(f"No columns found for {schema_name}.{table_name}") fields = [] for col_name, data_type, max_len, precision, scale, nullable in rows: arrow_t = _arrow_type(data_type, precision, scale) sql_t = _sql_type_str(data_type, max_len, precision, scale) fields.append( pa.field( col_name, arrow_t, nullable=(nullable == "YES"), metadata={"sql_type": sql_t}, ) ) return pa.schema(fields) def _create_table_ddl(target: str, schema: pa.Schema) -> str: """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata.""" col_defs = [] for f in schema: sql_t = f.metadata[b"sql_type"].decode() # timestamp/rowversion is auto-generated and not insertable if sql_t == "TIMESTAMP": sql_t = "VARBINARY(8)" null = "" if f.nullable else " NOT NULL" col_defs.append(f"[{f.name}] {sql_t}{null}") col_defs_str = ",\n ".join(col_defs) return ( f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n" f"CREATE TABLE {target} (\n {col_defs_str}\n);" )Adicione a função de download.
download_tabletransmite linhas de uma tabela de origem em batches usandofetchmany(), convertendo cada valor em um tipo Python compatível com Arrow e escreve batches de registros incrementalmente em um arquivo Parquet local usandoParquetWriter. Isso mantém a memória limitada, independentemente do tamanho da tabela. A função usa dois cursores separados: um para ler metadados de coluna e outro para transmitir os dados.def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int: """Download a SQL table to a parquet file. Returns row count (0 if empty).""" _validate_ident(table_name) source = f"{schema_name}.[{table_name}]" with conn.cursor() as cursor: schema = _get_arrow_schema(cursor, schema_name, table_name) n_cols = len(schema) row_count = 0 t0 = time.perf_counter() with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM {source}") writer = None try: while True: rows = cursor.fetchmany(BATCH_SIZE) if not rows: break columns = [[] for _ in range(n_cols)] for row in rows: for i in range(n_cols): columns[i].append(_convert_value(row[i])) arrays = [ pa.array(columns[i], type=schema.field(i).type) for i in range(n_cols) ] batch = pa.record_batch(arrays, schema=schema) if writer is None: writer = pq.ParquetWriter(parquet_file, schema) writer.write_batch(batch) row_count += len(rows) finally: if writer is not None: writer.close() if row_count == 0: return 0 elapsed = time.perf_counter() - t0 rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded " f"in {elapsed:.2f}s ({rate})" ) return row_countAdicione o gancho de enriquecimento.
enrich_parqueté um espaço reservado em que você pode adicionar transformações, colunas derivadas ou junções aos dados antes de serem carregados. Neste início rápido, é um no-op que retorna o caminho do arquivo inalterado.def enrich_parquet(parquet_file: str) -> str: """Enrich a parquet file before upload. Returns the (possibly new) file path.""" # TODO: add transformations, derived columns, joins, etc. print(f"Enriching {parquet_file} (no-op)") return parquet_fileAdicione a função de upload.
upload_parquetlê o esquema Arrow do arquivo Parquet, gera e executaDROP/CREATE TABLEDDL para preparar o destino, em seguida lê o arquivo em lotes e chamacursor.bulkcopy()para inserção em massa de alto desempenho. A opçãotable_lock=Truemelhora a taxa de transferência ao minimizar a contenção de bloqueio. Após a conclusão do upload, a função executa umSELECT COUNT(*)para verificar se a contagem de linhas corresponde.def upload_parquet(conn, parquet_file: str, target: str) -> int: """Upload a parquet file into a SQL table via BCP. Returns row count.""" # ── Create target table from parquet schema ── pf_schema = pq.read_schema(parquet_file) with conn.cursor() as cursor: cursor.execute(_create_table_ddl(target, pf_schema)) conn.commit() # ── Bulk insert ── uploaded = 0 t0 = time.perf_counter() with pq.ParquetFile(parquet_file) as pf: with conn.cursor() as cursor: for batch in pf.iter_batches(batch_size=BATCH_SIZE): rows = zip(*(col.to_pylist() for col in batch.columns)) cursor.bulkcopy( target, rows, batch_size=BATCH_SIZE, table_lock=True, timeout=3600, ) uploaded += batch.num_rows elapsed = time.perf_counter() - t0 # ── Verify ── with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {target}") count = cursor.fetchone()[0] rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a" print( f"{parquet_file} → {target}: {uploaded:,} rows uploaded " f"in {elapsed:.2f}s ({rate}) " f"| verified: {count:,}" ) return uploadedAdicione a função de orquestração.
transfer_tablesune as três fases. Ele se conecta ao banco de dados de origem, descobre todas as tabelas base no esquema especificado viaINFORMATION_SCHEMA.TABLES, baixa cada uma delas em um arquivo Parquet local, executa o hook de enriquecimento e, em seguida, conecta-se ao banco de dados de destino e carrega cada arquivo.def transfer_tables( source_conn_str: str, dest_conn_str: str, source_schema: str, dest_schema: str, ) -> None: """Download all tables from source DB/schema to parquet, upload to dest DB/schema.""" _validate_ident(source_schema) _validate_ident(dest_schema) parquet_dir = source_schema os.makedirs(parquet_dir, exist_ok=True) # ── Download from source ── with mssql_python.connect(source_conn_str) as src_conn: with src_conn.cursor() as cursor: cursor.execute( "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' " "ORDER BY TABLE_NAME", (source_schema,), ) tables = [row[0] for row in cursor.fetchall()] print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n") parquet_files = [] for table_name in tables: parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet") row_count = download_table(src_conn, source_schema, table_name, parquet_file) if row_count == 0: print(f"{source_schema}.{table_name}: empty, skipping") else: parquet_files.append((table_name, parquet_file)) # ── Enrich parquet files ── enriched = [] for table_name, parquet_file in parquet_files: enriched.append((table_name, enrich_parquet(parquet_file))) # ── Upload to destination ── with mssql_python.connect(dest_conn_str) as dest_conn: for table_name, parquet_file in enriched: target = f"{dest_schema}.[{table_name}]" upload_parquet(dest_conn, parquet_file, target)Por fim, adicione o ponto de entrada
main. Ele carrega o arquivo.env, chamatransfer_tablescom as cadeias de conexão de origem e destino, e imprime o tempo total decorrido.def main(): load_dotenv() t_start = time.perf_counter() transfer_tables( source_conn_str=os.environ["SOURCE_CONNECTION_STRING"], dest_conn_str=os.environ["DEST_CONNECTION_STRING"], source_schema="SalesLT", dest_schema="dbo", ) print(f"Total: {time.perf_counter() - t_start:.2f}s") if __name__ == "__main__": main()Salve e feche
main.py.
Salvar as strings de conexão
Abra o
.gitignorearquivo e adicione uma exclusão para.envarquivos. Seu arquivo deve ser semelhante a este exemplo. Salve-o e feche-o quando terminar.# Python-generated files __pycache__/ *.py[oc] build/ dist/ wheels/ *.egg-info # Virtual environments .venv # Connection strings and secrets .envNo diretório atual, crie um novo arquivo chamado
.env.No arquivo
.env, adicione entradas para as strings de conexão de origem e destino. Substitua os valores de placeholders pelos nomes reais do servidor e banco de dados.SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive" DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"Dica
A cadeia de conexão usada aqui depende em grande parte do tipo de banco de dados SQL ao qual você está se conectando. Se você estiver se conectando a um Banco de Dados SQL do Azure ou a um banco de dados SQL no Fabric, use a cadeia de conexão ODBC na guia cadeias de conexão. Talvez seja necessário ajustar o tipo de autenticação dependendo do seu cenário. Para obter mais informações sobre cadeias de conexão e sua sintaxe, consulte a referência de sintaxe da cadeia de conexão.
Dica
No macOS, tanto ActiveDirectoryInteractive quanto ActiveDirectoryDefault funcionam para autenticação do Microsoft Entra.
ActiveDirectoryInteractive solicita que você faça login toda vez que executar o script. Para evitar prompts de entrada repetidos, entre uma vez por meio da CLI do Azure executando az login, em seguida, use ActiveDirectoryDefault, que reutiliza a credencial armazenada em cache.
Use o comando uv run para executar o script
Na janela do terminal de antes ou em uma nova janela de terminal aberta para o mesmo diretório, execute o comando a seguir.
uv run main.pyEsta é a saída esperada quando o script é concluído.
Found 12 SalesLT tables: Address, Customer, CustomerAddress, ... SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec) ... SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450 ... Total: 2.35sConecte-se ao banco de dados de destino usando o SSMS (SQL Server Management Studio) ou a extensão MSSQL para VS Code e verifique se as tabelas e os dados foram criados com êxito.
Para implantar seu script em outro computador, copie todos os arquivos, exceto a
.venvpasta para o outro computador. O ambiente virtual é recriado com a primeira execução.
Como o código funciona
O aplicativo executa uma transferência completa de dados de ida e volta em três fases:
-
Download: Conecta-se ao banco de dados de origem, lê metadados da coluna de
INFORMATION_SCHEMA.COLUMNS, cria um esquema de Apache Arrow e baixa cada tabela em um arquivo Parquet local. -
Enriquecer (opcional): fornece um gancho (
enrich_parquet) em que você pode adicionar transformações, colunas derivadas ou junções antes de carregar. -
Upload: lê cada arquivo Parquet em blocos, recria a tabela no banco de dados de destino usando DDL gerado a partir dos metadados de esquema do Arrow e, em seguida, utiliza
cursor.bulkcopy()para uma inserção em massa de alto desempenho.
Próxima etapa
Visite o repositório GitHub do mssql-python driver para obter mais exemplos, contribuir com ideias ou relatar problemas.