修改masstransit 测试代码
continuous-integration/drone/push Build is passing Details

IRC_NewDev
hang 2024-10-13 21:40:05 +08:00
parent e65227c7bf
commit c1c03a3f05
4 changed files with 58 additions and 93 deletions

View File

@ -12808,7 +12808,7 @@
<param name="_trialRepository"></param>
<param name="_mapper"></param>
</member>
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.MasstransitHangfireTestConsumer">
<member name="T:IRaCIS.Core.Application.MassTransit.Consumer.MasstransitTestConsumer">
<summary>
meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码
publish 请求流不会先到消费者,发布后,直接执行后续代码
@ -12816,7 +12816,7 @@
</summary>
<param name="_userRepository"></param>
</member>
<member name="M:IRaCIS.Core.Application.MassTransit.Consumer.MasstransitHangfireTestConsumer.#ctor(IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.User})">
<member name="M:IRaCIS.Core.Application.MassTransit.Consumer.MasstransitTestConsumer.#ctor(IRaCIS.Core.Infra.EFCore.IRepository{IRaCIS.Core.Domain.Models.User})">
<summary>
meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码
publish 请求流不会先到消费者,发布后,直接执行后续代码

View File

@ -1,26 +0,0 @@

using AutoMapper;
using IP2Region.Net.Abstractions;
using IRaCIS.Core.Domain;
using IRaCIS.Core.Domain.Models;
using MassTransit;
namespace IRaCIS.Core.Application.MassTransit.Consumer;
//国家|区域|省份|城市|ISP 缺省的地域信息默认是0
//0|0|0|内网IP|内网IP
// 中国|0|湖北省|武汉市|电信
public class AddUserLogTriggerConsumer(ISearcher _searcher) : IConsumer<AddUserLogTriggerCommand>
{
public async Task Consume(ConsumeContext<AddUserLogTriggerCommand> context)
{
var userLog = context.Message.UserLog;
var ipinfo = _searcher.Search(userLog.IP);
userLog.IPRegion = string.Join('|', ipinfo.Split('|').TakeLast(3));
}
}

View File

@ -1,6 +1,12 @@
using MassTransit;
using IRaCIS.Core.Domain;
using MassTransit;
using MassTransit.Mediator;
using Medallion.Threading;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
@ -8,24 +14,69 @@ using static IRaCIS.Core.Application.Service.TestService;
namespace IRaCIS.Core.Application.MassTransit.Consumer;
public class MasstransiTestCommand
{
public string value { get; set; }
}
/// <summary>
/// meditor send 的时候,请求流会先到消费者,返回后才会执行后续代码
/// publish 请求流不会先到消费者,发布后,直接执行后续代码
///
/// </summary>
/// <param name="_userRepository"></param>
public class MasstransitHangfireTestConsumer(IRepository<User> _userRepository) : IConsumer<MasstransitHangfireTest>
public class MasstransitTestConsumer(IRepository<User> _userRepository) : IConsumer<MasstransiTestCommand>
{
public async Task Consume(ConsumeContext<MasstransitHangfireTest> context)
public async Task Consume(ConsumeContext<MasstransiTestCommand> context)
{
Console.WriteLine(_userRepository._dbContext.GetHashCode());
Console.WriteLine("Now is " + DateTime.Now.ToString());
Console.WriteLine($"MassTransit.Consumer :{context.Message.value}");
await context.RespondAsync<IResponseOutput>(ResponseOutput.Ok());
}
}
[ApiExplorerSettings(GroupName = "Institution")]
public class TestMasstransitService : BaseService
{
public async Task<IResponseOutput> TestMasstransitRequest([FromServices] IMessageScheduler _scheduler,
[FromServices] IRepository<TestLength> _testLengthRepository,
[FromServices] IRequestClient<MasstransiTestCommand> _requestClient,
[FromServices] IScopedClientFactory _clientFactory,
[FromServices] IScopedMediator _mediatorScoped,
[FromServices] IMediator _mediator)
{
var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
Console.WriteLine(_testLengthRepository._dbContext.GetHashCode());
//IScopedMediator 上下文一致, IMediator上下文不一致
//通过命令不获取结果 进入消费者后再返回 数据库上下文 不同
await _mediator.Send(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
//通过命令获取结果 进入消费者后再返回 数据库上下文 相同
var dd = await _mediatorScoped.CreateRequest(new MasstransiTestCommand() { value = "message at " + DateTime.Now.ToString() })
.GetResponse<IResponseOutput>();
//发布后,不会立即进入消费者,消费者是另外的线程执行
await _mediatorScoped.Publish(new MasstransiTestCommand { value = "message at " + DateTime.Now.ToString() });
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
return ResponseOutput.Ok();
}
}

View File

@ -126,67 +126,7 @@ namespace IRaCIS.Core.Application.Service
}
//My project is a monolithic project,And the efcore context repository is scoped registered.
public async Task<IResponseOutput> TestMasstransitMeditor(
[FromServices] IScopedMediator _mediatorScoped,
[FromServices] IMediator _mediator,
[FromServices] IRepository<TestLength> _testLengthRepository
)
{
var dbContext = _testLengthRepository._dbContext;
var dbContext2 = _trialRepository._dbContext;
if (ReferenceEquals(dbContext, dbContext2))
{
Console.WriteLine("两个 DbContext 是同一个实例");
Console.WriteLine(dbContext.GetHashCode());
Console.WriteLine(dbContext2.GetHashCode());
}
// add 1 recored
await _testLengthRepository.AddAsync(new TestLength() { Name = "xxxx" });
// The consumer method will inject the repository and add 3 pieces of data, but the savechanges method of the repository will not be called
await _mediatorScoped.Publish(new AddSubjectTriggerCommand { SubjectId = Guid.Empty });
await _mediator.Send(new AddSubjectTriggerCommand2 { SubjectId = Guid.Empty });
// this will save 1 record not 4 record ,Why is the dbcontext different? Can it be in the same transaction?
await _testLengthRepository.SaveChangesAsync();
return ResponseOutput.Ok();
}
public class MasstransitHangfireTest
{
public string value { get; set; }
}
public async Task<IResponseOutput> TestMasstransitHangfireOrRequest([FromServices] IMessageScheduler _scheduler,
[FromServices] IRepository<TestLength> _testLengthRepository,
[FromServices] IRequestClient<MasstransitHangfireTest> _requestClient)
{
var isEn_US = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
var aa = CultureInfo.CurrentCulture.Name;
CultureInfo.CurrentCulture = new CultureInfo(StaticData.CultureInfo.en_US);
var bb = CultureInfo.CurrentCulture.Name;
var isEn_US2 = CultureInfo.CurrentCulture.Name == StaticData.CultureInfo.en_US;
//await _scheduler.SchedulePublish(DateTime.Now.AddSeconds(10), new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
Console.WriteLine(_testLengthRepository._dbContext.GetHashCode());
await _requestClient.GetResponse<IResponseOutput>(new MasstransitHangfireTest() { value = "message at " + DateTime.Now.ToString() });
return ResponseOutput.Ok();
}
public async Task<IResponseOutput> TestJson()
{