135 lines
4.4 KiB
C#
135 lines
4.4 KiB
C#
using IRaCIS.Core.Application.Helper;
|
|
using IRaCIS.Core.Application.Service;
|
|
using IRaCIS.Core.Domain.Models;
|
|
using IRaCIS.Core.Infra.EFCore;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using System;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace IRaCIS.Core.API.HostService;
|
|
|
|
|
|
public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyncQueue _fileSyncQueue) : BackgroundService
|
|
{
|
|
|
|
private readonly int _pageSize = 500;
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
|
|
|
|
// 延迟启动,保证主机快速启动
|
|
await Task.Delay(5000, stoppingToken);
|
|
|
|
int page = 0;
|
|
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
// 分页获取未入队任务
|
|
var pending = await fileUploadRecordRepository
|
|
.Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null))
|
|
.OrderByDescending(x => x.Priority)
|
|
.Select(t => new { t.Id, t.Priority })
|
|
.Skip(page * _pageSize)
|
|
.Take(_pageSize)
|
|
.ToListAsync(stoppingToken);
|
|
|
|
if (!pending.Any())
|
|
break; // 扫描完毕,退出循环
|
|
|
|
foreach (var file in pending)
|
|
{
|
|
//file.IsQueued = true; // 避免重复入队
|
|
_fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列
|
|
}
|
|
|
|
page++; // 下一页
|
|
await Task.Delay(200, stoppingToken); // 缓解数据库压力
|
|
}
|
|
}
|
|
}
|
|
|
|
public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger<FileSyncWorker> _logger, FileSyncQueue _fileSyncQueue) : BackgroundService
|
|
{
|
|
|
|
// ⭐ 自动根据服务器CPU
|
|
private readonly int _workerCount = Math.Max(1, Environment.ProcessorCount - 1);
|
|
|
|
|
|
|
|
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
for (int i = 0; i < _workerCount; i++)
|
|
Task.Run(() => WorkerLoop(stoppingToken));
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
|
|
private async Task WorkerLoop(CancellationToken stoppingToken)
|
|
{
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
var id = await _fileSyncQueue.DequeueAsync(stoppingToken);
|
|
|
|
try
|
|
{
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<FileUploadRecord>>();
|
|
var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService<IRepository<UploadFileSyncRecord>>();
|
|
var oss = scope.ServiceProvider.GetRequiredService<IOSSService>();
|
|
|
|
var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id);
|
|
|
|
if (file == null || file.IsNeedSync != true)
|
|
return;
|
|
|
|
var log = new UploadFileSyncRecord
|
|
{
|
|
FileUploadRecordId = id,
|
|
StartTime = DateTime.Now,
|
|
JobState = jobState.RUNNING
|
|
};
|
|
|
|
await _uploadFileSyncRecordRepository.AddAsync(log);
|
|
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
|
|
|
|
try
|
|
{
|
|
await oss.SyncFileAsync(file.Path.TrimStart('/'), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS);
|
|
|
|
file.IsSync = true;
|
|
file.SyncFinishedTime = DateTime.UtcNow;
|
|
|
|
log.JobState = jobState.SUCCESS;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
log.JobState = jobState.FAILED;
|
|
|
|
log.Msg = ex.Message[..300];
|
|
}
|
|
|
|
log.EndTime = DateTime.UtcNow;
|
|
|
|
await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Sync failed {Id}", id);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|