IRecurringMessageScheduler 测试使用,以及bug发现
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
81b9c57de7
commit
8760ad3904
|
@ -0,0 +1,66 @@
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using System.Threading;
|
||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MassTransit;
|
||||||
|
using IRaCIS.Core.Domain.Models;
|
||||||
|
using IRaCIS.Core.Infra.EFCore;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Hangfire;
|
||||||
|
using IRaCIS.Core.Application.Helper;
|
||||||
|
using IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
|
using IRaCIS.Core.Domain.Share;
|
||||||
|
using MassTransit.Scheduling;
|
||||||
|
using Hangfire.Storage;
|
||||||
|
using System.Linq;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace IRaCIS.Core.API.HostService;
|
||||||
|
|
||||||
|
public class HangfireHostService(IRecurringMessageScheduler _recurringMessageScheduler,
|
||||||
|
IRepository<TrialEmailNoticeConfig> _trialEmailNoticeConfigRepository,
|
||||||
|
ILogger<HangfireHostService> _logger) : IHostedService
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
public async Task StartAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("项目启动 hangfire 任务初始化 执行开始~");
|
||||||
|
|
||||||
|
|
||||||
|
//创建邮件定时任务
|
||||||
|
//项目定时任务都在default 队列
|
||||||
|
var dbJobIdList = JobStorage.Current.GetConnection().GetRecurringJobs().Where(t => t.Queue == "default").Select(t => t.Id).ToList();
|
||||||
|
|
||||||
|
foreach (var jobId in dbJobIdList)
|
||||||
|
{
|
||||||
|
HangfireJobHelper.RemoveCronJob(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
var taskInfoList = await _trialEmailNoticeConfigRepository.Where(t => t.Trial.TrialStatusStr == StaticData.TrialState.TrialOngoing && t.EmailCron != string.Empty && t.IsAutoSend)
|
||||||
|
.Select(t => new { t.Id, t.Code, TrialCode = t.Trial.TrialCode, t.EmailCron, t.BusinessScenarioEnum, t.TrialId })
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
foreach (var task in taskInfoList)
|
||||||
|
{
|
||||||
|
//利用主键作为任务Id
|
||||||
|
var jobId = $"{task.TrialId}({task.TrialCode})_({task.BusinessScenarioEnum})";
|
||||||
|
|
||||||
|
var trialId = task.TrialId;
|
||||||
|
|
||||||
|
HangfireJobHelper.AddOrUpdateTrialCronJob(jobId, trialId, task.BusinessScenarioEnum, task.EmailCron);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
_logger.LogInformation("项目启动 hangfire 任务初始化 执行结束");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -1,9 +1,9 @@
|
||||||
using IRaCIS.Core.API;
|
using IRaCIS.Core.API;
|
||||||
|
using IRaCIS.Core.API.HostService;
|
||||||
using IRaCIS.Core.Application.BusinessFilter;
|
using IRaCIS.Core.Application.BusinessFilter;
|
||||||
using IRaCIS.Core.Application.Filter;
|
using IRaCIS.Core.Application.Filter;
|
||||||
using IRaCIS.Core.Application.MassTransit.Consumer;
|
using IRaCIS.Core.Application.MassTransit.Consumer;
|
||||||
using IRaCIS.Core.Application.Service;
|
using IRaCIS.Core.Application.Service;
|
||||||
using IRaCIS.Core.Application.Service.BackGroundJob;
|
|
||||||
using IRaCIS.Core.Application.Service.BusinessFilter;
|
using IRaCIS.Core.Application.Service.BusinessFilter;
|
||||||
using IRaCIS.Core.Infra.EFCore;
|
using IRaCIS.Core.Infra.EFCore;
|
||||||
using IRaCIS.Core.Infrastructure.Extention;
|
using IRaCIS.Core.Infrastructure.Extention;
|
||||||
|
@ -72,6 +72,8 @@ var _configuration = builder.Configuration;
|
||||||
//手动注册服务
|
//手动注册服务
|
||||||
builder.Services.ConfigureServices(_configuration);
|
builder.Services.ConfigureServices(_configuration);
|
||||||
|
|
||||||
|
builder.Services.AddHostedService<HangfireHostService>();
|
||||||
|
|
||||||
//minimal api 异常处理
|
//minimal api 异常处理
|
||||||
builder.Services.AddExceptionHandler<GlobalExceptionHandler>();
|
builder.Services.AddExceptionHandler<GlobalExceptionHandler>();
|
||||||
//builder.Services.AddProblemDetails();
|
//builder.Services.AddProblemDetails();
|
||||||
|
@ -251,11 +253,6 @@ SerilogExtension.AddSerilogSetup(enviromentName, app.Services);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
var hangfireJobService = app.Services.GetRequiredService<IIRaCISHangfireJob>();
|
|
||||||
|
|
||||||
await hangfireJobService.InitHangfireJobTaskAsync();
|
|
||||||
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
#region 运行环境 部署平台
|
#region 运行环境 部署平台
|
||||||
|
@ -288,6 +285,8 @@ try
|
||||||
#endregion
|
#endregion
|
||||||
|
|
||||||
app.Run();
|
app.Run();
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
{
|
{
|
||||||
|
|
|
@ -28,36 +28,21 @@ namespace IRaCIS.Core.API
|
||||||
//添加 MassTransit 和 InMemory 传输
|
//添加 MassTransit 和 InMemory 传输
|
||||||
services.AddMassTransit(cfg =>
|
services.AddMassTransit(cfg =>
|
||||||
{
|
{
|
||||||
// 自动扫描程序集中的消费者并进行注册
|
|
||||||
cfg.AddConsumers(typeof(UserSiteSurveySubmitedEventConsumer).Assembly);
|
cfg.AddConsumers(typeof(UserSiteSurveySubmitedEventConsumer).Assembly);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//Uri schedulerEndpoint = new Uri("queue:scheduler");
|
|
||||||
//cfg.AddMessageScheduler(schedulerEndpoint);
|
|
||||||
|
|
||||||
cfg.AddPublishMessageScheduler();
|
cfg.AddPublishMessageScheduler();
|
||||||
cfg.AddHangfireConsumers();
|
cfg.AddHangfireConsumers();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// 使用 InMemory 作为消息传递机制
|
// 使用 InMemory 作为消息传递机制
|
||||||
cfg.UsingInMemory((context, cfg) =>
|
cfg.UsingInMemory((context, cfg) =>
|
||||||
{
|
{
|
||||||
//https://github.com/MassTransit/Sample-Hangfire/blob/master/src/Sample.Hangfire.Console/Program.cs
|
|
||||||
cfg.UsePublishMessageScheduler();
|
cfg.UsePublishMessageScheduler();
|
||||||
|
|
||||||
//cfg.UseMessageScheduler(schedulerEndpoint);
|
|
||||||
|
|
||||||
|
|
||||||
//使用 Hangfire 进行消息调度
|
|
||||||
//cfg.UseHangfireScheduler();
|
|
||||||
|
|
||||||
cfg.UseConsumeFilter(typeof(CultureInfoFilter<>), context,
|
cfg.UseConsumeFilter(typeof(CultureInfoFilter<>), context,
|
||||||
x => x.Include(type => type.IsAssignableTo(typeof(DomainEvent))));
|
x => x.Include(type => type.IsAssignableTo(typeof(DomainEvent))));
|
||||||
|
|
||||||
|
|
||||||
// 这里可以进行额外的配置
|
|
||||||
cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点
|
cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
|
@ -4,7 +4,6 @@ using IP2Region.Net.XDB;
|
||||||
using IRaCIS.Core.Application.BackGroundJob;
|
using IRaCIS.Core.Application.BackGroundJob;
|
||||||
using IRaCIS.Core.Application.Helper;
|
using IRaCIS.Core.Application.Helper;
|
||||||
using IRaCIS.Core.Application.Service;
|
using IRaCIS.Core.Application.Service;
|
||||||
using IRaCIS.Core.Application.Service.BackGroundJob;
|
|
||||||
using IRaCIS.Core.Domain.Share;
|
using IRaCIS.Core.Domain.Share;
|
||||||
using IRaCIS.Core.Infra.EFCore;
|
using IRaCIS.Core.Infra.EFCore;
|
||||||
using Microsoft.AspNetCore.Builder;
|
using Microsoft.AspNetCore.Builder;
|
||||||
|
@ -52,7 +51,6 @@ public static class ServiceCollectionSetup
|
||||||
services.AddScoped<IRepository, Repository>();
|
services.AddScoped<IRepository, Repository>();
|
||||||
|
|
||||||
services.AddScoped<IObtainTaskAutoCancelJob, ObtainTaskAutoCancelJob>();
|
services.AddScoped<IObtainTaskAutoCancelJob, ObtainTaskAutoCancelJob>();
|
||||||
services.AddScoped<IIRaCISHangfireJob, IRaCISCHangfireJob>();
|
|
||||||
|
|
||||||
// 注册以Service 结尾的服务
|
// 注册以Service 结尾的服务
|
||||||
services.Scan(scan => scan
|
services.Scan(scan => scan
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
using Hangfire;
|
|
||||||
using Hangfire.Storage;
|
|
||||||
using IRaCIS.Core.Application.Helper;
|
|
||||||
using IRaCIS.Core.Application.MassTransit.Consumer;
|
|
||||||
using IRaCIS.Core.Domain.Models;
|
|
||||||
using IRaCIS.Core.Domain.Share;
|
|
||||||
using MassTransit;
|
|
||||||
using MassTransit.Mediator;
|
|
||||||
using MassTransit.Scheduling;
|
|
||||||
using Microsoft.Extensions.Logging;
|
|
||||||
using NPOI.SS.Formula.Functions;
|
|
||||||
|
|
||||||
namespace IRaCIS.Core.Application.Service.BackGroundJob
|
|
||||||
{
|
|
||||||
|
|
||||||
public interface IIRaCISHangfireJob
|
|
||||||
{
|
|
||||||
Task InitHangfireJobTaskAsync();
|
|
||||||
|
|
||||||
}
|
|
||||||
public class IRaCISCHangfireJob(ILogger<IRaCISCHangfireJob> _logger,
|
|
||||||
IRepository<TrialEmailNoticeConfig> _trialEmailNoticeConfigRepository,
|
|
||||||
IBus _bus
|
|
||||||
) : IIRaCISHangfireJob
|
|
||||||
{
|
|
||||||
public static string JsonFileFolder = Path.Combine(AppContext.BaseDirectory, StaticData.Folder.Resources);
|
|
||||||
|
|
||||||
|
|
||||||
public async Task InitHangfireJobTaskAsync()
|
|
||||||
{
|
|
||||||
_logger.LogInformation("项目启动 hangfire 任务初始化 执行开始~");
|
|
||||||
|
|
||||||
|
|
||||||
//创建邮件定时任务
|
|
||||||
await InitSysAndTrialCronJobAsync();
|
|
||||||
|
|
||||||
|
|
||||||
_logger.LogInformation("项目启动 hangfire 任务初始化 执行结束");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public async Task InitSysAndTrialCronJobAsync()
|
|
||||||
{
|
|
||||||
//项目定时任务都在default 队列
|
|
||||||
var dbJobIdList = JobStorage.Current.GetConnection().GetRecurringJobs().Where(t => t.Queue == "default").Select(t => t.Id).ToList();
|
|
||||||
|
|
||||||
foreach (var jobId in dbJobIdList)
|
|
||||||
{
|
|
||||||
HangfireJobHelper.RemoveCronJob(jobId);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
var taskInfoList = await _trialEmailNoticeConfigRepository.Where(t => t.Trial.TrialStatusStr == StaticData.TrialState.TrialOngoing && t.EmailCron != string.Empty && t.IsAutoSend)
|
|
||||||
.Select(t => new { t.Id, t.Code, TrialCode=t.Trial.TrialCode, t.EmailCron, t.BusinessScenarioEnum, t.TrialId })
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
foreach (var task in taskInfoList)
|
|
||||||
{
|
|
||||||
//利用主键作为任务Id
|
|
||||||
var jobId = $"{task.TrialId}({task.TrialCode})_{task.Id}({task.BusinessScenarioEnum})";
|
|
||||||
|
|
||||||
var trialId = task.TrialId;
|
|
||||||
|
|
||||||
HangfireJobHelper.AddOrUpdateTrialCronJob(jobId, trialId, task.BusinessScenarioEnum, task.EmailCron);
|
|
||||||
}
|
|
||||||
|
|
||||||
//var schedulerEndpoint = await _bus.GetSendEndpoint(new Uri("queue:sys_init"));
|
|
||||||
//await schedulerEndpoint.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:mt-message-queue"), new QCImageQuestionSchedule() { ScheduleId = jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new PollExternalSystem { });
|
|
||||||
//await schedulerEndpoint.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:sys_init"), new QCImageQuestionSchedule() { ScheduleId = jobId + jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new PollExternalSystem { });
|
|
||||||
|
|
||||||
//HangfireJobHelper.AddOrUpdateCronJob<IMediator>("test-MasstransiTestCommand", t => t.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() }, default), "0/3 * * * * ? ");
|
|
||||||
|
|
||||||
//await _bus.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:mt-message-queue"), new QCImageQuestionSchedule() { ScheduleId = jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new QCImageQuestionSchedule { });
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public class PollExternalSystem { }
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
|
@ -1286,27 +1286,6 @@
|
||||||
</summary>
|
</summary>
|
||||||
<returns></returns>
|
<returns></returns>
|
||||||
</member>
|
</member>
|
||||||
<member name="M:IRaCIS.Core.Application.Service.EmailSendService.SendTrialImageQCTaskEmailAsync(System.Guid)">
|
|
||||||
<summary>
|
|
||||||
影像质控
|
|
||||||
</summary>
|
|
||||||
<param name="trialId"></param>
|
|
||||||
<returns></returns>
|
|
||||||
</member>
|
|
||||||
<member name="M:IRaCIS.Core.Application.Service.EmailSendService.SendTrialQCQuestionEmailAsync(System.Guid)">
|
|
||||||
<summary>
|
|
||||||
QC质疑
|
|
||||||
</summary>
|
|
||||||
<param name="trialId"></param>
|
|
||||||
<returns></returns>
|
|
||||||
</member>
|
|
||||||
<member name="M:IRaCIS.Core.Application.Service.EmailSendService.SendTrialImageQuestionAsync(System.Guid)">
|
|
||||||
<summary>
|
|
||||||
影像质疑
|
|
||||||
</summary>
|
|
||||||
<param name="trialId"></param>
|
|
||||||
<returns></returns>
|
|
||||||
</member>
|
|
||||||
<member name="T:IRaCIS.Core.Application.Service.TrialEmailNoticeConfigService">
|
<member name="T:IRaCIS.Core.Application.Service.TrialEmailNoticeConfigService">
|
||||||
<summary>
|
<summary>
|
||||||
TrialEmailNoticeConfigService
|
TrialEmailNoticeConfigService
|
||||||
|
@ -12920,6 +12899,18 @@
|
||||||
MIM 回复医学返回通知IR
|
MIM 回复医学返回通知IR
|
||||||
</summary>
|
</summary>
|
||||||
</member>
|
</member>
|
||||||
|
<member name="M:IRaCIS.Core.Application.MassTransit.Consumer.UrgentMIMRepliedMedicalReviewConsumer.#ctor(IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.User},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.TaskMedicalReview},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.Trial},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.SubjectVisit},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.ReadingQuestionCriterionTrial},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.VisitTask},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.Dictionary},IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.EmailNoticeConfig},Microsoft.Extensions.Options.IOptionsMonitor{IRaCIS.Core.Domain.Share.SystemEmailSendConfig})">
|
||||||
|
<summary>
|
||||||
|
MIM 回复医学返回通知IR
|
||||||
|
</summary>
|
||||||
|
</member>
|
||||||
|
<member name="M:IRaCIS.Core.Application.MassTransit.Consumer.UrgentMIMRepliedMedicalReviewConsumer.Consume(MassTransit.ConsumeContext{IRaCIS.Core.Domain.UrgentMIMRepliedMedicalReview})">
|
||||||
|
<summary>
|
||||||
|
MIM 回复医学返回通知IR
|
||||||
|
</summary>
|
||||||
|
<param name="context"></param>
|
||||||
|
<returns></returns>
|
||||||
|
</member>
|
||||||
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.UrgentIRApplyedReReadingConsumer">
|
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.UrgentIRApplyedReReadingConsumer">
|
||||||
<summary>
|
<summary>
|
||||||
加急阅片 IR 申请重阅 或者PM 申请重阅
|
加急阅片 IR 申请重阅 或者PM 申请重阅
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
using IRaCIS.Core.Domain.Share;
|
using IRaCIS.Core.Domain.Share;
|
||||||
using MassTransit;
|
using MassTransit;
|
||||||
using MassTransit.Mediator;
|
using MassTransit.Mediator;
|
||||||
|
using MassTransit.Scheduling;
|
||||||
using Medallion.Threading;
|
using Medallion.Threading;
|
||||||
using Microsoft.AspNetCore.Mvc;
|
using Microsoft.AspNetCore.Mvc;
|
||||||
using Microsoft.Extensions.Options;
|
using Microsoft.Extensions.Options;
|
||||||
|
@ -46,6 +47,7 @@ public class TestMasstransitService : BaseService
|
||||||
|
|
||||||
|
|
||||||
public async Task<IResponseOutput> TestMasstransitRequest([FromServices] IMessageScheduler _scheduler,
|
public async Task<IResponseOutput> TestMasstransitRequest([FromServices] IMessageScheduler _scheduler,
|
||||||
|
[FromServices] IRecurringMessageScheduler _recurringMessageScheduler,
|
||||||
[FromServices] IRepository<TestLength> _testLengthRepository,
|
[FromServices] IRepository<TestLength> _testLengthRepository,
|
||||||
[FromServices] IRequestClient<MasstransiTestCommand> _requestClient,
|
[FromServices] IRequestClient<MasstransiTestCommand> _requestClient,
|
||||||
[FromServices] IScopedClientFactory _clientFactory,
|
[FromServices] IScopedClientFactory _clientFactory,
|
||||||
|
@ -60,17 +62,17 @@ public class TestMasstransitService : BaseService
|
||||||
//IScopedMediator 上下文一致, IMediator上下文不一致
|
//IScopedMediator 上下文一致, IMediator上下文不一致
|
||||||
|
|
||||||
//通过命令不获取结果 进入消费者后再返回 数据库上下文 不同
|
//通过命令不获取结果 进入消费者后再返回 数据库上下文 不同
|
||||||
await _mediator.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
//await _mediator.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
||||||
|
|
||||||
//通过命令获取结果 进入消费者后再返回 数据库上下文 相同
|
////通过命令获取结果 进入消费者后再返回 数据库上下文 相同
|
||||||
var dd = await _mediatorScoped.CreateRequest(new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() })
|
//var dd = await _mediatorScoped.CreateRequest(new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() })
|
||||||
.GetResponse<IResponseOutput>();
|
// .GetResponse<IResponseOutput>();
|
||||||
|
|
||||||
//发布后,不会立即进入消费者,消费者是另外的线程执行
|
////发布后,不会立即进入消费者,消费者是另外的线程执行
|
||||||
await _mediatorScoped.Publish(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
//await _mediatorScoped.Publish(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
||||||
|
|
||||||
|
await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
|
||||||
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() });
|
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() });
|
||||||
|
|
||||||
return ResponseOutput.Ok();
|
return ResponseOutput.Ok();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue