暂时屏蔽事件存储
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
a6fd6a5e69
commit
464f70f501
|
@ -35,7 +35,6 @@ using IRaCIS.Core.Application.MassTransit.Command;
|
||||||
using IRaCIS.Core.Application.MassTransit.Consumer;
|
using IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
using DocumentFormat.OpenXml.InkML;
|
using DocumentFormat.OpenXml.InkML;
|
||||||
using IRaCIS.Core.Domain;
|
using IRaCIS.Core.Domain;
|
||||||
using static IRaCIS.Core.Application.Service.TestService;
|
|
||||||
|
|
||||||
AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
|
AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
|
||||||
AppContext.SetSwitch("Npgsql.DisableDateTimeInfinityConversions", true);
|
AppContext.SetSwitch("Npgsql.DisableDateTimeInfinityConversions", true);
|
||||||
|
@ -164,9 +163,6 @@ builder.Services.AddMassTransit(cfg =>
|
||||||
cfg.AddConsumer<AddSubjectTriggerConsumer>(); // 替换为你的消费者类
|
cfg.AddConsumer<AddSubjectTriggerConsumer>(); // 替换为你的消费者类
|
||||||
cfg.AddConsumer<AddSubjectTriggerConsumer2>();
|
cfg.AddConsumer<AddSubjectTriggerConsumer2>();
|
||||||
|
|
||||||
cfg.AddConsumer<MasstransitHangfireTestConsumer>();
|
|
||||||
|
|
||||||
|
|
||||||
cfg.AddPublishMessageScheduler();
|
cfg.AddPublishMessageScheduler();
|
||||||
//cfg.AddHangfireConsumers();
|
//cfg.AddHangfireConsumers();
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
using IRaCIS.Core.Domain.Models;
|
using IRaCIS.Core.Domain.Models;
|
||||||
|
using MassTransit;
|
||||||
using System.ComponentModel;
|
using System.ComponentModel;
|
||||||
|
|
||||||
namespace IRaCIS.Core.Domain.BaseModel;
|
namespace IRaCIS.Core.Domain.BaseModel;
|
||||||
|
@ -12,7 +13,15 @@ namespace IRaCIS.Core.Domain.BaseModel;
|
||||||
public abstract class DomainEvent
|
public abstract class DomainEvent
|
||||||
{
|
{
|
||||||
|
|
||||||
|
public Guid EventId { get; set; } = NewId.NextSequentialGuid();
|
||||||
|
|
||||||
|
//是不是延迟消费的事件,需要用定时任务调度
|
||||||
|
public bool IsScheduleEvent { get; set; }=false;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// 在事件产生多少s后开始消费该事件
|
||||||
|
/// </summary>
|
||||||
|
public int DelaySeconds{ get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|
|
@ -23,6 +23,7 @@ public abstract class Entity : IEntity<Guid>
|
||||||
#region 领域事件 仅仅允许通过提供的方法进行操作
|
#region 领域事件 仅仅允许通过提供的方法进行操作
|
||||||
|
|
||||||
[JsonIgnore]
|
[JsonIgnore]
|
||||||
|
[NotMapped]
|
||||||
private readonly List<DomainEvent> _domainEvents = [];
|
private readonly List<DomainEvent> _domainEvents = [];
|
||||||
|
|
||||||
[JsonIgnore]
|
[JsonIgnore]
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="EntityFrameworkCore.Projectables.Abstractions" Version="3.0.4" />
|
<PackageReference Include="EntityFrameworkCore.Projectables.Abstractions" Version="3.0.4" />
|
||||||
|
<PackageReference Include="MassTransit" Version="8.2.5" />
|
||||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
</Project>
|
</Project>
|
||||||
|
|
|
@ -50,7 +50,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(trialSiteSurvey.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(trialSiteSurvey.DomainEvents.Select(t => new EventStoreRecord() { Id=t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ public static class DBContext_Ext
|
||||||
|
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(subjectVisit.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(subjectVisit.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(qCChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(qCChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() {Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(checkChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(checkChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(taskMedicalReview.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(taskMedicalReview.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -220,7 +220,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(readingMedicalReviewDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(readingMedicalReviewDialog.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ public static class DBContext_Ext
|
||||||
}
|
}
|
||||||
|
|
||||||
//添加进记录
|
//添加进记录
|
||||||
eventStoreList.AddRange(visitTask.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
eventStoreList.AddRange(visitTask.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() }));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ public class AuditEntityInterceptor(IUserInfo _userInfo,
|
||||||
//};
|
//};
|
||||||
|
|
||||||
//领域事件
|
//领域事件
|
||||||
eventData.Context.AddDomainEvents();
|
//eventData.Context.AddDomainEvents();
|
||||||
|
|
||||||
//审计时间
|
//审计时间
|
||||||
AuditEntities(eventData.Context);
|
AuditEntities(eventData.Context);
|
||||||
|
|
|
@ -14,7 +14,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
// 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer
|
// 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer
|
||||||
// 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中
|
// 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中
|
||||||
// 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc
|
// 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc
|
||||||
public class DispatchDomainEventsInterceptor(IMediator _mediator) : SaveChangesInterceptor
|
public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler) : SaveChangesInterceptor
|
||||||
{
|
{
|
||||||
|
|
||||||
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
||||||
|
@ -58,6 +58,11 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
//这种方式会导致消息没处理
|
//这种方式会导致消息没处理
|
||||||
//await publishEndpoint.Publish(domainEvent);
|
//await publishEndpoint.Publish(domainEvent);
|
||||||
|
|
||||||
|
if (domainEvent.IsScheduleEvent)
|
||||||
|
{
|
||||||
|
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), domainEvent);
|
||||||
|
}
|
||||||
|
|
||||||
await _mediator.Publish(domainEvent.GetType(), domainEvent);
|
await _mediator.Publish(domainEvent.GetType(), domainEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue