using IRaCIS.Core.Application.Helper;
using IRaCIS.Core.Application.Service;
using IRaCIS.Core.Domain.Models;
using IRaCIS.Core.Infra.EFCore;
using IRaCIS.Core.SCP.Service;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace IRaCIS.Core.SCP;
public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyncQueue _fileSyncQueue) : BackgroundService
{
private readonly int _pageSize = 500;
///
/// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务
///
///
///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.CompletedTask;
//using var scope = _scopeFactory.CreateScope();
//var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>();
//// 延迟启动,保证主机快速启动
//await Task.Delay(5000, stoppingToken);
//int page = 0;
//while (!stoppingToken.IsCancellationRequested)
//{
// // 分页获取未入队任务
// var pending = await fileUploadRecordRepository
// .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
// .OrderByDescending(x => x.Priority)
// .Select(t => new { t.Id, t.Priority })
// .Skip(page * _pageSize)
// .Take(_pageSize)
// .ToListAsync(stoppingToken);
// if (!pending.Any())
// break; // 扫描完毕,退出循环
// foreach (var file in pending)
// {
// //file.IsQueued = true; // 避免重复入队
// _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
// }
// page++; // 下一页
// await Task.Delay(200, stoppingToken); // 缓解数据库压力
//}
}
}
public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger _logger, FileSyncQueue _fileSyncQueue) : BackgroundService
{
// ⭐ 自动根据服务器CPU
private readonly int _workerCount = Math.Max(1, Environment.ProcessorCount - 1);
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
for (int i = 0; i < _workerCount; i++)
Task.Run(() => WorkerLoop(stoppingToken));
return Task.CompletedTask;
}
private async Task WorkerLoop(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var id = await _fileSyncQueue.DequeueAsync(stoppingToken);
try
{
using var scope = _scopeFactory.CreateScope();
var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>();
var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService>();
var syncConfig = (scope.ServiceProvider.GetRequiredService>()).CurrentValue;
var oss = scope.ServiceProvider.GetRequiredService();
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
// ✅ 不要 return
if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false)
{
continue;
}
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
{
continue;
}
//如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务
if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false))
{
return;
}
var log = new UploadFileSyncRecord
{
FileUploadRecordId = id,
StartTime = DateTime.Now,
JobState = jobState.RUNNING
};
await _uploadFileSyncRecordRepository.AddAsync(log);
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
try
{
await oss.SyncFileAsync(file.Path.TrimStart('/'), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS);
file.IsSync = true;
file.SyncFinishedTime = DateTime.Now;
log.JobState = jobState.SUCCESS;
}
catch (Exception ex)
{
log.JobState = jobState.FAILED;
log.Msg = ex.Message[..300];
}
log.EndTime = DateTime.Now;
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Sync failed {Id}", id);
}
finally
{
// ⭐⭐⭐ 永远执行
_fileSyncQueue.Complete(id);
}
}
}
}