From 464f70f50101941e29b1719ac41cef6c103fd446 Mon Sep 17 00:00:00 2001 From: hang <872297557@qq.com> Date: Fri, 11 Oct 2024 13:53:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E6=97=B6=E5=B1=8F=E8=94=BD=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- IRaCIS.Core.API/Progranm.cs | 4 ---- IRaCIS.Core.Domain/BaseModel/DomainEvent.cs | 9 +++++++++ IRaCIS.Core.Domain/BaseModel/Entity.cs | 1 + IRaCIS.Core.Domain/IRaCIS.Core.Domain.csproj | 1 + .../Interceptor/AddDomainExt.cs | 14 +++++++------- .../Interceptor/AuditEntityInterceptor.cs | 2 +- .../Interceptor/DispatchDomainEventsInterceptor.cs | 7 ++++++- 7 files changed, 25 insertions(+), 13 deletions(-) diff --git a/IRaCIS.Core.API/Progranm.cs b/IRaCIS.Core.API/Progranm.cs index b3a34ede8..0d9bcef7b 100644 --- a/IRaCIS.Core.API/Progranm.cs +++ b/IRaCIS.Core.API/Progranm.cs @@ -35,7 +35,6 @@ using IRaCIS.Core.Application.MassTransit.Command; using IRaCIS.Core.Application.MassTransit.Consumer; using DocumentFormat.OpenXml.InkML; using IRaCIS.Core.Domain; -using static IRaCIS.Core.Application.Service.TestService; AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true); AppContext.SetSwitch("Npgsql.DisableDateTimeInfinityConversions", true); @@ -164,9 +163,6 @@ builder.Services.AddMassTransit(cfg => cfg.AddConsumer(); // 替换为你的消费者类 cfg.AddConsumer(); - cfg.AddConsumer(); - - cfg.AddPublishMessageScheduler(); //cfg.AddHangfireConsumers(); diff --git a/IRaCIS.Core.Domain/BaseModel/DomainEvent.cs b/IRaCIS.Core.Domain/BaseModel/DomainEvent.cs index 7d609201c..e867ee572 100644 --- a/IRaCIS.Core.Domain/BaseModel/DomainEvent.cs +++ b/IRaCIS.Core.Domain/BaseModel/DomainEvent.cs @@ -1,4 +1,5 @@ using IRaCIS.Core.Domain.Models; +using MassTransit; using System.ComponentModel; namespace IRaCIS.Core.Domain.BaseModel; @@ -12,7 +13,15 @@ namespace IRaCIS.Core.Domain.BaseModel; public abstract class DomainEvent { + public Guid EventId { get; set; } = NewId.NextSequentialGuid(); + //是不是延迟消费的事件,需要用定时任务调度 + public bool IsScheduleEvent { get; set; }=false; + + /// + /// 在事件产生多少s后开始消费该事件 + /// + public int DelaySeconds{ get; set; } } /// diff --git a/IRaCIS.Core.Domain/BaseModel/Entity.cs b/IRaCIS.Core.Domain/BaseModel/Entity.cs index 4aa529c86..671393d49 100644 --- a/IRaCIS.Core.Domain/BaseModel/Entity.cs +++ b/IRaCIS.Core.Domain/BaseModel/Entity.cs @@ -23,6 +23,7 @@ public abstract class Entity : IEntity #region 领域事件 仅仅允许通过提供的方法进行操作 [JsonIgnore] + [NotMapped] private readonly List _domainEvents = []; [JsonIgnore] diff --git a/IRaCIS.Core.Domain/IRaCIS.Core.Domain.csproj b/IRaCIS.Core.Domain/IRaCIS.Core.Domain.csproj index fefabf7ba..85d943c69 100644 --- a/IRaCIS.Core.Domain/IRaCIS.Core.Domain.csproj +++ b/IRaCIS.Core.Domain/IRaCIS.Core.Domain.csproj @@ -17,6 +17,7 @@ + diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/AddDomainExt.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/AddDomainExt.cs index fd827068a..9086b2510 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/AddDomainExt.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/AddDomainExt.cs @@ -50,7 +50,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(trialSiteSurvey.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(trialSiteSurvey.DomainEvents.Select(t => new EventStoreRecord() { Id=t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } @@ -93,7 +93,7 @@ public static class DBContext_Ext //添加进记录 - eventStoreList.AddRange(subjectVisit.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(subjectVisit.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } @@ -122,7 +122,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(qCChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(qCChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() {Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } @@ -148,7 +148,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(checkChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(checkChallengeDialog.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } @@ -184,7 +184,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(taskMedicalReview.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(taskMedicalReview.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } @@ -220,7 +220,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(readingMedicalReviewDialog.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(readingMedicalReviewDialog.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } @@ -254,7 +254,7 @@ public static class DBContext_Ext } //添加进记录 - eventStoreList.AddRange(visitTask.DomainEvents.Select(t => new EventStoreRecord() { EventType = t.GetType().Name, EventData = t.ToJsonStr() })); + eventStoreList.AddRange(visitTask.DomainEvents.Select(t => new EventStoreRecord() { Id = t.EventId, EventType = t.GetType().Name, EventData = t.ToJsonStr() })); } } diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs index bb37a9a52..167f8aa7e 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/AuditEntityInterceptor.cs @@ -48,7 +48,7 @@ public class AuditEntityInterceptor(IUserInfo _userInfo, //}; //领域事件 - eventData.Context.AddDomainEvents(); + //eventData.Context.AddDomainEvents(); //审计时间 AuditEntities(eventData.Context); diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs index 171a9eeda..993a7bdaa 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs @@ -14,7 +14,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor // 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer // 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中 // 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc - public class DispatchDomainEventsInterceptor(IMediator _mediator) : SaveChangesInterceptor + public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler) : SaveChangesInterceptor { //领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态 @@ -58,6 +58,11 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor //这种方式会导致消息没处理 //await publishEndpoint.Publish(domainEvent); + if (domainEvent.IsScheduleEvent) + { + await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), domainEvent); + } + await _mediator.Publish(domainEvent.GetType(), domainEvent); } }