Adatok növekményes betöltése Azure SQL Database-ból Azure Blob Storage-ba a PowerShell használatával

Vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp

Data Factory a Microsoft Fabric a Azure Data Factory következő generációja, egyszerűbb architektúrával, beépített AI-vel és új funkciókkal. Ha még nem ismerkedik az adatintegrációval, kezdje a Fabric Data Factoryvel. A meglévő ADF-számítási feladatok frissíthetők Fabric használatával, hogy elérjék az adatkutatás, a valós idejű elemzés és a jelentéskészítés új képességeit.

Ebben az oktatóanyagban az Azure Data Factory használatával létrehoz egy adatfolyamot, amely az Azure SQL Database táblából betölti a delta adatokat az Azure Blob Storage-ba.

Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:

  • Az adatraktár előkészítése a küszöbértékek tárolására.
  • Adat-előállító létrehozása
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása.
  • A folyamat futtatása.
  • A csővezeték futásának monitorozása.

Áttekintés

Itt látható a megoldás összefoglaló jellegű ábrája:

Adatok növekményes betöltése

Az alábbiak a megoldás kialakításának leglényegesebb lépései:

  1. Válassza ki a vízjel oszlopot. Jelölje ki az adatforrás egyik oszlopát, amelynek alapján az új és a frissített rekordok minden egyes futtatáskor leválogathatók. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.

  2. Egy adatraktár előkészítése a küszöbértékek tárolására.
    Ebben az oktatóanyagban a küszöbértékeket egy SQL-adatbázisban tároljuk.

  3. Hozzon létre egy pipeline-t a következő munkafolyamattal:

    Ebben a megoldásban a folyamat a következő tevékenységeket tartalmazza:

    • Két keresési tevékenység létrehozása. Az első lekérdezési tevékenység az utolsó vízjel értéket kéri le. Használja a második lekérdezési tevékenységet az új vízjel érték lekéréséhez. Ezeket a vízjelértékeket a Copy tevékenység-nek adják át.
    • Hozzon létre egy másolási műveletet, amely sorokat másol a forrásadattárból, ahol a vízjel oszlop értéke nagyobb, mint a régi vízjel értéke, és kisebb vagy egyenlő, mint az új vízjel értéke. Ezután a delta adatokat a forrásadattárból új fájlként egy Blob Storage tárhelyre másolja.
    • Egy StoredProcedure tevékenység létrehozása, amely frissíti a küszöbértékeket a folyamat következő futtatásához.

Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy free fiókot.

Előfeltételek

Megjegyzés

Javasoljuk, hogy az Azure Az PowerShell-modult használja a Azure használatához. Első lépésként lásd: Install Azure PowerShell. Az Az PowerShell-modulra való migrálásról az Migrate Azure PowerShell az AzureRM-ből az Az című témakörben olvashat.

  • Azure SQL Database. Ezt az adatbázist használjuk forrásadattárként. Ha nem rendelkezik adatbázissal Azure SQL Database, a létrehozás lépéseit a Adatbázis létrehozása a Azure SQL Database című témakörben találja.
  • Azure Storage. A blobtárolót használjuk majd fogadóadattárként. Ha még nem rendelkezik tárfiókkal, tekintse meg a tárfiók létrehozásának lépéseit ismertető cikket. Hozzon létre egy tárolót adftutorial néven.
  • Azure PowerShell. Kövesse az Installban található utasításokat, és konfigurálja a Azure PowerShell.

Adatforrástábla létrehozása az SQL-adatbázisban

  1. Nyissa meg a SQL Server Management Studio. A Kiszolgálókezelőben kattintson a jobb gombbal az adatbázisra, és válassza az Új lekérdezés elemet.

  2. Futtassa a következő SQL-parancsot az SQL-adatbázison egy tábla data_source_table néven, adatforrástárként történő létrehozásához:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    Az oktatóanyagban a LastModifytime oszlopot használjuk küszöboszlopként. Az adatforrástár adatai az alábbi táblázatban láthatók:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Egy másik tábla létrehozása az SQL-adatbázisban a felső küszöbértékek tárolására

  1. Futtassa a következő SQL-parancsot az SQL-adatbázison egy watermarktable nevű, a küszöbértékek tárolására szolgáló tábla létrehozásához:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Állítsa be a felső küszöb alapértelmezett értékét a forrásadattár táblanevével. Ebben az oktatóanyagban a tábla neve a következő: data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Tekintse át a következő tábla adatait: watermarktable.

    Select * from watermarktable
    

    Kimenet:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Tárolt eljárás létrehozása az SQL-adatbázisban

