From 8eecabb8974f8be1d22ff1eb717135684509b386 Mon Sep 17 00:00:00 2001
From: hang <872297557@qq.com>
Date: Sat, 12 Oct 2024 13:48:48 +0800
Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=A4=87=E6=B3=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Consumer/NeedVerify/TestConsumer.cs | 31 +++++++++++++++++++
.../Common/DTO/EventStoreRecordViewModel.cs | 4 ++-
.../Service/Common/EventStoreRecordService.cs | 8 +++--
IRaCIS.Core.Application/TestService.cs | 18 +++++------
.../DispatchDomainEventsInterceptor.cs | 1 +
5 files changed, 48 insertions(+), 14 deletions(-)
create mode 100644 IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs
diff --git a/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs
new file mode 100644
index 000000000..133078b51
--- /dev/null
+++ b/IRaCIS.Core.Application/MassTransit/Consumer/NeedVerify/TestConsumer.cs
@@ -0,0 +1,31 @@
+using MassTransit;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using static IRaCIS.Core.Application.Service.TestService;
+
+namespace IRaCIS.Core.Application.MassTransit.Consumer;
+
+///
+/// meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码
+/// publish 请求流不会先到消费者,发布后,直接执行后续代码
+///
+///
+///
+public class MasstransitHangfireTestConsumer(IRepository _userRepository) : IConsumer
+{
+ public async Task Consume(ConsumeContext context)
+ {
+
+ Console.WriteLine(_userRepository._dbContext.GetHashCode());
+ Console.WriteLine("Now is " + DateTime.Now.ToString());
+ Console.WriteLine($"MassTransit.Consumer :{context.Message.value}");
+
+
+ await context.RespondAsync(ResponseOutput.Ok());
+
+ }
+}
+
diff --git a/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs b/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs
index bb8857b47..b8b00b1c0 100644
--- a/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs
+++ b/IRaCIS.Core.Application/Service/Common/DTO/EventStoreRecordViewModel.cs
@@ -15,6 +15,8 @@ public class EventStoreRecordView
public string EventData { get; set; }
+ public string EventTypeName { get; set; }
+
public EventStateEnum EventState { get; set; }
public string EventType { get; set; }
@@ -33,7 +35,7 @@ public class EventStoreRecordQuery : PageInput
public EventStateEnum? EventState { get; set; }
- public string? EventType { get; set; }
+ public string? EventTypeName { get; set; }
}
diff --git a/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs b/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs
index 1b05b453d..d2383ceff 100644
--- a/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs
+++ b/IRaCIS.Core.Application/Service/Common/EventStoreRecordService.cs
@@ -22,12 +22,16 @@ namespace IRaCIS.Core.Application.Service;
[ApiExplorerSettings(GroupName = "Common")]
public class EventStoreRecordService(IRepository _eventStoreRecordRepository, IMediator _mediator,IPublishEndpoint _publishEndpoint) : BaseService
{
-
+ ///
+ /// 邮件事件消息列表
+ ///
+ ///
+ ///
[HttpPost]
public async Task> GetEventStoreRecordList(EventStoreRecordQuery inQuery)
{
- var eventStoreRecordQueryable = _eventStoreRecordRepository
+ var eventStoreRecordQueryable = _eventStoreRecordRepository.WhereIf(inQuery.EventTypeName.IsNotNullOrEmpty(),t=>t.EventTypeName.Contains(inQuery.EventTypeName))
.ProjectTo(_mapper.ConfigurationProvider);
var pageList = await eventStoreRecordQueryable.ToPagedListAsync(inQuery);
diff --git a/IRaCIS.Core.Application/TestService.cs b/IRaCIS.Core.Application/TestService.cs
index 0ef066477..c038630d9 100644
--- a/IRaCIS.Core.Application/TestService.cs
+++ b/IRaCIS.Core.Application/TestService.cs
@@ -153,7 +153,7 @@ namespace IRaCIS.Core.Application.Service
await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" });
// The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called
- await _mediatorScoped.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
+ await _mediatorScoped.Publish(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty });
@@ -168,16 +168,9 @@ namespace IRaCIS.Core.Application.Service
public string value { get; set; }
}
- public class MasstransitHangfireTestConsumer : IConsumer
- {
- public Task Consume(ConsumeContext context)
- {
- Console.WriteLine("Now is " + DateTime.Now.ToString());
- Console.WriteLine($"MassTransit.Consumer1 :{context.Message.value}");
- return Task.CompletedTask;
- }
- }
- public async Task TestMasstransitHangfire([FromServices] IMessageScheduler _scheduler)
+ public async Task TestMasstransitHangfireOrRequest([FromServices] IMessageScheduler _scheduler,
+ [FromServices] IRepository _testLengthRepository,
+ [FromServices] IRequestClient _requestClient)
{
var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
@@ -190,6 +183,9 @@ namespace IRaCIS.Core.Application.Service
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
+ Console.WriteLine(_testLengthRepository._dbContext.GetHashCode());
+ await _requestClient.GetResponse(new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
+
return ResponseOutput.Ok();
}
diff --git a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs
index a89eaacdd..70818ee0e 100644
--- a/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs
+++ b/IRaCIS.Core.Infra.EFCore/Interceptor/DispatchDomainEventsInterceptor.cs
@@ -60,6 +60,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
if (domainEvent.IsScheduleEvent)
{
+ //延迟调度的消息,比如1h后再消费
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), (object)domainEvent);
}