Partilhar via


Início rápido: Cópia em massa com o driver mssql-python para Python

Neste guia de início rápido, utilizarás o driver mssql-python para copiar dados em massa entre bases de dados. A aplicação descarrega tabelas de um esquema de base de dados de origem para ficheiros Parquet locais usando o Apache Arrow, e depois carrega-as para uma base de dados de destino usando o método de alto desempenho bulkcopy . Pode usar este padrão para migrar, replicar ou transformar dados entre SQL Server, Azure SQL Database e SQL Database no Fabric.

O mssql-python driver não requer nenhuma dependência externa em máquinas Windows. O driver instala tudo o que precisa com uma única pip instalação, permitindo que você use a versão mais recente do driver para novos scripts sem quebrar outros scripts que você não tem tempo para atualizar e testar.

Documentação | Código-fonte | Pacote (PyPI) | UV

Pré-requisitos

  • Python 3

    • Se ainda não tens o Python, instala o runtime do Python e o gestor de pacotes pip a partir de python.org.

    • Não quer usar o seu próprio ambiente? Abra como um devcontainer usando o GitHub Codespaces.

  • Visual Studio Code com as seguintes extensões:

  • Azure Command-Line Interface (CLI) para autenticação sem palavra-passe no macOS e Linux.

  • Se ainda não tiver uv, siga as instruções de instalação.

  • Uma base de dados de origem no SQL Server, Azure SQL Database ou base de dados SQL em Fabric com o AdventureWorks2025 esquema de exemplo e uma cadeia de ligação válida.

  • Uma base de dados de destino no SQL Server, Azure SQL Database ou base de dados SQL em Fabric com uma string de ligação válida. O utilizador deve ter permissão para criar e escrever em tabelas. Se não tiveres uma segunda base de dados, podes alterar a cadeia de ligação de destino para apontar para a mesma base de dados e usar um esquema diferente para as tabelas de destino.

  • Instale pré-requisitos que devem ser instalados uma única vez específicos do sistema operacional.

    apk add libtool krb5-libs krb5-dev
    

Criar um banco de dados SQL

Este início rápido requer o esquema AdventureWorks2025 Lightweight como base de dados de origem.

Crie o projeto e execute o código

  1. Criar um novo projeto
  2. Adicionar dependências
  3. Iniciar o Visual Studio Code
  4. Atualizar pyproject.toml
  5. Atualizar main.py
  6. Guardar as cadeias de ligação
  7. Use uv run para executar o script

Criar um novo projeto

  1. Abra um prompt de comando no diretório de desenvolvimento. Se você não tiver um, crie um novo diretório chamado python, scripts, etc. Evite pastas no seu OneDrive, a sincronização pode interferir no gerenciamento do seu ambiente virtual.

  2. Crie um novo projeto com uv.

    uv init mssql-python-bcp-qs
    cd mssql-python-bcp-qs
    

Adicionar dependências

No mesmo diretório, instale os pacotes mssql-python, python-dotenv e pyarrow.

uv add mssql-python python-dotenv pyarrow

Abra o Visual Studio Code.

No mesmo diretório, execute o seguinte comando.

code .

Atualizar pyproject.toml

  1. O pyproject.toml contém os metadados para o seu projeto. Abra o arquivo em seu editor favorito.

  2. Revise o conteúdo do arquivo. Deve ser semelhante a este exemplo. Nota a versão do Python e a dependência para mssql-python usa >= para definir uma versão mínima. Se preferir uma versão exata, altere o >= para == antes do número da versão. As versões resolvidas de cada pacote são então armazenadas no uv.lock. O arquivo de bloqueio garante que os desenvolvedores que trabalham no projeto estejam usando versões de pacote consistentes. Ele também garante que o mesmo conjunto exato de versões de pacote seja usado ao distribuir seu pacote para usuários finais. Você não deve editar o uv.lock arquivo.

    [project]
    name = "mssql-python-bcp-qs"
    version = "0.1.0"
    description = "Add your description here"
    readme = "README.md"
    requires-python = ">=3.11"
    dependencies = [
        "mssql-python>=1.4.0",
        "python-dotenv>=1.1.1",
        "pyarrow>=19.0.0",
    ]
    
  3. Atualize a descrição para ser mais descritiva.

    description = "Bulk copies data between SQL databases using mssql-python and Apache Arrow"
    
  4. Salve e feche o arquivo.