Az alábbi parancs futtatásával hozzon létre egy tárolt eljárást az SQL-adatbázisban:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Adat-előállító létrehozása

  1. Adjon meg egy olyan változót, amelyet később a PowerShell-parancsokban az erőforráscsoport neveként fog használni. Másolja a következő parancsszöveget a PowerShellbe, adja meg a Azure erőforráscsoport nevét dupla idézőjelek között, majd futtassa a parancsot. Például: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  2. Adjon meg egy változót az adat-előállító helyéhez.

    $location = "East US"
    
  3. A Azure erőforráscsoport létrehozásához futtassa a következő parancsot:

    New-AzResourceGroup $resourceGroupName $location
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  4. Adjon meg egy változót az adat-előállító nevéhez.

    Fontos

    Frissítse az adat-előállító nevét, hogy globálisan egyedi legyen. Például: ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Az adat-előállító létrehozásához futtassa a következő Set-AzDataFactoryV2 parancsmagot:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Vegye figyelembe az alábbiakat:

  • Az adat-előállító nevének globálisan egyedinek kell lennie. Ha a következő hibaüzenetet kapja, módosítsa a nevet, majd próbálkozzon újra:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Data Factory-példányok létrehozásához a Azure való bejelentkezéshez használt felhasználói fióknak közreműködői vagy tulajdonosi szerepkörök tagjának vagy a Azure-előfizetés rendszergazdájának kell lennie.

  • Azoknak a Azure régióknak a listájához, amelyekben a Data Factory jelenleg elérhető, jelölje ki az Önt érdeklő régiókat az alábbi lapon, majd bontsa ki a Analytics elemet a Data Factory: Régiók szerint elérhető termékek. Az adat-előállító által használt adattárak (Storage, SQL Database, Azure SQL Managed Instance stb.) és számításai (Azure HDInsight stb.) más régiókban is lehetnek.

Társított szolgáltatások létrehozása

Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a szakaszban társított szolgáltatásokat hoz létre a tárfiókhoz és az SQL Database-hez.

Storage-beli társított szolgáltatás létrehozása

  1. Hozzon létre egy AzureStorageLinkedService.json nevű JSON-fájlt a C:\ADF mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) A fájl mentése előtt cserélje le <accountName> a <accountKey> tárfiók nevét és kulcsát.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. A PowerShellben lépjen az ADF mappára.

  3. Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureStorageLinkedService társított szolgáltatás létrehozásához. A következő példában a ResourceGroupName és a DataFactoryName paraméter értékeit fogja megadni:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

SQL Database-beli társított szolgáltatás létrehozása

  1. Hozzon létre egy AzureSQLDatabaseLinkedService.json nevű JSON-fájlt a C:\ADF mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) A fájl mentése előtt cserélje le <a kiszolgáló nevét> és <az adatbázis nevét> a kiszolgáló és az adatbázis nevére. Az Azure SQL Servert úgy kell konfigurálnia, hogy hozzáférést adjon az adatgyár felügyelt identitásához.

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.windows.net,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. A PowerShellben lépjen az ADF mappára.

  3. Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureSQLDatabaseLinkedService társított szolgáltatás létrehozásához.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Adatkészletek létrehozása

Ebben a lépésben adatkészleteket hoz létre, amelyek a forrás és a fogadó adatait jelölik.

Forrásadatkészlet létrehozása

  1. Hozzon létre egy SourceDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    Ebben az oktatóanyagban a tábla neve a következő: data_source_table. Ha más nevű táblát használ, cserélje le azt.

  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SourceDataset adatkészlet létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Adatkimenet adatkészlet létrehozása

  1. Hozzon létre egy SinkDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Fontos

    Ez a kódrészlet feltételezi, hogy van egy blobtartály, amelynek neve adftutorial, a blobtárolóban. Ha még nem létezik, hozza létre a tárolót, vagy állítsa be egy meglévő tároló nevét. Az incrementalcopy kimeneti mappa automatikusan létrejön, ha még nem létezik a tárolóban. Ebben az oktatóanyagban a fájl nevét dinamikusan hozzuk létre a következő kifejezés használatával: @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SinkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Adatkészlet létrehozása egy vízjelhez

