irc-netcore-api/IRC.Core.SCP/Service/CStoreSCPService.cs

678 lines
25 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using FellowOakDicom.Network;
using FellowOakDicom;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using IRaCIS.Core.SCP.Service;
using IRaCIS.Core.Domain.Models;
using IRaCIS.Core.Infra.EFCore;
using Medallion.Threading;
using IRaCIS.Core.Domain.Share;
using Serilog;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion.Internal;
using Microsoft.Extensions.Options;
using System.Data;
using FellowOakDicom.Imaging;
using SharpCompress.Common;
using SixLabors.ImageSharp.Formats.Jpeg;
using IRaCIS.Core.Infrastructure;
using IRaCIS.Core.Infrastructure.Extention;
using Newtonsoft.Json;
using FellowOakDicom.Imaging.Codec;
using FellowOakDicom.IO.Buffer;
using System.Diagnostics.CodeAnalysis;
using FellowOakDicom.Network.Client;
using MassTransit.Futures.Contracts;
using Microsoft.Identity.Client;
namespace IRaCIS.Core.SCP.Service
{
public class DicomSCPServiceOption
{
public bool IsSupportThirdService { get; set; }
public bool IsForwardImageMultiThread { get; set; }
public List<string> CalledAEList { get; set; }
public string ServerPort { get; set; }
}
public class ThirdDestinationAE
{
public int Port { get; set; }
public string Name { get; set; }
public string IP { get; set; }
}
public static class IRCAppConfig
{
public static IConfiguration Configuration { get; set; }
}
public class CStoreSCPService : DicomService, IDicomServiceProvider, IDicomCStoreProvider, IDicomCEchoProvider
{
//private IServiceProvider _injectServiceProvider { get; set; }
private IServiceProvider _serviceProvider { get; set; }
private List<Guid> _SCPStudyIdList => _ImageUploadList.Where(t => t.SCPStudyId != Guid.Empty).Select(t => t.SCPStudyId).ToList();
private List<ImageUploadInfo> _ImageUploadList { get; set; } = new List<ImageUploadInfo>();
private SCPImageUpload _upload { get; set; }
private DicomSCPServiceOption DicomSCPServiceConfig { get; set; }
public HospitalGroup CurrentHospitalGroup { get; set; }
private List<Guid> HospitalGroupIdList { get; set; }
private bool _releasedNormally = false;
private bool _isCurrentThirdForward = false;
private List<ThirdDestinationAE> ThirdDestinationAEList { get; set; }
private static readonly DicomTransferSyntax[] _acceptedTransferSyntaxes = new DicomTransferSyntax[]
{
DicomTransferSyntax.ExplicitVRLittleEndian,
DicomTransferSyntax.ExplicitVRBigEndian,
DicomTransferSyntax.ImplicitVRLittleEndian
};
private static readonly DicomTransferSyntax[] _acceptedImageTransferSyntaxes = new DicomTransferSyntax[]
{
// Lossless
DicomTransferSyntax.JPEGLSLossless,
DicomTransferSyntax.JPEG2000Lossless,
DicomTransferSyntax.JPEGProcess14SV1,
DicomTransferSyntax.JPEGProcess14,
DicomTransferSyntax.RLELossless,
// Lossy
DicomTransferSyntax.JPEGLSNearLossless,
DicomTransferSyntax.JPEG2000Lossy,
DicomTransferSyntax.JPEGProcess1,
DicomTransferSyntax.JPEGProcess2_4,
// Uncompressed
DicomTransferSyntax.ExplicitVRLittleEndian,
DicomTransferSyntax.ExplicitVRBigEndian,
DicomTransferSyntax.ImplicitVRLittleEndian
};
// 定义一个静态信号量,控制同时最多 N 个转发线程
private static SemaphoreSlim _forwardLimiter;
// ✅ 静态构造函数(只执行一次,全局初始化)
static CStoreSCPService()
{
// 默认单线程
var maxThreads = IRCAppConfig.Configuration.GetValue<int>("DicomSCPServiceConfig:MultiThreadCount", 1);
_forwardLimiter = new SemaphoreSlim(maxThreads);
Log.Logger.Information($"初始化 DICOM 转发线程限制为: {maxThreads}");
}
public CStoreSCPService(INetworkStream stream, Encoding fallbackEncoding, Microsoft.Extensions.Logging.ILogger log, DicomServiceDependencies dependencies, IServiceProvider injectServiceProvider)
: base(stream, fallbackEncoding, log, dependencies)
{
_serviceProvider = injectServiceProvider.CreateScope().ServiceProvider;
}
public Task OnReceiveAssociationRequestAsync(DicomAssociation association)
{
_upload = new SCPImageUpload() { StartTime = DateTime.Now, CallingAE = association.CallingAE, CalledAE = association.CalledAE, CallingAEIP = association.RemoteHost };
Log.Logger.Warning($"接收到来自{association.CallingAE}的连接");
var option = _serviceProvider.GetService<IOptionsMonitor<DicomSCPServiceOption>>().CurrentValue;
DicomSCPServiceConfig = option;
var _hospitalGroupRepository = _serviceProvider.GetService<IRepository<HospitalGroup>>();
var _dicomAERepository = _serviceProvider.GetService<IRepository<DicomAE>>();
ThirdDestinationAEList = _dicomAERepository.Where(t => t.PacsTypeEnum == PacsType.Destination).Select(t => new ThirdDestinationAE() { IP = t.IP, Port = t.Port, Name = t.CalledAE }).ToList();
var aeList = _dicomAERepository/*.Where(t => t.PacsTypeEnum == PacsType.PacsServer)*/.Select(t => t.CalledAE).ToList();
var list = _hospitalGroupRepository.Where(t => t.IsEnable).ToList();
CurrentHospitalGroup = list.FirstOrDefault(t => t.CallingAE == association.CallingAE);
var unionAEList = aeList.Union(list.Select(t => t.CallingAE)).ToList();
var calledAEList = option.CalledAEList;
if (!calledAEList.Contains(association.CalledAE) || !unionAEList.Any(t => t == association.CallingAE))
{
Log.Logger.Warning($"拒绝CalledAE:{association.CalledAE} CallingAE:{association.CallingAE}连接");
return SendAssociationRejectAsync(
DicomRejectResult.Permanent,
DicomRejectSource.ServiceUser,
DicomRejectReason.CalledAENotRecognized);
}
foreach (var pc in association.PresentationContexts)
{
if (pc.AbstractSyntax == DicomUID.Verification)
{
pc.AcceptTransferSyntaxes(_acceptedTransferSyntaxes);
}
else if (pc.AbstractSyntax.StorageCategory != DicomStorageCategory.None)
{
pc.AcceptTransferSyntaxes(_acceptedImageTransferSyntaxes);
}
}
return SendAssociationAcceptAsync(association);
}
public async Task OnReceiveAssociationReleaseRequestAsync()
{
if (_isCurrentThirdForward == false)
{
var _distributedLockProvider = _serviceProvider.GetService<IDistributedLockProvider>();
var @lock = _distributedLockProvider.CreateLock($"{_upload.CallingAE}");
using (await @lock.AcquireAsync())
{
await DataMaintenanceAsaync();
await AddUploadLogAsync();
_releasedNormally = true;
Log.Logger.Information($"进入释放连接请求 {_releasedNormally}");
}
}
await SendAssociationReleaseResponseAsync();
}
private async Task AddUploadLogAsync()
{
//转发第三方,那么不记录日志
if (_isCurrentThirdForward == false)
{
//记录监控
var _SCPImageUploadRepository = _serviceProvider.GetService<IRepository<SCPImageUpload>>();
_upload.EndTime = DateTime.Now;
_upload.StudyCount = _ImageUploadList.Count;
_upload.UploadJsonStr = (new SCPImageLog() { UploadList = _ImageUploadList }).ToJsonStr();
if (_upload.FileCount > 0)
{
//可能是测试echo 导致记录了
await _SCPImageUploadRepository.AddAsync(_upload, true);
}
}
}
private async Task DataMaintenanceAsaync()
{
Log.Logger.Warning($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE}传输结束开始维护数据处理检查Modality 以及自动创建访视,绑定检查");
var patientStudyService = _serviceProvider.GetService<IPatientStudyService>();
await patientStudyService.AutoBindingPatientStudyVisitAsync(_SCPStudyIdList);
//处理检查Modality
var _dictionaryRepository = _serviceProvider.GetService<IRepository<Dictionary>>();
var _seriesRepository = _serviceProvider.GetService<IRepository<SCPSeries>>();
var _studyRepository = _serviceProvider.GetService<IRepository<SCPStudy>>();
var dicModalityList = _dictionaryRepository.Where(t => t.Code == "Modality").SelectMany(t => t.ChildList.Select(c => c.Value)).ToList();
var seriesModalityList = _seriesRepository.Where(t => _SCPStudyIdList.Contains(t.StudyId)).Select(t => new { SCPStudyId = t.StudyId, t.Modality, t.StudyInstanceUid }).ToList();
foreach (var g in seriesModalityList.GroupBy(t => new { t.SCPStudyId, t.StudyInstanceUid }))
{
var modality = string.Join('、', g.Select(t => t.Modality).Distinct().ToList());
//特殊逻辑
var modalityForEdit = dicModalityList.Contains(modality) ? modality : String.Empty;
if (modality == "MR")
{
modalityForEdit = "MRI";
}
if (modality == "PT")
{
modalityForEdit = "PET";
}
if (modality == "PT、CT" || modality == "CT、PT")
{
modalityForEdit = "PET-CT";
}
await _studyRepository.BatchUpdateNoTrackingAsync(t => t.Id == g.Key.SCPStudyId, u => new SCPStudy() { Modalities = modality, ModalityForEdit = modalityForEdit });
}
Log.Logger.Warning($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE}维护数据结束");
}
public void OnReceiveAbort(DicomAbortSource source, DicomAbortReason reason)
{
Log.Logger.Warning($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE}接收中断,中断原因:{source.ToString() + reason.ToString()}");
/* nothing to do here */
}
public async void OnConnectionClosed(Exception exception)
{
if (_isCurrentThirdForward == false)
{
var _studyRepository = _serviceProvider.GetService<IRepository<SCPStudy>>();
if (exception != null || _releasedNormally == false)
{
//客户端断网,恢复后,也是没有异常的,估计是超时走了关闭
await _studyRepository.BatchUpdateNoTrackingAsync(t => _SCPStudyIdList.Contains(t.Id), u => new SCPStudy() { IsUploadFinished = true, IsUploadFaild = true });
//记录日志
await AddUploadLogAsync();
}
else
{
//将检查设置为传输结束
await _studyRepository.BatchUpdateNoTrackingAsync(t => _SCPStudyIdList.Contains(t.Id), u => new SCPStudy() { IsUploadFinished = true, IsUploadFaild = false });
}
await _studyRepository.SaveChangesAndClearAllTrackingAsync();
}
Log.Logger.Warning($"连接关闭 {_releasedNormally} {exception?.Message} {exception?.InnerException?.Message}");
}
private async Task<DicomStatus> ForwardToThirdPartyAsync(DicomCStoreRequest request, ThirdDestinationAE findDestination)
{
await _forwardLimiter.WaitAsync(); // 限制并发数量
try
{
var forwardRequest = new DicomCStoreRequest(request.File.Clone());
var client = DicomClientFactory.Create(
findDestination.IP,
findDestination.Port,
false,
DicomSCPServiceConfig.CalledAEList.First(),
findDestination.Name);
DicomStatus finalStatus = DicomStatus.Success;
forwardRequest.OnResponseReceived += (rq, rp) =>
{
Log.Logger.Information($"Forwarded C-STORE Response: {rq.SOPInstanceUID} {rp.Status}");
finalStatus = rp.Status; // 记录目标 PACS 返回状态
};
await client.AddRequestAsync(forwardRequest);
await client.SendAsync();
return finalStatus; // 返回实际状态
}
catch (Exception ex)
{
Log.Logger.Error("Error forwarding C-STORE: " + ex.Message);
return DicomStatus.ProcessingFailure; // 出错返回失败状态
}
finally
{
_forwardLimiter.Release();
}
}
public async Task<DicomCStoreResponse> OnCStoreRequestAsync(DicomCStoreRequest request)
{
string studyInstanceUid = request.Dataset.GetSingleValueOrDefault(DicomTag.StudyInstanceUID, string.Empty);
string seriesInstanceUid = request.Dataset.GetSingleValueOrDefault(DicomTag.SeriesInstanceUID, string.Empty);
string sopInstanceUid = request.Dataset.GetSingleValueOrDefault(DicomTag.SOPInstanceUID, string.Empty);
string patientIdStr = request.Dataset.GetSingleValueOrDefault(DicomTag.PatientID, string.Empty);
if (studyInstanceUid.IsNullOrEmpty() || seriesInstanceUid.IsNullOrEmpty() || sopInstanceUid.IsNullOrEmpty())
{
Log.Logger.Error($"接收数据读取StudyInstanceUID{studyInstanceUid}、SeriesInstanceUID{seriesInstanceUid}、SOPInstanceUID{sopInstanceUid}有空 ");
return new DicomCStoreResponse(request, DicomStatus.Success);
}
var _cmoveStudyRepository = _serviceProvider.GetService<IRepository<CmoveStudy>>();
#region 判断是否转发第三方影像
if (DicomSCPServiceConfig.IsSupportThirdService)
{
var cmoveInfo = _cmoveStudyRepository.Where(t => t.StudyInstanceUIDList.Any(c => c == studyInstanceUid)).OrderByDescending(t => t.CreateTime).FirstOrDefault();
//确定是第三方请求
if (cmoveInfo != null && ThirdDestinationAEList.Any(t => t.Name == cmoveInfo.DestinationAE))
{
_isCurrentThirdForward = true;
var findDestination = ThirdDestinationAEList.FirstOrDefault(t => t.Name == cmoveInfo.DestinationAE);
if (DicomSCPServiceConfig.IsForwardImageMultiThread)
{
// 多线程模式,异步执行
_ = Task.Run(() => ForwardToThirdPartyAsync(request, findDestination));
// 立即返回 Success
return new DicomCStoreResponse(request, DicomStatus.Success);
}
else
{
// 单线程模式,同步等待完成
var responseStatus = await ForwardToThirdPartyAsync(request, findDestination);
return new DicomCStoreResponse(request, responseStatus);
}
}
}
#endregion
//确保来了影像集合存在
if (!_ImageUploadList.Any(t => t.StudyInstanceUid == studyInstanceUid))
{
_ImageUploadList.Add(new ImageUploadInfo() { StudyInstanceUid = studyInstanceUid });
}
Guid seriesId = IdentifierHelper.CreateGuid(studyInstanceUid, seriesInstanceUid);
Guid instanceId = IdentifierHelper.CreateGuid(studyInstanceUid, seriesInstanceUid, sopInstanceUid);
var ossService = _serviceProvider.GetService<IOSSService>();
var dicomArchiveService = _serviceProvider.GetService<IDicomArchiveService>();
var _seriesRepository = _serviceProvider.GetService<IRepository<SCPSeries>>();
var _studyGroupRepository = _serviceProvider.GetService<IRepository<SCPStudyHospitalGroup>>();
var _distributedLockProvider = _serviceProvider.GetService<IDistributedLockProvider>();
var storeRelativePath = string.Empty;
var ossFolderPath = $"Dicom/{studyInstanceUid}";
long fileSize = 0;
try
{
using (MemoryStream ms = new MemoryStream())
{
await request.File.SaveAsync(ms);
#region 1帧拆成多个固定大小的方便移动端浏览
// 回到开头,读取 dicom
ms.Position = 0;
var dicomFile = DicomFile.Open(ms);
var pixelData = DicomPixelData.Create(dicomFile.Dataset);
var syntax = pixelData.Syntax;
// 每个 fragment 固定大小 (64KB 示例,可以自己调整)
int fragmentSize = 20 * 1024;
if (syntax.IsEncapsulated)
{
var newFragments = new DicomOtherByteFragment(DicomTag.PixelData);
for (int n = 0; n < pixelData.NumberOfFrames; n++)
{
var frameData = pixelData.GetFrame(n); // 获取完整一帧
var data = frameData.Data;
int offset = 0;
while (offset < data.Length)
{
int size = Math.Min(fragmentSize, data.Length - offset);
var buffer = new byte[size];
Buffer.BlockCopy(data, offset, buffer, 0, size);
newFragments.Fragments.Add(new MemoryByteBuffer(buffer));
offset += size;
}
}
// 替换原 PixelData
dicomFile.Dataset.AddOrUpdate(newFragments);
// 重新保存 dicom 到流
ms.SetLength(0);
dicomFile.Save(ms);
}
ms.Position = 0;
#endregion
#region 本地测试
//// --- 保存到本地文件测试 ---
//var localPath = @"D:\TestDicom.dcm";
//using (var fs = new FileStream(localPath, FileMode.Create, FileAccess.Write))
//{
// ms.CopyTo(fs);
//}
//return new DicomCStoreResponse(request, DicomStatus.Success);
#endregion
//irc 从路径最后一截取Guid
storeRelativePath = await ossService.UploadToOSSAsync(ms, ossFolderPath, instanceId.ToString(), false);
fileSize = ms.Length;
}
Log.Logger.Information($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE} {request.SOPInstanceUID} 上传完成 ");
}
catch (Exception ec)
{
Log.Logger.Warning($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE} 上传异常 {ec.Message}");
}
var @lock = _distributedLockProvider.CreateLock($"{studyInstanceUid}");
using (await @lock.AcquireAsync())
{
try
{
var scpStudyId = await dicomArchiveService.ArchiveDicomFileAsync(request.File, storeRelativePath, Association.CallingAE, Association.CalledAE, fileSize);
var series = await _seriesRepository.FirstOrDefaultAsync(t => t.Id == seriesId);
//没有缩略图
if (series != null && string.IsNullOrEmpty(series.ImageResizePath))
{
// 生成缩略图
using (var memoryStream = new MemoryStream())
{
DicomImage image = new DicomImage(request.Dataset);
var sharpimage = image.RenderImage().AsSharpImage();
sharpimage.Save(memoryStream, new JpegEncoder());
// 上传缩略图到 OSS
var seriesPath = await ossService.UploadToOSSAsync(memoryStream, ossFolderPath, seriesId.ToString() + ".preview.jpg", false);
Console.WriteLine(seriesPath + " Id: " + seriesId);
series.ImageResizePath = seriesPath;
//await _seriesRepository.BatchUpdateNoTrackingAsync(t => t.Id == seriesId, u => new SCPSeries() { ImageResizePath = seriesPath });
}
}
await _seriesRepository.SaveChangesAsync();
if (_ImageUploadList.Any(t => t.StudyInstanceUid == studyInstanceUid))
{
var find = _ImageUploadList.FirstOrDefault(t => t.StudyInstanceUid.Equals(studyInstanceUid));
find.SuccessImageCount++;
if (!find.PatientNameList.Any(t => t == patientIdStr) && patientIdStr.IsNotNullOrEmpty())
{
find.PatientNameList.Add(patientIdStr);
}
//首次 默认是Guid 空数据库归档出了Id
if (find.SCPStudyId != scpStudyId)
{
find.SCPStudyId = scpStudyId;
#region 给检查打课题组标签
//添加课题组标签
if (CurrentHospitalGroup != null)
{
if (!_studyGroupRepository.Any(t => t.SCPStudyId == scpStudyId && t.HospitalGroupId == CurrentHospitalGroup.Id))
{
await _studyGroupRepository.AddAsync(new SCPStudyHospitalGroup() { SCPStudyId = scpStudyId, HospitalGroupId = CurrentHospitalGroup.Id });
}
}
else
{
var findCmoveInfo = _cmoveStudyRepository.Where(t => t.StudyInstanceUIDList.Any(c => c == studyInstanceUid)).OrderByDescending(t => t.CreateTime).FirstOrDefault();
if (findCmoveInfo != null)
{
foreach (var item in findCmoveInfo.HopitalGroupIdList)
{
if (!_studyGroupRepository.Any(t => t.SCPStudyId == scpStudyId && t.HospitalGroupId == item))
{
await _studyGroupRepository.AddAsync(new SCPStudyHospitalGroup() { SCPStudyId = scpStudyId, HospitalGroupId = item });
}
}
}
else
{
Log.Logger.Warning($"未找到{studyInstanceUid}的Cmove记录");
}
}
#endregion
}
}
}
catch (Exception ex)
{
Log.Logger.Warning($"CallingAE:{Association.CallingAE} CalledAE:{Association.CalledAE} 传输处理异常:{ex.ToString()}");
if (_ImageUploadList.Any(t => t.StudyInstanceUid == studyInstanceUid))
{
var find = _ImageUploadList.FirstOrDefault(t => t.StudyInstanceUid.Equals(studyInstanceUid));
find.FailedImageCount++;
}
}
//监控信息设置
_upload.FileCount++;
_upload.FileSize = _upload.FileSize + fileSize;
return new DicomCStoreResponse(request, DicomStatus.Success);
}
}
public Task OnCStoreRequestExceptionAsync(string tempFileName, Exception e)
{
Log.Logger.Warning($"CStoreRequestException {tempFileName} {e?.Message} {e?.InnerException?.Message}");
// let library handle logging and error response
return Task.CompletedTask;
}
public Task<DicomCEchoResponse> OnCEchoRequestAsync(DicomCEchoRequest request)
{
return Task.FromResult(new DicomCEchoResponse(request, DicomStatus.Success));
}
}
}