//-------------------------------------------------------------------- // 此代码由liquid模板自动生成 byzhouhang 20240909 // 生成时间 2026-03-10 06:15:17Z // 对此文件的更改可能会导致不正确的行为,并且如果重新生成代码,这些更改将会丢失。 //-------------------------------------------------------------------- using DocumentFormat.OpenXml.Office2010.ExcelAc; using IRaCIS.Core.Application.Helper; using IRaCIS.Core.Application.Interfaces; using IRaCIS.Core.Application.ViewModel; using IRaCIS.Core.Domain.Models; using IRaCIS.Core.Infra.EFCore; using IRaCIS.Core.Infra.EFCore.Common; using IRaCIS.Core.Infrastructure.Extention; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Options; using Spire.Doc.Interface; using System.Drawing; using System.Threading.Channels; using System.Threading.Tasks; namespace IRaCIS.Core.Application.Service; [ApiExplorerSettings(GroupName = "Common")] public class FileUploadRecordService(IRepository _fileUploadRecordRepository, IRepository _uploadFileSyncRecordRepository, IMapper _mapper, IUserInfo _userInfo, IStringLocalizer _localizer, IOptionsMonitor options, IFusionCache _fusionCache, IRepository _trialRepository, FileSyncQueue _fileSyncQueue) : BaseService, IFileUploadRecordService { ObjectStoreServiceOptions ObjectStoreServiceConfig = options.CurrentValue; /// /// 按照 subject visit studyCode 三个维度进行分组的查询列表 (subject相关) /// /// /// [HttpPost] public async Task> GetSubjectUploadRecordList(SubjectFileUploadRecordQuery inQuery) { var query = _fileUploadRecordRepository.Where(t => t.TrialId == inQuery.TrialId && t.SubjectId != null) .WhereIf(!string.IsNullOrEmpty(inQuery.VisitName), t => t.SubjectVisit.VisitName.Contains(inQuery.VisitName)) .WhereIf(!string.IsNullOrEmpty(inQuery.SubjectCode), t => t.Subject.Code.Contains(inQuery.SubjectCode)) .WhereIf(!string.IsNullOrEmpty(inQuery.StudyCode), t => t.StudyCode.Contains(inQuery.StudyCode)) .WhereIf(!string.IsNullOrEmpty(inQuery.UploadRegion), t => t.UploadRegion == inQuery.UploadRegion) .WhereIf(!string.IsNullOrEmpty(inQuery.TargetRegion), t => t.TargetRegion == inQuery.TargetRegion) .WhereIf(inQuery.IsSync != null, t => t.IsSync == inQuery.IsSync) .GroupBy(t => new { t.StudyCode, SubjectCode = t.Subject.Code, t.SubjectVisit.VisitName, t.SubjectId, t.SubjectVisitId }) .Select(g => new SubjectFileUploadRecordView() { SubjectCode = g.Key.SubjectCode, VisitName = g.Key.VisitName, StudyCode = g.Key.StudyCode, SubjectId = g.Key.SubjectId, SubjectVisitId = g.Key.SubjectVisitId, FileCount = g.Count(), CreateTime = g.Max(t => t.CreateTime), SyncFinishedTime = g.Max(t => t.SyncFinishedTime), UploadRegion = g.First().UploadRegion, TargetRegion = g.First().TargetRegion, IsSync = !g.Any(t => t.IsSync == false || t.IsSync == null) }); var pageList = await query.ToPagedListAsync(inQuery); return pageList; } /// /// 上传记录表--里面包含待同步任务 DataFileType= 0 :代表系统文件 1:Subject相关 2:项目相关,但是和subject 没关系 /// /// /// [HttpPost] public async Task> GetFileUploadRecordList(FileUploadRecordQuery inQuery) { var fileUploadRecordQueryable = _fileUploadRecordRepository .WhereIf(!string.IsNullOrEmpty(inQuery.FileName), t => t.FileName.Contains(inQuery.FileName)) .WhereIf(!string.IsNullOrEmpty(inQuery.FileType), t => t.FileType.Contains(inQuery.FileType)) .WhereIf(inQuery.TrialId != null, t => t.TrialId == inQuery.TrialId) .WhereIf(inQuery.SubjectId != null, t => t.SubjectId == inQuery.SubjectId) .WhereIf(inQuery.SubjectVisitId != null, t => t.SubjectVisitId == inQuery.SubjectVisitId) .WhereIf(!string.IsNullOrEmpty(inQuery.StudyCode), t => t.StudyCode.Contains(inQuery.StudyCode)) .WhereIf(!string.IsNullOrEmpty(inQuery.VisitName), t => t.SubjectVisit.VisitName.Contains(inQuery.VisitName)) .WhereIf(!string.IsNullOrEmpty(inQuery.SubjectCode), t => t.Subject.Code.Contains(inQuery.SubjectCode)) .WhereIf(inQuery.DataFileType == 1 && inQuery.SubjectId != null && inQuery.SubjectVisitId == null, t => t.SubjectVisitId == null) .WhereIf(inQuery.DataFileType == 1 && inQuery.SubjectVisitId != null && inQuery.StudyCode == "", t => t.StudyCode == "") .WhereIf(inQuery.DataFileType == 0, t => t.TrialId == null) .WhereIf(inQuery.DataFileType == 1, t => t.SubjectId != null) .WhereIf(inQuery.DataFileType == 2, t => t.SubjectId == null) .WhereIf(inQuery.IsNeedSync != null, t => t.IsNeedSync == inQuery.IsNeedSync) .WhereIf(inQuery.IsSync != null, t => t.IsSync == inQuery.IsSync) .WhereIf(inQuery.Priority != null, t => t.Priority == inQuery.Priority) .WhereIf(inQuery.BatchDataType != null, t => t.BatchDataType == inQuery.BatchDataType) .WhereIf(!string.IsNullOrEmpty(inQuery.UploadRegion), t => t.UploadRegion == inQuery.UploadRegion) .WhereIf(!string.IsNullOrEmpty(inQuery.TargetRegion), t => t.TargetRegion == inQuery.TargetRegion) .WhereIf(!string.IsNullOrEmpty(inQuery.UploadBatchId), t => t.UploadBatchId.Contains(inQuery.UploadBatchId)) .WhereIf(!string.IsNullOrEmpty(inQuery.Path), t => t.Path.Contains(inQuery.Path)) .WhereIf(inQuery.UploadStartTime != null, t => t.CreateTime >= inQuery.UploadStartTime) .WhereIf(inQuery.UploadEndTime != null, t => t.CreateTime <= inQuery.UploadEndTime) .WhereIf(inQuery.SyncFinishedStartTime != null, t => t.SyncFinishedTime >= inQuery.SyncFinishedStartTime) .WhereIf(inQuery.SyncFinishedEndTime != null, t => t.SyncFinishedTime <= inQuery.SyncFinishedEndTime) .ProjectTo(_mapper.ConfigurationProvider); var pageList = await fileUploadRecordQueryable.ToPagedListAsync(inQuery); return pageList; } /// /// 任务具体执行记录表 /// /// /// [HttpPost] public async Task> GetUploadFileSyncRecordList(UploadFileSyncRecordQuery inQuery) { var fileUploadRecordQueryable = _uploadFileSyncRecordRepository .WhereIf(inQuery.JobState != null, t => t.JobState == inQuery.JobState) .WhereIf(inQuery.FileUploadRecordId != null, t => t.FileUploadRecordId == inQuery.FileUploadRecordId) .WhereIf(!string.IsNullOrEmpty(inQuery.StudyCode), t => t.FileUploadRecord.StudyCode.Contains(inQuery.StudyCode)) .WhereIf(!string.IsNullOrEmpty(inQuery.VisitName), t => t.FileUploadRecord.SubjectVisit.VisitName.Contains(inQuery.VisitName)) .WhereIf(!string.IsNullOrEmpty(inQuery.SubjectCode), t => t.FileUploadRecord.Subject.Code.Contains(inQuery.SubjectCode)) .ProjectTo(_mapper.ConfigurationProvider); var pageList = await fileUploadRecordQueryable.ToPagedListAsync(inQuery); return pageList; } /// /// 批量设置为需要同步,并且设置优先级 /// /// /// public async Task BatchAddSyncFileTask(BatchAddSyncFileCommand inComand) { await _fileUploadRecordRepository.BatchUpdateNoTrackingAsync(t => inComand.FileUploadRecordIdList.Contains(t.Id), u => new FileUploadRecord() { IsNeedSync = true, Priority = inComand.Priority }); foreach (var item in inComand.FileUploadRecordIdList) { _fileSyncQueue.Enqueue(item, inComand.Priority ?? 0); } await _fileUploadRecordRepository.SaveChangesAsync(); return ResponseOutput.Ok(); } public async Task AddOrUpdateFileUploadRecord(FileUploadRecordAddOrEdit addOrEditFileUploadRecord) { addOrEditFileUploadRecord.IP = _userInfo.IP; if (ObjectStoreServiceConfig.IsOpenStoreSync && _userInfo.Domain.IsNotNullOrEmpty()) { var find = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.Domain == _userInfo.Domain); if (find != null) { addOrEditFileUploadRecord.UploadRegion = find.UploadRegion; addOrEditFileUploadRecord.TargetRegion = find.TargetRegion; } else { //前后端调试的时候,上传的时候域名不对应,自动按照后端配置设置上传区域和同步区域 var apiDefalut = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.UploadRegion == ObjectStoreServiceConfig.ApiDeployRegion); if (apiDefalut != null) { addOrEditFileUploadRecord.UploadRegion = apiDefalut.UploadRegion; addOrEditFileUploadRecord.TargetRegion = apiDefalut.TargetRegion; } } } if (addOrEditFileUploadRecord.TrialId != null) { var trialDataStore = await _fusionCache.GetOrSetAsync(CacheKeys.TrialDataStoreType(addOrEditFileUploadRecord.TrialId.Value), async _ => { return await _trialRepository.Where(t => t.Id == addOrEditFileUploadRecord.TrialId).Select(t => t.TrialDataStoreType) .FirstOrDefaultAsync(); }, TimeSpan.FromDays(7) ); //项目配置了,那么就设置需要同步 if (trialDataStore == TrialDataStore.MUtiCenter) { addOrEditFileUploadRecord.IsNeedSync = true; addOrEditFileUploadRecord.Priority = 0; addOrEditFileUploadRecord.IsSync = false; } else { addOrEditFileUploadRecord.IsNeedSync = false; //addOrEditFileUploadRecord.TargetRegion = ""; } } else { //系统文件,默认同步 addOrEditFileUploadRecord.IsNeedSync = true; addOrEditFileUploadRecord.IsSync = false; addOrEditFileUploadRecord.Priority = 0; } var entity = await _fileUploadRecordRepository.InsertOrUpdateAsync(addOrEditFileUploadRecord, true); if (addOrEditFileUploadRecord.IsNeedSync == true) { _fileSyncQueue.Enqueue(entity.Id, addOrEditFileUploadRecord.Priority ?? 0); } return ResponseOutput.Ok(entity.Id.ToString()); } [HttpDelete("{fileUploadRecordId:guid}")] public async Task DeleteFileUploadRecord(Guid fileUploadRecordId) { var success = await _fileUploadRecordRepository.BatchDeleteNoTrackingAsync(t => t.Id == fileUploadRecordId); return ResponseOutput.Ok(); } } #region 同步队列 public sealed class FileSyncQueue { /// /// 优先级队列(仅负责排序) /// private readonly PriorityQueue _queue = new(); /// /// 当前等待中的任务(唯一真实数据) /// key = Guid /// value = 最新 priority /// private readonly Dictionary _waiting = new(); /// /// 正在执行的任务(防止重复执行) /// private readonly HashSet _running = new(); /// /// worker 等待信号 /// private readonly SemaphoreSlim _signal = new(0); private readonly object _lock = new(); // ============================================================ // Enqueue // ============================================================ /// /// 入队(同 Guid 会覆盖优先级) /// public void Enqueue(Guid id, int priority) { bool needSignal = false; lock (_lock) { // 如果正在执行,忽略(防止重复) if (_running.Contains(id)) return; // 是否新任务(用于减少 signal 风暴) if (!_waiting.ContainsKey(id)) needSignal = true; // 更新为最新优先级(最后一次为准) _waiting[id] = priority; //等价于添加或者更新 // PriorityQueue 无法更新节点 // 允许旧节点存在,Dequeue 时过滤 _queue.Enqueue(id, -priority); } // 只有新增任务才唤醒 worker if (needSignal) _signal.Release(); } // ============================================================ // Dequeue // ============================================================ /// /// 获取一个待执行任务(无任务时自动等待) /// public async Task DequeueAsync(CancellationToken ct) { while (true) { await _signal.WaitAsync(ct); lock (_lock) { while (_queue.Count > 0) { var id = _queue.Dequeue(); // 已被覆盖或取消 如果这个任务已经不是“当前最新版任务”,那它只是 PriorityQueue 里的垃圾数据,直接跳过。 if (!_waiting.TryGetValue(id, out _)) //能从等待任务中取到,那么就是有效的,不能取到那么就是覆盖的 continue; // 标记为运行中 _waiting.Remove(id); _running.Add(id); return id; } } } } // ============================================================ // Complete // ============================================================ /// /// 任务执行完成(必须调用) /// public void Complete(Guid id) { lock (_lock) { _running.Remove(id); } } // ============================================================ // Snapshot // ============================================================ /// /// 当前等待中的任务快照 /// public Guid[] Snapshot() { lock (_lock) { return _waiting.Keys.ToArray(); } } // ============================================================ // 状态信息(调试用) // ============================================================ public int WaitingCount { get { lock (_lock) return _waiting.Count; } } public int RunningCount { get { lock (_lock) return _running.Count; } } } ///// ///// 同步队列 信号量 ///// //public class FileSyncQueue //{ // private readonly PriorityQueue _queue = new(); // private readonly SemaphoreSlim _signal = new(0); // private readonly object _lock = new(); // public void Enqueue(Guid id, int priority) // { // lock (_lock) // { // // priority 越大越优先 // _queue.Enqueue(id, -priority); // } // //类似于计数器,不会产生通知风暴,可消费资源 +1 // //if (有等待线程) 唤醒一个 else 仅增加计数 // _signal.Release(); // 唤醒一个 worker // } // /// // /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 // /// // /// // /// // public async Task DequeueAsync(CancellationToken ct) // { // await _signal.WaitAsync(ct); // lock (_lock) // { // return _queue.Dequeue(); // } // } // /// // /// 获取队列任务Id // /// // /// // public Guid[] Snapshot() // { // lock (_lock) // { // return _queue.UnorderedItems // .Select(x => x.Element) // .ToArray(); // } // } //} #region 这里不用 SyncQueueUseChannel 和调度器 SyncScheduler public class SyncQueueUseChannel { // 优先级队列(priority 越大越先执行) private readonly PriorityQueue _queue = new(); // Worker 唤醒信号 private readonly Channel _signal = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false }); // 队列任务数量(不是CPU数量!) private int _count = 0; private readonly object _lock = new(); /// /// 入队任务 /// public void Enqueue(Guid id, int priority) { bool needSignal = false; lock (_lock) { // priority 越大越优先 → 转负数 _queue.Enqueue(id, -priority); // 只有从 0 → 1 才需要唤醒 worker if (_count == 0) needSignal = true; _count++; } // 避免 signal 风暴 if (needSignal) _signal.Writer.TryWrite(true); } /// /// Worker 等待并获取任务 /// public async Task DequeueAsync(CancellationToken ct) { // 没任务时挂起(不会占CPU) await _signal.Reader.ReadAsync(ct); lock (_lock) { var id = _queue.Dequeue(); _count--; // 如果还有任务,继续唤醒下一个 worker if (_count > 0) _signal.Writer.TryWrite(true); return id; } } /// /// 当前排队数量(调试用) /// public int Count { get { lock (_lock) return _count; } } } /// /// 同步调度器 /// public class FileSyncScheduler { private readonly FileSyncQueue _queue; public FileSyncScheduler(FileSyncQueue queue) { _queue = queue; } public void Enqueue(FileUploadRecord file) { if (file.IsNeedSync != true) return; _queue.Enqueue(file.Id, file.Priority ?? 0); } /// /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 /// /// /// public Task WaitAsync(CancellationToken ct) => _queue.DequeueAsync(ct); } #endregion #endregion