Ebben a lépésben egy adatkészletet hozunk létre a felső küszöbértékek tárolására.

  1. Hozzon létre egy WatermarkDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a WatermarkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Folyamat létrehozása

Ebben az oktatóanyagban létrehoz egy pipeline-t két "Lookup" tevékenységgel, egy "Copy tevékenység"-vel és egy "StoredProcedure activity"-val, amelyek egy pipeline-ba vannak láncolva.

  1. Hozzon létre egy IncrementalCopyPipeline.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Futtassa a Set-AzDataFactoryV2Pipeline parancsmagot az "IncrementalCopyPipeline" kötet létrehozásához.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Itt látható a minta kimenete:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

A folyamat futtatása

  1. Futtassa az IncrementalCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Ellenőrizze a folyamat állapotát a Get-AzDataFactoryV2ActivityRun parancsmag futtatásával, amíg az összes tevékenység sikeresen le nem fut. Cserélje le a helyőrzőket az Ön megfelelő időértékeire a RunStartedAfter és a RunStartedBefore paramétereknél. Ebben az oktatóanyagban a -RunStartedAfter "2017/09/14" és a -RunStartedBefore "2017/09/15" paramétereket használja.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Itt látható a minta kimenete:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

Az eredmények áttekintése

  1. A blobtárolóban (fogadótároló) láthatja, hogy az adatokat a SinkDatasetben meghatározott fájlba másolták. Ebben az oktatóanyagban a fájl neve: Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Nyissa meg a fájlt, és láthatja, hogy a benne lévő rekordok egyeznek az SQL-adatbázisban lévő adatokkal.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Ellenőrizze a watermarktable legutolsó értékét. Láthatja, hogy a vízjel értéke frissült.

    Select * from watermarktable
    

    Itt látható a minta kimenete:

    Táblanév WatermarkValue
    adat_forrás_tábla 2017-09-05 8:06:00.000

Adatok beszúrása az adatforrásba a delta adatbetöltés ellenőrzéséhez

  1. Szúrjon be új adatokat az SQL-adatbázisba (forrásadattár).

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    Az SQL-adatbázis a következő frissített adatokat tartalmazza:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Futtassa újra az IncrementalCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Ellenőrizze a folyamat állapotát a Get-AzDataFactoryV2ActivityRun parancsmag futtatásával, amíg az összes tevékenység sikeresen le nem fut. Cserélje le a helyőrzőket az Ön megfelelő időértékeire a RunStartedAfter és a RunStartedBefore paramétereknél. Ebben az oktatóanyagban a -RunStartedAfter "2017/09/14" és a -RunStartedBefore "2017/09/15" paramétereket használja.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Itt látható a minta kimenete:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. A blobtárolóban azt láthatja, hogy egy újabb fájl jött létre. Ebben az oktatóanyagban az új fájl neve a következő: Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Nyissa meg ezt a fájlt, és láthatja, hogy két sornyi rekordot tartalmaz.

  5. Ellenőrizze a watermarktable legutolsó értékét. Láthatja, hogy a vízjel érték ismét frissült.

    Select * from watermarktable
    

    A kimenet példája:

    Táblanév WatermarkValue
    adat_forrás_tábla 2017-09-07 09:01:00.000

Az oktatóanyagban az alábbi lépéseket hajtotta végre:

  • Az adatraktár előkészítése a küszöbértékek tárolására.
  • Adat-előállító létrehozása
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása.
  • A folyamat futtatása.
  • A csővezeték futásának monitorozása.

Ebben az oktatóanyagban a folyamat egyetlen táblából másolt adatokat Azure SQL Database Blob Storage-ba. Az alábbi oktatóanyagban megtudhatja, hogyan másolhat adatokat egy SQL Server adatbázis több táblájából az SQL Database-be.