diff --git a/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs b/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs
index 5114d819d..089465d10 100644
--- a/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs
+++ b/IRC.Core.SCP/HostConfig/SyncFileRecoveryService.cs
@@ -7,6 +7,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using System;
using System.Linq;
using System.Threading;
@@ -20,40 +21,50 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn
private readonly int _pageSize = 500;
+ ///
+ /// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务
+ ///
+ ///
+ ///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
- using var scope = _scopeFactory.CreateScope();
- var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>();
- // 延迟启动,保证主机快速启动
- await Task.Delay(5000, stoppingToken);
+ await Task.CompletedTask;
- int page = 0;
+ //using var scope = _scopeFactory.CreateScope();
+ //var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>();
+
+ //// 延迟启动,保证主机快速启动
+ //await Task.Delay(5000, stoppingToken);
+
+ //int page = 0;
- while (!stoppingToken.IsCancellationRequested)
- {
- // 分页获取未入队任务
- var pending = await fileUploadRecordRepository
- .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
- .OrderByDescending(x => x.Priority)
- .Select(t => new { t.Id, t.Priority })
- .Skip(page * _pageSize)
- .Take(_pageSize)
- .ToListAsync(stoppingToken);
+ //while (!stoppingToken.IsCancellationRequested)
+ //{
+ // // 分页获取未入队任务
+ // var pending = await fileUploadRecordRepository
+ // .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
+ // .OrderByDescending(x => x.Priority)
+ // .Select(t => new { t.Id, t.Priority })
+ // .Skip(page * _pageSize)
+ // .Take(_pageSize)
+ // .ToListAsync(stoppingToken);
- if (!pending.Any())
- break; // 扫描完毕,退出循环
+ // if (!pending.Any())
+ // break; // 扫描完毕,退出循环
+
+ // foreach (var file in pending)
+ // {
+ // //file.IsQueued = true; // 避免重复入队
+ // _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
+ // }
+
+ // page++; // 下一页
+ // await Task.Delay(200, stoppingToken); // 缓解数据库压力
+ //}
- foreach (var file in pending)
- {
- //file.IsQueued = true; // 避免重复入队
- _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
- }
- page++; // 下一页
- await Task.Delay(200, stoppingToken); // 缓解数据库压力
- }
}
}
@@ -85,12 +96,29 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger>();
var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService>();
+
+ var syncConfig = (scope.ServiceProvider.GetRequiredService>()).CurrentValue;
+
var oss = scope.ServiceProvider.GetRequiredService();
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
- if (file == null || file.IsNeedSync != true)
+ // ✅ 不要 return
+ if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false)
+ {
+ continue;
+ }
+
+ if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
+ {
+ continue;
+ }
+
+ //如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务
+ if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
+ {
return;
+ }
var log = new UploadFileSyncRecord
{
@@ -126,6 +154,11 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger _fileUploadRe
-
+
public async Task AddOrUpdateFileUploadRecord(FileUploadRecordAddOrEdit addOrEditFileUploadRecord)
{
@@ -98,6 +99,18 @@ public class FileUploadRecordService(IRepository _fileUploadRe
addOrEditFileUploadRecord.TargetRegion = find.TargetRegion;
}
+ else
+ {
+ //前后端调试的时候,上传的时候域名不对应,自动按照后端配置设置上传区域和同步区域
+ var apiDefalut = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.UploadRegion == ObjectStoreServiceConfig.ApiDeployRegion);
+
+ if (apiDefalut != null)
+ {
+ addOrEditFileUploadRecord.UploadRegion = apiDefalut.UploadRegion;
+ addOrEditFileUploadRecord.TargetRegion = apiDefalut.TargetRegion;
+
+ }
+ }
}
if (addOrEditFileUploadRecord.TrialId != null)
@@ -122,7 +135,9 @@ public class FileUploadRecordService(IRepository _fileUploadRe
}
else
{
- addOrEditFileUploadRecord.TargetRegion = "";
+ addOrEditFileUploadRecord.IsNeedSync = false;
+
+ //addOrEditFileUploadRecord.TargetRegion = "";
}
@@ -154,40 +169,149 @@ public class FileUploadRecordService(IRepository _fileUploadRe
-///
-/// 同步队列 信号量
-///
-public class FileSyncQueue
+public sealed class FileSyncQueue
{
+ ///
+ /// 优先级队列(仅负责排序)
+ ///
private readonly PriorityQueue _queue = new();
+
+ ///
+ /// 当前等待中的任务(唯一真实数据)
+ /// key = Guid
+ /// value = 最新 priority
+ ///
+ private readonly Dictionary _waiting = new();
+
+ ///
+ /// 正在执行的任务(防止重复执行)
+ ///
+ private readonly HashSet _running = new();
+
+ ///
+ /// worker 等待信号
+ ///
private readonly SemaphoreSlim _signal = new(0);
+
+
private readonly object _lock = new();
+ // ============================================================
+ // Enqueue
+ // ============================================================
+
+ ///
+ /// 入队(同 Guid 会覆盖优先级)
+ ///
public void Enqueue(Guid id, int priority)
{
+ bool needSignal = false;
+
lock (_lock)
{
- // priority 越大越优先
+ // 如果正在执行,忽略(防止重复)
+ if (_running.Contains(id))
+ return;
+
+ // 是否新任务(用于减少 signal 风暴)
+ if (!_waiting.ContainsKey(id))
+ needSignal = true;
+
+ // 更新为最新优先级(最后一次为准)
+ _waiting[id] = priority; //等价于添加或者更新
+
+ // PriorityQueue 无法更新节点
+ // 允许旧节点存在,Dequeue 时过滤
_queue.Enqueue(id, -priority);
}
- //类似于计数器,不会产生通知风暴,可消费资源 +1
- //if (有等待线程) 唤醒一个 else 仅增加计数
- _signal.Release(); // 唤醒一个 worker
+ // 只有新增任务才唤醒 worker
+ if (needSignal)
+ _signal.Release();
}
+ // ============================================================
+ // Dequeue
+ // ============================================================
+
///
- /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
+ /// 获取一个待执行任务(无任务时自动等待)
///
- ///
- ///
public async Task DequeueAsync(CancellationToken ct)
{
- await _signal.WaitAsync(ct);
+ while (true)
+ {
+ await _signal.WaitAsync(ct);
+ lock (_lock)
+ {
+ while (_queue.Count > 0)
+ {
+ var id = _queue.Dequeue();
+
+ // 已被覆盖或取消
+ if (!_waiting.TryGetValue(id, out _))
+ continue;
+
+ // 标记为运行中
+ _waiting.Remove(id);
+ _running.Add(id);
+
+ return id;
+ }
+ }
+ }
+ }
+
+ // ============================================================
+ // Complete
+ // ============================================================
+
+ ///
+ /// 任务执行完成(必须调用)
+ ///
+ public void Complete(Guid id)
+ {
lock (_lock)
{
- return _queue.Dequeue();
+ _running.Remove(id);
+ }
+ }
+
+ // ============================================================
+ // Snapshot
+ // ============================================================
+
+ ///
+ /// 当前等待中的任务快照
+ ///
+ public Guid[] Snapshot()
+ {
+ lock (_lock)
+ {
+ return _waiting.Keys.ToArray();
+ }
+ }
+
+ // ============================================================
+ // 状态信息(调试用)
+ // ============================================================
+
+ public int WaitingCount
+ {
+ get
+ {
+ lock (_lock)
+ return _waiting.Count;
+ }
+ }
+
+ public int RunningCount
+ {
+ get
+ {
+ lock (_lock)
+ return _running.Count;
}
}
}
\ No newline at end of file
diff --git a/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs b/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs
index da3a7be84..ad82da60d 100644
--- a/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs
+++ b/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs
@@ -20,6 +20,11 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn
private readonly int _pageSize = 500;
+ ///
+ /// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务
+ ///
+ ///
+ ///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
@@ -73,7 +78,6 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger t.Id == id);
+ // ✅ 不要 return
if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false)
- return;
+ {
+ continue;
+ }
+
+ if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
+ {
+ continue;
+ }
//如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
@@ -116,7 +128,7 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger
-
+
- 同步队列 信号量
+ 批量设置为需要同步,并且设置优先级
+
+
+
+
+
+
+ 优先级队列(仅负责排序)
+
+
+
+
+ 当前等待中的任务(唯一真实数据)
+ key = Guid
+ value = 最新 priority
+
+
+
+
+ 正在执行的任务(防止重复执行)
+
+
+
+
+ worker 等待信号
+
+
+
+
+ 入队(同 Guid 会覆盖优先级)
- 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
+ 获取一个待执行任务(无任务时自动等待)
+
+
+
+
+ 任务执行完成(必须调用)
-
-
- 获取队列任务Id
+ 当前等待中的任务快照
-
diff --git a/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs b/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs
index 0bb0a6af7..e983872d1 100644
--- a/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs
+++ b/IRaCIS.Core.Application/Service/Common/DTO/FileUploadRecordViewModel.cs
@@ -228,4 +228,6 @@ public class UploadFileSyncRecordQuery : PageInput
public class BatchAddSyncFileCommand
{
public List FileUploadRecordIdList { get; set; }
+
+ public int? Priority { get; set; }
}
diff --git a/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs b/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs
index 3ed43a408..22c99add8 100644
--- a/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs
+++ b/IRaCIS.Core.Application/Service/Common/FileUploadRecordService.cs
@@ -150,12 +150,25 @@ public class FileUploadRecordService(IRepository _fileUploadRe
}
+ ///
+ /// 批量设置为需要同步,并且设置优先级
+ ///
+ ///
+ ///
public async Task BatchAddSyncFileTask(BatchAddSyncFileCommand inComand)
{
+ await _fileUploadRecordRepository.BatchUpdateNoTrackingAsync(t => inComand.FileUploadRecordIdList.Contains(t.Id), u => new FileUploadRecord() { IsNeedSync = true, Priority = inComand.Priority });
+
+ foreach (var item in inComand.FileUploadRecordIdList)
+ {
+ _fileSyncQueue.Enqueue(item, inComand.Priority ?? 0);
+
+ }
+ await _fileUploadRecordRepository.SaveChangesAsync();
return ResponseOutput.Ok();
}
@@ -253,59 +266,206 @@ public class FileUploadRecordService(IRepository _fileUploadRe
#region 同步队列
-
-///
-/// 同步队列 信号量
-///
-public class FileSyncQueue
+public sealed class FileSyncQueue
{
+ ///
+ /// 优先级队列(仅负责排序)
+ ///
private readonly PriorityQueue _queue = new();
+
+ ///
+ /// 当前等待中的任务(唯一真实数据)
+ /// key = Guid
+ /// value = 最新 priority
+ ///
+ private readonly Dictionary _waiting = new();
+
+ ///
+ /// 正在执行的任务(防止重复执行)
+ ///
+ private readonly HashSet _running = new();
+
+ ///
+ /// worker 等待信号
+ ///
private readonly SemaphoreSlim _signal = new(0);
+
+
private readonly object _lock = new();
+ // ============================================================
+ // Enqueue
+ // ============================================================
+
+ ///
+ /// 入队(同 Guid 会覆盖优先级)
+ ///
public void Enqueue(Guid id, int priority)
{
+ bool needSignal = false;
+
lock (_lock)
{
- // priority 越大越优先
+ // 如果正在执行,忽略(防止重复)
+ if (_running.Contains(id))
+ return;
+
+ // 是否新任务(用于减少 signal 风暴)
+ if (!_waiting.ContainsKey(id))
+ needSignal = true;
+
+ // 更新为最新优先级(最后一次为准)
+ _waiting[id] = priority; //等价于添加或者更新
+
+ // PriorityQueue 无法更新节点
+ // 允许旧节点存在,Dequeue 时过滤
_queue.Enqueue(id, -priority);
}
- //类似于计数器,不会产生通知风暴,可消费资源 +1
- //if (有等待线程) 唤醒一个 else 仅增加计数
- _signal.Release(); // 唤醒一个 worker
+ // 只有新增任务才唤醒 worker
+ if (needSignal)
+ _signal.Release();
}
+ // ============================================================
+ // Dequeue
+ // ============================================================
+
///
- /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
+ /// 获取一个待执行任务(无任务时自动等待)
///
- ///
- ///
public async Task DequeueAsync(CancellationToken ct)
{
- await _signal.WaitAsync(ct);
-
- lock (_lock)
+ while (true)
{
- return _queue.Dequeue();
+ await _signal.WaitAsync(ct);
+
+ lock (_lock)
+ {
+ while (_queue.Count > 0)
+ {
+ var id = _queue.Dequeue();
+
+ // 已被覆盖或取消 如果这个任务已经不是“当前最新版任务”,那它只是 PriorityQueue 里的垃圾数据,直接跳过。
+ if (!_waiting.TryGetValue(id, out _)) //能从等待任务中取到,那么就是有效的,不能取到那么就是覆盖的
+ continue;
+
+ // 标记为运行中
+ _waiting.Remove(id);
+ _running.Add(id);
+
+ return id;
+ }
+ }
}
}
+ // ============================================================
+ // Complete
+ // ============================================================
+
///
- /// 获取队列任务Id
+ /// 任务执行完成(必须调用)
+ ///
+ public void Complete(Guid id)
+ {
+ lock (_lock)
+ {
+ _running.Remove(id);
+ }
+ }
+
+ // ============================================================
+ // Snapshot
+ // ============================================================
+
+ ///
+ /// 当前等待中的任务快照
///
- ///
public Guid[] Snapshot()
{
lock (_lock)
{
- return _queue.UnorderedItems
- .Select(x => x.Element)
- .ToArray();
+ return _waiting.Keys.ToArray();
+ }
+ }
+
+ // ============================================================
+ // 状态信息(调试用)
+ // ============================================================
+
+ public int WaitingCount
+ {
+ get
+ {
+ lock (_lock)
+ return _waiting.Count;
+ }
+ }
+
+ public int RunningCount
+ {
+ get
+ {
+ lock (_lock)
+ return _running.Count;
}
}
}
+
+/////
+///// 同步队列 信号量
+/////
+//public class FileSyncQueue
+//{
+// private readonly PriorityQueue _queue = new();
+// private readonly SemaphoreSlim _signal = new(0);
+// private readonly object _lock = new();
+
+// public void Enqueue(Guid id, int priority)
+// {
+// lock (_lock)
+// {
+// // priority 越大越优先
+// _queue.Enqueue(id, -priority);
+// }
+
+// //类似于计数器,不会产生通知风暴,可消费资源 +1
+// //if (有等待线程) 唤醒一个 else 仅增加计数
+// _signal.Release(); // 唤醒一个 worker
+// }
+
+// ///
+// /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
+// ///
+// ///
+// ///
+// public async Task DequeueAsync(CancellationToken ct)
+// {
+// await _signal.WaitAsync(ct);
+
+// lock (_lock)
+// {
+// return _queue.Dequeue();
+// }
+// }
+
+// ///
+// /// 获取队列任务Id
+// ///
+// ///
+// public Guid[] Snapshot()
+// {
+// lock (_lock)
+// {
+// return _queue.UnorderedItems
+// .Select(x => x.Element)
+// .ToArray();
+// }
+// }
+//}
+
#region 这里不用 SyncQueueUseChannel 和调度器 SyncScheduler
public class SyncQueueUseChannel
{