Azure Data Factory ve Azure Synapse Analytics'te ForEach etkinliği

UYGULANANLAR: Azure Data Factory Azure Synapse Analytics

Bahşiş

Kuruluşlar için hepsi bir arada analiz çözümü olan Microsoft Fabric'te Data Factory'yi deneyin. Microsoft Fabric , veri taşımadan veri bilimine, gerçek zamanlı analize, iş zekasına ve raporlamaya kadar her şeyi kapsar. Yeni bir deneme sürümünü ücretsiz olarak başlatmayı öğrenin!

ForEach Etkinliği, Azure Data Factory veya Synapse işlem hattında yinelenen bir denetim akışı tanımlar. Bu etkinlik bir koleksiyon üzerinde yinelemek için kullanılır ve bir döngüde belirtilen etkinlikleri yürütür. Bu etkinliğin döngü uygulaması, programlama dillerindeki Foreach döngü yapısına benzer.

Kullanıcı arabirimiyle ForEach etkinliği oluşturma

İşlem hattında ForEach etkinliği kullanmak için aşağıdaki adımları tamamlayın:

  1. ForEach etkinliğinizin girişi olarak herhangi bir dizi türü değişkenini veya diğer etkinliklerden gelen çıkışları kullanabilirsiniz. Dizi değişkeni oluşturmak için işlem hattı tuvalinin arka planını seçin ve ardından Değişkenler sekmesini seçerek aşağıda gösterildiği gibi bir dizi türü değişkeni ekleyin.

    Shows an empty pipeline canvas with an array type variable added to the pipeline.

  2. İşlem hattı Etkinlikleri bölmesinde ForEach'i arayın ve bir ForEach etkinliğini işlem hattı tuvaline sürükleyin.

  3. Tuvalde henüz seçili değilse yeni ForEach etkinliğini ve ayrıntılarını düzenlemek için Ayarlar sekmesini seçin.

    Shows the UI for a Filter activity.

  4. Öğeler alanını seçin ve dinamik içerik düzenleyicisi bölmesini açmak için Dinamik içerik ekle bağlantısını seçin.

    Shows the  Add dynamic content  link for the Items property.

  5. Dinamik içerik düzenleyicisinde filtrelenecek giriş dizinizi seçin. Bu örnekte, ilk adımda oluşturulan değişkeni seçiyoruz.

    Shows the dynamic content editor with the variable created in the first step selected

  6. Giriş Öğeleri dizisindeki her öğe için yürütülecek bir veya daha fazla etkinlik eklemek için ForEach etkinliğindeki Etkinlikler düzenleyicisini seçin.

    Shows the Activities editor button on the ForEach activity in the pipeline editor window.

  7. ForEach etkinliği içinde oluşturduğunuz tüm etkinliklerde, Öğeler listesinden ForEach etkinliğinin yinelemede olduğu geçerli öğeye başvurabilirsiniz. Bir özellik değeri belirtmek için dinamik ifadeyi kullanabileceğiniz her yerde geçerli öğeye başvurabilirsiniz. Geçerli öğeyi döndürmek için dinamik içerik düzenleyicisinde ForEach yineleyicisini seçin.

    Shows the dynamic content editor with the ForEach iterator selected.

Sözdizimi

Özellikler bu makalenin devamında açıklanmıştır. items özelliği koleksiyondur ve koleksiyondaki her öğeye aşağıdaki söz diziminde gösterildiği gibi kullanılarak @item() başvurulur:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Tür özellikleri

Özellik Açıklama İzin verilen değerler Zorunlu
name Her etkinlik için öğesinin adı. String Evet
type ForEach olarak ayarlanmalıdır String Evet
isSequential Döngünün sırayla mı yoksa paralel olarak mı yürütülmesi gerektiğini belirtir. Paralel olarak aynı anda en fazla 50 döngü yinelemesi yürütülebilir). Örneğin, 10 farklı kaynak ve isSequential değeri False olan havuz veri kümelerine sahip bir kopyalama etkinliği üzerinde yinelenen bir ForEach etkinliğiniz varsa, tüm kopyalar aynı anda yürütülür. Varsayılan değer False'tur.

"isSequential" False olarak ayarlandıysa, birden çok yürütülebilir dosyayı çalıştırmak için doğru bir yapılandırma olduğundan emin olun. Aksi takdirde, yazma çakışmalarının oluşmasını önlemek için bu özellik dikkatli kullanılmalıdır. Daha fazla bilgi için bkz . Paralel yürütme bölümü.
Boolean Hayır. Varsayılan değer False'tur.
batchCount Paralel yürütme sayısını denetlemek için kullanılacak toplu iş sayısı (isSequential false olarak ayarlandığında). Bu üst eşzamanlılık sınırıdır, ancak her etkinlik için her zaman bu sayıda yürütülmeyecek Tamsayı (en fazla 50) Hayır. Varsayılan değer 20'dir.
Items Yinelenecek bir JSON Dizisi döndüren ifade. İfade (JSON Dizisi döndüren) Evet
Aktiviteler Yürütülecek etkinlikler. Etkinlikler Listesi Evet

Paralel yürütme

isSequential false olarak ayarlanırsa, etkinlik en fazla 50 eşzamanlı yinelemeyle paralel olarak yinelenir. Bu ayar dikkatli kullanılmalıdır. Eşzamanlı yinelemeler aynı klasöre ancak farklı dosyalara yazıyorsa, bu yaklaşım uygundur. Eşzamanlı yinelemeler aynı dosyaya eşzamanlı olarak yazıyorsa, bu yaklaşım büyük olasılıkla bir hataya neden olur.

Yineleme ifade dili

ForEach etkinliğinde, özellik öğeleri için yinelenecek bir dizi sağlayın." ForEach etkinliğindeki tek bir numaralandırmayı yinelemek için kullanın @item() . Örneğin, öğeler bir diziyse: [1, 2, 3], ilk yinelemede 1, @item() ikinci yinelemede 2 ve üçüncü yinelemede 3 döndürür. 0 ile başlayan ve 9 ile biten on kez yinelemek için like ifadesini de kullanabilirsiniz @range(0,10) .

Tek bir etkinlik üzerinde yineleme

Senaryo: Azure Blob'daki aynı kaynak dosyadan Azure Blob'daki birden çok hedef dosyaya kopyalayın.

İşlem hattı tanımı

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Blob veri kümesi tanımı

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

Parametre değerlerini çalıştırma

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

Birden çok etkinlikte yineleme

Bir ForEach etkinliğindeki birden çok etkinlik (örneğin, kopyalama ve web etkinlikleri) üzerinde yineleme yapmak mümkündür. Bu senaryoda, birden çok etkinliği ayrı bir işlem hattında soyutlamanızı öneririz. Ardından, birden çok etkinliğe sahip ayrı işlem hattını çağırmak için ForEach etkinliğiyle işlem hattındaki ExecutePipeline etkinliğini kullanabilirsiniz.

Sözdizimi

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

Örnek

Senaryo: İşlem Hattı Yürütme etkinliğiyle bir ForEach etkinliği içindeki bir InnerPipeline üzerinde yineleme yapın. İç işlem hattı, şema tanımları parametreli olarak kopyalanır.

Ana İşlem Hattı tanımı

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

İç işlem hattı tanımı

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

Kaynak veri kümesi tanımı

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Havuz veri kümesi tanımı

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

Ana işlem hattı parametreleri

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

Çıkışları toplama

Foreach etkinliğinin çıkışlarını toplamak için değişkenleri ve Değişken Ekle etkinliğini kullanabilirsiniz.

İlk olarak, işlem hattında bir arraydeğişken bildirin. Ardından, her foreach döngüsünün içinde Değişken Ekle etkinliğini çağırabilirsiniz. Daha sonra, toplamayı dizinizden alabilirsiniz.

Sınırlamalar ve geçici çözümler

ForEach etkinliğinin bazı sınırlamaları ve önerilen geçici çözümler aşağıdadır.

Sınırlama Geçici çözüm
ForEach döngüsünü başka bir ForEach döngüsüne (veya Until döngüsüne) iç içe yerleştiremezsiniz. Dış ForEach döngüsüne sahip dış işlem hattının iç içe döngüye sahip bir iç işlem hattı üzerinde yinelendiği iki düzeyli bir işlem hattı tasarlayın.
ForEach etkinliğinin paralel işleme için en fazla batchCount 50, en fazla 100.000 öğesi vardır. ForEach etkinliğiyle dış işlem hattının bir iç işlem hattı üzerinde yinelendiği iki düzeyli bir işlem hattı tasarlayın.
SetVariable, paralel olarak çalışan bir ForEach etkinliğinin içinde kullanılamaz çünkü değişkenler tüm işlem hattının genelidir, bunların kapsamı forEach veya başka bir etkinlik olarak belirlenemez. Sıralı ForEach kullanmayı göz önünde bulundurun veya ForEach içinde İşlem Hattını Yürüt 'i kullanın (Alt İşlem Hattında işlenen Değişken/Parametre).

Desteklenen diğer denetim akışı etkinliklerine bakın: