Adatok növekményes betöltése az Azure SQL Database-ből az Azure Blob Storage-ba a PowerShell használatával

A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp.

Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!

Ebben az oktatóanyagban az Azure Data Factory használatával hoz létre egy folyamatot, amely az Azure SQL Database egy táblájából betölti a deltaadatokat az Azure Blob Storage-ba.

Az oktatóanyagban az alábbi lépéseket fogja végrehajtani:

  • Az adatraktár előkészítése a küszöbértékek tárolására.
  • Adat-előállító létrehozása
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása.
  • A folyamat futtatása.
  • A folyamat futásának monitorozása.

Áttekintés

Itt látható a megoldás összefoglaló jellegű ábrája:

Incrementally load data

Az alábbiak a megoldás kialakításának leglényegesebb lépései:

  1. A küszöb oszlopának kiválasztása. Jelölje ki az adatforrás egyik oszlopát, amelynek alapján az új és a frissített rekordok minden egyes futtatáskor leválogathatók. Normális esetben az ebben a kiválasztott oszlopban (például: last_modify_time vagy ID) lévő adatok a sorok létrehozásával vagy frissítésével folyamatosan növekednek. Az ebben az oszlopban lévő legnagyobb érték szolgál a küszöbként.

  2. Egy adatraktár előkészítése a küszöbértékek tárolására.
    Ebben az oktatóanyagban a küszöbértékeket egy SQL-adatbázisban tároljuk.

  3. Egy folyamat létrehozása a következő munkafolyamattal:

    Ebben a megoldásban a folyamat a következő tevékenységeket tartalmazza:

    • Két keresési tevékenység létrehozása. Az első keresési tevékenység az utolsó küszöbértéket kéri le. A második keresési tevékenység az új küszöbértéket kéri le. Ezeket a küszöbértékeket a rendszer átadja a másolási tevékenységnek.
    • Hozzon létre egy Copy tevékenység, amely sorokat másol a forrásadattárból a régi vízjelnél nagyobb és az új vízjel értékénél kisebb vagy egyenlő vízjel oszlop értékével. Ezután a módosított adatokat a forrásadattárból új fájlként egy Blob Storage-tárolóba másolja.
    • Egy StoredProcedure tevékenység létrehozása, amely frissíti a küszöbértékeket a folyamat következő futtatásához.

Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

Előfeltételek

Megjegyzés:

We recommend that you use the Azure Az PowerShell module to interact with Azure. See Install Azure PowerShell to get started. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.

  • Azure SQL Database Ezt az adatbázist használjuk forrásadattárként. Ha nem rendelkezik adatbázissal az Azure SQL Database-ben, a létrehozás lépéseit az Adatbázis létrehozása az Azure SQL Database-ben című témakörben találja.
  • Azure Storage. A blobtárolót használjuk majd fogadóadattárként. Ha még nem rendelkezik tárfiókkal, tekintse meg a tárfiók létrehozásának lépéseit ismertető cikket. Hozzon létre egy tárolót adftutorial néven.
  • Azure PowerShell. Kövesse az Azure PowerShell telepítését és konfigurálását ismertető cikkben szereplő utasításokat.

Adatforrástábla létrehozása az SQL-adatbázisban

  1. Nyissa meg az SQL Server Management Studiót. A Kiszolgálókezelőben kattintson a jobb gombbal az adatbázisra, és válassza az Új lekérdezés elemet.

  2. Futtassa a következő SQL-parancsot az SQL-adatbázison egy tábla data_source_table néven, adatforrástárként történő létrehozásához:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    Az oktatóanyagban a LastModifytime oszlopot használjuk küszöboszlopként. Az adatforrástár adatai az alábbi táblázatban láthatók:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

Egy másik tábla létrehozása az SQL-adatbázisban a felső küszöbértékek tárolására

  1. Futtassa a következő SQL-parancsot az SQL-adatbázison egy watermarktable nevű, a küszöbértékek tárolására szolgáló tábla létrehozásához:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. Állítsa be a felső küszöb alapértelmezett értékét a forrásadattár táblanevével. Ebben az oktatóanyagban a tábla neve a következő: data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Tekintse át a következő tábla adatait: watermarktable.

    Select * from watermarktable
    

    Output:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

Tárolt eljárás létrehozása az SQL-adatbázisban

Az alábbi parancs futtatásával hozzon létre egy tárolt eljárást az SQL-adatbázisban:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Adat-előállító létrehozása

  1. Adjon meg egy olyan változót, amelyet később a PowerShell-parancsokban az erőforráscsoport neveként fog használni. Másolja az alábbi parancsszöveget a PowerShellbe, adja meg az Azure-erőforráscsoport nevét dupla idézőjelek között, majd futtassa a parancsot. Például: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  2. Adjon meg egy változót az adat-előállító helyéhez.

    $location = "East US"
    
  3. Futtassa az alábbi parancsot az Azure-erőforráscsoport létrehozásához:

    New-AzResourceGroup $resourceGroupName $location
    

    Ha az erőforráscsoport már létezik, előfordulhat, hogy nem kívánja felülírni. Rendeljen egy másik értéket a $resourceGroupName változóhoz, majd futtassa újra a parancsot.

  4. Adjon meg egy változót az adat-előállító nevéhez.

    Fontos

    Frissítse az adat-előállító nevét, hogy globálisan egyedi legyen. Például: ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. Az adat-előállító létrehozásához futtassa a következő Set-AzDataFactoryV2 parancsmagot:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "East US" -Name $dataFactoryName
    

Vegye figyelembe a következő szempontokat:

  • Az adat-előállító nevének globálisan egyedinek kell lennie. Ha a következő hibaüzenetet kapja, módosítsa a nevet, majd próbálkozzon újra:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • Adatelőállító-példányok létrehozásához a felhasználói fióknak, amellyel bejelentkezik az Azure-ba, a közreműködő vagy tulajdonos szerepkörök tagjának, vagy az Azure-előfizetés rendszergazdájának kell lennie.

  • Azon Azure-régiók megtekintéséhez, amelyekben jelenleg elérhető a Data Factory, a következő lapon válassza ki az Önt érdeklő régiókat, majd bontsa ki az Elemzés részt, és keresse meg a Data Factory: Elérhető termékek régiók szerint szakaszt. Az adat-előállító által használt adattárak (Storage, SQL Database, Felügyelt Azure SQL-példány stb.) és számítások (Azure HDInsight stb.) más régiókban is lehetnek.

Társított szolgáltatások létrehozása

Társított szolgáltatásokat hoz létre egy adat-előállítóban az adattárak és a számítási szolgáltatások adat-előállítóval történő társításához. Ebben a szakaszban társított szolgáltatásokat hoz létre a tárfiókhoz és az SQL Database-hez.

Storage-beli társított szolgáltatás létrehozása

  1. Hozzon létre egy AzureStorageLinkedService.json nevű JSON-fájlt a C:\ADF mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) A fájl mentése előtt cserélje le <accountName> a <accountKey> tárfiók nevét és kulcsát.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
            }
        }
    }
    
  2. A PowerShellben lépjen az ADF mappára.

  3. Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureStorageLinkedService társított szolgáltatás létrehozásához. A következő példában a ResourceGroupName és a DataFactoryName paraméter értékeit fogja megadni:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

SQL Database-beli társított szolgáltatás létrehozása

  1. Hozzon létre egy AzureSQLDatabaseLinkedService.json nevű JSON-fájlt a C:\ADF mappában az alábbi tartalommal. (Hozza létre az ADF mappát, ha még nem létezik.) A fájl mentése előtt cserélje le <a kiszolgálót>, <az adatbázist>, <a felhasználói azonosítót> és <a jelszót> a kiszolgáló nevére, az adatbázisra, a felhasználói azonosítóra és a jelszóra.

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.windows.net,1433;Initial Catalog=<database>; Persist Security Info=False; User ID=<user> ; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. A PowerShellben lépjen az ADF mappára.

  3. Futtassa a Set-AzDataFactoryV2LinkedService parancsmagot az AzureSQLDatabaseLinkedService társított szolgáltatás létrehozásához.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    Itt látható a minta kimenete:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

Adatkészletek létrehozása

Ebben a lépésben adatkészleteket hoz létre, amelyek a forrás és a fogadó adatait jelölik.

Forrásadatkészlet létrehozása

  1. Hozzon létre egy SourceDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    Ebben az oktatóanyagban a tábla neve a következő: data_source_table. Ha más nevű táblát használ, cserélje le a nevet.

  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SourceDataset adatkészlet létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

Fogadó adatkészlet létrehozása

  1. Hozzon létre egy SinkDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    Fontos

    Ez a kódrészlet feltételezi, hogy van egy blobtároló neve adftutorial a blobtárolóban. Ha még nem létezik, hozza létre a tárolót, vagy állítsa be egy meglévő tároló nevét. Az incrementalcopy kimeneti mappa automatikusan létrejön, ha még nem létezik a tárolóban. Ebben az oktatóanyagban a fájl nevét dinamikusan hozzuk létre a következő kifejezés használatával: @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a SinkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

Adatkészlet létrehozása a küszöbhöz

Ebben a lépésben egy adatkészletet hozunk létre a felső küszöbértékek tárolására.

  1. Hozzon létre egy WatermarkDataset.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. Futtassa a Set-AzDataFactoryV2Dataset parancsmagot a WatermarkDataset adathalmaz létrehozásához.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    Itt látható a parancsmag mintakimenete:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

Folyamat létrehozása

Az oktatóanyag során egy olyan folyamatot fogunk létrehozni, amelyben két keresési, egy másolási és egy StoredProcedure (tárolt eljárás) tevékenység van összefűzve.

  1. Hozzon létre egy IncrementalCopyPipeline.json nevű JSON-fájlt ugyanebben a mappában az alábbi tartalommal:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. Futtassa a Set-AzDataFactoryV2Pipeline parancsmagot a folyamat növekményesCopyPipeline létrehozásához.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    Itt látható a minta kimenete:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

A folyamat futtatása

  1. Futtassa a növekményesCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. Ellenőrizze a folyamat állapotát a Get-AzDataFactoryV2ActivityRun parancsmag futtatásával, amíg az összes tevékenység sikeresen le nem fut. Cserélje le a helyőrzőket a saját megfelelő időértékeire a RunStartedAfter és a RunStartedBefore paraméternél. Ebben az oktatóanyagban a -RunStartedAfter "2017/09/14" és a -RunStartedBefore "2017/09/15" paramétereket használja.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Itt látható a minta kimenete:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

Az eredmények áttekintése

  1. A blobtárolóban (fogadótároló) azt kell látnia, hogy az adatok át lettek másolva a SinkDataset által meghatározott fájlba. Ebben az oktatóanyagban a fájl neve: Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. Nyissa meg a fájlt, és láthatja, hogy a benne lévő rekordok egyeznek az SQL-adatbázisban lévő adatokkal.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. Ellenőrizze a watermarktable legutolsó értékét. Láthatja, hogy a küszöbérték frissült.

    Select * from watermarktable
    

    Itt látható a minta kimenete:

    TableName WatermarkValue
    data_source_table 2017-09-05 8:06:00.000

Adatok beszúrása a forrásadattárba a változásadatok betöltésének ellenőrzéséhez

  1. Szúrjon be új adatokat az SQL-adatbázisba (forrásadattár).

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    Az SQL-adatbázis a következő frissített adatokat tartalmazza:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. Futtassa újra a növekményesCopyPipeline folyamatot az Invoke-AzDataFactoryV2Pipeline parancsmaggal. Cserélje le a helyőrzőket a saját erőforráscsoportja és adat-előállítója nevére.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. Ellenőrizze a folyamat állapotát a Get-AzDataFactoryV2ActivityRun parancsmag futtatásával, amíg az összes tevékenység sikeresen le nem fut. Cserélje le a helyőrzőket a saját megfelelő időértékeire a RunStartedAfter és a RunStartedBefore paraméternél. Ebben az oktatóanyagban a -RunStartedAfter "2017/09/14" és a -RunStartedBefore "2017/09/15" paramétereket használja.

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    Itt látható a minta kimenete:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. A blobtárolóban azt láthatja, hogy egy újabb fájl jött létre. Ebben az oktatóanyagban az új fájl neve a következő: Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. Nyissa meg ezt a fájlt, és láthatja, hogy két sornyi rekordot tartalmaz.

  5. Ellenőrizze a watermarktable legutolsó értékét. Láthatja, hogy a küszöbérték ismét frissült.

    Select * from watermarktable
    

    Példa a kimenetre:

    TableName WatermarkValue
    data_source_table 2017-09-07 09:01:00.000

Az oktatóanyagban az alábbi lépéseket hajtotta végre:

  • Az adatraktár előkészítése a küszöbértékek tárolására.
  • Adat-előállító létrehozása
  • Társított szolgáltatások létrehozása.
  • Forrás-, fogadó- és küszöbadatkészletek létrehozása.
  • Folyamat létrehozása.
  • A folyamat futtatása.
  • A folyamat futásának monitorozása.

Ebben az oktatóanyagban a folyamat adatokat másolt az Azure SQL Database egyetlen táblájából a Blob Storage-ba. Az alábbi oktatóanyagból megtudhatja, hogyan másolhat adatokat egy SQL Server-adatbázis több táblájából az SQL Database-be.