Compartilhar via


Início Rápido: Copiar em massa com o driver mssql-python para Python

Neste início rápido, você usará o mssql-python driver para copiar dados em massa entre bancos de dados. O aplicativo baixa tabelas de um esquema de banco de dados de origem para arquivos Parquet locais usando o Apache Arrow e, em seguida, carrega-as em um banco de dados de destino usando o método de alto desempenho bulkcopy . Você pode usar esse padrão para migrar, replicar ou transformar dados entre o SQL Server, o Banco de Dados SQL do Azure e o Banco de Dados SQL no Fabric.

O mssql-python driver não requer nenhuma dependência externa em computadores Windows. O driver instala tudo o que precisa com uma única pip instalação, permitindo que você use a versão mais recente do driver para novos scripts sem quebrar outros scripts que você não tem tempo para atualizar e testar.

Documentação do mssql-pythonCódigo-fonte mssql-pythonPacote (PyPI)uv

Pré-requisitos

  • Python 3

    • Se você ainda não tiver o Python, instale o runtime do Python e o gerenciador de pacotes pip de python.org.

    • Não quer usar seu próprio ambiente? Abra como um devcontainer usando codespaces do GitHub.

  • Visual Studio Code com as seguintes extensões:

  • CLI (Interface Command-Line) do Azure para autenticação sem senha no macOS e linux.

  • Se você ainda não tiver uv, siga as instruções de instalação.

  • Um banco de dados de origem no SQL Server, banco de dados SQL do Azure ou banco de dados SQL no Fabric com o AdventureWorks2025 esquema de exemplo e uma cadeia de conexão válida.

  • Um banco de dados de destino no SQL Server, no Banco de Dados SQL do Azure ou no Banco de Dados SQL no Fabric com uma cadeia de conexão válida. O usuário deve ter permissão para criar e gravar em tabelas. Se você não tiver um segundo banco de dados, poderá alterar a cadeia de conexão de destino para apontar para o mesmo banco de dados e usar um esquema diferente para as tabelas de destino.

  • Instale pré-requisitos específicos do sistema operacional uma vez.

    apk add libtool krb5-libs krb5-dev
    

Criar um Banco de Dados SQL

Este início rápido requer o esquema AdventureWorks2025 Lightweight como o banco de dados de origem.

Criar o projeto e executar o código

  1. Criar um novo projeto
  2. Adicionar dependências
  3. Iniciar Visual Studio Code
  4. Atualizar pyproject.toml
  5. Atualizar main.py
  6. Salvar as cadeias de conexão
  7. Use o comando uv run para executar o script

Crie um novo projeto

  1. Abra um prompt de comando no diretório de desenvolvimento. Se você não tiver um, crie um novo diretório chamado python, scriptsetc. Evite pastas em seu OneDrive, a sincronização pode interferir no gerenciamento do seu ambiente virtual.

  2. Crie um novo projeto com uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Adicionar dependências

No mesmo diretório, instale os pacotes mssql-python, python-dotenv e pyarrow.

uv add mssql-python python-dotenv pyarrow

Iniciar o Visual Studio Code

No mesmo diretório, execute o comando a seguir.

code .

Atualizar pyproject.toml

  1. O pyproject.toml contém os metadados do projeto. Abra o arquivo em seu editor favorito.

  2. Examine o conteúdo do arquivo. Deve ser semelhante a este exemplo. Observe a versão e a dependência do Python para que mssql-python use >= para definir uma versão mínima. Se preferir uma versão exata, altere o >= para == antes do número da versão. As versões resolvidas de cada pacote são armazenadas no uv.lock. O arquivo de bloqueio garante que os desenvolvedores que trabalham no projeto estejam usando versões de pacote consistentes. Ele também garante que exatamente o mesmo conjunto de versões de pacote seja usado ao distribuir seu pacote para os usuários finais. Você não deve editar o uv.lock arquivo.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Atualize a descrição para ser mais descritiva.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Salve e feche o arquivo.

Atualizar main.py

  1. Abra o arquivo chamado main.py. Deve ser semelhante a este exemplo.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Substitua o conteúdo de main.py pelos seguintes blocos de código. Cada bloco se baseia no anterior e deve ser colocado na ordem correta em main.py.

    Dica

    Se o Visual Studio Code estiver tendo problemas para resolver pacotes, você precisará atualizar o interpretador para usar o ambiente virtual.

  3. Na parte superior, main.pyadicione as importações e as constantes. O script usa mssql_python para conectividade de banco de dados, pyarrow e pyarrow.parquet para processamento de dados em formato colunar e leitura/gravação de arquivos Parquet, python-dotenv para carregar strings de conexão de um arquivo .env, e um padrão regex compilado que valida identificadores SQL para evitar injeções.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Abaixo das importações, adicione o mapeamento de tipos SQL para Arrow. Esse dicionário converte os tipos de coluna do SQL Server nos seus equivalentes Apache Arrow, de modo que a fidelidade dos dados seja preservada ao gravar no Parquet. As duas funções auxiliares criam cadeias de caracteres de tipo SQL exatas (por exemplo, NVARCHAR(100) ou DECIMAL(18,2)) de INFORMATION_SCHEMA metadados e resolvem o tipo de seta correspondente para cada coluna.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Adicione funções para introspecção de esquemas e geração de DDL. _get_arrow_schema consulta usando INFORMATION_SCHEMA.COLUMNS consultas parametrizadas, cria um esquema Arrow e armazena o tipo SQL original como metadados de campo, permitindo que a tabela de destino seja recriada com definições exatas de coluna. _create_table_ddl lê os metadados de volta para gerar DROP/CREATE TABLE DDL. O tipo timestamp (rowversion) é remapeado para VARBINARY(8) porque é gerado automaticamente e não pode ser inserido diretamente.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Adicione a função de download. download_table transmite linhas de uma tabela de origem em batches usando fetchmany(), convertendo cada valor em um tipo Python compatível com Arrow e escreve batches de registros incrementalmente em um arquivo Parquet local usando ParquetWriter. Isso mantém a memória limitada, independentemente do tamanho da tabela. A função usa dois cursores separados: um para ler metadados de coluna e outro para transmitir os dados.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Adicione o gancho de enriquecimento. enrich_parquet é um espaço reservado em que você pode adicionar transformações, colunas derivadas ou junções aos dados antes de serem carregados. Neste início rápido, é um no-op que retorna o caminho do arquivo inalterado.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Adicione a função de upload. upload_parquet lê o esquema Arrow do arquivo Parquet, gera e executa DROP/CREATE TABLE DDL para preparar o destino, em seguida lê o arquivo em lotes e chama cursor.bulkcopy() para inserção em massa de alto desempenho. A opção table_lock=True melhora a taxa de transferência ao minimizar a contenção de bloqueio. Após a conclusão do upload, a função executa um SELECT COUNT(*) para verificar se a contagem de linhas corresponde.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Adicione a função de orquestração. transfer_tables une as três fases. Ele se conecta ao banco de dados de origem, descobre todas as tabelas base no esquema especificado via INFORMATION_SCHEMA.TABLES, baixa cada uma delas em um arquivo Parquet local, executa o hook de enriquecimento e, em seguida, conecta-se ao banco de dados de destino e carrega cada arquivo.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Por fim, adicione o ponto de entrada main. Ele carrega o arquivo .env, chama transfer_tables com as cadeias de conexão de origem e destino, e imprime o tempo total decorrido.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Salve e feche main.py.

Salvar as strings de conexão

  1. Abra o .gitignore arquivo e adicione uma exclusão para .env arquivos. Seu arquivo deve ser semelhante a este exemplo. Salve-o e feche-o quando terminar.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. No diretório atual, crie um novo arquivo chamado .env.

  3. No arquivo .env, adicione entradas para as strings de conexão de origem e destino. Substitua os valores de placeholders pelos nomes reais do servidor e banco de dados.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Dica

    A cadeia de conexão usada aqui depende em grande parte do tipo de banco de dados SQL ao qual você está se conectando. Se você estiver se conectando a um Banco de Dados SQL do Azure ou a um banco de dados SQL no Fabric, use a cadeia de conexão ODBC na guia cadeias de conexão. Talvez seja necessário ajustar o tipo de autenticação dependendo do seu cenário. Para obter mais informações sobre cadeias de conexão e sua sintaxe, consulte a referência de sintaxe da cadeia de conexão.

Dica

No macOS, tanto ActiveDirectoryInteractive quanto ActiveDirectoryDefault funcionam para autenticação do Microsoft Entra. ActiveDirectoryInteractive solicita que você faça login toda vez que executar o script. Para evitar prompts de entrada repetidos, entre uma vez por meio da CLI do Azure executando az login, em seguida, use ActiveDirectoryDefault, que reutiliza a credencial armazenada em cache.

Use o comando uv run para executar o script

  1. Na janela do terminal de antes ou em uma nova janela de terminal aberta para o mesmo diretório, execute o comando a seguir.

     uv run main.py
    

    Esta é a saída esperada quando o script é concluído.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Conecte-se ao banco de dados de destino usando o SSMS (SQL Server Management Studio) ou a extensão MSSQL para VS Code e verifique se as tabelas e os dados foram criados com êxito.

  3. Para implantar seu script em outro computador, copie todos os arquivos, exceto a .venv pasta para o outro computador. O ambiente virtual é recriado com a primeira execução.

Como o código funciona

O aplicativo executa uma transferência completa de dados de ida e volta em três fases:

  1. Download: Conecta-se ao banco de dados de origem, lê metadados da coluna de INFORMATION_SCHEMA.COLUMNS, cria um esquema de Apache Arrow e baixa cada tabela em um arquivo Parquet local.
  2. Enriquecer (opcional): fornece um gancho (enrich_parquet) em que você pode adicionar transformações, colunas derivadas ou junções antes de carregar.
  3. Upload: lê cada arquivo Parquet em blocos, recria a tabela no banco de dados de destino usando DDL gerado a partir dos metadados de esquema do Arrow e, em seguida, utiliza cursor.bulkcopy() para uma inserção em massa de alto desempenho.

Próxima etapa

Visite o repositório GitHub do mssql-python driver para obter mais exemplos, contribuir com ideias ou relatar problemas.