Atualizar main.py

  1. Abra o arquivo chamado main.py. Deve ser semelhante a este exemplo.

    def main():
        print("Hello from mssql-python-bcp-qs!")
    
    if __name__ == "__main__":
        main()
    
  2. Substitua o conteúdo de main.py pelos seguintes blocos de código. Cada bloco constrói-se sobre o anterior e deve ser colocado main.py na ordem correta.

    Sugestão

    Se o Visual Studio Code estiver tendo problemas para resolver pacotes, você precisará atualizar o intérprete para usar o ambiente virtual.

  3. No topo de main.py, adicione as importações e as constantes. O script utiliza mssql_python para conectividade a bases de dados, pyarrow e pyarrow.parquet para o tratamento de dados colunares e operações de I/O de ficheiros Parquet, python-dotenv para carregar cadeias de ligação a partir de um ficheiro .env, e um padrão de expressão regular compilado que valida identificadores de SQL para evitar a injeção de SQL.

    """Round-trip: download tables from a source DB/schema to parquet, upload to a destination DB/schema."""
    
    import os
    import re
    import time
    from uuid import UUID
    
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dotenv import load_dotenv
    import mssql_python
    
    BATCH_SIZE = 64_000
    _SAFE_IDENT = re.compile(r"^[A-Za-z0-9_]+$")
    
    
    def _validate_ident(name: str) -> str:
        if not _SAFE_IDENT.match(name):
            raise ValueError(f"Unsafe SQL identifier: {name!r}")
        return name
    
  4. Por baixo das importações, adicione o mapeamento do tipo SQL-para-Arrow. Este dicionário traduz os tipos de colunas do SQL Server para os seus equivalentes no Apache Arrow, de modo a preservar a fidelidade dos dados ao escrever no Parquet. As duas funções auxiliares constroem cadeias exatas de tipos SQL (por exemplo, NVARCHAR(100) ou DECIMAL(18,2)) a partir de INFORMATION_SCHEMA metadados e resolvem o tipo de seta correspondente para cada coluna.

    _SQL_TO_ARROW = {
        "bit": pa.bool_(),
        "tinyint": pa.uint8(),
        "smallint": pa.int16(),
        "int": pa.int32(),
        "bigint": pa.int64(),
        "float": pa.float64(),
        "real": pa.float32(),
        "smallmoney": pa.decimal128(10, 4),
        "money": pa.decimal128(19, 4),
        "date": pa.date32(),
        "datetime": pa.timestamp("ms"),
        "datetime2": pa.timestamp("us"),
        "smalldatetime": pa.timestamp("s"),
        "uniqueidentifier": pa.string(),
        "xml": pa.string(),
        "image": pa.binary(),
        "binary": pa.binary(),
        "varbinary": pa.binary(),
        "timestamp": pa.binary(),
    }
    
    
    def _sql_type_str(data_type: str, max_length: int, precision: int, scale: int) -> str:
        """Build the exact SQL type string from INFORMATION_SCHEMA metadata."""
        dt = data_type.lower()
        if dt in ("char", "varchar", "nchar", "nvarchar", "binary", "varbinary"):
            length = "MAX" if max_length == -1 else str(max_length)
            return f"{dt.upper()}({length})"
        if dt in ("decimal", "numeric"):
            return f"{dt.upper()}({precision},{scale})"
        return dt.upper()
    
    
    def _arrow_type(sql_type: str, precision: int, scale: int) -> pa.DataType:
        sql_type = sql_type.lower()
        if sql_type in _SQL_TO_ARROW:
            return _SQL_TO_ARROW[sql_type]
        if sql_type in ("decimal", "numeric"):
            return pa.decimal128(precision, scale)
        if sql_type in ("char", "varchar", "nchar", "nvarchar", "text", "ntext", "sysname"):
            return pa.string()
        return pa.string()
    
    
    def _convert_value(v):
        """Convert a SQL value to an Arrow-compatible Python type."""
        if isinstance(v, UUID):
            return str(v)
        return v
    
  5. Adiciona as funções de introspeção de esquemas e geração DDL. _get_arrow_schema consultas INFORMATION_SCHEMA.COLUMNS usando consultas parametrizadas, constrói um esquema Arrow e armazena o tipo SQL original como metadados de campo para que a tabela de destino possa ser recriada com definições exatas de colunas. _create_table_ddl lê esses metadados para gerar DROP/CREATE TABLE DDL. O tipo timestamp (rowversion) é remapeado para VARBINARY(8) porque é gerado automaticamente e não é inserido diretamente.

    def _get_arrow_schema(cursor, schema_name: str, table_name: str) -> pa.Schema:
        """Build an Arrow schema from INFORMATION_SCHEMA.COLUMNS.
    
        Stores the original SQL type as field metadata so the round-trip
        CREATE TABLE can reproduce exact column definitions.
        """
        cursor.execute(
            "SELECT COLUMN_NAME, DATA_TYPE, "
            "COALESCE(CHARACTER_MAXIMUM_LENGTH, 0), "
            "COALESCE(NUMERIC_PRECISION, 0), "
            "COALESCE(NUMERIC_SCALE, 0), "
            "IS_NULLABLE "
            "FROM INFORMATION_SCHEMA.COLUMNS "
            "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
            "ORDER BY ORDINAL_POSITION",
            (schema_name, table_name),
        )
        rows = cursor.fetchall()
        if not rows:
            raise ValueError(f"No columns found for {schema_name}.{table_name}")
        fields = []
        for col_name, data_type, max_len, precision, scale, nullable in rows:
            arrow_t = _arrow_type(data_type, precision, scale)
            sql_t = _sql_type_str(data_type, max_len, precision, scale)
            fields.append(
                pa.field(
                    col_name, arrow_t,
                    nullable=(nullable == "YES"),
                    metadata={"sql_type": sql_t},
                )
            )
        return pa.schema(fields)
    
    
    def _create_table_ddl(target: str, schema: pa.Schema) -> str:
        """Build DROP/CREATE TABLE DDL from Arrow schema with SQL type metadata."""
        col_defs = []
        for f in schema:
            sql_t = f.metadata[b"sql_type"].decode()
            # timestamp/rowversion is auto-generated and not insertable
            if sql_t == "TIMESTAMP":
                sql_t = "VARBINARY(8)"
            null = "" if f.nullable else " NOT NULL"
            col_defs.append(f"[{f.name}] {sql_t}{null}")
        col_defs_str = ",\n    ".join(col_defs)
        return (
            f"IF OBJECT_ID('{target}', 'U') IS NOT NULL DROP TABLE {target};\n"
            f"CREATE TABLE {target} (\n    {col_defs_str}\n);"
        )
    
  6. Adiciona a função de download. download_table transmite linhas de uma tabela de origem em lotes usando fetchmany(), converte cada valor para um tipo Python compatível com Arrow e escreve lotes de registos incrementalmente para um ficheiro Parquet local usando ParquetWriter. Isto mantém a memória limitada independentemente do tamanho da tabela. A função utiliza dois cursores separados: um para ler metadados das colunas e outro para transmitir os dados.

    def download_table(conn, schema_name: str, table_name: str, parquet_file: str) -> int:
        """Download a SQL table to a parquet file. Returns row count (0 if empty)."""
        _validate_ident(table_name)
        source = f"{schema_name}.[{table_name}]"
    
        with conn.cursor() as cursor:
            schema = _get_arrow_schema(cursor, schema_name, table_name)
    
        n_cols = len(schema)
        row_count = 0
        t0 = time.perf_counter()
    
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT * FROM {source}")
            writer = None
            try:
                while True:
                    rows = cursor.fetchmany(BATCH_SIZE)
                    if not rows:
                        break
                    columns = [[] for _ in range(n_cols)]
                    for row in rows:
                        for i in range(n_cols):
                            columns[i].append(_convert_value(row[i]))
                    arrays = [
                        pa.array(columns[i], type=schema.field(i).type)
                        for i in range(n_cols)
                    ]
                    batch = pa.record_batch(arrays, schema=schema)
                    if writer is None:
                        writer = pq.ParquetWriter(parquet_file, schema)
                    writer.write_batch(batch)
                    row_count += len(rows)
            finally:
                if writer is not None:
                    writer.close()
    
        if row_count == 0:
            return 0
    
        elapsed = time.perf_counter() - t0
        rate = f"{int(row_count / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{schema_name}.{table_name} → {parquet_file}: {row_count:,} rows downloaded "
            f"in {elapsed:.2f}s ({rate})"
        )
        return row_count
    
  7. Adicione o gancho de enriquecimento. enrich_parquet é um marcador onde podes adicionar transformações, colunas derivadas ou junções aos dados antes de serem carregados. Neste quickstart, consiste numa operação nula que devolve o caminho do ficheiro inalterado.

    def enrich_parquet(parquet_file: str) -> str:
        """Enrich a parquet file before upload. Returns the (possibly new) file path."""
        # TODO: add transformations, derived columns, joins, etc.
        print(f"Enriching {parquet_file} (no-op)")
        return parquet_file
    
  8. Adiciona a função de upload. upload_parquet lê o esquema Arrow do ficheiro Parquet, gera e executa DROP/CREATE TABLE DDL para preparar o destino, após lê o ficheiro em lotes e chama cursor.bulkcopy() para uma inserção em massa de alto desempenho. A table_lock=True opção melhora o rendimento ao minimizar a contenção de bloqueios. Depois de concluído o upload, a função é executada SELECT COUNT(*) para verificar se a contagem de linhas corresponde.

    def upload_parquet(conn, parquet_file: str, target: str) -> int:
        """Upload a parquet file into a SQL table via BCP. Returns row count."""
        # ── Create target table from parquet schema ──
        pf_schema = pq.read_schema(parquet_file)
        with conn.cursor() as cursor:
            cursor.execute(_create_table_ddl(target, pf_schema))
        conn.commit()
    
        # ── Bulk insert ──
        uploaded = 0
        t0 = time.perf_counter()
        with pq.ParquetFile(parquet_file) as pf:
            with conn.cursor() as cursor:
                for batch in pf.iter_batches(batch_size=BATCH_SIZE):
                    rows = zip(*(col.to_pylist() for col in batch.columns))
                    cursor.bulkcopy(
                        target, rows, batch_size=BATCH_SIZE,
                        table_lock=True, timeout=3600,
                    )
                    uploaded += batch.num_rows
        elapsed = time.perf_counter() - t0
    
        # ── Verify ──
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT COUNT(*) FROM {target}")
            count = cursor.fetchone()[0]
    
        rate = f"{int(uploaded / elapsed):,} rows/sec" if elapsed > 0 else "n/a"
        print(
            f"{parquet_file} → {target}: {uploaded:,} rows uploaded "
            f"in {elapsed:.2f}s ({rate}) "
            f"| verified: {count:,}"
        )
        return uploaded
    
  9. Adiciona a função de orquestração. transfer_tables Liga as três fases. Liga-se à base de dados de origem, descobre todas as tabelas base no esquema dado através de INFORMATION_SCHEMA.TABLES, descarrega cada uma para um ficheiro Parquet local, executa o gancho de enriquecimento, depois liga-se à base de dados de destino e carrega cada ficheiro.

    def transfer_tables(
        source_conn_str: str,
        dest_conn_str: str,
        source_schema: str,
        dest_schema: str,
    ) -> None:
        """Download all tables from source DB/schema to parquet, upload to dest DB/schema."""
        _validate_ident(source_schema)
        _validate_ident(dest_schema)
    
        parquet_dir = source_schema
        os.makedirs(parquet_dir, exist_ok=True)
    
        # ── Download from source ──
        with mssql_python.connect(source_conn_str) as src_conn:
            with src_conn.cursor() as cursor:
                cursor.execute(
                    "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES "
                    "WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE' "
                    "ORDER BY TABLE_NAME",
                    (source_schema,),
                )
                tables = [row[0] for row in cursor.fetchall()]
    
            print(f"Found {len(tables)} {source_schema} tables: {', '.join(tables)}\n")
    
            parquet_files = []
            for table_name in tables:
                parquet_file = os.path.join(parquet_dir, f"{table_name}.parquet")
                row_count = download_table(src_conn, source_schema, table_name, parquet_file)
                if row_count == 0:
                    print(f"{source_schema}.{table_name}: empty, skipping")
                else:
                    parquet_files.append((table_name, parquet_file))
    
        # ── Enrich parquet files ──
        enriched = []
        for table_name, parquet_file in parquet_files:
            enriched.append((table_name, enrich_parquet(parquet_file)))
    
        # ── Upload to destination ──
        with mssql_python.connect(dest_conn_str) as dest_conn:
            for table_name, parquet_file in enriched:
                target = f"{dest_schema}.[{table_name}]"
                upload_parquet(dest_conn, parquet_file, target)
    
  10. Por fim, adiciona o ponto de entrada main. Carrega o .env ficheiro, chama transfer_tables com as cadeias de ligação de origem e destino, e imprime o tempo total decorrido.

    def main():
        load_dotenv()
        t_start = time.perf_counter()
    
        transfer_tables(
            source_conn_str=os.environ["SOURCE_CONNECTION_STRING"],
            dest_conn_str=os.environ["DEST_CONNECTION_STRING"],
            source_schema="SalesLT",
            dest_schema="dbo",
        )
    
        print(f"Total: {time.perf_counter() - t_start:.2f}s")
    
    
    if __name__ == "__main__":
        main()
    
  11. Salve e feche main.py.

