修改事件发布
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
d578feec18
commit
03d735a287
|
@ -12997,11 +12997,6 @@
|
||||||
</summary>
|
</summary>
|
||||||
<param name="_userRepository"></param>
|
<param name="_userRepository"></param>
|
||||||
</member>
|
</member>
|
||||||
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.MediatorHttpContextScopeFilterExtensions">
|
|
||||||
<summary>
|
|
||||||
参考链接:https://github.com/MassTransit/MassTransit/discussions/2498
|
|
||||||
</summary>
|
|
||||||
</member>
|
|
||||||
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.QCImageQuestionRecurringEventConsumer">
|
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.QCImageQuestionRecurringEventConsumer">
|
||||||
<summary>
|
<summary>
|
||||||
QC 影像质疑
|
QC 影像质疑
|
||||||
|
@ -13067,7 +13062,7 @@
|
||||||
10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
||||||
</summary>
|
</summary>
|
||||||
</member>
|
</member>
|
||||||
<member name="M:IRaCIS.Core.Application.MassTransit.Recurring.UrgentIRUnReadTaskRecurringEventConsumer.#ctor(IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.ReadingQuestionCriterionTrial},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.VisitTask},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.TrialUser})">
|
<member name="M:IRaCIS.Core.Application.MassTransit.Recurring.UrgentIRUnReadTaskRecurringEventConsumer.#ctor(IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.ReadingQuestionCriterionTrial},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.VisitTask},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.TrialUser},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.EmailNoticeConfig},Microsoft.Extensions.Options.IOptionsMonitor{IRaCIS.Core.Domain.Share.SystemEmailSendConfig})">
|
||||||
<summary>
|
<summary>
|
||||||
10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
||||||
</summary>
|
</summary>
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
|
|
||||||
|
|
||||||
using IRaCIS.Core.Application.MassTransit.Consumer;
|
|
||||||
using MassTransit;
|
|
||||||
using Microsoft.AspNetCore.Http;
|
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
|
||||||
|
|
||||||
namespace IRaCIS.Core.Application.MassTransit.Consumer;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// 参考链接:https://github.com/MassTransit/MassTransit/discussions/2498
|
|
||||||
/// </summary>
|
|
||||||
public static class MediatorHttpContextScopeFilterExtensions
|
|
||||||
{
|
|
||||||
public static void UseHttpContextScopeFilter(this IMediatorConfigurator configurator, IServiceProvider serviceProvider)
|
|
||||||
{
|
|
||||||
var filter = new HttpContextScopeFilter(serviceProvider.GetRequiredService<IHttpContextAccessor>());
|
|
||||||
|
|
||||||
configurator.ConfigurePublish(x => x.UseFilter(filter));
|
|
||||||
configurator.ConfigureSend(x => x.UseFilter(filter));
|
|
||||||
configurator.UseFilter(filter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public class HttpContextScopeFilter :
|
|
||||||
IFilter<PublishContext>,
|
|
||||||
IFilter<SendContext>,
|
|
||||||
IFilter<ConsumeContext>
|
|
||||||
{
|
|
||||||
private readonly IHttpContextAccessor _httpContextAccessor;
|
|
||||||
|
|
||||||
public HttpContextScopeFilter(IHttpContextAccessor httpContextAccessor)
|
|
||||||
{
|
|
||||||
_httpContextAccessor = httpContextAccessor;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void AddPayload(PipeContext context)
|
|
||||||
{
|
|
||||||
if (_httpContextAccessor.HttpContext == null)
|
|
||||||
return;
|
|
||||||
|
|
||||||
var serviceProvider = _httpContextAccessor.HttpContext.RequestServices;
|
|
||||||
context.GetOrAddPayload(() => serviceProvider);
|
|
||||||
context.GetOrAddPayload<IServiceScope>(() => new NoopScope(serviceProvider));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task Send(PublishContext context, IPipe<PublishContext> next)
|
|
||||||
{
|
|
||||||
AddPayload(context);
|
|
||||||
return next.Send(context);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task Send(SendContext context, IPipe<SendContext> next)
|
|
||||||
{
|
|
||||||
AddPayload(context);
|
|
||||||
return next.Send(context);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
|
|
||||||
{
|
|
||||||
AddPayload(context);
|
|
||||||
return next.Send(context);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Probe(ProbeContext context)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
private class NoopScope :
|
|
||||||
IServiceScope
|
|
||||||
{
|
|
||||||
public NoopScope(IServiceProvider serviceProvider)
|
|
||||||
{
|
|
||||||
ServiceProvider = serviceProvider;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public IServiceProvider ServiceProvider { get; }
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,9 +1,13 @@
|
||||||
using IRaCIS.Application.Contracts;
|
using IRaCIS.Application.Contracts;
|
||||||
using IRaCIS.Core.Application.Contracts;
|
using IRaCIS.Core.Application.Contracts;
|
||||||
|
using IRaCIS.Core.Application.Helper;
|
||||||
using IRaCIS.Core.Application.MassTransit.Consumer;
|
using IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using MimeKit;
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.Globalization;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
@ -14,11 +18,19 @@ namespace IRaCIS.Core.Application.MassTransit.Recurring
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// 10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
/// 10分钟检测通知IR 已通知的进行标注,下次不会再通知
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class UrgentIRUnReadTaskRecurringEventConsumer(IRepository<ReadingQuestionCriterionTrial> _trialReadingCriterionRepository,
|
public class UrgentIRUnReadTaskRecurringEventConsumer(
|
||||||
IRepository<VisitTask> _visitTaskRepository, IRepository<TrialUser> _trialUserRepository) : IConsumer<UrgentIRUnReadTaskRecurringEvent>
|
IRepository<ReadingQuestionCriterionTrial> _trialReadingCriterionRepository,
|
||||||
|
IRepository<VisitTask> _visitTaskRepository,
|
||||||
|
IRepository<TrialUser> _trialUserRepository,
|
||||||
|
IRepository<EmailNoticeConfig> _emailNoticeConfigrepository,
|
||||||
|
IOptionsMonitor<SystemEmailSendConfig> systemEmailConfig) : IConsumer<UrgentIRUnReadTaskRecurringEvent>
|
||||||
{
|
{
|
||||||
public Task Consume(ConsumeContext<UrgentIRUnReadTaskRecurringEvent> context)
|
private readonly SystemEmailSendConfig _systemEmailConfig = systemEmailConfig.CurrentValue;
|
||||||
|
|
||||||
|
public async Task Consume(ConsumeContext<UrgentIRUnReadTaskRecurringEvent> context)
|
||||||
{
|
{
|
||||||
|
var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
|
||||||
|
|
||||||
var trialId = context.Message.TrialId;
|
var trialId = context.Message.TrialId;
|
||||||
|
|
||||||
//找到该项目所有的IR 并且有加急 和Pd 未读的任务
|
//找到该项目所有的IR 并且有加急 和Pd 未读的任务
|
||||||
|
@ -41,7 +53,7 @@ namespace IRaCIS.Core.Application.MassTransit.Recurring
|
||||||
|
|
||||||
var userId=trialUser.UserId;
|
var userId=trialUser.UserId;
|
||||||
|
|
||||||
var doctorCriterionList = _trialReadingCriterionRepository.Where(t => t.IsSigned && t.IsConfirm && t.Trial.TrialUserList.Any(t => t.UserId == userId))
|
var doctorCriterionList = await _trialReadingCriterionRepository.Where(t => t.IsSigned && t.IsConfirm && t.Trial.TrialUserList.Any(t => t.UserId == userId))
|
||||||
.Select(c => new
|
.Select(c => new
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@ -65,9 +77,37 @@ namespace IRaCIS.Core.Application.MassTransit.Recurring
|
||||||
c.TaskName
|
c.TaskName
|
||||||
}).ToList()
|
}).ToList()
|
||||||
|
|
||||||
});
|
}).ToListAsync();
|
||||||
|
|
||||||
|
|
||||||
|
var toTalUnreadCount= doctorCriterionList.SelectMany(t=>t.UnReadList).Count();
|
||||||
|
|
||||||
|
|
||||||
|
var messageToSend = new MimeMessage();
|
||||||
|
|
||||||
|
var companyName = isEn_US ? _systemEmailConfig.CompanyShortName : _systemEmailConfig.CompanyShortNameCN;
|
||||||
|
|
||||||
|
Func<(string topicStr, string htmlBodyStr), (string topicStr, string htmlBodyStr)> emailConfigFunc = input =>
|
||||||
|
{
|
||||||
|
var topicStr = string.Format(input.topicStr, trialUser.ResearchProgramNo);
|
||||||
|
var htmlBodyStr = string.Format(CommonEmailHelper.ReplaceCompanyName(_systemEmailConfig, input.htmlBodyStr),
|
||||||
|
trialUser.FullName,
|
||||||
|
|
||||||
|
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
|
||||||
|
toTalUnreadCount,
|
||||||
|
|
||||||
|
trialUser.ResearchProgramNo,
|
||||||
|
trialUser.TrialReadingCriterionList,
|
||||||
|
//siteSurveyInfo.Phone,
|
||||||
|
_systemEmailConfig.SiteUrl
|
||||||
|
);
|
||||||
|
|
||||||
|
return (topicStr, htmlBodyStr);
|
||||||
|
};
|
||||||
|
|
||||||
|
await CommonEmailHelper.GetEmailSubejctAndHtmlInfoAndBuildAsync(_emailNoticeConfigrepository, EmailBusinessScenario.Approval_SubmitSiteSurvey, messageToSend, emailConfigFunc);
|
||||||
|
|
||||||
|
await SendEmailHelper.SendEmailAsync(messageToSend, _systemEmailConfig);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,9 +117,7 @@ namespace IRaCIS.Core.Application.MassTransit.Recurring
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Console.WriteLine(DateTime.Now);
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ namespace IRaCIS.Core.Application.Contracts
|
||||||
.Any(t => t.Value.Contains(" ")))
|
.Any(t => t.Value.Contains(" ")))
|
||||||
{
|
{
|
||||||
//邮件模板占位符不允许有空格,请核查占位符的地方
|
//邮件模板占位符不允许有空格,请核查占位符的地方
|
||||||
return ResponseOutput.NotOk("EmailNoticeConfig_ContainEmpty");
|
return ResponseOutput.NotOk(I18n.T("EmailNoticeConfig_ContainEmpty"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ public class EventStoreRecordService(IRepository<EventStoreRecord> _eventStoreRe
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="eventId"></param>
|
/// <param name="eventId"></param>
|
||||||
/// <returns></returns>
|
/// <returns></returns>
|
||||||
|
[HttpGet]
|
||||||
public async Task<IResponseOutput> RePublishEvent(Guid eventId)
|
public async Task<IResponseOutput> RePublishEvent(Guid eventId)
|
||||||
{
|
{
|
||||||
var storedEvent = await _eventStoreRecordRepository.FirstOrDefaultAsync(t => t.Id == eventId);
|
var storedEvent = await _eventStoreRecordRepository.FirstOrDefaultAsync(t => t.Id == eventId);
|
||||||
|
|
|
@ -685,7 +685,7 @@ namespace IRaCIS.Core.Application.Contracts
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (_userInfo.UserTypeEnumInt == (int)UserTypeEnum.Undefined)
|
if (_userInfo.UserTypeEnumInt == (int)UserTypeEnum.Undefined || _userInfo.UserTypeEnumInt == (int)UserTypeEnum.ClinicalResearchCoordinator)
|
||||||
{
|
{
|
||||||
var hasSPMOrCPM = await _trialSiteSurveyRepository.AnyAsync(t => t.TrialId == trialId && t.Trial.TrialUserList.Any(u => u.User.UserTypeEnum == UserTypeEnum.SPM || u.User.UserTypeEnum == UserTypeEnum.CPM));
|
var hasSPMOrCPM = await _trialSiteSurveyRepository.AnyAsync(t => t.TrialId == trialId && t.Trial.TrialUserList.Any(u => u.User.UserTypeEnum == UserTypeEnum.SPM || u.User.UserTypeEnum == UserTypeEnum.CPM));
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
// 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer
|
// 发件箱模式参考: https://dev.to/antonmartyniuk/use-masstransit-to-implement-outbox-pattern-with-ef-core-and-mongodb-oep#:~:text=MongoDB%20replica%20set%20is%20required%20for%20both%20publisher%20and%20consumer
|
||||||
// 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中
|
// 1、IPublishEndpoint 才会将事件存储到发件箱表中, 高级IBus接口时 - 消息不会存储在发件箱中,必须有savechanges 才会一起提交保存到数据库中
|
||||||
// 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc
|
// 2、进入消息代理之前,发布事件在OutboxState OutboxMessage, 进入消费者以后(已经删除OutboxState OutboxMessage),消费失败,需要修改代码重新发布,然后之前消费事件的重新处理,错误处理参考:https://www.youtube.com/watch?v=3TMKUu7c4lc
|
||||||
public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler) : SaveChangesInterceptor
|
public class DispatchDomainEventsInterceptor(IMediator _mediator, IMessageScheduler _scheduler, IPublishEndpoint _publishEndpoint) : SaveChangesInterceptor
|
||||||
{
|
{
|
||||||
|
|
||||||
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
//领域事件通常与数据变更密切相关。如果在 SaveChanges 之前发布事件,有可能事件发布时的数据状态还没有被持久化到数据库。这可能导致事件消费者看到的是一个不一致的状态
|
||||||
|
@ -64,7 +64,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
|
||||||
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds((int)domainEvent.DelaySeconds!), (object)domainEvent);
|
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds((int)domainEvent.DelaySeconds!), (object)domainEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
await _mediator.Publish(domainEvent.GetType(), domainEvent);
|
await _publishEndpoint.Publish(domainEvent.GetType(), domainEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue