Copier et transformer des données vers et depuis SQL Server à l'aide d'Azure Data Factory ou d’Azure Synapse Analytics

S’APPLIQUE À : Azure Data Factory Azure Synapse Analytics

Conseil

Essayez Data Factory dans Microsoft Fabric, une solution d’analyse tout-en-un pour les entreprises. Microsoft Fabric couvre tous les aspects, du déplacement des données à la science des données, en passant par l’analyse en temps réel, l’aide à la décision et la création de rapports. Découvrez comment démarrer un nouvel essai gratuitement !

Cet article explique comment utiliser l'activité Copy dans les pipelines Azure Data Factory et Azure Synapse pour copier des données vers et depuis une base de données SQL Server, et utiliser Data Flow pour transformer les données dans la base de données SQL Server. Pour en savoir plus, lisez l’article d’introduction pour Azure Data Factory ou Azure Synapse Analytics.

Fonctionnalités prises en charge

Ce connecteur SQL Server est pris en charge pour les capacités suivantes :

Fonctionnalités prises en charge IR
Activité de copie (source/récepteur) ① ②
Mappage de flux de données (source/récepteur)
Activité de recherche ① ②
Activité GetMetadata ① ②
Activité de script ① ②
Activité de procédure stockée ① ②

① Runtime d’intégration Azure ② Runtime d’intégration auto-hébergé

Pour obtenir la liste des banques de données prises en charge en tant que sources ou récepteurs par l’activité de copie, consultez le tableau banques de données prises en charge.

Plus précisément, ce connecteur SQL Server prend en charge :

  • SQL Server version 2005 et versions ultérieures.
  • La copie des données à l'aide de l'authentification SQL ou Windows
  • En tant que source, la récupération de données à l’aide d’une requête SQL ou d’une procédure stockée. Vous pouvez également choisir de copier en parallèle à partir de la source SQL Server. Pour plus d’informations, consultez la section Copier en parallèle à partir de la base de données SQL.
  • En tant que récepteur, la création automatique de la table de destination si elle n’existe pas, en fonction du schéma source, l’ajout de données à une table ou l’appel d’une procédure stockée avec une logique personnalisée pendant la copie.

SQL Server Express LocalDB n’est pas pris en charge.

Important

La source de données doit prendre en charge le type de données NVARCHAR car il affecte l’encodage des données lorsqu’un codage non universel est appliqué aux données.

Prérequis

Si votre magasin de données se trouve dans un réseau local, un réseau virtuel Azure ou un cloud privé virtuel Amazon, vous devez configurer un runtime d’intégration auto-hébergé pour vous y connecter.

Si votre magasin de données est un service de données cloud managé, vous pouvez utiliser Azure Integration Runtime. Si l’accès est limité aux adresses IP qui sont approuvées dans les règles de pare-feu, vous pouvez ajouter les adresses IP Azure Integration Runtime dans la liste d’autorisation.

Vous pouvez également utiliser la fonctionnalité de runtime d’intégration de réseau virtuel managé dans Azure Data Factory pour accéder au réseau local sans installer et configurer un runtime d’intégration auto-hébergé.

Pour plus d’informations sur les mécanismes de sécurité réseau et les options pris en charge par Data Factory, consultez Stratégies d’accès aux données.

Bien démarrer

Pour effectuer l’activité Copie avec un pipeline, vous pouvez vous servir de l’un des outils ou kits SDK suivants :

Créer un service lié SQL Server à l’aide de l’interface utilisateur

Utilisez les étapes suivantes pour créer un service lié SQL Server dans l’interface utilisateur du portail Azure.

  1. Accédez à l’onglet Gérer dans votre espace de travail Azure Data Factory ou Synapse et sélectionnez Services liés, puis cliquez sur Nouveau :

  2. Recherchez SQL et sélectionnez le connecteur SQL Server.

    Capture d’écran du connecteur SQL Server.

  3. Configurez les informations du service, testez la connexion et créez le nouveau service lié.

    Capture d’écran de la configuration d’un service lié SQL Server.

Informations de configuration des connecteurs

Les sections suivantes fournissent des informations sur les propriétés utilisées pour définir des entités de pipelines Data Factory et Synapse propres au connecteur de base de données SQL Server.

Propriétés du service lié

Ce connecteur SQL server prend en charge les types d’authentification suivants. Pour plus d’informations, consultez les sections correspondantes.

Conseil

Si vous rencontrez une erreur avec le code d’erreur « UserErrorFailedToConnectToSqlServer » et un message tel que « La limite de session de la base de données XXX a été atteinte », ajoutez Pooling=false à votre chaîne de connexion, puis réessayez.

Authentification SQL

Pour utiliser l’authentification par SQL, les propriétés suivantes sont prises en charge :

Propriété Description Obligatoire
type La propriété type doit être définie sur SqlServer. Oui
connectionString Spécifiez les informations connectionStringnécessaires pour se connecter à la base de données SQL server. Spécifiez un nom de connexion comme nom d’utilisateur et vérifiez que la base de données que vous souhaitez connecter est mappée à cette connexion. Consultez les exemples suivants. Oui
mot de passe Si vous voulez définir un mot de passe dans Azure Key Vault, il vous faut extraire la configuration password de la chaîne de connexion. Pour plus d’informations, consultez l’exemple JSON figurant après le tableau et l’article Stocker des informations d’identification dans Azure Key Vault. Non
alwaysEncryptedSettings Spécifiez les informations alwaysencryptedsettings nécessaires pour permettre à Always Encrypted de protéger les données sensibles stockées dans SQL Server à l’aide d’une identité managée ou d’un principal de service. Pour plus d’informations, consultez l’exemple JSON figurant après le tableau et la section Utilisation d’Always Encrypted. S’il n’est pas spécifié, le paramètre par défaut Always Encrypted est désactivé. Non
connectVia Ce runtime d'intégration permet de se connecter au magasin de données. Pour plus d’informations, consultez la section Conditions préalables. À défaut de spécification, l’Azure Integration Runtime par défaut est utilisé. Non

