using IRaCIS.Core.Application.Helper; using IRaCIS.Core.Application.Service; using IRaCIS.Core.Domain.Models; using IRaCIS.Core.Infra.EFCore; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace IRaCIS.Core.API.HostService; public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyncQueue _fileSyncQueue) : BackgroundService { private readonly int _pageSize = 500; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { using var scope = _scopeFactory.CreateScope(); var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); // 延迟启动,保证主机快速启动 await Task.Delay(5000, stoppingToken); int page = 0; while (!stoppingToken.IsCancellationRequested) { // 分页获取未入队任务 var pending = await fileUploadRecordRepository .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null)) .OrderByDescending(x => x.Priority) .Select(t => new { t.Id, t.Priority }) .Skip(page * _pageSize) .Take(_pageSize) .ToListAsync(stoppingToken); if (!pending.Any()) break; // 扫描完毕,退出循环 foreach (var file in pending) { //file.IsQueued = true; // 避免重复入队 _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列 } page++; // 下一页 await Task.Delay(200, stoppingToken); // 缓解数据库压力 } } } public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger _logger, FileSyncQueue _fileSyncQueue) : BackgroundService { // ⭐ 自动根据服务器CPU private readonly int _workerCount = Math.Max(1, Environment.ProcessorCount - 1); protected override Task ExecuteAsync(CancellationToken stoppingToken) { for (int i = 0; i < _workerCount; i++) Task.Run(() => WorkerLoop(stoppingToken)); return Task.CompletedTask; } private async Task WorkerLoop(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var id = await _fileSyncQueue.DequeueAsync(stoppingToken); try { using var scope = _scopeFactory.CreateScope(); var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService>(); var oss = scope.ServiceProvider.GetRequiredService(); var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id); if (file == null || file.IsNeedSync != true) return; var log = new UploadFileSyncRecord { FileUploadRecordId = id, StartTime = DateTime.Now, JobState = jobState.RUNNING }; await _uploadFileSyncRecordRepository.AddAsync(log); await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken); try { await oss.SyncFileAsync(file.Path.TrimStart('/'), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS); file.IsSync = true; file.SyncFinishedTime = DateTime.UtcNow; log.JobState = jobState.SUCCESS; } catch (Exception ex) { log.JobState = jobState.FAILED; log.Msg = ex.Message[..300]; } log.EndTime = DateTime.UtcNow; await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Sync failed {Id}", id); } } } }