Guardar as cadeias de ligação

  1. Abra o .gitignore arquivo e adicione uma exclusão para .env arquivos. Seu arquivo deve ser semelhante a este exemplo. Certifique-se de salvá-lo e fechá-lo quando terminar.

    # Python-generated files
    __pycache__/
    *.py[oc]
    build/
    dist/
    wheels/
    *.egg-info
    
    # Virtual environments
    .venv
    
    # Connection strings and secrets
    .env
    
  2. No diretório atual, crie um novo arquivo chamado .env.

  3. Dentro do .env ficheiro, adiciona entradas para as cadeias de ligação de origem e destino. Substitui os valores provisórios pelos nomes reais do teu servidor e base de dados.

    SOURCE_CONNECTION_STRING="Server=<source_server_name>;Database=<source_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    DEST_CONNECTION_STRING="Server=<dest_server_name>;Database=<dest_database_name>;Encrypt=yes;TrustServerCertificate=no;Authentication=ActiveDirectoryInteractive"
    

    Sugestão

    A cadeia de conexão usada aqui depende em grande parte do tipo de banco de dados SQL ao qual você está se conectando. Se você estiver se conectando a um Banco de Dados SQL do Azure ou a um banco de dados SQL na Malha, use a cadeia de conexão ODBC na guia Cadeias de conexão. Talvez seja necessário ajustar o tipo de autenticação dependendo do cenário. Para obter mais informações sobre cadeias de conexão e sua sintaxe, consulte Referência de sintaxe de cadeia de conexão.

