培训
Orleans 客户端
客户端允许非 grain 代码与 Orleans 群集交互。 客户端允许应用程序代码与群集中托管的 grain 和流通信。 根据客户端代码的托管位置,可通过两种方式获取客户端:在 silo 所在的同一进程中,或者在不同的进程中。 本文将讨论这两个选项,并从建议的选项开始:将客户端代码共同托管在 grain 代码所在的同一进程中。
如果客户端代码和 grain 代码托管在同一个进程中,则可以直接从宿主应用程序的依赖项注入容器中获取客户端。 在这种情况下,客户端直接与它所附加到的 silo 通信,并可以利用 silo 对群集的更多了解。
这样可以获得多种优势,包括降低网络和 CPU 开销,以及降低延迟和提高吞吐量与可靠性。 客户端利用 silo 对群集拓扑和状态的了解,且不需要使用单独的网关。 这可以避免使用网络跃点和往返执行序列化/反序列化。 因此,这可以提高可靠性,因为客户端和 grain 之间所需的节点数已减至最少。 如果 grain 是无状态工作线程 grain,或者恰好在托管客户端的 silo 上激活,则根本不需要执行序列化或网络通信,客户端可以获得额外的性能和可靠性增益。 通过消除部署和监视两个不同应用程序二进制文件的需要,共同托管客户端和 grain 代码还能简化部署和应用程序拓扑。
这种方法也存在不足之处,主要是 grain 代码不再与客户端进程隔离。 因此,客户端代码中的问题(例如阻塞 IO 或锁争用导致线程资源枯竭)可能会影响 grain 代码的性能。 即使不存在上述代码缺陷,只要让客户端代码与 grain 代码在同一处理器上执行,就会导致近邻干扰效应,这会给 CPU 缓存带来额外的压力,并且通常会加剧本地资源的争用。 此外,识别这些问题的根源会变得更加困难,因为监视系统无法在逻辑上区分客户端代码和 grain 代码。
尽管存在这些不足之处,但将客户端代码与 grain 代码共同托管在一起是流行的做法,并且对于大多数应用程序而言是建议的方法。 具体地说,上述不足之处在实践中极少遇到,原因如下:
- 客户端代码通常非常精简,例如,将传入的 HTTP 请求转换为 grain 调用,因此近邻干扰效应可忽略不计,并且在成本上与需要网关的方案相当。
- 如果出现性能问题,开发人员的典型工作流涉及到使用 CPU 探查器和调试器等工具,尽管客户端和 grain 代码在同一进程中执行,但这些工具仍能快速有效地识别问题的根源。 换而言之,指标变得更加粗略,不足以准确识别问题的根源,但更细致的工具仍然有效。
如果使用 .NET 通用主机进行托管,则客户端将自动在主机的依赖项注入容器中提供,并可以注入到 ASP.NET 控制器等服务或 IHostedService 实现中。
或者,可以从 ISiloHost 获取 IGrainFactory 或 IClusterClient 等客户端接口:
var client = host.Services.GetService<IClusterClient>();
await client.GetGrain<IMyGrain>(0).Ping();
客户端代码可以在托管 grain 代码的 Orleans 群集之外运行。 因此,外部客户端充当群集以及所有应用程序 grain 的连接器或管道。 通常,客户端在前端 Web 服务器上使用,以连接到充当中间层的 Orleans 群集,其中的 grain 执行业务逻辑。
在典型设置中,前端 Web 服务器:
- 接收 Web 请求。
- 执行必要的身份验证和授权验证。
- 确定哪些 grain 应处理请求。
- 使用 Microsoft.Orleans.Client NuGet 包对 grain 发出一个或多个方法调用。
- 处理 grain 调用的成功完成或失败状态以及任何返回值。
- 发送 Web 请求的响应。
在使用 grain 客户端对 Orleans 群集中托管的 grain 发出调用之前,需要配置、初始化该客户端并将其连接到群集。
配置是通过 UseOrleansClient 和多个补充选项类提供的,这些类包含配置属性的层次结构,用于以编程方式配置客户端。 有关详细信息,请参阅客户端配置。
请考虑以下客户端配置示例:
// Alternatively, call Host.CreateDefaultBuilder(args) if using the
// Microsoft.Extensions.Hosting NuGet package.
using IHost host = new HostBuilder()
.UseOrleansClient(clientBuilder =>
{
clientBuilder.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
});
clientBuilder.UseAzureStorageClustering(
options => options.ConfigureTableServiceClient(connectionString))
})
.Build();
host
启动时,客户端将通过其构造的服务提供程序实例来配置和提供。
配置是通过 ClientBuilder 和多个补充选项类提供的,这些类包含配置属性的层次结构,用于以编程方式配置客户端。 有关详细信息,请参阅客户端配置。
客户端配置示例:
var client = new ClientBuilder()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConnectionString = connectionString)
.ConfigureApplicationParts(
parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
.Build();
最后,需要调用构造的客户端对象上的 Connect()
方法,使客户端连接到 Orleans 群集。 这是一个返回 Task
的异步方法。 因此需要使用 await
或 .Wait()
等待此方法完成。
await client.Connect();
从客户端对 grain 发出调用与从 grain 代码内部发出此类调用没有差别。 同一 IGrainFactory.GetGrain<TGrainInterface>(Type, Guid) 方法(其中 T
是目标 grain 接口)在两种情况下都用于获取 grain 引用。 区别在于 IGrainFactory.GetGrain 是通过哪个工厂对象调用的。 在客户端代码中,通过连接的客户端对象进行这种调用,如以下示例所示:
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
Task joinGameTask = player.JoinGame(game)
await joinGameTask;
根据 grain 接口规则的要求,对 grain 方法的调用会返回 Task 或 Task<TResult>。 客户端可以使用 await
关键字来异步等待返回的 Task
而不会阻塞线程,或者在某些情况下,可以使用 Wait()
方法来阻塞当前的执行线程。
从客户端代码和从另一个 grain 内部对 grain 发出调用的主要差别在于 grain 的单线程执行模型。 Orleans 运行时将 grain 限制为单线程,而客户端可以是多线程的。 Orleans 在客户端不提供任何此类保证,因此客户端需负责使用适合其环境的任何同步构造(锁、事件和 Tasks
)来管理其并发性。
在某些情况下,简单的请求-响应模式是不够的,客户端需要接收异步通知。 例如,某个用户可能希望在其关注的人发布新消息时收到通知。
观察程序就是这样一种机制,使用它可将客户端对象公开为可供 grain 调用的类似于 grain 的目标。 对观察程序的调用不提供任何成功或失败指示,因为结果是按照“尽力而为”的原则作为单向消息发送的。 因此,应用程序代码需负责在必要时在观察程序之上构建更高级别的可靠性机制。
可用于向客户端传递异步消息的另一种机制是流。 流公开单个消息传递成功或失败指示,因此使用流能够可靠地与客户端通信。
在以下两种情况下,群集客户端可能会遇到连接问题:
- 客户端尝试连接到 silo 时。
- 对从连接的群集客户端获取的 grain 引用发出调用时。
在第一种情况下,客户端将尝试连接到 silo。 如果客户端无法连接到任何 silo,它将引发异常以指示出现了什么问题。 可以注册 IClientConnectionRetryFilter 来处理异常并决定是否重试。 如果未提供重试筛选器,或者重试筛选器返回 false
,则客户端将永远放弃。
using Orleans.Runtime;
internal sealed class ClientConnectRetryFilter : IClientConnectionRetryFilter
{
private int _retryCount = 0;
private const int MaxRetry = 5;
private const int Delay = 1_500;
public async Task<bool> ShouldRetryConnectionAttempt(
Exception exception,
CancellationToken cancellationToken)
{
if (_retryCount >= MaxRetry)
{
return false;
}
if (!cancellationToken.IsCancellationRequested &&
exception is SiloUnavailableException siloUnavailableException)
{
await Task.Delay(++ _retryCount * Delay, cancellationToken);
return true;
}
return false;
}
}
在以下两种情况下,群集客户端可能会遇到连接问题:
- 最初调用 IClusterClient.Connect() 方法时。
- 对从连接的群集客户端获取的 grain 引用发出调用时。
对于第一种情况,Connect
方法将引发异常以指明出现了什么问题。 这通常(但不一定)是 SiloUnavailableException。 如果发生这种情况,群集客户端实例将不可用,应被处置。 可以选择向 Connect
方法提供重试筛选器函数,例如,该函数可以等待指定的一段时间,然后再试一次。 如果未提供重试筛选器,或者重试筛选器返回 false
,则客户端将永远放弃。
如果 Connect
成功返回,则可以保证群集客户端在被处置之前可用。 这意味着,即使客户端遇到连接问题,它也会无限期地尝试恢复。 可以在 ClientBuilder 提供的 GatewayOptions 对象上配置确切的恢复行为,例如:
var client = new ClientBuilder()
// ...
.Configure<GatewayOptions>(
options => // Default is 1 min.
options.GatewayListRefreshPeriod = TimeSpan.FromMinutes(10))
.Build();
对于第二种情况,如果在 grain 调用期间发生连接问题,则会在客户端引发 SiloUnavailableException。 处理方式如下:
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
try
{
await player.JoinGame(game);
}
catch (SiloUnavailableException)
{
// Lost connection to the cluster...
}
在这种情况下,grain 引用不会失效;当稍后重新建立连接时,可对同一引用重试调用。
在使用 .NET 通用主机的程序中创建外部客户端的建议方法是通过依赖项注入来注入 IClusterClient 单一实例,然后可将该实例作为托管服务、ASP.NET 控制器等中的构造函数参数接受。
备注
在要连接到 Orleans 接收器的同一进程中共同托管该接收器时,不需要手动创建客户端;Orleans 将自动提供一个客户端并适当地管理其生命周期。
当连接到不同进程中的群集时(在不同的计算机上),一种常用模式是创建如下所示的托管服务:
using Microsoft.Extensions.Hosting;
namespace Client;
public sealed class ClusterClientHostedService : IHostedService
{
private readonly IClusterClient _client;
public ClusterClientHostedService(IClusterClient client)
{
_client = client;
}
public Task StartAsync(CancellationToken cancellationToken)
{
// Use the _client to consume grains...
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
=> Task.CompletedTask;
}
public class ClusterClientHostedService : IHostedService
{
private readonly IClusterClient _client;
public ClusterClientHostedService(IClusterClient client)
{
_client = client;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// A retry filter could be provided here.
await _client.Connect();
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _client.Close();
_client.Dispose();
}
}
然后如下所示注册该服务:
await Host.CreateDefaultBuilder(args)
.UseOrleansClient(builder =>
{
builder.UseLocalhostClustering();
})
.ConfigureServices(services =>
{
services.AddHostedService<ClusterClientHostedService>();
})
.RunConsoleAsync();
这是上面给出的客户端应用程序示例的扩展版本,该应用程序连接到 Orleans,查找玩家帐户,使用观察程序订阅玩家参与的游戏会话的更新内容,并在手动终止程序之前显示通知。
try
{
using IHost host = Host.CreateDefaultBuilder(args)
.UseOrleansClient((context, client) =>
{
client.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConfigureTableServiceClient(
context.Configuration["ORLEANS_AZURE_STORAGE_CONNECTION_STRING"]));
})
.UseConsoleLifetime()
.Build();
await host.StartAsync();
IGrainFactory client = host.Services.GetRequiredService<IGrainFactory>();
// Hardcoded player ID
Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
IGameGrain? game = null;
while (game is null)
{
Console.WriteLine(
$"Getting current game for player {playerId}...");
try
{
game = await player.GetCurrentGame();
if (game is null) // Wait until the player joins a game
{
await Task.Delay(TimeSpan.FromMilliseconds(5_000));
}
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.GetBaseException()}");
}
}
Console.WriteLine(
$"Subscribing to updates for game {game.GetPrimaryKey()}...");
// Subscribe for updates
var watcher = new GameObserver();
await game.ObserveGameUpdates(
client.CreateObjectReference<IGameObserver>(watcher));
Console.WriteLine(
"Subscribed successfully. Press <Enter> to stop.");
}
catch (Exception e)
{
Console.WriteLine(
$"Unexpected Error: {e.GetBaseException()}");
}
await RunWatcherAsync();
// Block the main thread so that the process doesn't exit.
// Updates arrive on thread pool threads.
Console.ReadLine();
static async Task RunWatcherAsync()
{
try
{
var client = new ClientBuilder()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "my-first-cluster";
options.ServiceId = "MyOrleansService";
})
.UseAzureStorageClustering(
options => options.ConnectionString = connectionString)
.ConfigureApplicationParts(
parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
.Build();
// Hardcoded player ID
Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
IGameGrain game = null;
while (game is null)
{
Console.WriteLine(
$"Getting current game for player {playerId}...");
try
{
game = await player.GetCurrentGame();
if (game is null) // Wait until the player joins a game
{
await Task.Delay(TimeSpan.FromMilliseconds(5_000));
}
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex.GetBaseException()}");
}
}
Console.WriteLine(
$"Subscribing to updates for game {game.GetPrimaryKey()}...");
// Subscribe for updates
var watcher = new GameObserver();
await game.SubscribeForGameUpdates(
await client.CreateObjectReference<IGameObserver>(watcher));
Console.WriteLine(
"Subscribed successfully. Press <Enter> to stop.");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine(
$"Unexpected Error: {e.GetBaseException()}");
}
}
}
/// <summary>
/// Observer class that implements the observer interface.
/// Need to pass a grain reference to an instance of
/// this class to subscribe for updates.
/// </summary>
class GameObserver : IGameObserver
{
public void UpdateGameScore(string score)
{
Console.WriteLine("New game score: {0}", score);
}
}