Kopiowanie danych z obiektu blob Azure do Azure SQL Database przy użyciu Azure Data Factory

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Data Factory w usłudze Microsoft Fabric jest następną generacją Azure Data Factory z prostszą architekturą, wbudowaną sztuczną inteligencją i nowymi funkcjami. Jeśli dopiero zaczynasz integrować dane, zacznij od Fabric Data Factory. Istniejące obciążenia ADF można zaktualizować do Fabric, aby uzyskać dostęp do nowych możliwości w zakresie nauki o danych, analiz w czasie rzeczywistym oraz raportowania.

W tym samouczku utworzysz potok usługi Data Factory, który kopiuje dane z Azure Blob Storage do Azure SQL Database. Wzorzec konfiguracji w tym samouczku ma zastosowanie do kopiowania danych z magazynu danych opartego na plikach do relacyjnego magazynu danych. Aby uzyskać listę magazynów danych obsługiwanych jako źródła i ujścia, zobacz obsługiwane magazyny danych i formaty.

W tym samouczku wykonasz następujące kroki:

  • Tworzenie fabryki danych.
  • Utwórz usługi połączone z Azure Storage i Azure SQL Database.
  • Utwórz zestawy danych Azure Blob i Azure SQL Database.
  • Utwórz potok, który zawiera zadanie kopiowania.
  • Uruchom potok.
  • Monitoruj pipeline i uruchomienia działań.

W tym samouczku używany jest .NET SDK. Możesz użyć innych mechanizmów do interakcji z Azure Data Factory; zapoznaj się z przykładami w Quickstarts.

Jeśli nie masz subskrypcji Azure, przed rozpoczęciem utwórz konto free Azure.

Wymagania wstępne

  • konto Azure Storage. Wykorzystujesz magazyn obiektów blob jako źródłowy magazyn danych. Jeśli nie masz konta magazynu Azure, zobacz Tworzenie konta magazynu ogólnego przeznaczenia.
  • Azure SQL Database. Baza danych jest używana jako magazyn danych ujścia. Jeśli nie masz bazy danych w Azure SQL Database, zobacz Utwórz bazę danych w Azure SQL Database.
  • Visual Studio. Instrukcja krok po kroku w artykule używa "Visual Studio 2019".
  • Azure SDK dla .NET.
  • aplikacja Microsoft Entra. Jeśli nie masz aplikacji Microsoft Entra, zapoznaj się z sekcją Tworzenie aplikacji Microsoft Entra w dokumencie Jak utworzyć aplikację Microsoft Entra. Skopiuj następujące wartości do użycia w kolejnych krokach: identyfikator aplikacji (klienta), klucz uwierzytelniania i identyfikator katalogu (dzierżawy). Przypisz aplikację do roli Współtwórca, postępując zgodnie z instrukcjami w tym samym artykule.

Utwórz obiekt blob i tabelę SQL

Teraz przygotuj Azure Blob Storage i Azure SQL Database dla samouczka, tworząc źródłowy obiekt blob i docelową tabelę SQL.

Utwórz źródłowy blob

Najpierw utwórz obiekt blob źródłowy, tworząc kontener i przesyłając do niego wejściowy plik tekstowy.

  1. Otwórz Notatnik. Skopiuj następujący tekst i zapisz go lokalnie w pliku o nazwie inputEmp.txt.

    John|Doe
    Jane|Doe
    
  2. Użyj narzędzia, takiego jak Azure Storage Explorer, aby utworzyć plik adfv2tutorial i przekazać plik inputEmp.txt do kontenera.

Tworzenie tabeli SQL docelowej

Następnie utwórz tabelę SQL ujścia:

  1. Użyj następującego skryptu SQL, aby utworzyć tabelę dbo.emp w Azure SQL Database.

    CREATE TABLE dbo.emp
    (
        ID int IDENTITY(1,1) NOT NULL,
        FirstName varchar(50),
        LastName varchar(50)
    )
    GO
    
    CREATE CLUSTERED INDEX IX_emp_ID ON dbo.emp (ID);
    
  2. Zezwalaj usługom Azure na dostęp do usługi SQL Database. Upewnij się, że zezwolisz na dostęp do usług Azure na serwerze, aby usługa Data Factory mogła zapisywać dane w usłudze SQL Database. W celu sprawdzenia i włączenia tego ustawienia wykonaj następujące kroki:

    1. Przejdź do portalu Azure aby zarządzać serwerem SQL. Wyszukaj i wybierz serwery SQL.

    2. Wybierz serwer.

    3. W obszarze zabezpieczeń menu programu SQL Server wybierz pozycję Zapory i sieci wirtualne.

    4. Na stronie Firewall i sieci wirtualne w sekcji Zezwalaj usługom i zasobom platformy Azure na dostęp do tego serwera wybierz WŁĄCZ.

Tworzenie projektu Visual Studio

Za pomocą Visual Studio utwórz aplikację konsolową .NET języka C#.

  1. Otwórz Visual Studio.
  2. W oknie Start wybierz pozycję Utwórz nowy projekt.
  3. W oknie Utwórz nowy projekt wybierz wersję języka C# Console App (.NET Framework) z listy typów projektów. Następnie kliknij przycisk Dalej.
  4. W oknie Konfiguruj nowy projekt wprowadź nazwę projektuADFv2Tutorial. Dla Lokalizacja przejdź do katalogu i/lub utwórz nowy, aby zapisać projekt. Następnie wybierz Utwórz. Nowy projekt zostanie wyświetlony w Visual Studio IDE.

Instalowanie pakietów NuGet

Następnie zainstaluj wymagane pakiety biblioteki przy użyciu menedżera pakietów NuGet.

  1. Na pasku menu wybierz pozycję Tools>NuGet Package Manager>Package Manager Console.

  2. W okienku Package Manager Console uruchom następujące polecenia, aby zainstalować pakiety. Aby uzyskać informacje o pakiecie NuGet Azure Data Factory, zobacz Microsoft.Azure. Management.DataFactory.

    Install-Package Microsoft.Azure.Management.DataFactory
    Install-Package Microsoft.Azure.Management.ResourceManager -PreRelease
    Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
    

Utwórz klienta fabryki danych

Wykonaj następujące kroki, aby utworzyć klienta fabryki danych.

  1. Otwórz Program.cs, a następnie zastąp istniejące using instrukcje następującym kodem, aby dodać odwołania do przestrzeni nazw.

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using Microsoft.Rest;
    using Microsoft.Rest.Serialization;
    using Microsoft.Azure.Management.ResourceManager;
    using Microsoft.Azure.Management.DataFactory;
    using Microsoft.Azure.Management.DataFactory.Models;
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    
  2. Dodaj następujący kod do Main metody, która ustawia zmienne. Zastąp 14 miejsc wstawienia własnymi wartościami.

    Aby wyświetlić listę regionów Azure, w których usługa Data Factory jest obecnie dostępna, zobacz Produkty dostępne według regionów. Na liście rozwijanej Produkty wybierz Przeglądaj>Analizy>Data Factory. Następnie na liście rozwijanej Regiony wybierz interesujące Cię regiony. Zostanie wyświetlona siatka ze stanem dostępności produktów usługi Data Factory dla wybranych regionów.

    Uwaga

    Magazyny danych, takie jak Azure Storage i Azure SQL Database, oraz obliczenia, takie jak usługa HDInsight, mogą znajdować się w innych regionach niż wybrane dla usługi Data Factory.

    // Set variables
    string tenantID = "<your tenant ID>";
    string applicationId = "<your application ID>";
    string authenticationKey = "<your authentication key for the application>";
    string subscriptionId = "<your subscription ID to create the factory>";
    string resourceGroup = "<your resource group to create the factory>";
    
    string region = "<location to create the data factory in, such as East US>";
    string dataFactoryName = "<name of data factory to create (must be globally unique)>";
    
    // Specify the source Azure Blob information
    string storageAccount = "<your storage account name to copy data>";
    string storageKey = "<your storage account key>";
    string inputBlobPath = "adfv2tutorial/";
    string inputBlobName = "inputEmp.txt";
    
    // Specify the sink Azure SQL Database information
    string azureSqlConnString =
        "Server=tcp:<your server name>.database.windows.net,1433;" +
        "Database=<your database name>;" +
        "User ID=<your username>@<your server name>;" +
        "Password=<your password>;" +
        "Trusted_Connection=False;Encrypt=True;Connection Timeout=30";
    string azureSqlTableName = "dbo.emp";
    
    string storageLinkedServiceName = "AzureStorageLinkedService";
    string sqlDbLinkedServiceName = "AzureSqlDbLinkedService";
    string blobDatasetName = "BlobDataset";
    string sqlDatasetName = "SqlDataset";
    string pipelineName = "Adfv2TutorialBlobToSqlCopy";
    
  3. Dodaj następujący kod do Main metody, która tworzy wystąpienie DataFactoryManagementClient klasy. Ten obiekt jest wykorzystywany do tworzenia fabryki danych, powiązanej usługi, zestawów danych i potoku. Używasz tego obiektu, aby monitorować szczegóły dotyczące uruchomienia potoku.

    // Authenticate and create a data factory management client
    var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
    ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
    AuthenticationResult result = context.AcquireTokenAsync(
        "https://management.azure.com/", cc
    ).Result;
    ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
    var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
    

Tworzenie fabryki danych

Dodaj następujący kod do Main metody, która tworzy fabrykę danych.

// Create a data factory
Console.WriteLine("Creating a data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
    Location = region,
    Identity = new FactoryIdentity()
};

client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)
);

while (
    client.Factories.Get(
        resourceGroup, dataFactoryName
    ).ProvisioningState == "PendingCreation"
)
{
    System.Threading.Thread.Sleep(1000);
}

Tworzenie połączonych usług

W tym samouczku utworzysz dwie usługi połączeń, najpierw dla źródła, a następnie dla ujścia.

Tworzenie połączonej usługi Azure Storage

Dodaj następujący kod do metody Main, która tworzy połączoną usługę Azure Storage. Aby uzyskać informacje o obsługiwanych właściwościach i szczegółach, zobacz właściwości połączonej usługi Azure Blob.

// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");

LinkedServiceResource storageLinkedService = new LinkedServiceResource(
    new AzureStorageLinkedService
    {
        ConnectionString = new SecureString(
            "DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
            ";AccountKey=" + storageKey
        )
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings)
);

Tworzenie połączonej usługi Azure SQL Database

Dodaj następujący kod do metody Main, która tworzy połączoną usługę Azure SQL Database. Aby uzyskać informacje o obsługiwanych właściwościach i szczegółach, zobacz właściwości połączonej usługi Azure SQL Database.

// Create an Azure SQL Database linked service
Console.WriteLine("Creating linked service " + sqlDbLinkedServiceName + "...");

LinkedServiceResource sqlDbLinkedService = new LinkedServiceResource(
    new AzureSqlDatabaseLinkedService
    {
        ConnectionString = new SecureString(azureSqlConnString)
    }
);

client.LinkedServices.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDbLinkedServiceName, sqlDbLinkedService
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDbLinkedService, client.SerializationSettings)
);

Tworzenie zestawów danych

W tej sekcji utworzysz dwa zestawy danych: jeden dla źródła, drugi dla ujścia.

Tworzenie zestawu danych dla źródłowego obiektu blob Azure

Dodaj następujący kod do metody , która tworzy zestaw danych Azure blob . Aby uzyskać informacje o obsługiwanych właściwościach i szczegółach, zobacz Właściwości zestawu danych Azure Blob.

Definiujesz zestaw danych, który reprezentuje dane źródłowe w Azure Blob. Ten zestaw danych Blob odnosi się do połączonej usługi Azure Storage, którą utworzyłeś w poprzednim kroku i opisuje:

  • Lokalizacja obiektu blob do skopiowania z: FolderPath i FileName
  • Format obiektu blob wskazujący sposób analizowania zawartości: TextFormat i jego ustawienia, takich jak ogranicznik kolumn
  • Struktura danych, w tym nazwy kolumn i typy danych, które mapują się w tym przykładzie na docelową tabelę SQL.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
    new AzureBlobDataset
    {
        LinkedServiceName = new LinkedServiceReference {
            ReferenceName = storageLinkedServiceName
        },
        FolderPath = inputBlobPath,
        FileName = inputBlobName,
        Format = new TextFormat { ColumnDelimiter = "|" },
        Structure = new List<DatasetDataElement>
        {
            new DatasetDataElement { Name = "FirstName", Type = "String" },
            new DatasetDataElement { Name = "LastName", Type = "String" }
        }
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, blobDatasetName, blobDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings)
);

Tworzenie zestawu danych dla Azure SQL Database ujścia

Dodaj następujący kod do metody Main, która tworzy zestaw danych Azure SQL Database. Aby uzyskać informacje o obsługiwanych właściwościach i szczegółach, zobacz Azure SQL Database właściwości zestawu danych.

Zdefiniuj zestaw danych reprezentujący dane ujścia w Azure SQL Database. Ten zbiór danych jest powiązany z powiązaną usługą Azure SQL Database, którą utworzyłeś w poprzednim kroku. Określa on również tabelę SQL, która przechowuje skopiowane dane.

// Create an Azure SQL Database dataset
Console.WriteLine("Creating dataset " + sqlDatasetName + "...");
DatasetResource sqlDataset = new DatasetResource(
    new AzureSqlTableDataset
    {
        LinkedServiceName = new LinkedServiceReference
        {
            ReferenceName = sqlDbLinkedServiceName
        },
        TableName = azureSqlTableName
    }
);

client.Datasets.CreateOrUpdate(
    resourceGroup, dataFactoryName, sqlDatasetName, sqlDataset
);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings)
);

Stwórz pipeline

Dodaj następujący kod do Main metody, która tworzy potok z aktywnością kopiowania. W tym samouczku ten pipeline zawiera jedno działanie: CopyActivity, które przyjmuje zbiór danych typu Blob jako źródło i zbiór danych SQL jako ujście. Aby uzyskać informacje na temat szczegółów Copy activity, zobacz Copy activity w Azure Data Factory.

// Create a pipeline with copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
    Activities = new List<Activity>
    {
        new CopyActivity
        {
            Name = "CopyFromBlobToSQL",
            Inputs = new List<DatasetReference>
            {
                new DatasetReference() { ReferenceName = blobDatasetName }
            },
            Outputs = new List<DatasetReference>
            {
                new DatasetReference { ReferenceName = sqlDatasetName }
            },
            Source = new BlobSource { },
            Sink = new SqlSink { }
        }
    }
};

client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(
    SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings)
);

Uruchom potok

Dodaj następujący kod do Main metody, która uruchamia pipeline.

// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
    resourceGroup, dataFactoryName, pipelineName
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

Monitorowanie przebiegu potoku

Teraz wstaw kod, aby sprawdzić stany uruchomienia pipeline'u i uzyskać szczegóły dotyczące uruchomienia copy activity.

  1. Dodaj następujący kod do metody w Main celu ciągłego sprawdzania stanu przebiegu potoku do momentu zakończenia kopiowania danych.

    // Monitor the pipeline run
    Console.WriteLine("Checking pipeline run status...");
    PipelineRun pipelineRun;
    while (true)
    {
        pipelineRun = client.PipelineRuns.Get(
            resourceGroup, dataFactoryName, runResponse.RunId
        );
        Console.WriteLine("Status: " + pipelineRun.Status);
        if (pipelineRun.Status == "InProgress")
            System.Threading.Thread.Sleep(15000);
        else
            break;
    }
    
  2. Dodaj następujący kod do Main metody, która pobiera szczegóły przebiegu działania kopiowania, takie jak rozmiar odczytanych lub zapisanych danych.

    // Check the copy activity run details
    Console.WriteLine("Checking copy activity run details...");
    
    RunFilterParameters filterParams = new RunFilterParameters(
        DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)
    );
    
    ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun(
        resourceGroup, dataFactoryName, runResponse.RunId, filterParams
    );
    
    if (pipelineRun.Status == "Succeeded")
    {
        Console.WriteLine(queryResponse.Value.First().Output);
    }
    else
        Console.WriteLine(queryResponse.Value.First().Error);
    
    Console.WriteLine("\nPress any key to exit...");
    Console.ReadKey();
    

Uruchamianie kodu

Skompiluj aplikację, wybierając Kompiluj>Kompiluj rozwiązanie. Następnie uruchom aplikację, wybierając pozycję Debuguj>Rozpocznij debugowanie, i sprawdź wykonanie potoku.

Konsola wypisuje postęp tworzenia fabryki danych, połączonej usługi, zestawów danych, potoku i działania potoku. Następnie sprawdza stan uruchomienia potoku. Poczekaj, aż zobaczysz szczegóły uruchomienia działania kopiowania z rozmiarem odczytu/zapisu danych. Następnie przy użyciu narzędzi, takich jak SQL Server Management Studio (SSMS) lub Visual Studio, możesz nawiązać połączenie z docelowym Azure SQL Database i sprawdzić, czy określona tabela docelowa zawiera skopiowane dane.

Przykładowe dane wyjściowe

Creating a data factory AdfV2Tutorial...
{
  "identity": {
    "type": "SystemAssigned"
  },
  "location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>"
      }
    }
  }
}
Creating linked service AzureSqlDbLinkedService...
{
  "properties": {
    "type": "AzureSqlDatabase",
    "typeProperties": {
      "connectionString": {
        "type": "SecureString",
        "value": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
      }
    }
  }
}
Creating dataset BlobDataset...
{
  "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": "adfv2tutorial/",
      "fileName": "inputEmp.txt",
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "|"
      }
    },
    "structure": [
      {
        "name": "FirstName",
        "type": "String"
      },
      {
        "name": "LastName",
        "type": "String"
      }
    ],
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureStorageLinkedService"
    }
  }
}
Creating dataset SqlDataset...
{
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": "dbo.emp"
    },
    "linkedServiceName": {
      "type": "LinkedServiceReference",
      "referenceName": "AzureSqlDbLinkedService"
    }
  }
}
Creating pipeline Adfv2TutorialBlobToSqlCopy...
{
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "inputs": [
          {
            "type": "DatasetReference",
            "referenceName": "BlobDataset"
          }
        ],
        "outputs": [
          {
            "type": "DatasetReference",
            "referenceName": "SqlDataset"
          }
        ],
        "name": "CopyFromBlobToSQL"
      }
    ]
  }
}
Creating pipeline run...
Pipeline run ID: 1cd03653-88a0-4c90-aabc-ae12d843e252
Checking pipeline run status...
Status: InProgress
Status: InProgress
Status: Succeeded
Checking copy activity run details...
{
  "dataRead": 18,
  "dataWritten": 28,
  "rowsCopied": 2,
  "copyDuration": 2,
  "throughput": 0.01,
  "errors": [],
  "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)",
  "usedDataIntegrationUnits": 2,
  "billedDuration": 2
}

Press any key to exit...

Przepływ pracy w tym przykładzie kopiuje dane z jednej lokalizacji do drugiej w magazynie obiektów blob Azure. Nauczyłeś się jak:

  • Tworzenie fabryki danych.
  • Utwórz usługi połączone z Azure Storage i Azure SQL Database.
  • Utwórz zestawy danych Azure Blob i Azure SQL Database.
  • Utwórz potok zawierający aktywność kopiowania.
  • Uruchom potok.
  • Monitoruj pipeline i uruchomienia działań.

Przejdź do następującego samouczka, aby dowiedzieć się więcej o kopiowaniu danych lokalnych do chmury: