Fan-out/fan-in , birden çok işlevi eşzamanlı olarak yürütme ve ardından sonuçlar üzerinde bazı toplamalar gerçekleştirme desenini ifade eder. Bu makalede, bir fan-in/fan-out senaryosu uygulamak için Dayanıklı İşlevler kullanan bir örnek açıklanmaktadır. Örnek, bir uygulamanın site içeriğinin tamamını veya bir kısmını Azure Depolama'ya yedekleyen dayanıklı bir işlevdir.
Not
Azure İşlevleri için Node.js programlama modelinin 4. sürümü genel olarak kullanılabilir. Yeni v4 modeli, JavaScript ve TypeScript geliştiricileri için daha esnek ve sezgisel bir deneyime sahip olacak şekilde tasarlanmıştır.
Geçiş kılavuzunda v3 ile v4 arasındaki farklar hakkında daha fazla bilgi edinin.
Aşağıdaki kod parçacıklarında JavaScript (PM4), yeni deneyim olan programlama modeli V4'i belirtir.
Bu örnekte, işlevler belirtilen bir dizin altındaki tüm dosyaları yinelenen olarak blob depolamaya yükler. Ayrıca karşıya yüklenen toplam bayt sayısını da sayarlar.
Her şeyi halleden tek bir işlev yazmak mümkündür. Karşılaşacağınız temel sorun ölçeklenebilirliktir. Tek bir işlev yürütmesi yalnızca tek bir sanal makinede çalıştırılabilir, bu nedenle aktarım hızı bu tek VM'nin aktarım hızıyla sınırlandırılır. Bir diğer sorun da güvenilirliktir. İşlemin ortasında bir hata varsa veya işlemin tamamı 5 dakikadan uzun sürüyorsa yedekleme kısmen tamamlanmış durumda başarısız olabilir. Daha sonra yeniden başlatılması gerekir.
İki normal işlev yazmak daha sağlam bir yaklaşım olacaktır: biri dosyaları numaralandırır, dosya adlarını kuyruğa ekler ve diğeri kuyruktan okuyup dosyaları blob depolamaya yükler. Bu yaklaşım aktarım hızı ve güvenilirlik açısından daha iyidir, ancak kuyruk sağlamanızı ve yönetmenizi gerektirir. Daha da önemlisi, karşıya yüklenen toplam bayt sayısını raporlama gibi daha fazlasını yapmak istiyorsanız , durum yönetimi ve koordinasyon açısından önemli bir karmaşıklık ortaya konur.
Dayanıklı İşlevler bir yaklaşım, çok düşük ek yük ile bahsedilen avantajların tümünü sunar.
İşlevler
Bu makalede örnek uygulamada aşağıdaki işlevler açıklanmaktadır:
E2_BackupSiteContent: Yedeklene dosyaların listesini almak için çağıran E2_GetFileList ve ardından her dosyayı yedeklemeye çağıran E2_CopyFileToBlobbir düzenleyici işlevi.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Çizgiye await Task.WhenAll(tasks); dikkat edin. İşleve E2_CopyFileToBlob yapılan tek tek çağrılar beklenmedi ve bu da paralel olarak çalışmalarına olanak tanır. Bu görev dizisini 'ye Task.WhenAllgeçirdiğimizde , tüm kopyalama işlemleri tamamlanana kadar tamamlanmayacak bir görevi geri alacağız. .NET'teki Görev Paralel Kitaplığı'nı (TPL) biliyorsanız, bu sizin için yeni değildir. Aradaki fark, bu görevlerin aynı anda birden çok sanal makinede çalışıyor olması ve Dayanıklı İşlevler uzantısının uçtan uca yürütmenin geri dönüşümü işlemeye dayanıklı olmasını sağlamasıdır.
'den Task.WhenAllbekledikten sonra, tüm işlev çağrılarının tamamlandığını ve değerleri bize geri döndürdüğünü biliyoruz. her çağrısı E2_CopyFileToBlob karşıya yüklenen bayt sayısını döndürür, bu nedenle toplam bayt sayısını hesaplamak, tüm bu dönüş değerlerinin bir araya eklenmesiyle ilgili bir konudur.
işlevi orchestrator işlevleri için standard function.json işlevini kullanır.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Çizgiye yield context.df.Task.all(tasks); dikkat edin. İşleve E2_CopyFileToBlob yapılan tek tek çağrılar verimlenmemiştir ve bu da paralel olarak çalışmalarına olanak tanır. Bu görev dizisini 'ye context.df.Task.allgeçirdiğimizde , tüm kopyalama işlemleri tamamlanana kadar tamamlanmayacak bir görevi geri alacağız. JavaScript hakkında Promise.all bilgi sahibiyseniz, bu sizin için yeni değildir. Aradaki fark, bu görevlerin aynı anda birden çok sanal makinede çalışıyor olması ve Dayanıklı İşlevler uzantısının uçtan uca yürütmenin geri dönüşümü işlemeye dayanıklı olmasını sağlamasıdır.
Not
Görevler kavramsal olarak JavaScript vaatlerine benzer olsa da, orchestrator işlevleri görev paralelleştirmesini Promise.all yönetmek için ve context.df.Task.any yerine ve Promise.race kullanmalıdırcontext.df.Task.all.
'den context.df.Task.allverdikten sonra, tüm işlev çağrılarının tamamlandığını ve değerleri bize geri döndürdüğünü biliyoruz. her çağrısı E2_CopyFileToBlob karşıya yüklenen bayt sayısını döndürür, bu nedenle toplam bayt sayısını hesaplamak, tüm bu dönüş değerlerinin bir araya eklenmesiyle ilgili bir konudur.
Orchestrator işlevini uygulayan kod aşağıdadır:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Çizgiye yield context.df.Task.all(tasks); dikkat edin. İşleve copyFileToBlob yapılan tek tek çağrılar verimlenmemiştir ve bu da paralel olarak çalışmalarına olanak tanır. Bu görev dizisini 'ye context.df.Task.allgeçirdiğimizde , tüm kopyalama işlemleri tamamlanana kadar tamamlanmayacak bir görevi geri alacağız. JavaScript hakkında Promise.all bilgi sahibiyseniz, bu sizin için yeni değildir. Aradaki fark, bu görevlerin aynı anda birden çok sanal makinede çalışıyor olması ve Dayanıklı İşlevler uzantısının uçtan uca yürütmenin geri dönüşümü işlemeye dayanıklı olmasını sağlamasıdır.
Not
Görevler kavramsal olarak JavaScript vaatlerine benzer olsa da, düzenleyici işlevleri görev paralelleştirmesi Promise.all için ve context.df.Task.any yerine ve Promise.race kullanmalıdırcontext.df.Task.all.
'den context.df.Task.allverdikten sonra, tüm işlev çağrılarının tamamlandığını ve değerleri bize geri döndürdüğünü biliyoruz. her çağrısı copyFileToBlob karşıya yüklenen bayt sayısını döndürür, bu nedenle toplam bayt sayısını hesaplamak, tüm bu dönüş değerlerinin bir araya eklenmesiyle ilgili bir konudur.
işlevi orchestrator işlevleri için standard function.json işlevini kullanır.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Çizgiye yield context.task_all(tasks); dikkat edin. İşleve E2_CopyFileToBlob yapılan tek tek çağrılar verimlenmemiştir ve bu da paralel olarak çalışmalarına olanak tanır. Bu görev dizisini 'ye context.task_allgeçirdiğimizde , tüm kopyalama işlemleri tamamlanana kadar tamamlanmayacak bir görevi geri alacağız. Python hakkında bilgi asyncio.gather sahibiyseniz bu sizin için yeni değildir. Aradaki fark, bu görevlerin aynı anda birden çok sanal makinede çalışıyor olması ve Dayanıklı İşlevler uzantısının uçtan uca yürütmenin geri dönüşümü işlemeye dayanıklı olmasını sağlamasıdır.
Not
Görevler kavramsal olarak Python tarafından beklenebilir öğelere benzer olsa da, orchestrator işlevlerinin context.task_allcontext.task_any ve API'lerini kullanarak yield görev paralelleştirmesini yönetmesi gerekir.
'den context.task_allverdikten sonra, tüm işlev çağrılarının tamamlandığını ve değerleri bize geri döndürdüğünü biliyoruz.
E2_CopyFileToBlob Çağrısının her biri karşıya yüklenen bayt sayısını döndürür, böylece tüm dönüş değerlerini bir araya ekleyerek toplam bayt sayısını hesaplayabiliriz.
Yardımcı etkinlik işlevleri
Yardımcı etkinlik işlevleri, diğer örneklerde olduğu gibi yalnızca tetikleyici bağlamasını activityTrigger kullanan normal işlevlerdir.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Not
Bu kodu neden doğrudan orchestrator işlevine koyamadığınızı merak ediyor olabilirsiniz. Bunu yapabilirsiniz, ancak bu, düzenleyici işlevlerinin temel kurallarından birini bozar. Bu, yerel dosya sistemi erişimi de dahil olmak üzere hiçbir zaman G/Ç yapmaması gerektiğidir. Daha fazla bilgi için bkz . Orchestrator işlev kodu kısıtlamaları.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Not
Örnek kodu çalıştırmak için NuGet paketini yüklemeniz Microsoft.Azure.WebJobs.Extensions.Storage gerekir.
işlevi, Azure İşlevleri bağlamalarının bazı gelişmiş özelliklerini (parametrenin Binder kullanımı) kullanır, ancak bu kılavuzun amacı için bu ayrıntılar hakkında endişelenmeniz gerekmez.
için E2_CopyFileToBlobfunction.json dosyası benzer şekilde basittir:
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Uygulama, dosyayı diskten yükler ve içeriği zaman uyumsuz olarak "yedeklemeler" kapsayıcısında aynı ada sahip bir bloba akışla aktarır. Dönüş değeri, depolama alanına kopyalanan bayt sayısıdır ve daha sonra düzenleyici işlevi tarafından toplam toplamı hesaplamak için kullanılır.
Not
Bu, G/Ç işlemlerini bir işleve taşımanın mükemmel bir activityTrigger örneğidir. Çalışma yalnızca birçok farklı makineye dağıtılamaz, aynı zamanda ilerleme durumunu kontrol etme avantajından da yararlanırsınız. Konak işlemi herhangi bir nedenle sonlandırılırsa, hangi yüklemelerin zaten tamamlandığını biliyorsunuz.
Örneği çalıştırma
Windows'da aşağıdaki HTTP POST isteğini göndererek düzenlemeyi başlatabilirsiniz.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternatif olarak, bir Linux İşlev Uygulamasında (Python şu anda yalnızca App Service için Linux üzerinde çalıştırılır) düzenlemeyi şu şekilde başlatabilirsiniz:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Not
HttpStart Çağırdığınız işlev yalnızca JSON biçimli içerikle çalışır. Bu nedenle üst Content-Type: application/json bilgi gereklidir ve dizin yolu bir JSON dizesi olarak kodlanmıştır. Ayrıca HTTP kod parçacığı, dosyada tüm HTTP tetikleyici işlevleri URL'lerinden host.json varsayılan api/ ön eki kaldıran bir giriş olduğunu varsayar. Bu yapılandırma için işaretlemeyi örneklerdeki host.json dosyada bulabilirsiniz.
Bu HTTP isteği orchestrator'ı E2_BackupSiteContent tetikler ve dizeyi D:\home\LogFiles parametre olarak geçirir. Yanıt, yedekleme işleminin durumunu almak için bir bağlantı sağlar:
İşlev uygulamanızda kaç günlük dosyası bulunduğuna bağlı olarak, bu işlemin tamamlanması birkaç dakika sürebilir. Önceki HTTP 202 yanıtının üst bilgisindeki URL'yi Location sorgulayarak en son durumu alabilirsiniz.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Bu durumda işlev çalışmaya devam eder. Düzenleyici durumuna kaydedilen girişi ve son güncelleştirme zamanını görebilirsiniz. Tamamlanmasını sorgulamak için üst bilgi değerlerini kullanmaya Location devam edebilirsiniz. Durum "Tamamlandı" olduğunda, aşağıdakine benzer bir HTTP yanıt değeri görürsünüz:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Artık düzenlemenin tamamlandığını ve tamamlanmasının yaklaşık ne kadar sürdüğünü görebilirsiniz. Ayrıca, alan için output yaklaşık 450 KB günlüklerin karşıya yüklendiğini gösteren bir değer görürsünüz.
Sonraki adımlar
Bu örnek, fan-out/fan-in deseninin nasıl uygulandığını göstermiştir. Sonraki örnekte , dayanıklı zamanlayıcılar kullanılarak izleyici deseninin nasıl uygulanacakları gösterilmektedir.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz. https://aka.ms/ContentUserFeedback.