MassTransit Command 测试预备代码
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
02db483119
commit
2461646d5f
|
@ -155,7 +155,7 @@ builder.Services.AddMediator(cfg =>
|
|||
builder.Services.AddMassTransit(cfg =>
|
||||
{
|
||||
// 注册消费者
|
||||
cfg.AddConsumer<AddSubjectConsumer>(); // 替换为你的消费者类
|
||||
cfg.AddConsumer<AddSubjectTriggerConsumer>(); // 替换为你的消费者类
|
||||
|
||||
// 使用 InMemory 作为消息传递机制
|
||||
cfg.UsingInMemory((context, cfg) =>
|
||||
|
|
|
@ -18,6 +18,7 @@ namespace IRaCIS.Core.API
|
|||
|
||||
// 全局忽略 DomainEvents 属性
|
||||
automapper.AddGlobalIgnore(nameof(Entity.DomainEvents));
|
||||
automapper.AddGlobalIgnore(nameof(Entity.DomainCommands));
|
||||
|
||||
#region 会使 IncludeMembers 失效 不能全局使用
|
||||
//mapping an EntityFramework Core DbContext-object.
|
||||
|
|
|
@ -6,13 +6,13 @@ using MassTransit;
|
|||
|
||||
namespace IRaCIS.Core.Application.MassTransit.Consumer;
|
||||
|
||||
public class AddSubjectConsumer(IRepository<SubjectVisit> _subjectVisitRepository,
|
||||
public class AddSubjectTriggerConsumer(IRepository<SubjectVisit> _subjectVisitRepository,
|
||||
|
||||
IRepository<VisitStage> _visitStageRepository,
|
||||
IRepository<Trial> _trialRepository,
|
||||
IMapper _mapper) : IConsumer<AddSubjectEvent>
|
||||
IMapper _mapper) : IConsumer<AddSubjectTriggerCommand>
|
||||
{
|
||||
public async Task Consume(ConsumeContext<AddSubjectEvent> context)
|
||||
public async Task Consume(ConsumeContext<AddSubjectTriggerCommand> context)
|
||||
{
|
||||
var addSubjectEvent= context.Message;
|
||||
|
|
@ -1,12 +1,27 @@
|
|||
using System.ComponentModel;
|
||||
|
||||
namespace IRaCIS.Core.Domain.BaseModel;
|
||||
|
||||
/// <summary>
|
||||
/// 事件 不影响数据库提交事务的,不会记录稽查
|
||||
///
|
||||
/// 比如 添加subject 自动添加访视,不适合作为事件,否则自动添加访视后,记录稽查,当前请求url 都不知道
|
||||
/// </summary>
|
||||
[Description("领域实体事件基类")]
|
||||
public abstract class DomainEvent
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// 命令,触发一些操作,在当前事务一起提交
|
||||
/// </summary>
|
||||
[Description("领域实体命令基类")]
|
||||
public abstract class DomainCommand
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public class FailedDomainEvent
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
|
|
|
@ -44,6 +44,31 @@ public abstract class Entity : IEntity<Guid>
|
|||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region 领域命令 当前事务一起处理的,触发操作
|
||||
|
||||
[JsonIgnore]
|
||||
private readonly List<DomainCommand> _domainCommands = [];
|
||||
|
||||
[JsonIgnore]
|
||||
[NotMapped]
|
||||
public IReadOnlyCollection<DomainCommand> DomainCommands => _domainCommands.AsReadOnly();
|
||||
public void AddDomainCommand(DomainCommand domainCommand)
|
||||
{
|
||||
_domainCommands.Add(domainCommand);
|
||||
}
|
||||
|
||||
public void RemoveDomainCommand(DomainCommand domainCommand)
|
||||
{
|
||||
_domainCommands.Remove(domainCommand);
|
||||
}
|
||||
|
||||
public void ClearDomainCommands()
|
||||
{
|
||||
_domainCommands.Clear();
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
#region 减少实体属性,增加基类
|
||||
|
||||
|
|
|
@ -19,4 +19,8 @@
|
|||
<PackageReference Include="EntityFrameworkCore.Projectables.Abstractions" Version="3.0.4" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Folder Include="_DomainEvent\" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
|
|
|
@ -3,7 +3,10 @@ using IRaCIS.Core.Domain.BaseModel;
|
|||
|
||||
namespace IRaCIS.Core.Domain;
|
||||
|
||||
public class AddSubjectEvent : DomainEvent
|
||||
/// <summary>
|
||||
/// 添加Subject 触发命令
|
||||
/// </summary>
|
||||
public class AddSubjectTriggerCommand : DomainCommand
|
||||
{
|
||||
public Guid SubjectId { get; set; }
|
||||
public Guid TrialId { get; set; }
|
|
@ -14,6 +14,15 @@ public static class DBContext_Ext
|
|||
{
|
||||
var changeTracker = context.ChangeTracker;
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
public static void AddDomainCommands(this DbContext context)
|
||||
{
|
||||
var changeTracker = context.ChangeTracker;
|
||||
|
||||
// 遍历 ChangeTracker 中的实体
|
||||
foreach (var entry in changeTracker.Entries<Subject>())
|
||||
{
|
||||
|
@ -22,10 +31,12 @@ public static class DBContext_Ext
|
|||
if (entry.State == EntityState.Added)
|
||||
{
|
||||
// 受试者添加 触发访视自动添加
|
||||
var addedEvent = new AddSubjectEvent { SubjectId = subject.Id, TrialId = subject.TrialId, TrialSiteId = subject.TrialSiteId };
|
||||
var addedEvent = new AddSubjectTriggerCommand { SubjectId = subject.Id, TrialId = subject.TrialId, TrialSiteId = subject.TrialSiteId };
|
||||
|
||||
subject.AddDomainEvent(addedEvent);
|
||||
subject.AddDomainCommand(addedEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -2,6 +2,8 @@
|
|||
using IRaCIS.Core.Domain.Models;
|
||||
using IRaCIS.Core.Domain.Share;
|
||||
using IRaCIS.Core.Infra.EFCore.Common;
|
||||
using MassTransit;
|
||||
using MassTransit.Transports;
|
||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Data;
|
||||
|
@ -9,7 +11,10 @@ using System.Data;
|
|||
|
||||
namespace IRaCIS.Core.Infra.EFCore;
|
||||
|
||||
public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger<AuditEntityInterceptor> _logger) : SaveChangesInterceptor
|
||||
public class AuditEntityInterceptor(IUserInfo _userInfo,
|
||||
ILogger<AuditEntityInterceptor> _logger
|
||||
//,ISendEndpoint _sendEndpoint
|
||||
) : SaveChangesInterceptor
|
||||
{
|
||||
|
||||
/// <summary>
|
||||
|
@ -22,7 +27,12 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger<AuditEntityInte
|
|||
public override ValueTask<InterceptionResult<int>> SavingChangesAsync(DbContextEventData eventData,
|
||||
InterceptionResult<int> result, CancellationToken cancellationToken = default)
|
||||
{
|
||||
//测试领域事件
|
||||
//领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑)
|
||||
eventData.Context.AddDomainCommands();
|
||||
|
||||
//DispatchDomainCommands(eventData.Context).GetAwaiter().GetResult();
|
||||
|
||||
//领域事件
|
||||
eventData.Context.AddDomainEvents();
|
||||
|
||||
//审计时间
|
||||
|
@ -35,8 +45,18 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger<AuditEntityInte
|
|||
}
|
||||
public override InterceptionResult<int> SavingChanges(DbContextEventData eventData, InterceptionResult<int> result)
|
||||
{
|
||||
//领域命令 (同一个事务提交的一些逻辑,类似Trigger 保存事务之前执行的一些逻辑)
|
||||
eventData.Context.AddDomainCommands();
|
||||
|
||||
//领域事件
|
||||
eventData.Context.AddDomainEvents();
|
||||
|
||||
//审计时间
|
||||
AuditEntities(eventData.Context);
|
||||
|
||||
//IRC稽查 放在savechange 之前 不影响之前的逻辑
|
||||
IRCDataInspection(eventData.Context);
|
||||
|
||||
return base.SavingChanges(eventData, result);
|
||||
}
|
||||
public void AuditEntities(DbContext? context)
|
||||
|
@ -143,6 +163,11 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger<AuditEntityInte
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
#region 异常处理
|
||||
|
||||
#endregion
|
||||
|
||||
private void LoggerDBContextException(DbContextErrorEventData eventData)
|
||||
{
|
||||
var ex = eventData.Exception;
|
||||
|
@ -201,4 +226,30 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, ILogger<AuditEntityInte
|
|||
|
||||
base.SaveChangesFailed(eventData);
|
||||
}
|
||||
|
||||
|
||||
#region 领域命令分发
|
||||
|
||||
//private async Task DispatchDomainCommands(DbContext? context)
|
||||
//{
|
||||
// if (context == null) return;
|
||||
|
||||
// var entities = context.ChangeTracker
|
||||
// .Entries<Entity>()
|
||||
// .Where(e => e.Entity.DomainCommands.Any())
|
||||
// .Select(e => e.Entity)
|
||||
// .ToList();
|
||||
|
||||
// var domainCommands = entities
|
||||
// .SelectMany(e => e.DomainCommands)
|
||||
// .ToList();
|
||||
|
||||
// entities.ForEach(e => e.ClearDomainCommands());
|
||||
|
||||
// foreach (var domainCommand in domainCommands)
|
||||
// {
|
||||
// await _sendEndpoint.Send(domainCommand.GetType(), domainCommand);
|
||||
// }
|
||||
//}
|
||||
#endregion
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
using IRaCIS.Core.Domain.Models;
|
||||
using IRaCIS.Core.Domain;
|
||||
using IRaCIS.Core.Domain.Models;
|
||||
using MassTransit;
|
||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||
using System.Data;
|
||||
|
@ -48,7 +49,10 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
|||
|
||||
foreach (var domainEvent in domainEvents)
|
||||
{
|
||||
await publishEndpoint.Publish(domainEvent);
|
||||
//这种方式会导致消息没处理
|
||||
//await publishEndpoint.Publish(domainEvent);
|
||||
|
||||
await publishEndpoint.Publish(domainEvent.GetType(), domainEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue