增加备注
continuous-integration/drone/push Build is passing Details

IRC_NewDev
hang 2024-10-12 13:48:48 +08:00
parent c940976f7f
commit 8eecabb897
5 changed files with 48 additions and 14 deletions

View File

@ -0,0 +1,31 @@
using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static IRaCIS.Core.Application.Service.TestService;
namespace IRaCIS.Core.Application.MassTransit.Consumer;
/// <summary>
/// meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码
/// publish 请求流不会先到消费者,发布后,直接执行后续代码
///
/// </summary>
/// <param name="_userRepository"></param>
public class MasstransitHangfireTestConsumer(IRepository<User> _userRepository) : IConsumer<MasstransitHangfireTest>
{
public async Task Consume(ConsumeContext<MasstransitHangfireTest> context)
{
Console.WriteLine(_userRepository._dbContext.GetHashCode());
Console.WriteLine("Now is " + DateTime.Now.ToString());
Console.WriteLine($"MassTransit.Consumer :{context.Message.value}");
await context.RespondAsync<IResponseOutput>(ResponseOutput.Ok());
}
}

View File

@ -15,6 +15,8 @@ public class EventStoreRecordView
public string EventData { get; set; } public string EventData { get; set; }
public string EventTypeName { get; set; }
public EventStateEnum EventState { get; set; } public EventStateEnum EventState { get; set; }
public string EventType { get; set; } public string EventType { get; set; }
@ -33,7 +35,7 @@ public class EventStoreRecordQuery : PageInput
public EventStateEnum? EventState { get; set; } public EventStateEnum? EventState { get; set; }
public string? EventType { get; set; } public string? EventTypeName { get; set; }
} }

View File

@ -22,12 +22,16 @@ namespace IRaCIS.Core.Application.Service;
[ApiExplorerSettings(GroupName = "Common")] [ApiExplorerSettings(GroupName = "Common")]
public class EventStoreRecordService(IRepository<EventStoreRecord> _eventStoreRecordRepository, IMediator _mediator,IPublishEndpoint _publishEndpoint) : BaseService public class EventStoreRecordService(IRepository<EventStoreRecord> _eventStoreRecordRepository, IMediator _mediator,IPublishEndpoint _publishEndpoint) : BaseService
{ {
/// <summary>
/// 邮件事件消息列表
/// </summary>
/// <param name="inQuery"></param>
/// <returns></returns>
[HttpPost] [HttpPost]
public async Task<PageOutput<EventStoreRecordView>> GetEventStoreRecordList(EventStoreRecordQuery inQuery) public async Task<PageOutput<EventStoreRecordView>> GetEventStoreRecordList(EventStoreRecordQuery inQuery)
{ {
var eventStoreRecordQueryable = _eventStoreRecordRepository var eventStoreRecordQueryable = _eventStoreRecordRepository.WhereIf(inQuery.EventTypeName.IsNotNullOrEmpty(),t=>t.EventTypeName.Contains(inQuery.EventTypeName))
.ProjectTo<EventStoreRecordView>(_mapper.ConfigurationProvider); .ProjectTo<EventStoreRecordView>(_mapper.ConfigurationProvider);
var pageList = await eventStoreRecordQueryable.ToPagedListAsync(inQuery); var pageList = await eventStoreRecordQueryable.ToPagedListAsync(inQuery);

View File

@ -153,7 +153,7 @@ namespace IRaCIS.Core.Application.Service
await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" }); await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" });
// The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called // The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called
await _mediatorScoped.Send(new AddSubjectTriggerCommand { SubjectId = Guid.Empty }); await _mediatorScoped.Publish(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty }); await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty });
@ -168,16 +168,9 @@ namespace IRaCIS.Core.Application.Service
public string value { get; set; } public string value { get; set; }
} }
public class MasstransitHangfireTestConsumer : IConsumer<MasstransitHangfireTest> public async Task<IResponseOutput> TestMasstransitHangfireOrRequest([FromServices] IMessageScheduler _scheduler,
{ [FromServices] IRepository<TestLength> _testLengthRepository,
public Task Consume(ConsumeContext<MasstransitHangfireTest> context) [FromServices] IRequestClient<MasstransitHangfireTest> _requestClient)
{
Console.WriteLine("Now is " + DateTime.Now.ToString());
Console.WriteLine($"MassTransit.Consumer1 :{context.Message.value}");
return Task.CompletedTask;
}
}
public async Task<IResponseOutput> TestMasstransitHangfire([FromServices] IMessageScheduler _scheduler)
{ {
var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US; var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
@ -190,6 +183,9 @@ namespace IRaCIS.Core.Application.Service
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() }); //await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
Console.WriteLine(_testLengthRepository._dbContext.GetHashCode());
await _requestClient.GetResponse<IResponseOutput>(new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
return ResponseOutput.Ok(); return ResponseOutput.Ok();
} }

View File

@ -60,6 +60,7 @@ namespace IRaCIS.Core.Infra.EFCore.Interceptor
if (domainEvent.IsScheduleEvent) if (domainEvent.IsScheduleEvent)
{ {
//延迟调度的消息比如1h后再消费
await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), (object)domainEvent); await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(domainEvent.DelaySeconds), (object)domainEvent);
} }