优先级队列,增加hash去重,覆盖优先级测试
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
parent
cad9eaf966
commit
5a7796939f
|
|
@ -7,6 +7,7 @@ using Microsoft.EntityFrameworkCore;
|
|||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
|
|
@ -20,40 +21,50 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn
|
|||
|
||||
private readonly int _pageSize = 500;
|
||||
|
||||
/// <summary>
|
||||
/// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务
|
||||
/// </summary>
|
||||
/// <param name="stoppingToken"></param>
|
||||
/// <returns></returns>
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
|
||||
|
||||
// 延迟启动,保证主机快速启动
|
||||
await Task.Delay(5000, stoppingToken);
|
||||
await Task.CompletedTask;
|
||||
|
||||
int page = 0;
|
||||
//using var scope = _scopeFactory.CreateScope();
|
||||
//var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
|
||||
|
||||
//// 延迟启动,保证主机快速启动
|
||||
//await Task.Delay(5000, stoppingToken);
|
||||
|
||||
//int page = 0;
|
||||
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
// 分页获取未入队任务
|
||||
var pending = await fileUploadRecordRepository
|
||||
.Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
|
||||
.OrderByDescending(x => x.Priority)
|
||||
.Select(t => new { t.Id, t.Priority })
|
||||
.Skip(page * _pageSize)
|
||||
.Take(_pageSize)
|
||||
.ToListAsync(stoppingToken);
|
||||
//while (!stoppingToken.IsCancellationRequested)
|
||||
//{
|
||||
// // 分页获取未入队任务
|
||||
// var pending = await fileUploadRecordRepository
|
||||
// .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
|
||||
// .OrderByDescending(x => x.Priority)
|
||||
// .Select(t => new { t.Id, t.Priority })
|
||||
// .Skip(page * _pageSize)
|
||||
// .Take(_pageSize)
|
||||
// .ToListAsync(stoppingToken);
|
||||
|
||||
if (!pending.Any())
|
||||
break; // 扫描完毕,退出循环
|
||||
// if (!pending.Any())
|
||||
// break; // 扫描完毕,退出循环
|
||||
|
||||
// foreach (var file in pending)
|
||||
// {
|
||||
// //file.IsQueued = true; // 避免重复入队
|
||||
// _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
|
||||
// }
|
||||
|
||||
// page++; // 下一页
|
||||
// await Task.Delay(200, stoppingToken); // 缓解数据库压力
|
||||
//}
|
||||
|
||||
foreach (var file in pending)
|
||||
{
|
||||
//file.IsQueued = true; // 避免重复入队
|
||||
_fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
|
||||
}
|
||||
|
||||
page++; // 下一页
|
||||
await Task.Delay(200, stoppingToken); // 缓解数据库压力
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -85,12 +96,29 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
using var scope = _scopeFactory.CreateScope();
|
||||
var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
|
||||
var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<UploadFileSyncRecord>>();
|
||||
|
||||
var syncConfig = (scope.ServiceProvider.GetRequiredService<IOptionsMonitor<ObjectStoreServiceOptions>>()).CurrentValue;
|
||||
|
||||
var oss = scope.ServiceProvider.GetRequiredService<IOSSService>();
|
||||
|
||||
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
|
||||
|
||||
if (file == null || file.IsNeedSync != true)
|
||||
// ✅ 不要 return
|
||||
if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
//如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务
|
||||
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var log = new UploadFileSyncRecord
|
||||
{
|
||||
|
|
@ -126,6 +154,11 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
{
|
||||
_logger.LogError(ex, "Sync failed {Id}", id);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// ⭐⭐⭐ 永远执行
|
||||
_fileSyncQueue.Complete(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ public class FileUploadRecordAddOrEdit
|
|||
public string UploadBatchId { get; set; }
|
||||
public BatchDataType BatchDataType { get; set; }
|
||||
|
||||
public string StudyCode { get; set; }
|
||||
|
||||
public Guid? TrialId { get; set; }
|
||||
|
||||
|
|
@ -98,6 +99,18 @@ public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRe
|
|||
addOrEditFileUploadRecord.TargetRegion = find.TargetRegion;
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
//前后端调试的时候,上传的时候域名不对应,自动按照后端配置设置上传区域和同步区域
|
||||
var apiDefalut = ObjectStoreServiceConfig.SyncConfigList.FirstOrDefault(t => t.UploadRegion == ObjectStoreServiceConfig.ApiDeployRegion);
|
||||
|
||||
if (apiDefalut != null)
|
||||
{
|
||||
addOrEditFileUploadRecord.UploadRegion = apiDefalut.UploadRegion;
|
||||
addOrEditFileUploadRecord.TargetRegion = apiDefalut.TargetRegion;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (addOrEditFileUploadRecord.TrialId != null)
|
||||
|
|
@ -122,7 +135,9 @@ public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRe
|
|||
}
|
||||
else
|
||||
{
|
||||
addOrEditFileUploadRecord.TargetRegion = "";
|
||||
addOrEditFileUploadRecord.IsNeedSync = false;
|
||||
|
||||
//addOrEditFileUploadRecord.TargetRegion = "";
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -154,40 +169,149 @@ public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRe
|
|||
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 同步队列 信号量
|
||||
/// </summary>
|
||||
public class FileSyncQueue
|
||||
public sealed class FileSyncQueue
|
||||
{
|
||||
/// <summary>
|
||||
/// 优先级队列(仅负责排序)
|
||||
/// </summary>
|
||||
private readonly PriorityQueue<Guid, int> _queue = new();
|
||||
|
||||
/// <summary>
|
||||
/// 当前等待中的任务(唯一真实数据)
|
||||
/// key = Guid
|
||||
/// value = 最新 priority
|
||||
/// </summary>
|
||||
private readonly Dictionary<Guid, int> _waiting = new();
|
||||
|
||||
/// <summary>
|
||||
/// 正在执行的任务(防止重复执行)
|
||||
/// </summary>
|
||||
private readonly HashSet<Guid> _running = new();
|
||||
|
||||
/// <summary>
|
||||
/// worker 等待信号
|
||||
/// </summary>
|
||||
private readonly SemaphoreSlim _signal = new(0);
|
||||
|
||||
|
||||
private readonly object _lock = new();
|
||||
|
||||
// ============================================================
|
||||
// Enqueue
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 入队(同 Guid 会覆盖优先级)
|
||||
/// </summary>
|
||||
public void Enqueue(Guid id, int priority)
|
||||
{
|
||||
bool needSignal = false;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
// priority 越大越优先
|
||||
// 如果正在执行,忽略(防止重复)
|
||||
if (_running.Contains(id))
|
||||
return;
|
||||
|
||||
// 是否新任务(用于减少 signal 风暴)
|
||||
if (!_waiting.ContainsKey(id))
|
||||
needSignal = true;
|
||||
|
||||
// 更新为最新优先级(最后一次为准)
|
||||
_waiting[id] = priority; //等价于添加或者更新
|
||||
|
||||
// PriorityQueue 无法更新节点
|
||||
// 允许旧节点存在,Dequeue 时过滤
|
||||
_queue.Enqueue(id, -priority);
|
||||
}
|
||||
|
||||
//类似于计数器,不会产生通知风暴,可消费资源 +1
|
||||
//if (有等待线程) 唤醒一个 else 仅增加计数
|
||||
_signal.Release(); // 唤醒一个 worker
|
||||
// 只有新增任务才唤醒 worker
|
||||
if (needSignal)
|
||||
_signal.Release();
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Dequeue
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||||
/// 获取一个待执行任务(无任务时自动等待)
|
||||
/// </summary>
|
||||
/// <param name="ct"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<Guid> DequeueAsync(CancellationToken ct)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
await _signal.WaitAsync(ct);
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
return _queue.Dequeue();
|
||||
while (_queue.Count > 0)
|
||||
{
|
||||
var id = _queue.Dequeue();
|
||||
|
||||
// 已被覆盖或取消
|
||||
if (!_waiting.TryGetValue(id, out _))
|
||||
continue;
|
||||
|
||||
// 标记为运行中
|
||||
_waiting.Remove(id);
|
||||
_running.Add(id);
|
||||
|
||||
return id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Complete
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 任务执行完成(必须调用)
|
||||
/// </summary>
|
||||
public void Complete(Guid id)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_running.Remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Snapshot
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 当前等待中的任务快照
|
||||
/// </summary>
|
||||
public Guid[] Snapshot()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return _waiting.Keys.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// 状态信息(调试用)
|
||||
// ============================================================
|
||||
|
||||
public int WaitingCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _waiting.Count;
|
||||
}
|
||||
}
|
||||
|
||||
public int RunningCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _running.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,11 @@ public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyn
|
|||
|
||||
private readonly int _pageSize = 500;
|
||||
|
||||
/// <summary>
|
||||
/// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务
|
||||
/// </summary>
|
||||
/// <param name="stoppingToken"></param>
|
||||
/// <returns></returns>
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
|
|
@ -73,7 +78,6 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
private async Task WorkerLoop(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
|
|
@ -92,8 +96,16 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
|
||||
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
|
||||
|
||||
// ✅ 不要 return
|
||||
if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false)
|
||||
return;
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
//如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务
|
||||
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
|
||||
|
|
@ -116,7 +128,7 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
await oss.SyncFileAsync(file.Path.TrimStart('/'), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS);
|
||||
|
||||
file.IsSync = true;
|
||||
file.SyncFinishedTime = DateTime.UtcNow;
|
||||
file.SyncFinishedTime = DateTime.Now;
|
||||
|
||||
log.JobState = jobState.SUCCESS;
|
||||
}
|
||||
|
|
@ -127,7 +139,7 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
log.Msg = ex.Message[..300];
|
||||
}
|
||||
|
||||
log.EndTime = DateTime.UtcNow;
|
||||
log.EndTime = DateTime.Now;
|
||||
|
||||
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
|
||||
}
|
||||
|
|
@ -135,6 +147,11 @@ public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSync
|
|||
{
|
||||
_logger.LogError(ex, "Sync failed {Id}", id);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// ⭐⭐⭐ 永远执行
|
||||
_fileSyncQueue.Complete(id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1577,23 +1577,54 @@
|
|||
<param name="inQuery"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="T:IRaCIS.Core.Application.Service.FileSyncQueue">
|
||||
<member name="M:IRaCIS.Core.Application.Service.FileUploadRecordService.BatchAddSyncFileTask(IRaCIS.Core.Application.ViewModel.BatchAddSyncFileCommand)">
|
||||
<summary>
|
||||
同步队列 信号量
|
||||
批量设置为需要同步,并且设置优先级
|
||||
</summary>
|
||||
<param name="inComand"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="F:IRaCIS.Core.Application.Service.FileSyncQueue._queue">
|
||||
<summary>
|
||||
优先级队列(仅负责排序)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="F:IRaCIS.Core.Application.Service.FileSyncQueue._waiting">
|
||||
<summary>
|
||||
当前等待中的任务(唯一真实数据)
|
||||
key = Guid
|
||||
value = 最新 priority
|
||||
</summary>
|
||||
</member>
|
||||
<member name="F:IRaCIS.Core.Application.Service.FileSyncQueue._running">
|
||||
<summary>
|
||||
正在执行的任务(防止重复执行)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="F:IRaCIS.Core.Application.Service.FileSyncQueue._signal">
|
||||
<summary>
|
||||
worker 等待信号
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:IRaCIS.Core.Application.Service.FileSyncQueue.Enqueue(System.Guid,System.Int32)">
|
||||
<summary>
|
||||
入队(同 Guid 会覆盖优先级)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:IRaCIS.Core.Application.Service.FileSyncQueue.DequeueAsync(System.Threading.CancellationToken)">
|
||||
<summary>
|
||||
如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||||
获取一个待执行任务(无任务时自动等待)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:IRaCIS.Core.Application.Service.FileSyncQueue.Complete(System.Guid)">
|
||||
<summary>
|
||||
任务执行完成(必须调用)
|
||||
</summary>
|
||||
<param name="ct"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:IRaCIS.Core.Application.Service.FileSyncQueue.Snapshot">
|
||||
<summary>
|
||||
获取队列任务Id
|
||||
当前等待中的任务快照
|
||||
</summary>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:IRaCIS.Core.Application.Service.SyncQueueUseChannel.Enqueue(System.Guid,System.Int32)">
|
||||
<summary>
|
||||
|
|
|
|||
|
|
@ -228,4 +228,6 @@ public class UploadFileSyncRecordQuery : PageInput
|
|||
public class BatchAddSyncFileCommand
|
||||
{
|
||||
public List<Guid> FileUploadRecordIdList { get; set; }
|
||||
|
||||
public int? Priority { get; set; }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,12 +150,25 @@ public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRe
|
|||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// 批量设置为需要同步,并且设置优先级
|
||||
/// </summary>
|
||||
/// <param name="inComand"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<IResponseOutput> BatchAddSyncFileTask(BatchAddSyncFileCommand inComand)
|
||||
{
|
||||
|
||||
|
||||
await _fileUploadRecordRepository.BatchUpdateNoTrackingAsync(t => inComand.FileUploadRecordIdList.Contains(t.Id), u => new FileUploadRecord() { IsNeedSync = true, Priority = inComand.Priority });
|
||||
|
||||
foreach (var item in inComand.FileUploadRecordIdList)
|
||||
{
|
||||
_fileSyncQueue.Enqueue(item, inComand.Priority ?? 0);
|
||||
|
||||
}
|
||||
|
||||
|
||||
await _fileUploadRecordRepository.SaveChangesAsync();
|
||||
|
||||
return ResponseOutput.Ok();
|
||||
}
|
||||
|
|
@ -253,59 +266,206 @@ public class FileUploadRecordService(IRepository<FileUploadRecord> _fileUploadRe
|
|||
#region 同步队列
|
||||
|
||||
|
||||
public sealed class FileSyncQueue
|
||||
{
|
||||
/// <summary>
|
||||
/// 优先级队列(仅负责排序)
|
||||
/// </summary>
|
||||
private readonly PriorityQueue<Guid, int> _queue = new();
|
||||
|
||||
/// <summary>
|
||||
/// 同步队列 信号量
|
||||
/// 当前等待中的任务(唯一真实数据)
|
||||
/// key = Guid
|
||||
/// value = 最新 priority
|
||||
/// </summary>
|
||||
private readonly Dictionary<Guid, int> _waiting = new();
|
||||
|
||||
/// <summary>
|
||||
/// 正在执行的任务(防止重复执行)
|
||||
/// </summary>
|
||||
private readonly HashSet<Guid> _running = new();
|
||||
|
||||
/// <summary>
|
||||
/// worker 等待信号
|
||||
/// </summary>
|
||||
public class FileSyncQueue
|
||||
{
|
||||
private readonly PriorityQueue<Guid, int> _queue = new();
|
||||
private readonly SemaphoreSlim _signal = new(0);
|
||||
|
||||
|
||||
private readonly object _lock = new();
|
||||
|
||||
// ============================================================
|
||||
// Enqueue
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 入队(同 Guid 会覆盖优先级)
|
||||
/// </summary>
|
||||
public void Enqueue(Guid id, int priority)
|
||||
{
|
||||
bool needSignal = false;
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
// priority 越大越优先
|
||||
// 如果正在执行,忽略(防止重复)
|
||||
if (_running.Contains(id))
|
||||
return;
|
||||
|
||||
// 是否新任务(用于减少 signal 风暴)
|
||||
if (!_waiting.ContainsKey(id))
|
||||
needSignal = true;
|
||||
|
||||
// 更新为最新优先级(最后一次为准)
|
||||
_waiting[id] = priority; //等价于添加或者更新
|
||||
|
||||
// PriorityQueue 无法更新节点
|
||||
// 允许旧节点存在,Dequeue 时过滤
|
||||
_queue.Enqueue(id, -priority);
|
||||
}
|
||||
|
||||
//类似于计数器,不会产生通知风暴,可消费资源 +1
|
||||
//if (有等待线程) 唤醒一个 else 仅增加计数
|
||||
_signal.Release(); // 唤醒一个 worker
|
||||
// 只有新增任务才唤醒 worker
|
||||
if (needSignal)
|
||||
_signal.Release();
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Dequeue
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||||
/// 获取一个待执行任务(无任务时自动等待)
|
||||
/// </summary>
|
||||
/// <param name="ct"></param>
|
||||
/// <returns></returns>
|
||||
public async Task<Guid> DequeueAsync(CancellationToken ct)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
await _signal.WaitAsync(ct);
|
||||
|
||||
lock (_lock)
|
||||
{
|
||||
return _queue.Dequeue();
|
||||
while (_queue.Count > 0)
|
||||
{
|
||||
var id = _queue.Dequeue();
|
||||
|
||||
// 已被覆盖或取消 如果这个任务已经不是“当前最新版任务”,那它只是 PriorityQueue 里的垃圾数据,直接跳过。
|
||||
if (!_waiting.TryGetValue(id, out _)) //能从等待任务中取到,那么就是有效的,不能取到那么就是覆盖的
|
||||
continue;
|
||||
|
||||
// 标记为运行中
|
||||
_waiting.Remove(id);
|
||||
_running.Add(id);
|
||||
|
||||
return id;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Complete
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 获取队列任务Id
|
||||
/// 任务执行完成(必须调用)
|
||||
/// </summary>
|
||||
public void Complete(Guid id)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_running.Remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// Snapshot
|
||||
// ============================================================
|
||||
|
||||
/// <summary>
|
||||
/// 当前等待中的任务快照
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
public Guid[] Snapshot()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return _queue.UnorderedItems
|
||||
.Select(x => x.Element)
|
||||
.ToArray();
|
||||
return _waiting.Keys.ToArray();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// 状态信息(调试用)
|
||||
// ============================================================
|
||||
|
||||
public int WaitingCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _waiting.Count;
|
||||
}
|
||||
}
|
||||
|
||||
public int RunningCount
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _running.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
///// <summary>
|
||||
///// 同步队列 信号量
|
||||
///// </summary>
|
||||
//public class FileSyncQueue
|
||||
//{
|
||||
// private readonly PriorityQueue<Guid, int> _queue = new();
|
||||
// private readonly SemaphoreSlim _signal = new(0);
|
||||
// private readonly object _lock = new();
|
||||
|
||||
// public void Enqueue(Guid id, int priority)
|
||||
// {
|
||||
// lock (_lock)
|
||||
// {
|
||||
// // priority 越大越优先
|
||||
// _queue.Enqueue(id, -priority);
|
||||
// }
|
||||
|
||||
// //类似于计数器,不会产生通知风暴,可消费资源 +1
|
||||
// //if (有等待线程) 唤醒一个 else 仅增加计数
|
||||
// _signal.Release(); // 唤醒一个 worker
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 如果没有任务 → 挂起等待 有任务 → 被唤醒并返回
|
||||
// /// </summary>
|
||||
// /// <param name="ct"></param>
|
||||
// /// <returns></returns>
|
||||
// public async Task<Guid> DequeueAsync(CancellationToken ct)
|
||||
// {
|
||||
// await _signal.WaitAsync(ct);
|
||||
|
||||
// lock (_lock)
|
||||
// {
|
||||
// return _queue.Dequeue();
|
||||
// }
|
||||
// }
|
||||
|
||||
// /// <summary>
|
||||
// /// 获取队列任务Id
|
||||
// /// </summary>
|
||||
// /// <returns></returns>
|
||||
// public Guid[] Snapshot()
|
||||
// {
|
||||
// lock (_lock)
|
||||
// {
|
||||
// return _queue.UnorderedItems
|
||||
// .Select(x => x.Element)
|
||||
// .ToArray();
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
#region 这里不用 SyncQueueUseChannel 和调度器 SyncScheduler
|
||||
public class SyncQueueUseChannel
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in New Issue