From 5a7796939f6c5728cfd8b4c925d421b3f161dca0 Mon Sep 17 00:00:00 2001 From: hang <872297557@qq.com> Date: Fri, 3 Apr 2026 16:00:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=85=88=E7=BA=A7=E9=98=9F=E5=88=97?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0hash=E5=8E=BB=E9=87=8D=EF=BC=8C?= =?UTF-8?q?=E8=A6=86=E7=9B=96=E4=BC=98=E5=85=88=E7=BA=A7=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HostConfig/SyncFileRecoveryService.cs | 85 +++++--- .../Service/FileUploadRecordService.cs | 154 +++++++++++-- .../HostService/SyncFileRecoveryService.cs | 25 ++- .../IRaCIS.Core.Application.xml | 45 +++- .../Common/DTO/FileUploadRecordViewModel.cs | 2 + .../Service/Common/FileUploadRecordService.cs | 202 ++++++++++++++++-- 6 files changed, 440 insertions(+), 73 deletions(-) diff --git a/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs b/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs index 5114d819d..089465d10 100644 --- a/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs +++ b/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs @@ -7,6 +7,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System; using System.Linq; using System.Threading; @@ -20,40 +21,50 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn private readonly int _pageSize = 500; + /// + /// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务 + /// + /// + /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - using var scope = _scopeFactory.CreateScope(); - var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); - // 延迟启动,保证主机快速启动 - await Task.Delay(5000, stoppingToken); + await Task.CompletedTask; - int page = 0; + //using var scope = _scopeFactory.CreateScope(); + //var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); + + //// 延迟启动,保证主机快速启动 + //await Task.Delay(5000, stoppingToken); + + //int page = 0; - while (!stoppingToken.IsCancellationRequested) - { - // 分页获取未入队任务 - var pending = await fileUploadRecordRepository - .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null)) - .OrderByDescending(x => x.Priority) - .Select(t => new { t.Id, t.Priority }) - .Skip(page * _pageSize) - .Take(_pageSize) - .ToListAsync(stoppingToken); + //while (!stoppingToken.IsCancellationRequested) + //{ + // // 分页获取未入队任务 + // var pending = await fileUploadRecordRepository + // .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null)) + // .OrderByDescending(x => x.Priority) + // .Select(t => new { t.Id, t.Priority }) + // .Skip(page * _pageSize) + // .Take(_pageSize) + // .ToListAsync(stoppingToken); - if (!pending.Any()) - break; // 扫描完毕,退出循环 + // if (!pending.Any()) + // break; // 扫描完毕,退出循环 + + // foreach (var file in pending) + // { + // //file.IsQueued = true; // 避免重复入队 + // _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列 + // } + + // page++; // 下一页 + // await Task.Delay(200, stoppingToken); // 缓解数据库压力 + //} - foreach (var file in pending) - { - //file.IsQueued = true; // 避免重复入队 - _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列 - } - page++; // 下一页 - await Task.Delay(200, stoppingToken); // 缓解数据库压力 - } } } @@ -85,12 +96,29 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger>(); var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService>(); + + var syncConfig = (scope.ServiceProvider.GetRequiredService>()).CurrentValue; + var oss = scope.ServiceProvider.GetRequiredService(); var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id); - if (file == null || file.IsNeedSync != true) + // ✅ 不要 return + if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false) + { + continue; + } + + if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) + { + continue; + } + + //如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务 + if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) + { return; + } var log = new UploadFileSyncRecord { @@ -126,6 +154,11 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger _fileUploadRe - + public async Task AddOrUpdateFileUploadRecord(FileUploadRecordAddOrEdit addOrEditFileUploadRecord) { @@ -98,6 +99,18 @@ public class FileUploadRecordService(IRepository _fileUploadRe addOrEditFileUploadRecord.TargetRegion = find.TargetRegion; } + else + { + //前后端调试的时候,上传的时候域名不对应,自动按照后端配置设置上传区域和同步区域 + var apiDefalut = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.UploadRegion == ObjectStoreServiceConfig.ApiDeployRegion); + + if (apiDefalut != null) + { + addOrEditFileUploadRecord.UploadRegion = apiDefalut.UploadRegion; + addOrEditFileUploadRecord.TargetRegion = apiDefalut.TargetRegion; + + } + } } if (addOrEditFileUploadRecord.TrialId != null) @@ -122,7 +135,9 @@ public class FileUploadRecordService(IRepository _fileUploadRe } else { - addOrEditFileUploadRecord.TargetRegion = ""; + addOrEditFileUploadRecord.IsNeedSync = false; + + //addOrEditFileUploadRecord.TargetRegion = ""; } @@ -154,40 +169,149 @@ public class FileUploadRecordService(IRepository _fileUploadRe -/// -/// 同步队列 信号量 -/// -public class FileSyncQueue +public sealed class FileSyncQueue { + /// + /// 优先级队列(仅负责排序) + /// private readonly PriorityQueue _queue = new(); + + /// + /// 当前等待中的任务(唯一真实数据) + /// key = Guid + /// value = 最新 priority + /// + private readonly Dictionary _waiting = new(); + + /// + /// 正在执行的任务(防止重复执行) + /// + private readonly HashSet _running = new(); + + /// + /// worker 等待信号 + /// private readonly SemaphoreSlim _signal = new(0); + + private readonly object _lock = new(); + // ============================================================ + // Enqueue + // ============================================================ + + /// + /// 入队(同 Guid 会覆盖优先级) + /// public void Enqueue(Guid id, int priority) { + bool needSignal = false; + lock (_lock) { - // priority 越大越优先 + // 如果正在执行,忽略(防止重复) + if (_running.Contains(id)) + return; + + // 是否新任务(用于减少 signal 风暴) + if (!_waiting.ContainsKey(id)) + needSignal = true; + + // 更新为最新优先级(最后一次为准) + _waiting[id] = priority; //等价于添加或者更新 + + // PriorityQueue 无法更新节点 + // 允许旧节点存在,Dequeue 时过滤 _queue.Enqueue(id, -priority); } - //类似于计数器,不会产生通知风暴,可消费资源 +1 - //if (有等待线程) 唤醒一个 else 仅增加计数 - _signal.Release(); // 唤醒一个 worker + // 只有新增任务才唤醒 worker + if (needSignal) + _signal.Release(); } + // ============================================================ + // Dequeue + // ============================================================ + /// - /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 + /// 获取一个待执行任务(无任务时自动等待) /// - /// - /// public async Task DequeueAsync(CancellationToken ct) { - await _signal.WaitAsync(ct); + while (true) + { + await _signal.WaitAsync(ct); + lock (_lock) + { + while (_queue.Count > 0) + { + var id = _queue.Dequeue(); + + // 已被覆盖或取消 + if (!_waiting.TryGetValue(id, out _)) + continue; + + // 标记为运行中 + _waiting.Remove(id); + _running.Add(id); + + return id; + } + } + } + } + + // ============================================================ + // Complete + // ============================================================ + + /// + /// 任务执行完成(必须调用) + /// + public void Complete(Guid id) + { lock (_lock) { - return _queue.Dequeue(); + _running.Remove(id); + } + } + + // ============================================================ + // Snapshot + // ============================================================ + + /// + /// 当前等待中的任务快照 + /// + public Guid[] Snapshot() + { + lock (_lock) + { + return _waiting.Keys.ToArray(); + } + } + + // ============================================================ + // 状态信息(调试用) + // ============================================================ + + public int WaitingCount + { + get + { + lock (_lock) + return _waiting.Count; + } + } + + public int RunningCount + { + get + { + lock (_lock) + return _running.Count; } } } \ No newline at end of file diff --git a/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs b/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs index da3a7be84..ad82da60d 100644 --- a/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs +++ b/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs @@ -20,6 +20,11 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn private readonly int _pageSize = 500; + /// + /// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务 + /// + /// + /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { using var scope = _scopeFactory.CreateScope(); @@ -73,7 +78,6 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger t.Id == id); + // ✅ 不要 return if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false) - return; + { + continue; + } + + if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) + { + continue; + } //如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务 if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) @@ -116,7 +128,7 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger - + - 同步队列 信号量 + 批量设置为需要同步,并且设置优先级 + + + + + + + 优先级队列(仅负责排序) + + + + + 当前等待中的任务(唯一真实数据) + key = Guid + value = 最新 priority + + + + + 正在执行的任务(防止重复执行) + + + + + worker 等待信号 + + + + + 入队(同 Guid 会覆盖优先级) - 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 + 获取一个待执行任务(无任务时自动等待) + + + + + 任务执行完成(必须调用) - - - 获取队列任务Id + 当前等待中的任务快照 - diff --git a/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs b/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs index 0bb0a6af7..e983872d1 100644 --- a/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs +++ b/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs @@ -228,4 +228,6 @@ public class UploadFileSyncRecordQuery : PageInput public class BatchAddSyncFileCommand { public List FileUploadRecordIdList { get; set; } + + public int? Priority { get; set; } } diff --git a/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs b/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs index 3ed43a408..22c99add8 100644 --- a/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs +++ b/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs @@ -150,12 +150,25 @@ public class FileUploadRecordService(IRepository _fileUploadRe } + /// + /// 批量设置为需要同步,并且设置优先级 + /// + /// + /// public async Task BatchAddSyncFileTask(BatchAddSyncFileCommand inComand) { + await _fileUploadRecordRepository.BatchUpdateNoTrackingAsync(t => inComand.FileUploadRecordIdList.Contains(t.Id), u => new FileUploadRecord() { IsNeedSync = true, Priority = inComand.Priority }); + + foreach (var item in inComand.FileUploadRecordIdList) + { + _fileSyncQueue.Enqueue(item, inComand.Priority ?? 0); + + } + await _fileUploadRecordRepository.SaveChangesAsync(); return ResponseOutput.Ok(); } @@ -253,59 +266,206 @@ public class FileUploadRecordService(IRepository _fileUploadRe #region 同步队列 - -/// -/// 同步队列 信号量 -/// -public class FileSyncQueue +public sealed class FileSyncQueue { + /// + /// 优先级队列(仅负责排序) + /// private readonly PriorityQueue _queue = new(); + + /// + /// 当前等待中的任务(唯一真实数据) + /// key = Guid + /// value = 最新 priority + /// + private readonly Dictionary _waiting = new(); + + /// + /// 正在执行的任务(防止重复执行) + /// + private readonly HashSet _running = new(); + + /// + /// worker 等待信号 + /// private readonly SemaphoreSlim _signal = new(0); + + private readonly object _lock = new(); + // ============================================================ + // Enqueue + // ============================================================ + + /// + /// 入队(同 Guid 会覆盖优先级) + /// public void Enqueue(Guid id, int priority) { + bool needSignal = false; + lock (_lock) { - // priority 越大越优先 + // 如果正在执行,忽略(防止重复) + if (_running.Contains(id)) + return; + + // 是否新任务(用于减少 signal 风暴) + if (!_waiting.ContainsKey(id)) + needSignal = true; + + // 更新为最新优先级(最后一次为准) + _waiting[id] = priority; //等价于添加或者更新 + + // PriorityQueue 无法更新节点 + // 允许旧节点存在,Dequeue 时过滤 _queue.Enqueue(id, -priority); } - //类似于计数器,不会产生通知风暴,可消费资源 +1 - //if (有等待线程) 唤醒一个 else 仅增加计数 - _signal.Release(); // 唤醒一个 worker + // 只有新增任务才唤醒 worker + if (needSignal) + _signal.Release(); } + // ============================================================ + // Dequeue + // ============================================================ + /// - /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 + /// 获取一个待执行任务(无任务时自动等待) /// - /// - /// public async Task DequeueAsync(CancellationToken ct) { - await _signal.WaitAsync(ct); - - lock (_lock) + while (true) { - return _queue.Dequeue(); + await _signal.WaitAsync(ct); + + lock (_lock) + { + while (_queue.Count > 0) + { + var id = _queue.Dequeue(); + + // 已被覆盖或取消 如果这个任务已经不是“当前最新版任务”,那它只是 PriorityQueue 里的垃圾数据,直接跳过。 + if (!_waiting.TryGetValue(id, out _)) //能从等待任务中取到,那么就是有效的,不能取到那么就是覆盖的 + continue; + + // 标记为运行中 + _waiting.Remove(id); + _running.Add(id); + + return id; + } + } } } + // ============================================================ + // Complete + // ============================================================ + /// - /// 获取队列任务Id + /// 任务执行完成(必须调用) + /// + public void Complete(Guid id) + { + lock (_lock) + { + _running.Remove(id); + } + } + + // ============================================================ + // Snapshot + // ============================================================ + + /// + /// 当前等待中的任务快照 /// - /// public Guid[] Snapshot() { lock (_lock) { - return _queue.UnorderedItems - .Select(x => x.Element) - .ToArray(); + return _waiting.Keys.ToArray(); + } + } + + // ============================================================ + // 状态信息(调试用) + // ============================================================ + + public int WaitingCount + { + get + { + lock (_lock) + return _waiting.Count; + } + } + + public int RunningCount + { + get + { + lock (_lock) + return _running.Count; } } } + +///// +///// 同步队列 信号量 +///// +//public class FileSyncQueue +//{ +// private readonly PriorityQueue _queue = new(); +// private readonly SemaphoreSlim _signal = new(0); +// private readonly object _lock = new(); + +// public void Enqueue(Guid id, int priority) +// { +// lock (_lock) +// { +// // priority 越大越优先 +// _queue.Enqueue(id, -priority); +// } + +// //类似于计数器,不会产生通知风暴,可消费资源 +1 +// //if (有等待线程) 唤醒一个 else 仅增加计数 +// _signal.Release(); // 唤醒一个 worker +// } + +// /// +// /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回 +// /// +// /// +// /// +// public async Task DequeueAsync(CancellationToken ct) +// { +// await _signal.WaitAsync(ct); + +// lock (_lock) +// { +// return _queue.Dequeue(); +// } +// } + +// /// +// /// 获取队列任务Id +// /// +// /// +// public Guid[] Snapshot() +// { +// lock (_lock) +// { +// return _queue.UnorderedItems +// .Select(x => x.Element) +// .ToArray(); +// } +// } +//} + #region 这里不用 SyncQueueUseChannel 和调度器 SyncScheduler public class SyncQueueUseChannel {