irc-netcore-api/IRaCIS.Core.API/HostService/SyncFileRecoveryService.cs

135 lines
4.4 KiB
C#

using IRaCIS.Core.Application.Helper;
using IRaCIS.Core.Application.Service;
using IRaCIS.Core.Domain.Models;
using IRaCIS.Core.Infra.EFCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace IRaCIS.Core.API.HostService;
public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyncQueue _fileSyncQueue) : BackgroundService
{
private readonly int _pageSize = 500;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
// 延迟启动,保证主机快速启动
await Task.Delay(5000, stoppingToken);
int page = 0;
while (!stoppingToken.IsCancellationRequested)
{
// 分页获取未入队任务
var pending = await fileUploadRecordRepository
.Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
.OrderByDescending(x => x.Priority)
.Select(t => new { t.Id, t.Priority })
.Skip(page * _pageSize)
.Take(_pageSize)
.ToListAsync(stoppingToken);
if (!pending.Any())
break; // 扫描完毕,退出循环
foreach (var file in pending)
{
//file.IsQueued = true; // 避免重复入队
_fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
}
page++; // 下一页
await Task.Delay(200, stoppingToken); // 缓解数据库压力
}
}
}
public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSyncWorker> _logger, FileSyncQueue _fileSyncQueue) : BackgroundService
{
// ⭐ 自动根据服务器CPU
private readonly int _workerCount = Math.Max(1, Environment.ProcessorCount - 1);
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
for (int i = 0; i < _workerCount; i++)
Task.Run(() => WorkerLoop(stoppingToken));
return Task.CompletedTask;
}
private async Task WorkerLoop(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var id = await _fileSyncQueue.DequeueAsync(stoppingToken);
try
{
using var scope = _scopeFactory.CreateScope();
var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<UploadFileSyncRecord>>();
var oss = scope.ServiceProvider.GetRequiredService<IOSSService>();
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
if (file == null || file.IsNeedSync != true)
return;
var log = new UploadFileSyncRecord
{
FileUploadRecordId = id,
StartTime = DateTime.Now,
JobState = jobState.RUNNING
};
await _uploadFileSyncRecordRepository.AddAsync(log);
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
try
{
await oss.SyncFileAsync(file.Path.TrimStart("/"), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS);
file.IsSync = true;
file.SyncFinishedTime = DateTime.UtcNow;
log.JobState = jobState.SUCCESS;
}
catch (Exception ex)
{
log.JobState = jobState.FAILED;
log.Msg = ex.Message[..300];
}
log.EndTime = DateTime.UtcNow;
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Sync failed {Id}", id);
}
}
}
}