Merge branch 'Test_IRC_Net8' of http://192.168.3.68:2000/XCKJ/irc-netcore-api into Test_IRC_Net8
continuous-integration/drone/push Build is passing Details

IRC_NewDev
hang 2024-10-18 09:03:22 +08:00
commit d578feec18
7 changed files with 81 additions and 114 deletions

View File

@ -0,0 +1,66 @@
using Microsoft.Extensions.Hosting;
using System.Threading;
using System;
using System.Threading.Tasks;
using MassTransit;
using IRaCIS.Core.Domain.Models;
using IRaCIS.Core.Infra.EFCore;
using Microsoft.Extensions.Logging;
using Hangfire;
using IRaCIS.Core.Application.Helper;
using IRaCIS.Core.Application.MassTransit.Consumer;
using IRaCIS.Core.Domain.Share;
using MassTransit.Scheduling;
using Hangfire.Storage;
using System.Linq;
using Microsoft.EntityFrameworkCore;
namespace IRaCIS.Core.API.HostService;
public class HangfireHostService(IRecurringMessageScheduler _recurringMessageScheduler,
IRepository<TrialEmailNoticeConfig> _trialEmailNoticeConfigRepository,
ILogger<HangfireHostService> _logger) : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("项目启动 hangfire 任务初始化 执行开始~");
//创建邮件定时任务
//项目定时任务都在default 队列
var dbJobIdList = JobStorage.Current.GetConnection().GetRecurringJobs().Where(t => t.Queue == "default").Select(t => t.Id).ToList();
foreach (var jobId in dbJobIdList)
{
HangfireJobHelper.RemoveCronJob(jobId);
}
var taskInfoList = await _trialEmailNoticeConfigRepository.Where(t => t.Trial.TrialStatusStr == StaticData.TrialState.TrialOngoing && t.EmailCron != string.Empty && t.IsAutoSend)
.Select(t => new { t.Id, t.Code, TrialCode = t.Trial.TrialCode, t.EmailCron, t.BusinessScenarioEnum, t.TrialId })
.ToListAsync();
foreach (var task in taskInfoList)
{
//利用主键作为任务Id
var jobId = $"{task.TrialId}({task.TrialCode})_({task.BusinessScenarioEnum})";
var trialId = task.TrialId;
HangfireJobHelper.AddOrUpdateTrialCronJob(jobId, trialId, task.BusinessScenarioEnum, task.EmailCron);
}
await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
_logger.LogInformation("项目启动 hangfire 任务初始化 执行结束");
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

View File

@ -1,9 +1,9 @@
using IRaCIS.Core.API;
using IRaCIS.Core.API.HostService;
using IRaCIS.Core.Application.BusinessFilter;
using IRaCIS.Core.Application.Filter;
using IRaCIS.Core.Application.MassTransit.Consumer;
using IRaCIS.Core.Application.Service;
using IRaCIS.Core.Application.Service.BackGroundJob;
using IRaCIS.Core.Application.Service.BusinessFilter;
using IRaCIS.Core.Infra.EFCore;
using IRaCIS.Core.Infrastructure.Extention;
@ -72,6 +72,8 @@ var _configuration = builder.Configuration;
//手动注册服务
builder.Services.ConfigureServices(_configuration);
builder.Services.AddHostedService<HangfireHostService>();
//minimal api 异常处理
builder.Services.AddExceptionHandler<GlobalExceptionHandler>();
//builder.Services.AddProblemDetails();
@ -251,11 +253,6 @@ SerilogExtension.AddSerilogSetup(enviromentName, app.Services);
var hangfireJobService = app.Services.GetRequiredService<IIRaCISHangfireJob>();
await hangfireJobService.InitHangfireJobTaskAsync();
try
{
#region 运行环境 部署平台
@ -288,6 +285,8 @@ try
#endregion
app.Run();
}
catch (Exception e)
{

View File

@ -28,36 +28,21 @@ namespace IRaCIS.Core.API
//添加 MassTransit 和 InMemory 传输
services.AddMassTransit(cfg =>
{
// 自动扫描程序集中的消费者并进行注册
cfg.AddConsumers(typeof(UserSiteSurveySubmitedEventConsumer).Assembly);
//Uri schedulerEndpoint = new Uri("queue:scheduler");
//cfg.AddMessageScheduler(schedulerEndpoint);
cfg.AddPublishMessageScheduler();
cfg.AddHangfireConsumers();
// 使用 InMemory 作为消息传递机制
cfg.UsingInMemory((context, cfg) =>
{
//https://github.com/MassTransit/Sample-Hangfire/blob/master/src/Sample.Hangfire.Console/Program.cs
cfg.UsePublishMessageScheduler();
//cfg.UseMessageScheduler(schedulerEndpoint);
//使用 Hangfire 进行消息调度
//cfg.UseHangfireScheduler();
cfg.UseConsumeFilter(typeof(CultureInfoFilter<>), context,
x => x.Include(type => type.IsAssignableTo(typeof(DomainEvent))));
// 这里可以进行额外的配置
cfg.ConfigureEndpoints(context); // 自动配置所有消费者的端点
});

View File

@ -4,7 +4,6 @@ using IP2Region.Net.XDB;
using IRaCIS.Core.Application.BackGroundJob;
using IRaCIS.Core.Application.Helper;
using IRaCIS.Core.Application.Service;
using IRaCIS.Core.Application.Service.BackGroundJob;
using IRaCIS.Core.Domain.Share;
using IRaCIS.Core.Infra.EFCore;
using Microsoft.AspNetCore.Builder;
@ -52,7 +51,6 @@ public static class ServiceCollectionSetup
services.AddScoped<IRepository, Repository>();
services.AddScoped<IObtainTaskAutoCancelJob, ObtainTaskAutoCancelJob>();
services.AddScoped<IIRaCISHangfireJob, IRaCISCHangfireJob>();
// 注册以Service 结尾的服务
services.Scan(scan => scan

View File

@ -1,83 +0,0 @@
using Hangfire;
using Hangfire.Storage;
using IRaCIS.Core.Application.Helper;
using IRaCIS.Core.Application.MassTransit.Consumer;
using IRaCIS.Core.Domain.Models;
using IRaCIS.Core.Domain.Share;
using MassTransit;
using MassTransit.Mediator;
using MassTransit.Scheduling;
using Microsoft.Extensions.Logging;
using NPOI.SS.Formula.Functions;
namespace IRaCIS.Core.Application.Service.BackGroundJob
{
public interface IIRaCISHangfireJob
{
Task InitHangfireJobTaskAsync();
}
public class IRaCISCHangfireJob(ILogger<IRaCISCHangfireJob> _logger,
IRepository<TrialEmailNoticeConfig> _trialEmailNoticeConfigRepository,
IBus _bus
) : IIRaCISHangfireJob
{
public static string JsonFileFolder = Path.Combine(AppContext.BaseDirectory, StaticData.Folder.Resources);
public async Task InitHangfireJobTaskAsync()
{
_logger.LogInformation("项目启动 hangfire 任务初始化 执行开始~");
//创建邮件定时任务
await InitSysAndTrialCronJobAsync();
_logger.LogInformation("项目启动 hangfire 任务初始化 执行结束");
}
public async Task InitSysAndTrialCronJobAsync()
{
//项目定时任务都在default 队列
var dbJobIdList = JobStorage.Current.GetConnection().GetRecurringJobs().Where(t => t.Queue == "default").Select(t => t.Id).ToList();
foreach (var jobId in dbJobIdList)
{
HangfireJobHelper.RemoveCronJob(jobId);
}
var taskInfoList = await _trialEmailNoticeConfigRepository.Where(t => t.Trial.TrialStatusStr == StaticData.TrialState.TrialOngoing && t.EmailCron != string.Empty && t.IsAutoSend)
.Select(t => new { t.Id, t.Code, TrialCode=t.Trial.TrialCode, t.EmailCron, t.BusinessScenarioEnum, t.TrialId })
.ToListAsync();
foreach (var task in taskInfoList)
{
//利用主键作为任务Id
var jobId = $"{task.TrialId}({task.TrialCode})_{task.Id}({task.BusinessScenarioEnum})";
var trialId = task.TrialId;
HangfireJobHelper.AddOrUpdateTrialCronJob(jobId, trialId, task.BusinessScenarioEnum, task.EmailCron);
}
//var schedulerEndpoint = await _bus.GetSendEndpoint(new Uri("queue:sys_init"));
//await schedulerEndpoint.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:mt-message-queue"), new QCImageQuestionSchedule() { ScheduleId = jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new PollExternalSystem { });
//await schedulerEndpoint.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:sys_init"), new QCImageQuestionSchedule() { ScheduleId = jobId + jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new PollExternalSystem { });
//HangfireJobHelper.AddOrUpdateCronJob<IMediator>("test-MasstransiTestCommand", t => t.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() }, default), "0/3 * * * * ? ");
//await _bus.ScheduleRecurringSend<QCImageQuestionSchedule>(new Uri("queue:mt-message-queue"), new QCImageQuestionSchedule() { ScheduleId = jobId, CronExpression = "0 0/1 * 1/1 * ? *" }, new QCImageQuestionSchedule { });
}
public class PollExternalSystem { }
}
}

View File

@ -2,6 +2,7 @@
using IRaCIS.Core.Domain.Share;
using MassTransit;
using MassTransit.Mediator;
using MassTransit.Scheduling;
using Medallion.Threading;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
@ -46,6 +47,7 @@ public class TestMasstransitService : BaseService
public async Task<IResponseOutput> TestMasstransitRequest([FromServices] IMessageScheduler _scheduler,
[FromServices] IRecurringMessageScheduler _recurringMessageScheduler,
[FromServices] IRepository<TestLength> _testLengthRepository,
[FromServices] IRequestClient<MasstransiTestCommand> _requestClient,
[FromServices] IScopedClientFactory _clientFactory,
@ -60,17 +62,17 @@ public class TestMasstransitService : BaseService
//IScopedMediator 上下文一致, IMediator上下文不一致
//通过命令不获取结果 进入消费者后再返回 数据库上下文 不同
await _mediator.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
//await _mediator.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
//通过命令获取结果 进入消费者后再返回 数据库上下文 相同
var dd = await _mediatorScoped.CreateRequest(new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() })
.GetResponse<IResponseOutput>();
////通过命令获取结果 进入消费者后再返回 数据库上下文 相同
//var dd = await _mediatorScoped.CreateRequest(new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() })
// .GetResponse<IResponseOutput>();
//发布后,不会立即进入消费者,消费者是另外的线程执行
await _mediatorScoped.Publish(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
////发布后,不会立即进入消费者,消费者是另外的线程执行
//await _mediatorScoped.Publish(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() });
await _recurringMessageScheduler.ScheduleRecurringPublish(new QCImageQuestionSchedule() { CronExpression = "0/3 * * * * ? " }, new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() });
return ResponseOutput.Ok();
}