From 874c8718b1f08425ed04ad148969469ad1ecc68b Mon Sep 17 00:00:00 2001 From: hang <87227557@qq.com> Date: Fri, 4 Oct 2024 17:54:55 +0800 Subject: [PATCH] TestMasstransitMeditor --- IRaCIS.Core.API/Progranm.cs | 14 ++-- .../IRaCIS.Core.Application.csproj | 1 - .../IRaCIS.Core.Application.xml | 5 ++ .../NeedVerify/AddSubjectTriggerConsumer.cs | 48 ++++++++++- ...ediatorHttpContextScopeFilterExtensions.cs | 83 +++++++++++++++++++ IRaCIS.Core.Application/TestService.cs | 12 ++- .../AddSubjectTriggerCommand.cs | 8 ++ .../Interceptor/AuditEntityInterceptor.cs | 9 +- .../DispatchDomainEventsInterceptor.cs | 13 ++- 9 files changed, 176 insertions(+), 17 deletions(-) create mode 100644 IRaCIS.Core.Application/MassTransit/Extension/MediatorHttpContextScopeFilterExtensions.cs diff --git a/IRaCIS.Core.API/Progranm.cs b/IRaCIS.Core.API/Progranm.cs index 808a5edec..5403af7fe 100644 --- a/IRaCIS.Core.API/Progranm.cs +++ b/IRaCIS.Core.API/Progranm.cs @@ -134,7 +134,7 @@ builder.Services.AddDynamicWebApiSetup(); //AutoMapper builder.Services.AddAutoMapperSetup(); //EF ORM QueryWithNoLock -builder.Services.AddEFSetup(_configuration,enviromentName); +builder.Services.AddEFSetup(_configuration, enviromentName); //Http 响应压缩 builder.Services.AddResponseCompressionSetup(); //Swagger Api 文档 @@ -151,15 +151,17 @@ builder.Services.AddJWTAuthSetup(_configuration); builder.Services.AddMediator(cfg => { cfg.AddConsumer(); - cfg.AddConsumer(); cfg.AddConsumer(); + cfg.AddConsumer(); + //cfg.ConfigureMediator((context, cfg) => cfg.UseHttpContextScopeFilter(context)); }); -// 添加 MassTransit 和 InMemory 传输 +//添加 MassTransit 和 InMemory 传输 builder.Services.AddMassTransit(cfg => { // 注册消费者 - //cfg.AddConsumer(); // 替换为你的消费者类 + cfg.AddConsumer(); // 替换为你的消费者类 + cfg.AddConsumer(); // 使用 InMemory 作为消息传递机制 cfg.UsingInMemory((context, cfg) => @@ -167,9 +169,8 @@ builder.Services.AddMassTransit(cfg => // 这里可以进行额外的配置 cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点 }); - - }); + #endregion @@ -211,6 +212,7 @@ builder.Services.AddSingleton(new Searcher(CachePolicy.Content, Path. //builder.Services.AddExceptionHandler(); //builder.Services.AddProblemDetails(); + #region 历史废弃配置 //builder.Services.AddMemoryCache(); ////上传限制 配置 diff --git a/IRaCIS.Core.Application/IRaCIS.Core.Application.csproj b/IRaCIS.Core.Application/IRaCIS.Core.Application.csproj index d495031f6..3a509ace3 100644 --- a/IRaCIS.Core.Application/IRaCIS.Core.Application.csproj +++ b/IRaCIS.Core.Application/IRaCIS.Core.Application.csproj @@ -71,7 +71,6 @@ - diff --git a/IRaCIS.Core.Application/IRaCIS.Core.Application.xml b/IRaCIS.Core.Application/IRaCIS.Core.Application.xml index 0a897fe0a..6132acfb8 100644 --- a/IRaCIS.Core.Application/IRaCIS.Core.Application.xml +++ b/IRaCIS.Core.Application/IRaCIS.Core.Application.xml @@ -12676,6 +12676,11 @@ + + + 参考链接:https://github.com/MassTransit/MassTransit/discussions/2498 + + TaskAllocationRuleView 列表视图模型 diff --git a/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/AddSubjectTriggerConsumer.cs b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/AddSubjectTriggerConsumer.cs index 67779cc06..f9875948e 100644 --- a/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/AddSubjectTriggerConsumer.cs +++ b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/AddSubjectTriggerConsumer.cs @@ -3,10 +3,15 @@ using AutoMapper; using IRaCIS.Core.Domain; using MassTransit; +using Microsoft.AspNetCore.Http; using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; namespace IRaCIS.Core.Application.MassTransit.Consumer; + + + /// /// 添加Subject 触发添加访视 不能代替 Trigger,稽查BatchId 不一致 /// 因为消费者这里的数据库上下文 和消息发送者上下文不是同一个,相当于两个独立的事务 @@ -25,7 +30,7 @@ public class AddSubjectTriggerConsumer(IRepository _subjectVisitRe { var addSubjectEvent = context.Message; - + { Console.WriteLine(_visitStageRepository._dbContext.GetHashCode()); @@ -53,3 +58,44 @@ public class AddSubjectTriggerConsumer(IRepository _subjectVisitRe await _subjectVisitRepository.AddRangeAsync(svList); } } + +public class AddSubjectTriggerConsumer2(IRepository _subjectVisitRepository, + + IRepository _visitStageRepository, + IRepository _trialRepository, + IMapper _mapper) : IConsumer +{ + public async Task Consume(ConsumeContext context) + { + var addSubjectEvent = context.Message; + + + { + Console.WriteLine(_visitStageRepository._dbContext.GetHashCode()); + + Console.WriteLine("两个 DbContext 不是同一个实例"); + } + + + //添加受试者的时候,获取访视计划列表,添加到受试者访视表。 + var visitPlanList = await _visitStageRepository.Where(t => t.TrialId == addSubjectEvent.TrialId && t.IsConfirmed).ToListAsync(); + + var svList = _mapper.Map>(visitPlanList); + + var IsEnrollementQualificationConfirm = await _trialRepository.Where(t => t.Id == addSubjectEvent.TrialId).Select(u => u.IsEnrollementQualificationConfirm).FirstOrDefaultAsync(); + + svList.ForEach(t => + { + t.SubjectId = addSubjectEvent.SubjectId; + t.TrialId = addSubjectEvent.TrialId; + t.TrialSiteId = addSubjectEvent.TrialSiteId; + t.IsEnrollmentConfirm = t.IsBaseLine ? IsEnrollementQualificationConfirm : false; + t.Id = NewId.NextGuid(); + + }); + + await _subjectVisitRepository.AddRangeAsync(svList); + } +} + + diff --git a/IRaCIS.Core.Application/MassTransit/Extension/MediatorHttpContextScopeFilterExtensions.cs b/IRaCIS.Core.Application/MassTransit/Extension/MediatorHttpContextScopeFilterExtensions.cs new file mode 100644 index 000000000..7a1d2684b --- /dev/null +++ b/IRaCIS.Core.Application/MassTransit/Extension/MediatorHttpContextScopeFilterExtensions.cs @@ -0,0 +1,83 @@ + + +using IRaCIS.Core.Application.MassTransit.Consumer; +using MassTransit; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.DependencyInjection; + +namespace IRaCIS.Core.Application.MassTransit.Consumer; + +/// +/// 参考链接:https://github.com/MassTransit/MassTransit/discussions/2498 +/// +public static class MediatorHttpContextScopeFilterExtensions +{ + public static void UseHttpContextScopeFilter(this IMediatorConfigurator configurator, IServiceProvider serviceProvider) + { + var filter = new HttpContextScopeFilter(serviceProvider.GetRequiredService()); + + configurator.ConfigurePublish(x => x.UseFilter(filter)); + configurator.ConfigureSend(x => x.UseFilter(filter)); + configurator.UseFilter(filter); + } +} + +public class HttpContextScopeFilter : + IFilter, + IFilter, + IFilter +{ + private readonly IHttpContextAccessor _httpContextAccessor; + + public HttpContextScopeFilter(IHttpContextAccessor httpContextAccessor) + { + _httpContextAccessor = httpContextAccessor; + } + + private void AddPayload(PipeContext context) + { + if (_httpContextAccessor.HttpContext == null) + return; + + var serviceProvider = _httpContextAccessor.HttpContext.RequestServices; + context.GetOrAddPayload(() => serviceProvider); + context.GetOrAddPayload(() => new NoopScope(serviceProvider)); + } + + public Task Send(PublishContext context, IPipe next) + { + AddPayload(context); + return next.Send(context); + } + + public Task Send(SendContext context, IPipe next) + { + AddPayload(context); + return next.Send(context); + } + + public Task Send(ConsumeContext context, IPipe next) + { + AddPayload(context); + return next.Send(context); + } + + public void Probe(ProbeContext context) + { + } + + private class NoopScope : + IServiceScope + { + public NoopScope(IServiceProvider serviceProvider) + { + ServiceProvider = serviceProvider; + } + + public void Dispose() + { + } + + public IServiceProvider ServiceProvider { get; } + } +} \ No newline at end of file diff --git a/IRaCIS.Core.Application/TestService.cs b/IRaCIS.Core.Application/TestService.cs index 998e3b66c..6127e3522 100644 --- a/IRaCIS.Core.Application/TestService.cs +++ b/IRaCIS.Core.Application/TestService.cs @@ -16,6 +16,7 @@ using Medallion.Threading; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MiniExcelLibs; @@ -126,7 +127,11 @@ namespace IRaCIS.Core.Application.Service } //My project is a monolithic project,And the efcore context repository is scoped registered. - public async Task TestMasstransitMeditor([FromServices] IScopedMediator _mediator, [FromServices] IRepository _testLengthRepository) + public async Task TestMasstransitMeditor( + [FromServices] IScopedMediator _mediatorScoped, + [FromServices] IMediator _mediator, + [FromServices] IRepository _testLengthRepository + ) { var dbContext = _testLengthRepository._dbContext; @@ -143,12 +148,13 @@ namespace IRaCIS.Core.Application.Service } - // add 1 recored await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" }); // The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called - await _mediator.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty }); + await _mediatorScoped.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty }); + + await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty }); // this will save 1 record not 4 record ,Why is the dbcontext different? Can it be in the same transaction? await _testLengthRepository.SaveChangesAsync(); diff --git a/IRaCIS.Core.Domain/_DomainCommand/AddSubjectTriggerCommand.cs b/IRaCIS.Core.Domain/_DomainCommand/AddSubjectTriggerCommand.cs index 6e4a2cf73..96545c03d 100644 --- a/IRaCIS.Core.Domain/_DomainCommand/AddSubjectTriggerCommand.cs +++ b/IRaCIS.Core.Domain/_DomainCommand/AddSubjectTriggerCommand.cs @@ -14,3 +14,11 @@ public class AddSubjectTriggerCommand : DomainCommand public Guid TrialSiteId { get; set; } } + +public class AddSubjectTriggerCommand2 +{ + public Guid SubjectId { get; set; } + public Guid TrialId { get; set; } + + public Guid TrialSiteId { get; set; } +} diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs index a97b5a65d..32a2f49d9 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs @@ -11,7 +11,12 @@ using System.Data; namespace IRaCIS.Core.Infra.EFCore; - +/// +/// ef 官方文档 :https://learn.microsoft.com/zh-cn/ef/core/logging-events-diagnostics/interceptors +/// +/// +/// +/// public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger _logger , IMediator _mediator @@ -28,7 +33,7 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, public override ValueTask> SavingChangesAsync(DbContextEventData eventData, InterceptionResult result, CancellationToken cancellationToken = default) { - //////领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑) + //////领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑) IMediator 和autofac 有冲突,不在一个事务,废弃。 //eventData.Context.AddDomainCommands(); //await DispatchDomainCommands(eventData.Context); diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs index 195d1c710..b854a0dc3 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs @@ -1,14 +1,19 @@ using IRaCIS.Core.Domain; using IRaCIS.Core.Domain.Models; using MassTransit; +using MassTransit.Mediator; using Microsoft.EntityFrameworkCore.Diagnostics; using System.Data; namespace IRaCIS.Core.Infra.EFCore.Interceptor { - // ISendEndpoint:提供了Send方法,用于发送命令。 - //IPublishEndpoint:提供了Publish方法,用于发布事件。 - public class DispatchDomainEventsInterceptor(IPublishEndpoint publishEndpoint) : SaveChangesInterceptor + // 命令(Command):是一种定向的消息,应该由一个特定的接收者处理。用 Send 发送。 + // 事件(Event):是一种广播式消息,可以有多个订阅者接收并处理。用 Publish 发布。 + // IPublishEndpoint:提供了Publish方法,用于发布事件,该接口仅负责发布消息,不会依赖于接收者的存在,发布者和消费者可以是完全分离的 + // IMediator 适用于单进程内的消息传递,不涉及外部消息中间件(如 RabbitMQ、Azure Service Bus 等) 使用本地的内存队列,而不是跨进程消息传递,适合轻量级、无中间件的场景 + // 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer + // 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中 + public class DispatchDomainEventsInterceptor(IMediator _mediator) : SaveChangesInterceptor { //领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态 @@ -52,7 +57,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor //这种方式会导致消息没处理 //await publishEndpoint.Publish(domainEvent); - await publishEndpoint.Publish(domainEvent.GetType(), domainEvent); + await _mediator.Publish(domainEvent.GetType(), domainEvent); } }