Exemple : utilisez l’authentification SQL

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Exemple : utilisez l’authentification SQL avec un mot de passe dans Azure Key Vault

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;",
            "password": { 
                "type": "AzureKeyVaultSecret", 
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>", 
                    "type": "LinkedServiceReference" 
                }, 
                "secretName": "<secretName>" 
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Exemple : utiliser Always Encrypted

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
        },
        "alwaysEncryptedSettings": {
            "alwaysEncryptedAkvAuthType": "ServicePrincipal",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalKey": {
                "type": "SecureString",
                "value": "<service principal key>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Authentification Windows

Pour utiliser l’authentification Windows, les propriétés suivantes sont prises en charge :

Propriété Description Obligatoire
type La propriété type doit être définie sur SqlServer. Oui
connectionString Spécifiez les informations connectionStringnécessaires pour se connecter à la base de données SQL server. Consultez les exemples suivants.
userName Indique un nom d'utilisateur. Exemple : nom-domaine\nom-utilisateur. Oui
mot de passe Spécifiez un mot de passe pour le compte d’utilisateur que vous avez spécifié pour le nom d’utilisateur. Marquez ce champ comme SecureString pour le stocker en toute sécurité. Vous pouvez également référencer un secret stocké dans Azure Key Vault. Oui
alwaysEncryptedSettings Spécifiez les informations alwaysencryptedsettings nécessaires pour permettre à Always Encrypted de protéger les données sensibles stockées dans SQL Server à l’aide d’une identité managée ou d’un principal de service. Pour plus d’informations, consultez la section Utilisation d’Always Encrypted. S’il n’est pas spécifié, le paramètre par défaut Always Encrypted est désactivé. Non
connectVia Ce runtime d'intégration permet de se connecter au magasin de données. Pour plus d’informations, consultez la section Conditions préalables. À défaut de spécification, l’Azure Integration Runtime par défaut est utilisé. Non

Notes

L’authentification Windows n’est pas prise en charge dans le flux de données.

Exemple : utiliser l’authentification Windows

{
    "name": "SqlServerLinkedService",
    "properties": {
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=True;",
            "userName": "<domain\\username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Exemple : utilisez l’authentification Windows avec un mot de passe dans Azure Key Vault

{
    "name": "SqlServerLinkedService",
    "properties": {
        "annotations": [],
        "type": "SqlServer",
        "typeProperties": {
            "connectionString": "Data Source=<servername>\\<instance name if using named instance>;Initial Catalog=<databasename>;Integrated Security=True;",
            "userName": "<domain\\username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": {
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                },
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Propriétés du jeu de données

Pour obtenir la liste complète des sections et propriétés disponibles pour la définition de jeux de données, consultez l’article sur les jeux de données. Cette section fournit une liste des propriétés prises en charge par le jeu de données SQL Server.

Pour copier des données depuis et vers la base de données SQL Server, les propriétés suivantes sont prises en charge :

Propriété Description Obligatoire
type La propriété type du jeu de données doit être définie sur SqlServerTable. Oui
schéma Nom du schéma. Non pour Source, Oui pour Récepteur
table Nom de la table/vue. Non pour Source, Oui pour Récepteur
tableName Nom de la table/vue avec schéma. Cette propriété est prise en charge pour la compatibilité descendante. Pour les nouvelles charges de travail, utilisez schema et table. Non pour Source, Oui pour Récepteur

Exemple

{
    "name": "SQLServerDataset",
    "properties":
    {
        "type": "SqlServerTable",
        "linkedServiceName": {
            "referenceName": "<SQL Server linked service name>",
            "type": "LinkedServiceReference"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "typeProperties": {
            "schema": "<schema_name>",
            "table": "<table_name>"
        }
    }
}

Propriétés de l’activité de copie

Pour obtenir la liste complète des sections et des propriétés disponibles pour la définition des activités, consultez l'article Pipelines. Cette section fournit une liste de propriétés prises en charge par la source et le récepteur SQL Server.

SQL Server en tant que source

Conseil

Pour savoir comment charger efficacement des données à partir de SQL Server à l’aide du partitionnement, consultez Copier en parallèle à partir de SQL Server.

Pour copier des données à partir de SQL Server, définissez SqlSource comme type source dans l’activité de copie. Les propriétés prises en charge dans la section source de l'activité de copie sont les suivantes :

Propriété Description Obligatoire
type La propriété type de la source de l'activité de copie doit être définie sur SqlSource. Oui
sqlReaderQuery Utiliser la requête SQL personnalisée pour lire les données. par exemple select * from MyTable. Non
sqlReaderStoredProcedureName Cette propriété est le nom de la procédure stockée qui lit les données dans la table source. La dernière instruction SQL doit être une instruction SELECT dans la procédure stockée. Non
storedProcedureParameters Ces paramètres concernent la procédure stockée.
Les valeurs autorisées sont des paires de noms ou de valeurs. Les noms et la casse des paramètres doivent correspondre aux noms et à la casse des paramètres de procédure stockée.
Non
isolationLevel Spécifie le comportement de verrouillage des transactions pour la source SQL. Les valeurs autorisées sont les suivantes : ReadCommitted, ReadUncommitted, RepeatableRead, Serializable, Snapshot. S’il n’est pas spécifié, le niveau d’isolation par défaut de la base de données est utilisé. Pour plus d’informations, consultez ce document. Non
partitionOptions Spécifie les options de partitionnement des données utilisées pour charger des données à partir de SQL Server.
Les valeurs autorisées sont les suivantes : None (valeur par défaut), PhysicalPartitionsOfTable et DynamicRange.
Lorsqu’une option de partition est activée (donc, autre que None), le degré de parallélisme pour charger simultanément des données à partir de SQL Server est contrôlé par le paramètre parallelCopies sur l’activité de copie.
Non
partitionSettings Spécifiez le groupe de paramètres pour le partitionnement des données.
S’applique lorsque l’option de partitionnement n’est pas None.
Non
Sous partitionSettings :
partitionColumnName Spécifiez le nom de la colonne source en type entier ou date/DateHeure (int, smallint, bigint, date, smalldatetime, datetime, datetime2 ou datetimeoffset) qu’utilisera le partitionnement par plages de valeurs pour la copie en parallèle. S’il n’est pas spécifié, l’index ou la clé primaire de la table seront automatiquement détectés et utilisés en tant que colonne de partition.
S’applique lorsque l’option de partitionnement est DynamicRange. Si vous utilisez une requête pour récupérer des données sources, utilisez ?AdfDynamicRangePartitionCondition dans la clause WHERE. Pour obtenir un exemple, consultez la section Copier en parallèle à partir de la base de données SQL.
Non
partitionUpperBound Valeur maximale de la colonne de partition pour le fractionnement de la plage de partition. Cette valeur est utilisée pour décider du stride de la partition, et non pour filtrer les lignes de la table. Toutes les lignes de la table ou du résultat de la requête seront partitionnées et copiées. Si la valeur n’est pas spécifiée, l’activité de copie la détecte automatiquement.
S’applique lorsque l’option de partitionnement est DynamicRange. Pour obtenir un exemple, consultez la section Copier en parallèle à partir de la base de données SQL.
Non
partitionLowerBound Valeur minimale de la colonne de partition pour le fractionnement de la plage de partition. Cette valeur est utilisée pour décider du stride de la partition, et non pour filtrer les lignes de la table. Toutes les lignes de la table ou du résultat de la requête seront partitionnées et copiées. Si la valeur n’est pas spécifiée, l’activité de copie la détecte automatiquement.
S’applique lorsque l’option de partitionnement est DynamicRange. Pour obtenir un exemple, consultez la section Copier en parallèle à partir de la base de données SQL.
Non

Notez les points suivants :

  • Si sqlReaderQuery est spécifié pour SqlSource, l’activité de copie exécute cette requête sur la source SQL Server pour obtenir les données. Vous pouvez également spécifier une procédure stockée en spécifiant sqlReaderStoredProcedureName et storedProcedureParameters si la procédure stockée accepte des paramètres.
  • Lorsque vous utilisez une procédure stockée dans la source pour récupérer des données, sachez que si votre procédure stockée est conçue pour renvoyer un schéma différent quand une valeur de paramètre différente est entrée, vous risquez d’échouer ou d’obtenir un résultat inattendu lors de l’importation d’un schéma à partir de l’interface utilisateur ou lors de la copie de données dans la base de données SQL avec création de table automatique.

Exemple : Utiliser une requête SQL

"activities":[
    {
        "name": "CopyFromSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Server input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Exemple : Utilisation d'une procédure stockée

"activities":[
    {
        "name": "CopyFromSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Server input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlSource",
                "sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
                "storedProcedureParameters": {
                    "stringData": { "value": "str3" },
                    "identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Définition de la procédure stockée

CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
    @stringData varchar(20),
    @identifier int
)
AS
SET NOCOUNT ON;
BEGIN
    select *
    from dbo.UnitTestSrcTable
    where dbo.UnitTestSrcTable.stringData != stringData
    and dbo.UnitTestSrcTable.identifier != identifier
END
GO

SQL Server en tant que récepteur

Conseil

Pour en savoir plus sur les bonnes pratiques, les configurations et les comportements d’écriture pris en charge, consultez l’article Bonnes pratiques de chargement de données dans SQL Server.

Pour copier des données vers SQL Server, définissez SqlSink comme type de récepteur dans l’activité de copie. Les propriétés prises en charge dans la section récepteur de l'activité de copie sont les suivantes :

Propriété Description Obligatoire
type La propriété type du récepteur de l'activité de copie doit être définie sur SqlSink. Oui
preCopyScript Cette propriété spécifie une requête SQL que l’activité de copie doit exécuter avant l’écriture de données dans SQL Server. Elle n'est appelée qu'une seule fois par copie. Vous pouvez utiliser cette propriété pour nettoyer des données préchargées. Non
tableOption Spécifie si la table du récepteur doit être créée automatiquement si elle n’existe pas en fonction du schéma source. La création automatique de la table n’est pas prise en charge quand le récepteur spécifie une procédure stockée. Les valeurs autorisées sont none (par défaut) et autoCreate. Non
sqlWriterStoredProcedureName Nom de la procédure stockée qui définit comment appliquer des données sources dans une table cible.
Cette procédure stockée est appelée par lot. Pour les opérations qui ne s’exécutent qu’une seule fois et qui n’ont rien à voir avec les données sources (par exemple, supprimer ou tronquer), utilisez la propriété preCopyScript.
Voir l’exemple dans la section Appel d’une procédure stockée à partir d’un récepteur SQL.
Non
storedProcedureTableTypeParameterName Nom du paramètre du type de table spécifié dans la procédure stockée. Non
sqlWriterTableType Nom du type de table à utiliser dans la procédure stockée. L'activité de copie rend les données déplacées disponibles dans une table temporaire avec ce type de table. Le code de procédure stockée peut ensuite fusionner les données copiées avec les données existantes. Non
storedProcedureParameters Paramètres de la procédure stockée.
Les valeurs autorisées sont des paires de noms et de valeurs. Les noms et la casse des paramètres doivent correspondre aux noms et à la casse des paramètres de la procédure stockée.
Non
writeBatchSize Nombre de lignes à insérer dans la table SQL par lot.
Les valeurs autorisées sont des entiers pour le nombre de lignes. Par défaut, le service détermine de façon dynamique la taille de lot appropriée selon la taille de ligne.
Non
writeBatchTimeout Le temps d’attente pour que l’opération d’insertion, d’insertion ascendante et de procédure stockée se termine avant l’expiration du délai.
Les valeurs autorisées sont celles qui expriment un intervalle de temps. Exemple : « 00:30:00 » pour 30 minutes. Si aucune valeur n’est spécifiée, le délai d’expiration est par défaut « 00:30:00 ».
Non
 maxConcurrentConnections La limite supérieure de connexions simultanées établies au magasin de données pendant l’exécution de l’activité. Spécifiez une valeur uniquement lorsque vous souhaitez limiter les connexions simultanées.  Aucune
WriteBehavior Spécifiez le comportement d’écriture pour l’activité de copie afin de charger des données dans une base de données SQL Server.
La valeur autorisée est Insert ou Upsert. Par défaut, le service utilise Insert pour charger des données.
Non
upsertSettings Spécifiez le groupe de paramètres pour le comportement d’écriture.
S’applique quand l’option WriteBehavior a la valeur Upsert.
Non
Sous upsertSettings :
useTempDB Indiquez si vous voulez utiliser une table temporaire globale ou une table physique en tant que table temporaire pour faire un upsert.
Par défaut, le service utilise une table temporaire globale en tant que table intermédiaire. La valeur est true.
Non
interimSchemaName Spécifiez le schéma intermédiaire utilisé pour créer la table intermédiaire en cas d’utilisation d’une table physique. Remarque : L’utilisateur a besoin d’une autorisation de créer et supprimer une table. Par défaut, la table intermédiaire partage le même schéma que la table du récepteur.
S’applique quand l’option useTempDB a la valeur False.
Non
clés Spécifiez les noms de colonne à des fins d’identification unique des lignes. Vous pouvez utiliser une seule clé ou une série de clés. Si la valeur n’est pas spécifiée, la clé primaire est utilisée. Non

Exemple 1 : Ajout de données

"activities":[
    {
        "name": "CopyToSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Server output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlSink",
                "tableOption": "autoCreate",
                "writeBatchSize": 100000
            }
        }
    }
]

Exemple 2 : Appeler une procédure stockée pendant la copie

Pour en savoir plus, consultez Appel d'une procédure stockée à partir d'un récepteur SQL.

"activities":[
    {
        "name": "CopyToSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Server output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlSink",
                "sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
                "storedProcedureTableTypeParameterName": "MyTable",
                "sqlWriterTableType": "MyTableType",
                "storedProcedureParameters": {
                    "identifier": { "value": "1", "type": "Int" },
                    "stringData": { "value": "str1" }
                }
            }
        }
    }
]

Exemple 3 : Faire un upsert des données

"activities":[
    {
        "name": "CopyToSQLServer",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Server output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlSink",
                "tableOption": "autoCreate",
                "writeBehavior": "upsert",
                "upsertSettings": {
                    "useTempDB": true,
                    "keys": [
                        "<column name>"
                    ]
                },
            }
        }
    }
]

Copier en parallèle à partir de la base de données SQL

Le connecteur SQL Server dans l’activité de copie propose un partitionnement de données intégré pour copier des données en parallèle. Vous trouverez des options de partitionnement de données dans l’onglet Source de l’activité de copie.

Capture d’écran représentant les options de partition

Lorsque vous activez la copie partitionnée, l’activité de copie exécute des requêtes en parallèle sur votre source SQL Server pour charger des données par partitions. Le degré de parallélisme est contrôlé via le paramètre parallelCopies sur l’activité de copie. Par exemple, si vous définissez parallelCopies sur la valeur quatre, le service génère et exécute simultanément quatre requêtes selon l’option de partition et les paramètres que vous avez spécifiés, chacune récupérant une partie des données de votre serveur SQL.

Il vous est recommandé d’activer la copie en parallèle avec partitionnement des données notamment lorsque vous chargez une grande quantité de données à partir de votre serveur SQL. Voici quelques suggestions de configurations pour différents scénarios. Lors de la copie de données dans un magasin de données basé sur un fichier, il est recommandé d’écrire les données dans un dossier sous la forme de plusieurs fichiers (spécifiez uniquement le nom du dossier). Les performances seront meilleures qu’avec l’écriture de données dans un seul fichier.

Scénario Paramètres suggérés
Chargement complet à partir d’une table volumineuse, avec des partitions physiques. Option de partition : Partitions physiques de la table.

Pendant l’exécution, le service détecte automatiquement les partitions physiques et copie les données par partition.

Pour vérifier si votre table possède, ou non, une partition physique, vous pouvez vous reporter à cette requête.
Chargement complet d’une table volumineuse, sans partitions physiques, avec une colonne d’entiers ou DateHeure pour le partitionnement des données. Options de partition : Partition dynamique par spécification de plages de valeurs.
Colonne de partition (facultatif) : Spécifiez la colonne utilisée pour partitionner les données. Si la valeur n’est pas spécifiée, la colonne de la clé primaire est utilisée.
Limite supérieure de partition et limite inférieure de partition (facultatif) : Spécifiez si vous souhaitez déterminer le stride de la partition. Cela ne permet pas de filtrer les lignes de la table ; toutes les lignes de la table sont partitionnées et copiées. S’il n’est pas spécifié, l’activité Copy détecte automatiquement les valeurs et peut prendre beaucoup de temps en fonction des valeurs MIN et MAX. Il est recommandé de fournir une limite supérieure et une limite inférieure.

Par exemple, si les valeurs de la colonne de partition « ID » sont comprises entre 1 et 100, et que vous définissez la limite inférieure à 20 et la limite supérieure à 80, avec la copie parallèle à 4, le service récupère des données en fonction de 4 partitions, (ID des plages <=20, [21, 50], [51, 80] et >=81, respectivement).
Chargement d’une grande quantité de données à l’aide d’une requête personnalisée, sans partitions physiques, et avec une colonne d’entiers ou de date/DateHeure pour le partitionnement des données. Options de partition : Partition dynamique par spécification de plages de valeurs.
Requête: SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>.
Colonne de partition : Spécifiez la colonne utilisée pour partitionner les données.
Limite supérieure de partition et limite inférieure de partition (facultatif) : Spécifiez si vous souhaitez déterminer le stride de la partition. Cela ne permet pas de filtrer les lignes de la table ; toutes les lignes du résultat de la requête sont partitionnées et copiées. Si la valeur n’est pas spécifiée, l’activité de copie la détecte automatiquement.

Lors de l’exécution, le service remplace ?AdfRangePartitionColumnName par le nom réel de la colonne et les plages de valeurs de chaque partition et les envoie à SQL Server.
Par exemple, si les valeurs de la colonne de partition « ID » sont comprises entre 1 et 100, et que vous définissez la limite inférieure à 20 et la limite supérieure à 80, avec la copie parallèle à 4, le service récupère des données en fonction de 4 partitions (ID des plages <=20, [21, 50], [51, 80] et >=81, respectivement).

Voici d’autres exemples de requêtes pour différents scénarios :
1. Interroger l’ensemble de la table :
SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition
2. Interroger une table avec une sélection de colonnes et des filtres de la clause WHERE supplémentaires :
SELECT <column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
3. Effectuer une requête avec des sous-requêtes :
SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
4. Effectuer une requête avec une partition dans une sous-requête :
SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition) AS T

Meilleures pratiques pour charger des données avec l’option de partition :

  1. Choisissez une colonne distinctive comme colonne de partition (p. ex. : clé primaire ou clé unique) pour éviter l’asymétrie des données.
  2. Si la table possède une partition intégrée, utilisez l’option de partition « Partitions physiques de la table » pour obtenir de meilleures performances.
  3. Si vous utilisez Azure Integration Runtime pour copier des données, vous pouvez définir des « unités d’intégration de données (DIU) » plus grandes (>4) pour utiliser davantage de ressources de calcul. Vérifiez les scénarios applicables ici.
  4. Le « degré de parallélisme de copie » contrôle le nombre de partitions : un nombre trop élevé nuit parfois aux performances. Il est recommandé de définir ce nombre selon (DIU ou nombre de nœuds d'IR auto-hébergé) * (2 à 4).

Exemple : chargement complet à partir d’une table volumineuse, avec des partitions physiques

"source": {
    "type": "SqlSource",
    "partitionOption": "PhysicalPartitionsOfTable"
}

Exemple : requête avec partition dynamique par spécification de plages de valeurs

"source": {
    "type": "SqlSource",
    "query": "SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>",
    "partitionOption": "DynamicRange",
    "partitionSettings": {
        "partitionColumnName": "<partition_column_name>",
        "partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
        "partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
    }
}

Exemple de requête pour vérifier une partition physique

SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id 
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id 
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id 
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id 
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id 
WHERE s.name='[your schema]' AND t.name = '[your table name]'

Si la table a une partition physique, vous voyez « HasPartition » avec la valeur « yes » (oui) comme suit.

Résultat d’une requête SQL

Bonnes pratiques de chargement de données dans SQL Server

Lors de la copie de données dans SQL Server, vous pouvez exiger un comportement d’écriture différent :

  • Append: Mes données sources contiennent uniquement de nouveaux enregistrements.
  • Upsert : Mes données sources contiennent à la fois des insertions et des mises à jour.
  • Remplacer : Je veux recharger la totalité de la table de dimension à chaque fois.
  • Écrire avec une logique personnalisée : J’ai besoin d’un traitement supplémentaire avant l’insertion finale dans la table de destination.

Consultez les sections correspondantes pour savoir comment effectuer des configurations et connaître les bonnes pratiques associées.

Ajout de données

L’ajout de données est le comportement par défaut de ce connecteur de récepteur SQL Server. Le service effectue une insertion en bloc pour écrire efficacement dans votre table. Vous pouvez configurer la source et le récepteur en conséquence dans l’activité de copie.

Effectuer un upsert de données

L’activité de copie prend maintenant en charge le chargement en mode natif des données dans une table temporaire de base de données, puis met à jour les données dans la table du récepteur s’il existe une clé ou insère les nouvelles données dans le cas contraire. Pour en savoir plus sur les paramètres upsert dans les activités de copie, consultez SQL Server en tant que récepteur.

Remplacer l’intégralité de la table

Vous pouvez configurer la propriété preCopyScript dans un récepteur d’activité de copie. Dans le cas présent, pour chaque activité Copy qui s’exécute, le service exécute d’abord le script. Il exécute ensuite la copie pour insérer les données. Par exemple, pour remplacer l’intégralité de la table par les données les plus récentes, spécifiez un script pour d’abord supprimer tous les enregistrements avant de charger en bloc les nouvelles données à partir de la source.

Écrire des données avec une logique personnalisée

Les étapes permettant d’écrire des données à l’aide d’une logique personnalisée sont semblables à celles décrites dans la section Effectuer un upsert de données. Quand vous devez appliquer un traitement supplémentaire avant l’insertion finale des données sources dans la table de destination, vous pouvez effectuer le chargement vers une table de mise en lots, puis appeler une activité de procédure stockée ou appeler une procédure stockée dans le récepteur de l’activité de copie pour appliquer les données.

Appel d'une procédure stockée à partir d'un récepteur SQL

Quand vous copiez des données dans une base de données SQL Server, vous pouvez également configurer et appeler une procédure stockée spécifiée par l’utilisateur avec des paramètres supplémentaires sur chaque lot de la table source. La fonction de procédure stockée tire parti des paramètres table. Notez que le service encapsule automatiquement la procédure stockée dans sa propre transaction. Par conséquent, toute transaction créée à l’intérieur de la procédure stockée devient une transaction imbriquée, ce qui peut avoir des implications pour la gestion des exceptions.

Vous pouvez utiliser une procédure stockée à la place des mécanismes de copie intégrée. Par exemple, quand vous souhaitez appliquer un traitement supplémentaire avant l’insertion finale de données sources dans la table de destination. Fusionner des colonnes, rechercher des valeurs supplémentaires et effectuer des insertions dans plusieurs tables sont des exemples de traitement supplémentaire.

L’exemple suivant montre comment utiliser une procédure stockée pour effectuer une opération upsert simple dans une table de la base de données SQL Server. Supposons que les données d’entrée et la table réceptrice Marketing ont trois colonnes : ProfileID, State et Category. Effectuez l’opération upsert basée sur la colonne ProfileID et appliquez-la uniquement à une catégorie spécifique appelée « ProductA ».

  1. Dans votre base de données, définissez le type de table avec le même nom que sqlWriterTableType. Le schéma du type de table doit être identique au schéma retourné par vos données d'entrée.

    CREATE TYPE [dbo].[MarketingType] AS TABLE(
        [ProfileID] [varchar](256) NOT NULL,
        [State] [varchar](256) NOT NULL,
        [Category] [varchar](256) NOT NULL
    )
    
  2. Dans votre base de données, définissez la procédure stockée portant le même nom que sqlWriterStoredProcedureName. Elle gère les données d’entrée à partir de la source que vous avez spécifiée et les fusionne dans la table de sortie. Le nom de paramètre du type de table de la procédure stockée doit être identique au tableName défini dans le jeu de données.

    CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
    AS
    BEGIN
    MERGE [dbo].[Marketing] AS target
    USING @Marketing AS source
    ON (target.ProfileID = source.ProfileID and target.Category = @category)
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT MATCHED THEN
        INSERT (ProfileID, State, Category)
        VALUES (source.ProfileID, source.State, source.Category);
    END
    
  3. Définissez la section Récepteur SQL de l’activité de copie comme suit :

    "sink": {
        "type": "SqlSink",
        "sqlWriterStoredProcedureName": "spOverwriteMarketing",
        "storedProcedureTableTypeParameterName": "Marketing",
        "sqlWriterTableType": "MarketingType",
        "storedProcedureParameters": {
            "category": {
                "value": "ProductA"
            }
        }
    }
    

Propriétés du mappage de flux de données

Lors de la transformation de données dans le flux de données de mappage, vous pouvez lire et écrire dans les tables de la base de données SQL Server. Pour plus d’informations, consultez la transformation de la source et la transformation du récepteur dans le flux de données de mappage.

Notes

Pour accéder à une instance SQL Server locale, vous devez utiliser le réseau virtuel managé de l’espace de travail Azure Data Factory ou Synapse via un point de terminaison privé. Pour connaître les étapes détaillées, reportez-vous à ce tutoriel.

Transformation de la source

Le tableau ci-dessous répertorie les propriétés prises en charge par la source SQL Server. Vous pouvez modifier ces propriétés sous l’onglet Options de la source.

Nom Description Obligatoire Valeurs autorisées Propriété du script de flux de données
Table de charge de travail Si vous sélectionnez Table comme entrée, le flux de données extrait toutes les données de la table spécifiée dans le jeu de données. Non - -
Requête Si vous sélectionnez Requête comme entrée, spécifiez une requête SQL pour extraire des données de la source, qui remplace toute table que vous spécifiez dans le jeu de données. L’utilisation de requêtes est un excellent moyen de réduire le nombre de lignes pour les tests ou les recherches.

La clause Order By n’est pas prise en charge, mais vous pouvez définir une instruction SELECT FROM complète. Vous pouvez également utiliser des fonctions de table définies par l’utilisateur. select * from udfGetData() est une fonction UDF dans SQL qui retourne une table que vous pouvez utiliser dans le flux de données.
Exemple de requête : Select * from MyTable where customerId > 1000 and customerId < 2000.
Non String query
Taille du lot Spécifiez la taille de lot que doivent avoir les lectures créées à partir d’un large volume de données. Non Integer batchSize
Niveau d’isolation Choisissez l’un des niveaux d’isolation suivants :
– Lecture validée.
– Lecture non validée (par défaut).
– Lecture renouvelable.
– Sérialisable.
– Aucun (ignorer le niveau d’isolation).
Non READ_COMMITTED
READ_UNCOMMITTED
REPEATABLE_READ
SERIALIZABLE
NONE
isolationLevel
Activer l’extraction incrémentielle Utilisez cette option pour indiquer à ADF de traiter seulement les lignes qui ont changé depuis la dernière exécution du pipeline. Non - -
Colonne de date incrémentielle Quand vous utilisez la fonctionnalité d’extraction incrémentielle, vous devez choisir la colonne date/heure que vous voulez utiliser comme filigrane dans votre table source. Non - -
Activer la capture native des changements de données (préversion) Utilisez cette option pour indiquer à ADF de traiter seulement les données delta capturées par la technologie de capture des changements de données SQL depuis la dernière exécution du pipeline. Avec cette option, les données delta, y compris l’insertion, la mise à jour et la suppression de lignes, sont chargées automatiquement sans utiliser de colonne de date incrémentielle. Vous devez activer la capture des changements de données sur SQL Server avant d’utiliser cette option dans ADF. Pour plus d’informations sur cette option dans ADF, consultez la capture native des changements de données. Non - -
Commencer la lecture depuis le début La définition de cette option avec l’extraction incrémentielle indique à ADF de lire toutes les lignes lors de la première exécution d’un pipeline qui a l’extraction incrémentielle activée. Non - -

Conseil

L’expression de table commune (CTE) dans SQL n’est pas prise en charge dans le mode Requête du flux de données de mappage, car le prérequis pour utiliser ce mode est que les requêtes puissent être utilisées dans la clause FROM de requête SQL, ce que ne peuvent pas faire les expressions CTE. Pour utiliser des expressions CTE, vous devez créer une procédure stockée avec la requête suivante :

CREATE PROC CTESP @query nvarchar(max)
AS
BEGIN
EXECUTE sp_executesql @query;
END

Utilisez ensuite le mode Procédure stockée dans la transformation source du flux de données de mappage et définissez @query comme dans l’exemple with CTE as (select 'test' as a) select * from CTE. Vous pouvez ensuite utiliser les expressions CTE comme vous le voulez.

Exemple de script source SQL Server

Lorsque vous utilisez le type de source SQL Server, le script de flux de données associé est le suivant :

source(allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    query: 'select * from MYTABLE',
    format: 'query') ~> SQLSource

Transformation du récepteur

Le tableau ci-dessous répertorie les propriétés prises en charge par le récepteur SQL Server. Vous pouvez modifier ces propriétés sous l’onglet Options du récepteur.

Name Description Obligatoire Valeurs autorisées Propriété du script de flux de données
Mettre à jour la méthode Spécifiez les opérations autorisées sur la destination de votre base de données. Par défaut, seules les insertions sont autorisées.
Pour mettre à jour, effectuer un upsert ou supprimer des lignes, une transformation de modification de ligne est requise afin de baliser les lignes relatives à ces actions.
Oui true ou false deletable
insertable
updateable
upsertable
Colonnes clés Pour les mises à jour, les opérations upsert et les suppressions, une ou plusieurs colonnes clés doivent être définies afin de déterminer la ligne à modifier.
Le nom de colonne que vous choisissez comme clé sera utilisé dans le cadre des opérations suivantes de mise à jour, d’upsert et de suppression. Vous devez donc choisir une colonne qui existe dans le mappage du récepteur.
Non Array clés
Ignorer l’écriture des colonnes clés Si vous ne souhaitez pas écrire la valeur dans la colonne clé, sélectionnez « Ignorer l’écriture des colonnes clés ». Non true ou false skipKeyWrites
Action table Détermine si toutes les lignes de la table de destination doivent être recréées ou supprimées avant l’écriture.
- Aucun : Aucune action ne sera effectuée sur la table.
- Recréer : La table sera supprimée et recréée. Obligatoire en cas de création dynamique d’une nouvelle table.
- Tronquer : Toutes les lignes de la table cible seront supprimées.
Non true ou false recreate
truncate
Taille du lot Spécifiez le nombre de lignes écrites dans chaque lot. Les plus grandes tailles de lot améliorent la compression et l’optimisation de la mémoire, mais risquent de lever des exceptions de type mémoire insuffisante lors de la mise en cache des données. Non Integer batchSize
Pré et post-scripts SQL Spécifiez des scripts SQL multilignes qui s’exécutent avant (prétraitement) et après (post-traitement) l’écriture de données dans votre base de données de réception. Non String preSQLs
postSQLs

Conseil

  1. Il est recommandé de diviser les scripts de commandes par lot uniques contenant plusieurs commandes en plusieurs lots.
  2. Seules des instructions DDL (Data Definition Language, langage de définition de données) et DML (Data Manipulation Language, langage de manipulation de données) qui retournent un seul nombre de mises à jour peuvent être exécutées dans un lot. Pour en savoir plus, consultez Exécution d’opérations par lot

Exemples de script de récepteur SQL Server

Lorsque vous utilisez le type de récepteur SQL Server, le script de flux de données associé est le suivant :

IncomingStream sink(allowSchemaDrift: true,
    validateSchema: false,
    deletable:false,
    insertable:true,
    updateable:true,
    upsertable:true,
    keys:['keyColumn'],
    format: 'table',
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> SQLSink

Mappage de type de données pour SQL Server

Quand vous copiez des données depuis et vers SQL Server, les mappages suivants sont utilisés de types de données SQL Server en types de données intermédiaires Azure Data Factory. Les pipelines Synapse, qui implémentent Data Factory, utilisent les mêmes mappages. Pour découvrir comment l’activité de copie mappe le schéma et le type de données la source au récepteur, consultez Mappage de schéma dans l’activité de copie.

Type de données SQL Server Type de données intermédiaires d’Azure Data Factory
bigint Int64
binary Byte[]
bit Boolean
char String, Char[]
Date DateTime
Datetime DateTime
datetime2 DateTime
Datetimeoffset DateTimeOffset
Decimal Decimal
FILESTREAM attribute (varbinary(max)) Byte[]
Float Double
image Byte[]
int Int32
money Decimal
NCHAR String, Char[]
ntext String, Char[]
numeric Decimal
NVARCHAR String, Char[]
real Unique
rowversion Byte[]
smalldatetime DateTime
SMALLINT Int16
SMALLMONEY Decimal
sql_variant Object
text String, Char[]
time TimeSpan
timestamp Byte[]
TINYINT Int16
UNIQUEIDENTIFIER Guid
varbinary Byte[]
varchar String, Char[]
Xml String

Notes

Pour les types de données mappés avec le type intermédiaire Decimal,l’activité de copie prend actuellement en charge une précision maximale de 28. Si vous disposez de données qui nécessitent une précision supérieure à 28, pensez à les convertir en chaîne dans une requête SQL.

Quand vous copiez des données depuis SQL Server en utilisant Azure Data Factory, le type de données bit est mappé au type de données intermédiaire booléen. Si vous avez des données qui doivent être conservées comme type de données bit, utilisez des requêtes avec une fonction T-SQL CAST ou CONVERT.

Propriétés de l’activité Lookup

Pour en savoir plus sur les propriétés, consultez Activité Lookup.

Propriétés de l’activité GetMetadata

Pour en savoir plus sur les propriétés, consultez Activité GetMetadata.

Utilisation d’Always Encrypted

Quand vous copiez des données depuis/vers SQL Server avec Always Encrypted, effectuez les étapes suivantes :

  1. Stockez la valeur Clé principale de colonne (CMK) dans un coffre Azure Key Vault. En savoir plus sur la configuration d’Always Encrypted à l’aide d’Azure Key Vault

  2. Vérifiez que le coffre de clés où la valeur Clé principale de colonne (CMK) est stockée est totalement accessible. Consultez cet article pour connaître les autorisations requises.

  3. Créez un service lié pour vous connecter à votre base de données SQL et activez la fonction « Always Encrypted » en utilisant une identité managée ou un principal de service.

Notes

SQL Server Always Encrypted prend en charge les scénarios suivants :

  1. Les magasins de données sources ou récepteurs utilisent l’identité managée ou le principal de service comme type d’authentification du fournisseur de clés.
  2. Les magasins de données sources et récepteurs utilisent l’identité managée comme type d’authentification du fournisseur de clés.
  3. Les magasins de données sources et récepteurs utilisent le même principal de service que le type d’authentification du fournisseur de clés.

Notes

Actuellement, SQL Server Always Encrypted est pris en charge uniquement pour la transformation de la source dans les flux de données de mappage.

Capture native des changements de données

Azure Data Factory peut prendre en charge les fonctionnalités de capture native des changements de données pour SQL Server, Azure SQL DB et Azure SQL MI. Les changements de données, notamment l’insertion, la mise à jour et la suppression de lignes dans les magasins SQL, peuvent être détectés et extraits automatiquement par le flux de données de mappage ADF. Avec l’expérience sans code du flux de données de mappage, les utilisateurs peuvent facilement réaliser un scénario de réplication de données à partir de magasins SQL en ajoutant une base de données comme magasin de destination. De plus, les utilisateurs peuvent également composer une logique de transformation de données intermédiaire pour obtenir un scénario ETL incrémentiel à partir de magasins SQL.

Veillez à ne pas changer le nom du pipeline et de l’activité, afin que le point de contrôle puisse être enregistré par ADF pour que vous puissiez obtenir automatiquement les données modifiées à partir de la dernière exécution. Si vous changez le nom de votre pipeline ou de votre activité, le point de contrôle est réinitialisé, ce qui vous oblige à recommencer depuis le début ou à récupérer les prochaines modifications lors de la prochaine exécution. Pour changer le nom du pipeline ou le nom de l’activité en gardant le point de contrôle afin d’obtenir automatiquement les changements de données de la dernière exécution, utilisez votre propre clé de point de contrôle dans l’activité de flux de données.

Lorsque vous déboguez le pipeline, cette fonctionnalité fonctionne de la même façon. Sachez que le point de contrôle est réinitialisé lorsque vous actualisez votre navigateur lors de l’exécution du débogage. Une fois que vous êtes satisfait du résultat du pipeline de l’exécution du débogage, vous pouvez publier et déclencher le pipeline. Au moment où vous déclenchez pour la première fois votre pipeline publié, il redémarre automatiquement à partir du début ou obtient les modifications à partir de ce moment-là.

Dans la section de monitoring, vous avez toujours la possibilité de réexécuter un pipeline. Dans ce cas, les données modifiées sont toujours capturées à partir du point de contrôle précédent de votre exécution de pipeline sélectionnée.

Exemple 1 :

Quand vous chaînez directement une transformation de source référencée dans un jeu de données utilisant CDC SQL à une transformation de récepteur référencée dans une base de données dans un flux de données de mappage, les changements qui se produisent dans la source SQL sont automatiquement appliqués à la base de données cible, pour obtenir facilement un scénario de réplication de données entre les bases de données. Vous pouvez utiliser la méthode de mise à jour dans la transformation du récepteur pour sélectionner si vous voulez autoriser l’insertion, la mise à jour ou la suppression sur la base de données cible. L’exemple de script dans le flux de données de mappage est décrit ci-dessous.

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:true,
	keys:['id'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

Exemple 2 :

Si vous voulez activer le scénario ETL au lieu de la réplication de données entre bases de données via CDC SQL, vous pouvez utiliser des expressions dans le flux de données de mappage, notamment isInsert(1), isUpdate(1) et isDelete(1) pour différencier les lignes qui ont différents types d’opération. Voici un exemple de script de flux de données de mappage qui dérive une colonne avec la valeur : 1 pour indiquer des lignes insérées, 2 pour indiquer des lignes mises à jour et 3 pour indiquer des lignes supprimées dans les transformations en aval afin de traiter les données delta.

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
derivedColumn1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> sink1

Limitation connue :

Résoudre les problèmes de connexion

  1. Configurez votre instance SQL Server pour qu’elle accepte les connexions à distance. Démarrez SQL Server Management Studio, cliquez avec le bouton droit sur Serveur, puis sélectionnez Propriétés. Sélectionnez Connexions dans la liste, puis cochez la case Autoriser les accès distants à ce serveur.

    Activer des connexions à distance

    Pour obtenir des instructions détaillées, consultez Configurer l’option de configuration du serveur d’accès à distance.

  2. Démarrez le Gestionnaire de configuration SQL Server. Développez Configuration du réseau SQL Server pour l’instance souhaitée, puis sélectionnez Protocoles pour MSSQLSERVER. Des protocoles s’affichent dans le volet droit. Activez TCP/IP en cliquant avec le bouton droit sur TCP/IP, puis en sélectionnant Activer.

    Activer TCP/IP

    Pour obtenir des informations supplémentaires et découvrir d’autres façons d’activer le protocole TCP/IP, consultez Activer ou désactiver un protocole réseau de serveur.

  3. Dans la même fenêtre, double-cliquez sur TCP/IP pour lancer la fenêtre des propriétés TCP/IP.

  4. Allez sous l’onglet Adresses IP . Faites défiler l’écran vers le bas jusqu’à la section IPAll. Notez le port TCP. La valeur par défaut est 1433.

  5. Créez une règle de Pare-feu Windows sur l’ordinateur pour autoriser le trafic à entrer par ce port.

  6. Vérifiez la connexion : Pour vous connecter à SQL Server en utilisant un nom complet, utilisez SQL Server Management Studio à partir d’un autre ordinateur. par exemple "<machine>.<domain>.corp.<company>.com,1433".

Consultez les magasins de données pris en charge pour obtenir la liste des sources et magasins de données pris en charge en tant que récepteurs par l’activité de copie.