TestMasstransitMeditor
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
fea009bf0c
commit
874c8718b1
|
@ -134,7 +134,7 @@ builder.Services.AddDynamicWebApiSetup();
|
||||||
//AutoMapper
|
//AutoMapper
|
||||||
builder.Services.AddAutoMapperSetup();
|
builder.Services.AddAutoMapperSetup();
|
||||||
//EF ORM QueryWithNoLock
|
//EF ORM QueryWithNoLock
|
||||||
builder.Services.AddEFSetup(_configuration,enviromentName);
|
builder.Services.AddEFSetup(_configuration, enviromentName);
|
||||||
//Http 响应压缩
|
//Http 响应压缩
|
||||||
builder.Services.AddResponseCompressionSetup();
|
builder.Services.AddResponseCompressionSetup();
|
||||||
//Swagger Api 文档
|
//Swagger Api 文档
|
||||||
|
@ -151,15 +151,17 @@ builder.Services.AddJWTAuthSetup(_configuration);
|
||||||
builder.Services.AddMediator(cfg =>
|
builder.Services.AddMediator(cfg =>
|
||||||
{
|
{
|
||||||
cfg.AddConsumer<ConsistencyCheckConsumer>();
|
cfg.AddConsumer<ConsistencyCheckConsumer>();
|
||||||
cfg.AddConsumer<AddUserLogTriggerConsumer>();
|
|
||||||
cfg.AddConsumer<AddSubjectTriggerConsumer>();
|
cfg.AddConsumer<AddSubjectTriggerConsumer>();
|
||||||
|
cfg.AddConsumer<AddSubjectTriggerConsumer2>();
|
||||||
|
//cfg.ConfigureMediator((context, cfg) => cfg.UseHttpContextScopeFilter(context));
|
||||||
});
|
});
|
||||||
|
|
||||||
// 添加 MassTransit 和 InMemory 传输
|
//添加 MassTransit 和 InMemory 传输
|
||||||
builder.Services.AddMassTransit(cfg =>
|
builder.Services.AddMassTransit(cfg =>
|
||||||
{
|
{
|
||||||
// 注册消费者
|
// 注册消费者
|
||||||
//cfg.AddConsumer<AddSubjectTriggerConsumer>(); // 替换为你的消费者类
|
cfg.AddConsumer<AddSubjectTriggerConsumer>(); // 替换为你的消费者类
|
||||||
|
cfg.AddConsumer<AddSubjectTriggerConsumer2>();
|
||||||
|
|
||||||
// 使用 InMemory 作为消息传递机制
|
// 使用 InMemory 作为消息传递机制
|
||||||
cfg.UsingInMemory((context, cfg) =>
|
cfg.UsingInMemory((context, cfg) =>
|
||||||
|
@ -167,9 +169,8 @@ builder.Services.AddMassTransit(cfg =>
|
||||||
// 这里可以进行额外的配置
|
// 这里可以进行额外的配置
|
||||||
cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点
|
cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
|
|
||||||
|
@ -211,6 +212,7 @@ builder.Services.AddSingleton<ISearcher>(new Searcher(CachePolicy.Content, Path.
|
||||||
//builder.Services.AddExceptionHandler<GlobalExceptionHandler>();
|
//builder.Services.AddExceptionHandler<GlobalExceptionHandler>();
|
||||||
//builder.Services.AddProblemDetails();
|
//builder.Services.AddProblemDetails();
|
||||||
|
|
||||||
|
|
||||||
#region 历史废弃配置
|
#region 历史废弃配置
|
||||||
//builder.Services.AddMemoryCache();
|
//builder.Services.AddMemoryCache();
|
||||||
////上传限制 配置
|
////上传限制 配置
|
||||||
|
|
|
@ -71,7 +71,6 @@
|
||||||
<PackageReference Include="RestSharp" Version="112.0.0" />
|
<PackageReference Include="RestSharp" Version="112.0.0" />
|
||||||
<PackageReference Include="SixLabors.ImageSharp" Version="3.1.5" />
|
<PackageReference Include="SixLabors.ImageSharp" Version="3.1.5" />
|
||||||
<PackageReference Include="Swashbuckle.AspNetCore.Filters" Version="8.0.2" />
|
<PackageReference Include="Swashbuckle.AspNetCore.Filters" Version="8.0.2" />
|
||||||
<PackageReference Include="MassTransit.AspNetCore" Version="7.3.1" />
|
|
||||||
<PackageReference Include="ZiggyCreatures.FusionCache" Version="1.3.0" />
|
<PackageReference Include="ZiggyCreatures.FusionCache" Version="1.3.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
|
|
|
@ -12676,6 +12676,11 @@
|
||||||
<param name="_trialRepository"></param>
|
<param name="_trialRepository"></param>
|
||||||
<param name="_mapper"></param>
|
<param name="_mapper"></param>
|
||||||
</member>
|
</member>
|
||||||
|
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.MediatorHttpContextScopeFilterExtensions">
|
||||||
|
<summary>
|
||||||
|
参考链接:https://github.com/MassTransit/MassTransit/discussions/2498
|
||||||
|
</summary>
|
||||||
|
</member>
|
||||||
<member name="T:IRaCIS.Core.Application.ViewModel.TaskAllocationRuleView">
|
<member name="T:IRaCIS.Core.Application.ViewModel.TaskAllocationRuleView">
|
||||||
<summary> TaskAllocationRuleView 列表视图模型 </summary>
|
<summary> TaskAllocationRuleView 列表视图模型 </summary>
|
||||||
</member>
|
</member>
|
||||||
|
|
|
@ -3,10 +3,15 @@
|
||||||
using AutoMapper;
|
using AutoMapper;
|
||||||
using IRaCIS.Core.Domain;
|
using IRaCIS.Core.Domain;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
|
using Microsoft.AspNetCore.Http;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
namespace IRaCIS.Core.Application.MassTransit.Consumer;
|
namespace IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 添加Subject 触发添加访视 不能代替 Trigger,稽查BatchId 不一致
|
/// 添加Subject 触发添加访视 不能代替 Trigger,稽查BatchId 不一致
|
||||||
/// 因为消费者这里的数据库上下文 和消息发送者上下文不是同一个,相当于两个独立的事务
|
/// 因为消费者这里的数据库上下文 和消息发送者上下文不是同一个,相当于两个独立的事务
|
||||||
|
@ -25,7 +30,7 @@ public class AddSubjectTriggerConsumer(IRepository<SubjectVisit> _subjectVisitRe
|
||||||
{
|
{
|
||||||
var addSubjectEvent = context.Message;
|
var addSubjectEvent = context.Message;
|
||||||
|
|
||||||
|
|
||||||
{
|
{
|
||||||
Console.WriteLine(_visitStageRepository._dbContext.GetHashCode());
|
Console.WriteLine(_visitStageRepository._dbContext.GetHashCode());
|
||||||
|
|
||||||
|
@ -53,3 +58,44 @@ public class AddSubjectTriggerConsumer(IRepository<SubjectVisit> _subjectVisitRe
|
||||||
await _subjectVisitRepository.AddRangeAsync(svList);
|
await _subjectVisitRepository.AddRangeAsync(svList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class AddSubjectTriggerConsumer2(IRepository<SubjectVisit> _subjectVisitRepository,
|
||||||
|
|
||||||
|
IRepository<VisitStage> _visitStageRepository,
|
||||||
|
IRepository<Trial> _trialRepository,
|
||||||
|
IMapper _mapper) : IConsumer<AddSubjectTriggerCommand2>
|
||||||
|
{
|
||||||
|
public async Task Consume(ConsumeContext<AddSubjectTriggerCommand2> context)
|
||||||
|
{
|
||||||
|
var addSubjectEvent = context.Message;
|
||||||
|
|
||||||
|
|
||||||
|
{
|
||||||
|
Console.WriteLine(_visitStageRepository._dbContext.GetHashCode());
|
||||||
|
|
||||||
|
Console.WriteLine("两个 DbContext 不是同一个实例");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//添加受试者的时候,获取访视计划列表,添加到受试者访视表。
|
||||||
|
var visitPlanList = await _visitStageRepository.Where(t => t.TrialId == addSubjectEvent.TrialId && t.IsConfirmed).ToListAsync();
|
||||||
|
|
||||||
|
var svList = _mapper.Map<List<SubjectVisit>>(visitPlanList);
|
||||||
|
|
||||||
|
var IsEnrollementQualificationConfirm = await _trialRepository.Where(t => t.Id == addSubjectEvent.TrialId).Select(u => u.IsEnrollementQualificationConfirm).FirstOrDefaultAsync();
|
||||||
|
|
||||||
|
svList.ForEach(t =>
|
||||||
|
{
|
||||||
|
t.SubjectId = addSubjectEvent.SubjectId;
|
||||||
|
t.TrialId = addSubjectEvent.TrialId;
|
||||||
|
t.TrialSiteId = addSubjectEvent.TrialSiteId;
|
||||||
|
t.IsEnrollmentConfirm = t.IsBaseLine ? IsEnrollementQualificationConfirm : false;
|
||||||
|
t.Id = NewId.NextGuid();
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
await _subjectVisitRepository.AddRangeAsync(svList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
|
||||||
|
|
||||||
|
using IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
|
using MassTransit;
|
||||||
|
using Microsoft.AspNetCore.Http;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
|
namespace IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 参考链接:https://github.com/MassTransit/MassTransit/discussions/2498
|
||||||
|
/// </summary>
|
||||||
|
public static class MediatorHttpContextScopeFilterExtensions
|
||||||
|
{
|
||||||
|
public static void UseHttpContextScopeFilter(this IMediatorConfigurator configurator, IServiceProvider serviceProvider)
|
||||||
|
{
|
||||||
|
var filter = new HttpContextScopeFilter(serviceProvider.GetRequiredService<IHttpContextAccessor>());
|
||||||
|
|
||||||
|
configurator.ConfigurePublish(x => x.UseFilter(filter));
|
||||||
|
configurator.ConfigureSend(x => x.UseFilter(filter));
|
||||||
|
configurator.UseFilter(filter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class HttpContextScopeFilter :
|
||||||
|
IFilter<PublishContext>,
|
||||||
|
IFilter<SendContext>,
|
||||||
|
IFilter<ConsumeContext>
|
||||||
|
{
|
||||||
|
private readonly IHttpContextAccessor _httpContextAccessor;
|
||||||
|
|
||||||
|
public HttpContextScopeFilter(IHttpContextAccessor httpContextAccessor)
|
||||||
|
{
|
||||||
|
_httpContextAccessor = httpContextAccessor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void AddPayload(PipeContext context)
|
||||||
|
{
|
||||||
|
if (_httpContextAccessor.HttpContext == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var serviceProvider = _httpContextAccessor.HttpContext.RequestServices;
|
||||||
|
context.GetOrAddPayload(() => serviceProvider);
|
||||||
|
context.GetOrAddPayload<IServiceScope>(() => new NoopScope(serviceProvider));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Send(PublishContext context, IPipe<PublishContext> next)
|
||||||
|
{
|
||||||
|
AddPayload(context);
|
||||||
|
return next.Send(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Send(SendContext context, IPipe<SendContext> next)
|
||||||
|
{
|
||||||
|
AddPayload(context);
|
||||||
|
return next.Send(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
|
||||||
|
{
|
||||||
|
AddPayload(context);
|
||||||
|
return next.Send(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Probe(ProbeContext context)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
private class NoopScope :
|
||||||
|
IServiceScope
|
||||||
|
{
|
||||||
|
public NoopScope(IServiceProvider serviceProvider)
|
||||||
|
{
|
||||||
|
ServiceProvider = serviceProvider;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public IServiceProvider ServiceProvider { get; }
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,6 +16,7 @@ using Medallion.Threading;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Hosting;
|
using Microsoft.AspNetCore.Hosting;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
using MiniExcelLibs;
|
using MiniExcelLibs;
|
||||||
|
@ -126,7 +127,11 @@ namespace IRaCIS.Core.Application.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
//My project is a monolithic project,And the efcore context repository is scoped registered.
|
//My project is a monolithic project,And the efcore context repository is scoped registered.
|
||||||
public async Task<IResponseOutput> TestMasstransitMeditor([FromServices] IScopedMediator _mediator, [FromServices] IRepository<TestLength> _testLengthRepository)
|
public async Task<IResponseOutput> TestMasstransitMeditor(
|
||||||
|
[FromServices] IScopedMediator _mediatorScoped,
|
||||||
|
[FromServices] IMediator _mediator,
|
||||||
|
[FromServices] IRepository<TestLength> _testLengthRepository
|
||||||
|
)
|
||||||
{
|
{
|
||||||
|
|
||||||
var dbContext = _testLengthRepository._dbContext;
|
var dbContext = _testLengthRepository._dbContext;
|
||||||
|
@ -143,12 +148,13 @@ namespace IRaCIS.Core.Application.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// add 1 recored
|
// add 1 recored
|
||||||
await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" });
|
await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" });
|
||||||
|
|
||||||
// The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called
|
// The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called
|
||||||
await _mediator.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
|
await _mediatorScoped.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
|
||||||
|
|
||||||
|
await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty });
|
||||||
|
|
||||||
// this will save 1 record not 4 record ,Why is the dbcontext different? Can it be in the same transaction?
|
// this will save 1 record not 4 record ,Why is the dbcontext different? Can it be in the same transaction?
|
||||||
await _testLengthRepository.SaveChangesAsync();
|
await _testLengthRepository.SaveChangesAsync();
|
||||||
|
|
|
@ -14,3 +14,11 @@ public class AddSubjectTriggerCommand : DomainCommand
|
||||||
public Guid TrialSiteId { get; set; }
|
public Guid TrialSiteId { get; set; }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public class AddSubjectTriggerCommand2
|
||||||
|
{
|
||||||
|
public Guid SubjectId { get; set; }
|
||||||
|
public Guid TrialId { get; set; }
|
||||||
|
|
||||||
|
public Guid TrialSiteId { get; set; }
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,12 @@ using System.Data;
|
||||||
|
|
||||||
|
|
||||||
namespace IRaCIS.Core.Infra.EFCore;
|
namespace IRaCIS.Core.Infra.EFCore;
|
||||||
|
/// <summary>
|
||||||
|
/// ef 官方文档 :https://learn.microsoft.com/zh-cn/ef/core/logging-events-diagnostics/interceptors
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="_userInfo"></param>
|
||||||
|
/// <param name="_logger"></param>
|
||||||
|
/// <param name="_mediator"></param>
|
||||||
public class AuditEntityInterceptor(IUserInfo _userInfo,
|
public class AuditEntityInterceptor(IUserInfo _userInfo,
|
||||||
ILogger<AuditEntityInterceptor> _logger
|
ILogger<AuditEntityInterceptor> _logger
|
||||||
, IMediator _mediator
|
, IMediator _mediator
|
||||||
|
@ -28,7 +33,7 @@ public class AuditEntityInterceptor(IUserInfo _userInfo,
|
||||||
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
|
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
|
||||||
InterceptionResult<int> result, CancellationToken cancellationToken = default)
|
InterceptionResult<int> result, CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
//////领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑)
|
//////领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑) IMediator 和autofac 有冲突,不在一个事务,废弃。
|
||||||
//eventData.Context.AddDomainCommands();
|
//eventData.Context.AddDomainCommands();
|
||||||
//await DispatchDomainCommands(eventData.Context);
|
//await DispatchDomainCommands(eventData.Context);
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,19 @@
|
||||||
using IRaCIS.Core.Domain;
|
using IRaCIS.Core.Domain;
|
||||||
using IRaCIS.Core.Domain.Models;
|
using IRaCIS.Core.Domain.Models;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
|
using MassTransit.Mediator;
|
||||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
using System.Data;
|
using System.Data;
|
||||||
|
|
||||||
namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
{
|
{
|
||||||
// ISendEndpoint:提供了Send方法,用于发送命令。
|
// 命令(Command):是一种定向的消息,应该由一个特定的接收者处理。用 Send 发送。
|
||||||
//IPublishEndpoint:提供了Publish方法,用于发布事件。
|
// 事件(Event):是一种广播式消息,可以有多个订阅者接收并处理。用 Publish 发布。
|
||||||
public class DispatchDomainEventsInterceptor(IPublishEndpoint publishEndpoint) : SaveChangesInterceptor
|
// IPublishEndpoint:提供了Publish方法,用于发布事件,该接口仅负责发布消息,不会依赖于接收者的存在,发布者和消费者可以是完全分离的
|
||||||
|
// IMediator 适用于单进程内的消息传递,不涉及外部消息中间件(如 RabbitMQ、Azure Service Bus 等) 使用本地的内存队列,而不是跨进程消息传递,适合轻量级、无中间件的场景
|
||||||
|
// 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer
|
||||||
|
// 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中
|
||||||
|
public class DispatchDomainEventsInterceptor(IMediator _mediator) : SaveChangesInterceptor
|
||||||
{
|
{
|
||||||
|
|
||||||
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
||||||
|
@ -52,7 +57,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
//这种方式会导致消息没处理
|
//这种方式会导致消息没处理
|
||||||
//await publishEndpoint.Publish(domainEvent);
|
//await publishEndpoint.Publish(domainEvent);
|
||||||
|
|
||||||
await publishEndpoint.Publish(domainEvent.GetType(), domainEvent);
|
await _mediator.Publish(domainEvent.GetType(), domainEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue