diff --git a/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs new file mode 100644 index 000000000..133078b51 --- /dev/null +++ b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs @@ -0,0 +1,31 @@ +using MassTransit; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using static IRaCIS.Core.Application.Service.TestService; + +namespace IRaCIS.Core.Application.MassTransit.Consumer; + +/// +/// meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码 +/// publish 请求流不会先到消费者,发布后,直接执行后续代码 +/// +/// +/// +public class MasstransitHangfireTestConsumer(IRepository _userRepository) : IConsumer +{ + public async Task Consume(ConsumeContext context) + { + + Console.WriteLine(_userRepository._dbContext.GetHashCode()); + Console.WriteLine("Now is " + DateTime.Now.ToString()); + Console.WriteLine($"MassTransit.Consumer :{context.Message.value}"); + + + await context.RespondAsync(ResponseOutput.Ok()); + + } +} + diff --git a/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs b/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs index bb8857b47..b8b00b1c0 100644 --- a/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs +++ b/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs @@ -15,6 +15,8 @@ public class EventStoreRecordView public string EventData { get; set; } + public string EventTypeName { get; set; } + public EventStateEnum EventState { get; set; } public string EventType { get; set; } @@ -33,7 +35,7 @@ public class EventStoreRecordQuery : PageInput public EventStateEnum? EventState { get; set; } - public string? EventType { get; set; } + public string? EventTypeName { get; set; } } diff --git a/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs b/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs index 1b05b453d..d2383ceff 100644 --- a/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs +++ b/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs @@ -22,12 +22,16 @@ namespace IRaCIS.Core.Application.Service; [ApiExplorerSettings(GroupName = "Common")] public class EventStoreRecordService(IRepository _eventStoreRecordRepository, IMediator _mediator,IPublishEndpoint _publishEndpoint) : BaseService { - + /// + /// 邮件事件消息列表 + /// + /// + /// [HttpPost] public async Task> GetEventStoreRecordList(EventStoreRecordQuery inQuery) { - var eventStoreRecordQueryable = _eventStoreRecordRepository + var eventStoreRecordQueryable = _eventStoreRecordRepository.WhereIf(inQuery.EventTypeName.IsNotNullOrEmpty(),t=>t.EventTypeName.Contains(inQuery.EventTypeName)) .ProjectTo(_mapper.ConfigurationProvider); var pageList = await eventStoreRecordQueryable.ToPagedListAsync(inQuery); diff --git a/IRaCIS.Core.Application/TestService.cs b/IRaCIS.Core.Application/TestService.cs index 0ef066477..c038630d9 100644 --- a/IRaCIS.Core.Application/TestService.cs +++ b/IRaCIS.Core.Application/TestService.cs @@ -153,7 +153,7 @@ namespace IRaCIS.Core.Application.Service await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" }); // The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called - await _mediatorScoped.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty }); + await _mediatorScoped.Publish(new AddSubjectTriggerCommand { SubjectId = Guid.Empty }); await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty }); @@ -168,16 +168,9 @@ namespace IRaCIS.Core.Application.Service public string value { get; set; } } - public class MasstransitHangfireTestConsumer : IConsumer - { - public Task Consume(ConsumeContext context) - { - Console.WriteLine("Now is " + DateTime.Now.ToString()); - Console.WriteLine($"MassTransit.Consumer1 :{context.Message.value}"); - return Task.CompletedTask; - } - } - public async Task TestMasstransitHangfire([FromServices] IMessageScheduler _scheduler) + public async Task TestMasstransitHangfireOrRequest([FromServices] IMessageScheduler _scheduler, + [FromServices] IRepository _testLengthRepository, + [FromServices] IRequestClient _requestClient) { var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US; @@ -190,6 +183,9 @@ namespace IRaCIS.Core.Application.Service //await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() }); + Console.WriteLine(_testLengthRepository._dbContext.GetHashCode()); + await _requestClient.GetResponse(new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() }); + return ResponseOutput.Ok(); } diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs index a89eaacdd..70818ee0e 100644 --- a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs +++ b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs @@ -60,6 +60,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor if (domainEvent.IsScheduleEvent) { + //延迟调度的消息,比如1h后再消费 await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), (object)domainEvent); }