ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

FreeSql接入CAP的实践

2020-11-18 09:33:25  阅读:608  来源: 互联网

标签:publisher transaction 接入 FreeSql ICapTransaction CAP Commit public


CAP

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

ADO.NET事务

1.DotNetCore.CAP.MySql中引用 了如下类库.在Commit事务时,会调用 Flush方法推送消息​

<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.7" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.7" />
<PackageReference Include="MySqlConnector" Version="1.0.1" />
    public class MySqlCapTransaction : CapTransactionBase
    {
        public MySqlCapTransaction(
            IDispatcher dispatcher) : base(dispatcher)
        {
        }

        public override void Commit()
        {
            Debug.Assert(DbTransaction != null);

            switch (DbTransaction)
            {
                case IDbTransaction dbTransaction:
                    dbTransaction.Commit();
                    break;
                case IDbContextTransaction dbContextTransaction:
                    dbContextTransaction.Commit();
                    break;
            }
            Flush();
        }
    }

其中我们能看到,事务的提交,会调用父类CapTransactionBase中的方法Flush。他是protected类型的,并未开放出此接口。

       protected virtual void Flush()
        {
            while (!_bufferList.IsEmpty)
            {
                _bufferList.TryDequeue(out var message);

                _dispatcher.EnqueueToPublish(message);
            }
        }

我们来看一下集成 的demo调用

    [Route("~/adonet/transaction")]
    public IActionResult AdonetWithTransaction()
    {
        using (var connection = new MySqlConnection(AppDbContext.ConnectionString))
        {
            using (var transaction = connection.BeginTransaction(_capBus, true))
            {
                //your business code
                connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);

                //for (int i = 0; i < 5; i++)
                //{
                _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
                //}
            }
        }

        return Ok();
    }

代码中通过扩展IDbConnection类,增加BeginTransaction方法,传递了注入的_capBus类,传了autoCommit

private readonly ICapPublisher _capBus;

public PublishController(ICapPublisher capPublisher)
{
    _capBus = capPublisher;
}
/// <summary>
/// Start the CAP transaction
/// </summary>
/// <param name="dbConnection">The <see cref="IDbConnection" />.</param>
/// <param name="publisher">The <see cref="ICapPublisher" />.</param>
/// <param name="autoCommit">Whether the transaction is automatically committed when the message is published</param>
/// <returns>The <see cref="ICapTransaction" /> object.</returns>
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,
    ICapPublisher publisher, bool autoCommit = false)
{
    if (dbConnection.State == ConnectionState.Closed)
    {
        dbConnection.Open();
    }

    var dbTransaction = dbConnection.BeginTransaction();
    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
    return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);
}

autoCommit:false,(此属性会自动提交事务,集成其他ORM,不建议开启)因为,我们只要调用 了Publish,他会调用MySqlCapTransaction中的Commit(),并执行Flush,即消息 会发出去。
IDbContextTransaction

这段代码是非常 重要的。

    publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();

从CapPublisher中可以看出,事务是通过AsyncLocal实现状态共享的。

internal class CapPublisher : ICapPublisher
{
     public AsyncLocal<ICapTransaction> Transaction { get; }
}

publisher.Transaction.Value的类型实现上才是ICapTransaction ,

CapTransactionExtensions.cs还有一个扩展方法,调用Begin,相当于给当前控制器上注入的ICapPublisher设置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。

      public static ICapTransaction Begin(this ICapTransaction transaction,
            IDbTransaction dbTransaction, bool autoCommit = false)
        {
            transaction.DbTransaction = dbTransaction;
            transaction.AutoCommit = autoCommit;

            return transaction;
        }

对于ADO.NET,我们只要传递此transaction,就能保证发送消息和操作DB是一个事务了。。

EF Core事务

同样,我们看扩展方法和使用方式

    public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,
        ICapPublisher publisher, bool autoCommit = false)
    {
        var trans = database.BeginTransaction();
        publisher.Transaction.Value = publisher.ServiceProvider.GetService<ICapTransaction>();
        var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);
        return new CapEFDbTransaction(capTrans);
    }

dbContext.Database就是DatabaseFacade类型。直接能BeginTransaction事务。

[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))
    {
        dbContext.Persons.Add(new Person() { Name = "ef.transaction" });

        for (int i = 0; i < 1; i++)
        {
            _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);
        }

        dbContext.SaveChanges();

        trans.Commit();
    }
    return Ok();
}

同样,还有一个Begin扩展方法,仅仅是给ICapTransaction赋下值。

public static ICapTransaction Begin(this ICapTransaction transaction,
    IDbContextTransaction dbTransaction, bool autoCommit = false)
{
    transaction.DbTransaction = dbTransaction;
    transaction.AutoCommit = autoCommit;

    return transaction;
}

在这个demo,上,,autoCommit是false,因为dbContext有自己的SaveChanges(),如果发送不太合适。SaveChanges()要做好些操作,具体不太情况是什么,但要在Commit事务前的吧。。具体不详细研究。

但我们可以看下CapTransactionBase源码,DbTransaction是Object类型。

EF Core中的事务类型是IDbContextTransaction​

ADO.NET实际是IDbTransaction类型。

 public object DbTransaction { get; set; }

所以在最开始的那段代码,判断DbTransaction,是哪种类型,然后调用自身内部使用的事务进行Commit()。如果要集成其他ORM,但又想去掉EFCore的依赖,然后增加其他ORM,如下类似的处理,就是关键,比如CommitAsync,Commit,Roolback()

    public override void Commit()
    {
        Debug.Assert(DbTransaction != null);

        switch (DbTransaction)
        {
            case IDbTransaction dbTransaction:
                dbTransaction.Commit();
                break;
            case IDbContextTransaction dbContextTransaction:
                dbContextTransaction.Commit();
                break;
        }
        Flush();
    }

还有MySqlDataStorage.cs

判断dbTransaction的类型,然后获取当前事务,引用其他ORM,记得修改此处。

    var dbTrans = dbTransaction as IDbTransaction;
    if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
    {
        dbTrans = dbContextTrans.GetDbTransaction();
    }

参考项目(不打算维护)

FreeSql接入CAP(最简单的方式)

关于此问题的想法

我们还是引用各自官方的库

Install-Package DotNetCore.CAP.Dashboard
Install-Package DotNetCore.CAP.MySql
Install-Package DotNetCore.CAP.RabbitMQ
Install-Package FreeSql
Install-Package FreeSql.DbContext
Install-Package FreeSql.Provider.MySqlConnector

关于CAP集成的方式,配置项,这里不做详情,官方地址有中文: http://cap.dotnetcore.xyz/

重写扩展方法,BeginTransaction。是基于IUnitOfWork的扩展。

提交事务调用Commit(IUnitOfWork)时,内部再通过反射调用 ICapTransaction中protected类型的方法Flush。

  public static class CapUnitOfWorkExtensions
    {

        public static void Flush(this ICapTransaction capTransaction)
        {
            capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);
        }

       
        public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false)
        {
            publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));
            return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);
        }

        public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork)
        {
            unitOfWork.Commit();
            capTransaction.Flush();
        }
    }

注入我们的FreeSql

public void ConfigureServices(IServiceCollection services)
 {
    IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql");
    IFreeSql fsql = new FreeSqlBuilder()
           .UseConnectionString(DataType.MySql, configurationSection.Value);
           .UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower)
           .UseAutoSyncStructure(true)
           .UseNoneCommandParameter(true)
           .UseMonitorCommand(cmd =>
           {
               Trace.WriteLine(cmd.CommandText + ";");
           }
           )
           .Build();


    services.AddSingleton(fsql);
    services.AddFreeRepository();
    services.AddScoped<UnitOfWorkManager>();
}

示例

    [HttpGet("~/freesql/unitofwork/{id}")]
    public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepository<Book> repo)
    {
        DateTime now = DateTime.Now;
        using (IUnitOfWork uow = _unitOfWorkManager.Begin())
        {
            ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
            repo.Insert(new Book()
            {
                Author = "luoyunchong",
                Summary = "2",
                Title = "122"
            });

            _capBus.Publish("freesql.time", now);
            trans.Commit(uow);
        }
        return now;
    }
    
    [NonAction]
    [CapSubscribe("freesql.time")]
    public void GetTime(DateTime time)
    {
        Console.WriteLine($"time:{time}");
    }

注意trans不需要using,freesql内部会释放资源。,也可using,但请更新到最新的freesql版本。

ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);

提交事务,也请调用扩展方法,否则事务无法正常。

trans.Commit(uow);

源码位置

标签:publisher,transaction,接入,FreeSql,ICapTransaction,CAP,Commit,public
来源: https://www.cnblogs.com/igeekfan/p/cap_freesql_flush.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有