From 2a0d01c161c4ff3ae985589d88100e40a7b86de6 Mon Sep 17 00:00:00 2001 From: hang <872297557@qq.com> Date: Fri, 18 Oct 2024 15:08:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=AD=E5=BF=83=E8=B0=83?= =?UTF-8?q?=E7=A0=94=E8=A1=A8=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HostService/HangfireHostService.cs | 4 ++- .../_ServiceExtensions/MassTransitSetup.cs | 5 ++- .../Consumer/SiteSurverEmailConsumer.cs | 25 ++++++++++---- .../Extension/ConsumeExceptionFilter.cs | 33 +++++++++++++++++++ .../DispatchDomainEventsInterceptor.cs | 4 +-- 5 files changed, 60 insertions(+), 11 deletions(-) create mode 100644 IRaCIS.Core.Application/MassTransit/Extension/ConsumeExceptionFilter.cs diff --git a/IRaCIS.Core.API/HostService/HangfireHostService.cs b/IRaCIS.Core.API/HostService/HangfireHostService.cs index 65336aec0..c3220ef99 100644 --- a/IRaCIS.Core.API/HostService/HangfireHostService.cs +++ b/IRaCIS.Core.API/HostService/HangfireHostService.cs @@ -14,11 +14,13 @@ using MassTransit.Scheduling; using Hangfire.Storage; using System.Linq; using Microsoft.EntityFrameworkCore; +using MassTransit.Mediator; namespace IRaCIS.Core.API.HostService; public class HangfireHostService(IRecurringMessageScheduler _recurringMessageScheduler, IRepository _trialEmailNoticeConfigRepository, + IMediator _mediator, ILogger _logger) : IHostedService { @@ -53,7 +55,7 @@ public class HangfireHostService(IRecurringMessageScheduler _recurringMessageSch } - await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() }); + //await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() }); diff --git a/IRaCIS.Core.API/_ServiceExtensions/MassTransitSetup.cs b/IRaCIS.Core.API/_ServiceExtensions/MassTransitSetup.cs index 19b368b9d..f2a6b5730 100644 --- a/IRaCIS.Core.API/_ServiceExtensions/MassTransitSetup.cs +++ b/IRaCIS.Core.API/_ServiceExtensions/MassTransitSetup.cs @@ -39,12 +39,15 @@ namespace IRaCIS.Core.API cfg.UsePublishMessageScheduler(); - + cfg.UseConsumeFilter(typeof(ConsumeExceptionFilter<>), context, + x => x.Include(type => type.IsAssignableTo(typeof(DomainEvent)))); + cfg.UseConsumeFilter(typeof(CultureInfoFilter<>), context, x => x.Include(type => type.IsAssignableTo(typeof(DomainEvent)))); cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点 + }); }); diff --git a/IRaCIS.Core.Application/MassTransit/Consumer/SiteSurverEmailConsumer.cs b/IRaCIS.Core.Application/MassTransit/Consumer/SiteSurverEmailConsumer.cs index 3fb1d9fcd..0f3fdf4f9 100644 --- a/IRaCIS.Core.Application/MassTransit/Consumer/SiteSurverEmailConsumer.cs +++ b/IRaCIS.Core.Application/MassTransit/Consumer/SiteSurverEmailConsumer.cs @@ -219,26 +219,37 @@ public class SiteSurverRejectedEventConsumer( var trialId = siteSurveyInfo.TrialId; - var messageToSend = new MimeMessage(); - var name = siteSurveyInfo.UserName; + var toUserName = siteSurveyInfo.UserName; if (context.Message.IsHaveSPMOrCPM) { //PM 驳回到SPM if (siteSurveyInfo.State == TrialSiteSurveyEnum.CRCSubmitted) { - var user = await _userRepository.FirstOrDefaultAsync(t => t.Id == siteSurveyInfo.PreliminaryUserId); + //var user = await _userRepository.FirstOrDefaultAsync(t => t.Id == siteSurveyInfo.PreliminaryUserId); - name = user.FullName; + //name = user.FullName; + + var sPMOrCPMList = _trialRepository.Where(t => t.Id == trialId) + .SelectMany(t => t.TrialUserList) + .Where(t => t.User.UserTypeEnum == UserTypeEnum.SPM || t.User.UserTypeEnum == UserTypeEnum.CPM) + .Select(t => new { t.User.EMail, t.User.FullName, t.User.UserTypeEnum }).ToList(); + + + foreach (var user in sPMOrCPMList) + { + messageToSend.To.Add(new MailboxAddress(user.FullName, user.EMail)); + } + + toUserName = string.Join('、', sPMOrCPMList.Select(t => t.FullName)); - messageToSend.To.Add(new MailboxAddress(name, user.EMail)); } //SPM 驳回到CRC else if (siteSurveyInfo.State == TrialSiteSurveyEnum.ToSubmit) { - messageToSend.To.Add(new MailboxAddress(name, siteSurveyInfo.Email)); + messageToSend.To.Add(new MailboxAddress(toUserName, siteSurveyInfo.Email)); } } @@ -265,7 +276,7 @@ public class SiteSurverRejectedEventConsumer( { var topicStr = string.Format(input.topicStr, companyName, trialInfo.ResearchProgramNo); var htmlBodyStr = string.Format(CommonEmailHelper.ReplaceCompanyName(_systemEmailConfig, input.htmlBodyStr), - name, + toUserName, trialInfo.TrialCode, trialInfo.ResearchProgramNo, trialInfo.ExperimentName, diff --git a/IRaCIS.Core.Application/MassTransit/Extension/ConsumeExceptionFilter.cs b/IRaCIS.Core.Application/MassTransit/Extension/ConsumeExceptionFilter.cs new file mode 100644 index 000000000..8bdf23368 --- /dev/null +++ b/IRaCIS.Core.Application/MassTransit/Extension/ConsumeExceptionFilter.cs @@ -0,0 +1,33 @@ +using System; +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; +using IRaCIS.Core.Domain.BaseModel; +using IRaCIS.Core.Domain.Share; +using MassTransit; +using Microsoft.Extensions.Logging; +using NPOI.SS.Formula.Functions; + +public class ConsumeExceptionFilter(ILogger> _logger) : IFilter> where T : DomainEvent +{ + + public async Task Send(ConsumeContext context, IPipe> next) + { + + try + { + await next.Send(context); + + } + catch (Exception exception) + { + var errorInfo = $"Exception: {exception.Message}[{exception.StackTrace}]" + (exception.InnerException != null ? $" InnerException: {exception.InnerException.Message}[{exception.InnerException.StackTrace}]" : ""); + _logger.LogError(errorInfo); + } + } + + public void Probe(ProbeContext context) + { + context.CreateFilterScope("ConsumeException"); + } +} diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs index d2825a696..871ed9b45 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs @@ -14,7 +14,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor // 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer // 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中 // 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc - public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler, IPublishEndpoint _publishEndpoint) : SaveChangesInterceptor + public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler/*, IPublishEndpoint _publishEndpoint*/) : SaveChangesInterceptor { //领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态 @@ -64,7 +64,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor await _scheduler.SchedulePublish(DateTime.Now.AddSeconds((int)domainEvent.DelaySeconds!), (object)domainEvent); } - await _publishEndpoint.Publish(domainEvent.GetType(), domainEvent); + await _mediator.Publish(domainEvent.GetType(), domainEvent); } }