实现复原 Entity Framework Core SQL 连接

提示

此内容摘自电子书《适用于容器化 .NET 应用程序的 .NET 微服务体系结构》,可在 .NET 文档上获取,也可作为免费可下载的 PDF 脱机阅读。

.NET Microservices Architecture for Containerized .NET Applications eBook cover thumbnail.

对于 Azure SQL DB,Entity Framework Core (EF) 早已提供了内部数据库连接复原和重试逻辑。 但如果想要复原 EF Core 连接,则需要为每个 DbContext 连接启用 Entity Framework 执行策略。

例如,EF Core 连接级别的下列代码可启用复原 SQL 连接,此连接在连接失败时会重试。

// Program.cs from any ASP.NET Core Web API
// Other code ...
builder.Services.AddDbContext<CatalogContext>(options =>
    {
        options.UseSqlServer(builder.Configuration["ConnectionString"],
        sqlServerOptionsAction: sqlOptions =>
        {
            sqlOptions.EnableRetryOnFailure(
            maxRetryCount: 10,
            maxRetryDelay: TimeSpan.FromSeconds(30),
            errorNumbersToAdd: null);
        });
    });

使用 BeginTransaction 和多个 DbContext 的执行策略和显式事务

在 EF Core 连接中启用重试时,使用 EF Core 执行的每项操作都会成为其自己的可重试操作。 如果发生暂时性故障,每个查询和 SaveChanges 的每次调用都会作为一个单元进行重试。

但是,如果你的代码使用 BeginTransaction 启动事务,则自行定义一组需要作为一个单元来处理的操作。 如果发生故障,则必须回滚事务中的所有内容。

如果在使用 EF 执行策略(重试策略)时尝试执行该事务,并从多个 DbContext 调用 SaveChanges,则会收到与下列情况类似的异常:

System.InvalidOperationException:已配置的执行策略“SqlServerRetryingExecutionStrategy”不支持用户启动的事务。 使用由“DbContext.Database.CreateExecutionStrategy()”返回的执行策略执行事务(作为一个可回溯单元)中的所有操作。

该解决方案通过代表所有需要执行的委托来手动调用 EF 执行策略。 如果发生暂时性故障,执行策略会再次调用委托。 例如,以下代码演示了在更新产品时,如何使用两个 DbContext(_catalogContext 和 IntegrationEventLogContext)在 eShopOnContainers 中实现该操作,然后保存需要使用不同 DbContext 的 ProductPriceChangedIntegrationEvent 对象。

public async Task<IActionResult> UpdateProduct(
    [FromBody]CatalogItem productToUpdate)
{
    // Other code ...

    var oldPrice = catalogItem.Price;
    var raiseProductPriceChangedEvent = oldPrice != productToUpdate.Price;

    // Update current product
    catalogItem = productToUpdate;

    // Save product's data and publish integration event through the Event Bus
    // if price has changed
    if (raiseProductPriceChangedEvent)
    {
        //Create Integration Event to be published through the Event Bus
        var priceChangedEvent = new ProductPriceChangedIntegrationEvent(
          catalogItem.Id, productToUpdate.Price, oldPrice);

       // Achieving atomicity between original Catalog database operation and the
       // IntegrationEventLog thanks to a local transaction
       await _catalogIntegrationEventService.SaveEventAndCatalogContextChangesAsync(
           priceChangedEvent);

       // Publish through the Event Bus and mark the saved event as published
       await _catalogIntegrationEventService.PublishThroughEventBusAsync(
           priceChangedEvent);
    }
    // Just save the updated product because the Product's Price hasn't changed.
    else
    {
        await _catalogContext.SaveChangesAsync();
    }
}

第一个 DbContext_catalogContext,第二个 DbContext_catalogIntegrationEventService 对象中。 通过使用 EF 执行策略在所有 DbContext 对象之间执行“提交”操作。

若要实现此多个 DbContext 提交,SaveEventAndCatalogContextChangesAsync 要使用 ResilientTransaction 类,如以下代码所示:

public class CatalogIntegrationEventService : ICatalogIntegrationEventService
{
    //…
    public async Task SaveEventAndCatalogContextChangesAsync(
        IntegrationEvent evt)
    {
        // Use of an EF Core resiliency strategy when using multiple DbContexts
        // within an explicit BeginTransaction():
        // https://learn.microsoft.com/ef/core/miscellaneous/connection-resiliency
        await ResilientTransaction.New(_catalogContext).ExecuteAsync(async () =>
        {
            // Achieving atomicity between original catalog database
            // operation and the IntegrationEventLog thanks to a local transaction
            await _catalogContext.SaveChangesAsync();
            await _eventLogService.SaveEventAsync(evt,
                _catalogContext.Database.CurrentTransaction.GetDbTransaction());
        });
    }
}

基本上,ResilientTransaction.ExecuteAsync 方法从传递的 DbContext (_catalogContext) 开始一个事务,然后让 EventLogService 使用该事务保存来自 IntegrationEventLogContext 的更改,然后提交整个事务。

public class ResilientTransaction
{
    private DbContext _context;
    private ResilientTransaction(DbContext context) =>
        _context = context ?? throw new ArgumentNullException(nameof(context));

    public static ResilientTransaction New (DbContext context) =>
        new ResilientTransaction(context);

    public async Task ExecuteAsync(Func<Task> action)
    {
        // Use of an EF Core resiliency strategy when using multiple DbContexts
        // within an explicit BeginTransaction():
        // https://learn.microsoft.com/ef/core/miscellaneous/connection-resiliency
        var strategy = _context.Database.CreateExecutionStrategy();
        await strategy.ExecuteAsync(async () =>
        {
            await using var transaction = await _context.Database.BeginTransactionAsync();
            await action();
            await transaction.CommitAsync();
        });
    }
}

其他资源