293 lines
8.9 KiB
C#
293 lines
8.9 KiB
C#
|
||
//--------------------------------------------------------------------
|
||
// 此代码由liquid模板自动生成 byzhouhang 20240909
|
||
// 生成时间 2026-03-10 06:15:17Z
|
||
// 对此文件的更改可能会导致不正确的行为,并且如果重新生成代码,这些更改将会丢失。
|
||
//--------------------------------------------------------------------
|
||
using IRaCIS.Core.Application.Helper;
|
||
using IRaCIS.Core.Application.Interfaces;
|
||
using IRaCIS.Core.Application.ViewModel;
|
||
using IRaCIS.Core.Domain.Models;
|
||
using IRaCIS.Core.Infra.EFCore;
|
||
using IRaCIS.Core.Infra.EFCore.Common;
|
||
using IRaCIS.Core.Infrastructure.Extention;
|
||
using Microsoft.AspNetCore.Mvc;
|
||
using Microsoft.Extensions.Options;
|
||
using Spire.Doc.Interface;
|
||
using System.Drawing;
|
||
using System.Threading.Channels;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace IRaCIS.Core.Application.Service;
|
||
|
||
[ApiExplorerSettings(GroupName = "Common")]
|
||
public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRecordRepository,
|
||
IMapper _mapper, IUserInfo _userInfo, IStringLocalizer _localizer, IOptionsMonitor<ObjectStoreServiceOptions> options,
|
||
IFusionCache _fusionCache, IRepository<Trial> _trialRepository, FileSyncQueue _fileSyncQueue) : BaseService, IFileUploadRecordService
|
||
{
|
||
|
||
ObjectStoreServiceOptions ObjectStoreServiceConfig = options.CurrentValue;
|
||
|
||
[HttpPost]
|
||
public async Task<PageOutput<FileUploadRecordView>> GetFileUploadRecordList(FileUploadRecordQuery inQuery)
|
||
{
|
||
|
||
var fileUploadRecordQueryable = _fileUploadRecordRepository
|
||
.WhereIf(inQuery.BatchDataType != null, t => t.BatchDataType == inQuery.BatchDataType)
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.FileName), t => t.FileName.Contains(inQuery.FileName))
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.FileType), t => t.FileType.Contains(inQuery.FileType))
|
||
.WhereIf(inQuery.IsSync != null, t => t.IsSync == inQuery.IsSync)
|
||
.WhereIf(inQuery.Priority != null, t => t.Priority == inQuery.Priority)
|
||
.WhereIf(inQuery.BatchDataType != null, t => t.BatchDataType == inQuery.BatchDataType)
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.UploadRegion), t => t.UploadRegion.Contains(inQuery.UploadRegion))
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.TargetRegion), t => t.TargetRegion.Contains(inQuery.TargetRegion))
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.UploadBatchId), t => t.UploadBatchId.Contains(inQuery.UploadBatchId))
|
||
.WhereIf(!string.IsNullOrEmpty(inQuery.Path), t => t.Path.Contains(inQuery.Path))
|
||
.WhereIf(inQuery.UploadStartTime != null, t => t.CreateTime >= inQuery.UploadStartTime)
|
||
.WhereIf(inQuery.UploadEndTime != null, t => t.CreateTime <= inQuery.UploadEndTime)
|
||
.WhereIf(inQuery.SyncFinishedStartTime != null, t => t.SyncFinishedTime >= inQuery.SyncFinishedStartTime)
|
||
.WhereIf(inQuery.SyncFinishedEndTime != null, t => t.SyncFinishedTime <= inQuery.SyncFinishedEndTime)
|
||
|
||
.ProjectTo<FileUploadRecordView>(_mapper.ConfigurationProvider);
|
||
|
||
var pageList = await fileUploadRecordQueryable.ToPagedListAsync(inQuery);
|
||
|
||
return pageList;
|
||
}
|
||
|
||
|
||
|
||
|
||
public async Task<IResponseOutput> AddOrUpdateFileUploadRecord(FileUploadRecordAddOrEdit addOrEditFileUploadRecord)
|
||
{
|
||
|
||
addOrEditFileUploadRecord.IP = _userInfo.IP;
|
||
|
||
if (ObjectStoreServiceConfig.IsOpenStoreSync && _userInfo.Domain.IsNotNullOrEmpty())
|
||
{
|
||
var find = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.Domain == _userInfo.Domain);
|
||
if (find != null)
|
||
{
|
||
addOrEditFileUploadRecord.UploadRegion = find.UploadRegion;
|
||
addOrEditFileUploadRecord.TargetRegion = find.TargetRegion;
|
||
|
||
}
|
||
}
|
||
|
||
if (addOrEditFileUploadRecord.TrialId != null)
|
||
{
|
||
|
||
var trialDataStore = await _fusionCache.GetOrSetAsync(CacheKeys.TrialDataStoreType(addOrEditFileUploadRecord.TrialId.Value), async _ =>
|
||
{
|
||
return await _trialRepository.Where(t => t.Id == addOrEditFileUploadRecord.TrialId).Select(t => t.TrialDataStoreType)
|
||
.FirstOrDefaultAsync();
|
||
},
|
||
TimeSpan.FromDays(7)
|
||
);
|
||
|
||
//项目配置了,那么就设置需要同步
|
||
if (trialDataStore == TrialDataStore.MUtiCenter)
|
||
{
|
||
addOrEditFileUploadRecord.IsNeedSync = true;
|
||
|
||
addOrEditFileUploadRecord.Priority = 0;
|
||
|
||
|
||
}
|
||
else
|
||
{
|
||
addOrEditFileUploadRecord.TargetRegion = "";
|
||
|
||
}
|
||
|
||
}
|
||
else
|
||
{
|
||
//系统文件,默认同步
|
||
addOrEditFileUploadRecord.IsNeedSync = true;
|
||
|
||
addOrEditFileUploadRecord.Priority = 0;
|
||
}
|
||
|
||
var entity = await _fileUploadRecordRepository.InsertOrUpdateAsync(addOrEditFileUploadRecord, true);
|
||
|
||
if (addOrEditFileUploadRecord.IsNeedSync == true)
|
||
{
|
||
_fileSyncQueue.Enqueue(entity.Id, addOrEditFileUploadRecord.Priority ?? 0);
|
||
}
|
||
|
||
return ResponseOutput.Ok(entity.Id.ToString());
|
||
|
||
}
|
||
|
||
|
||
[HttpDelete("{fileUploadRecordId:guid}")]
|
||
public async Task<IResponseOutput> DeleteFileUploadRecord(Guid fileUploadRecordId)
|
||
{
|
||
var success = await _fileUploadRecordRepository.BatchDeleteNoTrackingAsync(t => t.Id == fileUploadRecordId);
|
||
return ResponseOutput.Ok();
|
||
}
|
||
|
||
}
|
||
|
||
|
||
#region 同步队列
|
||
|
||
|
||
|
||
/// <summary>
|
||
/// 同步队列 信号量
|
||
/// </summary>
|
||
public class FileSyncQueue
|
||
{
|
||
private readonly PriorityQueue<Guid, int> _queue = new();
|
||
private readonly SemaphoreSlim _signal = new(0);
|
||
private readonly object _lock = new();
|
||
|
||
public void Enqueue(Guid id, int priority)
|
||
{
|
||
lock (_lock)
|
||
{
|
||
// priority 越大越优先
|
||
_queue.Enqueue(id, -priority);
|
||
}
|
||
|
||
//类似于计数器,不会产生通知风暴,可消费资源 +1
|
||
//if (有等待线程) 唤醒一个 else 仅增加计数
|
||
_signal.Release(); // 唤醒一个 worker
|
||
}
|
||
|
||
/// <summary>
|
||
/// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||
/// </summary>
|
||
/// <param name="ct"></param>
|
||
/// <returns></returns>
|
||
public async Task<Guid> DequeueAsync(CancellationToken ct)
|
||
{
|
||
await _signal.WaitAsync(ct);
|
||
|
||
lock (_lock)
|
||
{
|
||
return _queue.Dequeue();
|
||
}
|
||
}
|
||
}
|
||
|
||
#region 这里不用 SyncQueueUseChannel 和调度器 SyncScheduler
|
||
public class SyncQueueUseChannel
|
||
{
|
||
// 优先级队列(priority 越大越先执行)
|
||
private readonly PriorityQueue<Guid, int> _queue = new();
|
||
|
||
// Worker 唤醒信号
|
||
private readonly Channel<bool> _signal =
|
||
Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
|
||
{
|
||
SingleReader = false,
|
||
SingleWriter = false
|
||
});
|
||
|
||
// 队列任务数量(不是CPU数量!)
|
||
private int _count = 0;
|
||
|
||
|
||
private readonly object _lock = new();
|
||
|
||
/// <summary>
|
||
/// 入队任务
|
||
/// </summary>
|
||
public void Enqueue(Guid id, int priority)
|
||
{
|
||
bool needSignal = false;
|
||
|
||
lock (_lock)
|
||
{
|
||
// priority 越大越优先 → 转负数
|
||
_queue.Enqueue(id, -priority);
|
||
|
||
// 只有从 0 → 1 才需要唤醒 worker
|
||
if (_count == 0)
|
||
needSignal = true;
|
||
|
||
_count++;
|
||
}
|
||
|
||
// 避免 signal 风暴
|
||
if (needSignal)
|
||
_signal.Writer.TryWrite(true);
|
||
}
|
||
|
||
/// <summary>
|
||
/// Worker 等待并获取任务
|
||
/// </summary>
|
||
public async Task<Guid> DequeueAsync(CancellationToken ct)
|
||
{
|
||
|
||
// 没任务时挂起(不会占CPU)
|
||
await _signal.Reader.ReadAsync(ct);
|
||
|
||
lock (_lock)
|
||
{
|
||
var id = _queue.Dequeue();
|
||
|
||
_count--;
|
||
|
||
// 如果还有任务,继续唤醒下一个 worker
|
||
if (_count > 0)
|
||
_signal.Writer.TryWrite(true);
|
||
|
||
return id;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 当前排队数量(调试用)
|
||
/// </summary>
|
||
public int Count
|
||
{
|
||
get
|
||
{
|
||
lock (_lock)
|
||
return _count;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 同步调度器
|
||
/// </summary>
|
||
public class FileSyncScheduler
|
||
{
|
||
private readonly FileSyncQueue _queue;
|
||
|
||
public FileSyncScheduler(FileSyncQueue queue)
|
||
{
|
||
_queue = queue;
|
||
}
|
||
|
||
public void Enqueue(FileUploadRecord file)
|
||
{
|
||
if (file.IsNeedSync != true)
|
||
return;
|
||
|
||
_queue.Enqueue(file.Id, file.Priority ?? 0);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||
/// </summary>
|
||
/// <param name="ct"></param>
|
||
/// <returns></returns>
|
||
public Task<Guid> WaitAsync(CancellationToken ct)
|
||
=> _queue.DequeueAsync(ct);
|
||
}
|
||
#endregion
|
||
|
||
|
||
|
||
|
||
|
||
|
||
#endregion
|
||
|