Sugestão

No macOS, tanto ActiveDirectoryInteractive como ActiveDirectoryDefault funcionam para a autenticação do Microsoft Entra. ActiveDirectoryInteractive Solicita-te para iniciar sessão sempre que executas o script. Para evitar repetidos prompts de início de sessão, inicie sessão uma vez através do Azure CLI executando az login, depois use ActiveDirectoryDefault, que reutiliza a credencial em cache.

Use uv run para executar o script

  1. Na janela do terminal anterior ou em uma nova janela do terminal aberta no mesmo diretório, execute o seguinte comando.

     uv run main.py
    

    Aqui está a saída esperada quando o script for concluído.

    Found 12 SalesLT tables: Address, Customer, CustomerAddress, ...
    
    SalesLT.Address → SalesLT/Address.parquet: 450 rows downloaded in 0.15s (3,000 rows/sec)
    ...
    SalesLT/Address.parquet → dbo.[Address]: 450 rows uploaded in 0.10s (4,500 rows/sec) | verified: 450
    ...
    Total: 2.35s
    
  2. Ligue-se à base de dados de destino usando SQL Server Management Studio (SSMS) ou a extensão MSSQL para o VS Code e verifique se as tabelas e dados foram criados com sucesso.

  3. Para implantar o script em outra máquina, copie todos os arquivos, exceto a .venv pasta, para a outra máquina. O ambiente virtual é recriado com a primeira execução.

Como funciona o código

A aplicação realiza uma transferência completa de dados de ida e volta em três fases:

  1. Download: Liga-se à base de dados de origem, lê metadados das colunas de INFORMATION_SCHEMA.COLUMNS, constrói um esquema Apache Arrow e depois descarrega cada tabela para um ficheiro Parquet local.
  2. Enriquecer (opcional): Fornece um gancho (enrich_parquet) onde pode adicionar transformações, colunas derivadas ou junções antes de carregar.
  3. Upload: Lê cada ficheiro Parquet em lotes, recria a tabela na base de dados de destino usando DDL gerado a partir dos metadados do esquema Arrow, e depois usa cursor.bulkcopy() para inserção massiva de alto desempenho.

Próximo passo

Visite o mssql-python repositório GitHub do driver para obter mais exemplos, para contribuir com ideias ou relatar problemas.