using IRaCIS.Core.Application.Helper; using IRaCIS.Core.Application.Service; using IRaCIS.Core.Domain.Models; using IRaCIS.Core.Infra.EFCore; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace IRaCIS.Core.API.HostService; public class SyncFileRecoveryService(IServiceScopeFactory _scopeFactory, FileSyncQueue _fileSyncQueue) : BackgroundService { private readonly int _pageSize = 500; /// /// 多个程序,如果恢复同一份数据,造成重复同步,SCP服务不恢复任务 /// /// /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { using var scope = _scopeFactory.CreateScope(); var fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); // 延迟启动,保证主机快速启动 await Task.Delay(5000, stoppingToken); int page = 0; while (!stoppingToken.IsCancellationRequested) { // 分页获取未入队任务 var pending = await fileUploadRecordRepository .Where(x => x.IsNeedSync == true && (x.IsSync == false || x.IsSync == null)) .OrderByDescending(x => x.Priority) .Select(t => new { t.Id, t.Priority }) .Skip(page * _pageSize) .Take(_pageSize) .ToListAsync(stoppingToken); if (!pending.Any()) break; // 扫描完毕,退出循环 foreach (var file in pending) { //file.IsQueued = true; // 避免重复入队 _fileSyncQueue.Enqueue(file.Id, file.Priority ?? 0); // 放入队列 } page++; // 下一页 await Task.Delay(200, stoppingToken); // 缓解数据库压力 } } } public class FileSyncWorker(IServiceScopeFactory _scopeFactory, ILogger _logger, FileSyncQueue _fileSyncQueue) : BackgroundService { // ⭐ 自动根据服务器CPU private readonly int _workerCount = Math.Max(1, Environment.ProcessorCount - 1); protected override Task ExecuteAsync(CancellationToken stoppingToken) { for (int i = 0; i < _workerCount; i++) Task.Run(() => WorkerLoop(stoppingToken)); return Task.CompletedTask; } private async Task WorkerLoop(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var id = await _fileSyncQueue.DequeueAsync(stoppingToken); try { using var scope = _scopeFactory.CreateScope(); var _fileUploadRecordRepository = scope.ServiceProvider.GetRequiredService>(); var _uploadFileSyncRecordRepository = scope.ServiceProvider.GetRequiredService>(); var syncConfig = (scope.ServiceProvider.GetRequiredService>()).CurrentValue; var oss = scope.ServiceProvider.GetRequiredService(); var file = await _fileUploadRecordRepository.FirstOrDefaultAsync(t => t.Id == id); // ✅ 不要 return if (file == null || file.IsNeedSync != true || syncConfig.IsOpenStoreSync == false) { continue; } if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) { continue; } //如果发现系统配置某一边同步进行了关闭,那么就直接返回,不执行任务 if (syncConfig.SyncConfigList.Any(t => t.UploadRegion == file.UploadRegion && t.IsOpenSync == false)) { return; } var log = new UploadFileSyncRecord { FileUploadRecordId = id, StartTime = DateTime.Now, JobState = jobState.RUNNING }; await _uploadFileSyncRecordRepository.AddAsync(log); await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken); try { await oss.SyncFileAsync(file.Path.TrimStart('/'), file.UploadRegion == "CN" ? ObjectStoreUse.AliyunOSS : ObjectStoreUse.AWS, file.UploadRegion == "CN" ? ObjectStoreUse.AWS : ObjectStoreUse.AliyunOSS); file.IsSync = true; file.SyncFinishedTime = DateTime.Now; log.JobState = jobState.SUCCESS; } catch (Exception ex) { log.JobState = jobState.FAILED; log.Msg = ex.Message[..300]; } log.EndTime = DateTime.Now; await _uploadFileSyncRecordRepository.SaveChangesAsync(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "Sync failed {Id}", id); } finally { // ⭐⭐⭐ 永远执行 _fileSyncQueue.Complete(id); } } } }