const Service = require("egg").Service; const child_process = require('child_process'); const fs = require('fs'); const readline = require('readline'); const path = require('path'); // const axios = require('axios') const http = require('http'); const request = require('request'); var querystring = require('querystring'); function getObjectMkdir(url) { let index = url.lastIndexOf("/") let path = url.substring(0, index); return path } class PacsService extends Service { async uploadfile(token, user_id, filepath) { console.log(this.config.upload.url) try { fs.accessSync(filepath, fs.F_OK); } catch (e) { console.log(`upload======error:file ${filepath} not exist`) return false } let filedata = fs.createReadStream(filepath); let options = { url: `${this.config.upload.url}/v1/datalist/upload`, // url: `https://www.rayplus.net/v1/datalist/upload`, headers: { 'Content-Type': 'multipart/form-data', 'Authorization': `Bearer ${token}` }, strictSSL: false, formData: { 'dicom': filedata, 'user_id': user_id } } return new Promise(function (resolve, reject) { request.post(options, function (error, response, body) { if (!error && response.statusCode == 200) { let bodydata = JSON.parse(body) console.log('下载完成') if (bodydata.code === 1 || bodydata.code === 40010) { resolve(true) } else { console.log(`upload======error:data ${bodydata.data}, msg : ${bodydata.message} ${filepath}`) resolve(false) } } else { console.log(`upload======error:other error ${error} ${filepath}`) resolve(false) } }) }) } //测试连接 async testConnection(pacs_ids) { let result = [] let PACS_SCP_AET = this.config.site.PACS_SCP_AET for (let i = 0; i < pacs_ids.length; i++) { try { let pacs_id = pacs_ids[i] let pacs = await this.ctx.model.Pacs.findOne({ _id: pacs_id }) if (!pacs) { result.push("pacs位置不存在") continue } let { aet, host, port } = pacs let para = `-v ${host} ${port} -aec ${aet} -aet ${PACS_SCP_AET}` let r = await this.echoscu(para) result.push(r) } catch (e) { result.push(e) } } return result } //^^ async echoscu(para) { return new Promise((resolve, reject) => { let CMD = `echoscu ${para}` try { child_process.exec(CMD, (error, stderr, stdout) => { if (error) { reject(error) } else if (stderr) { if (stderr.indexOf('Success') > -1) { resolve(true) } else { reject(stderr) } } else if (stdout) { if (stdout.indexOf('Success') > -1) { resolve(true) } else { reject(stdout) } } else { reject('no echoscu info') } }) } catch (error) { reject(error) } }) } //查询 async findFromPacs(body, config) { try { let pacs = await this.ctx.model.Pacs.findOne({ _id: body.pacs_id }).select("aet host port") let result = [] if (!pacs) throw "pacs位置不存在" let { aet, host, port } = pacs let dict = await this.parseQRDist() console.log(dict, 'dict') for (let modeIndex = 0; modeIndex < body.modeList.length; modeIndex++) { let mode = body.modeList[modeIndex] for (let StudyInstanceUIDIndex = 0; StudyInstanceUIDIndex < body.StudyInstanceUidList.length; StudyInstanceUIDIndex++) { let StudyInstanceUID = body.StudyInstanceUidList[StudyInstanceUIDIndex] let param = { PatientID: `${body.trialCode}_${body.subjectCode}`, StudyDescription: '', StudyID: '', PatientName: '', Modality: mode, StudyInstanceUID: StudyInstanceUID, SeriesInstanceUID: '', SeriesNumber: '' } let search = "" let enume = Object.keys(param) for (let i = 0; i < dict.length; i++) { let name = dict[i].name let item = param[name] if (item && item != '' && item != '*') { search = search + ` -k ${name}=*${item}*` } else { search = search + ` -k ${name}` } } //查询指定pacs //循环pacs 获取数据 let para = `${host} ${port} -aec ${aet} -aet ${config.aet} -S -k QueryRetrieveLevel=SERIES` + search let resp = await this.findscu(para, { aet, host, port }) result = Object.assign(result, resp) } } if (result) { for (let i = 0; i < result.length; i++) { await this.ctx.model.Series.findOneAndUpdate({ SeriesInstanceUID: result[i].SeriesInstanceUID, StudyInstanceUID: result[i].StudyInstanceUID, }, { $set: { Modality: result[i].Modality, SeriesNumber: result[i].SeriesNumber, PatientID: result[i].PatientID } }, { upsert: true, new: true }) } } return result } catch (e) { return (e) } } async storescu(pacs, config, study) { return new Promise((resolve, reject) => { try { // let CMD = `dcmsend -v +sd -aec ${pacs.aet} -aet ${config.aet} ${pacs.host} ${pacs.port} ${study}` let CMD = `dcmsend -v +sd -aec ${pacs.aet} -aet AET ${pacs.host} ${pacs.port} ${study}` console.log(CMD) child_process.exec(CMD, { timeout: 300000, maxBuffer: 400000 * 1024 }, async (error, stderr, stdout) => { if (error) { reject(error) } else if (stderr) { resolve(stderr) } else if (stdout) { resolve(stdout) } else { reject('no find info') } }) } catch (error) { reject(error) } }) } //^^ async findscu(para, obj) { return new Promise((resolve, reject) => { let CMD = `findscu ${para}` console.log(CMD) try { child_process.exec(CMD, { timeout: 10000, maxBuffer: 400000 * 1024 }, async (error, stderr, stdout) => { if (error) { reject(error) } else if (stderr) { let str = stderr.toString().replace(/\u0000/g, "") console.log('stderr', str) let resp = await this.parseFindResult(str, obj) resolve(resp) } else if (stdout) { let str = stdout.toString().replace(/\u0000/g, "") console.log('stdout', str) let resp = await this.parseFindResult(str, obj) resolve(resp) } else { resolve() } }) } catch (error) { reject(error) } }) } //^^ //解析文件 获取查询所需要的关键词 async parseQRDist() { return new Promise((resolve, reject) => { try { let filePath = this.config.site.PACS_QR_CFG let fRead = fs.createReadStream(filePath); let objReadline = readline.createInterface({ input: fRead, }) let result = [] objReadline.on('line', (line) => { result.push(line) }); objReadline.on('close', () => { let resp = [] for (let i = 0; i < result.length; i++) { if (result[i].toString().indexOf('#') > -1) { let ele = { tag: result[i].split(' ')[0], name: result[i].split(' ')[2].split('#')[1], arr: [] } resp.push(ele) } } resolve(resp) }); } catch (error) { reject(error) } }) } //^^ //整理查询结果 async parseFindResult(str, obj) { let result = [] let dict = await this.parseQRDist() console.log(dict) if (dict.length == 0) { return result } let arr = str.split('\n') var regex = /\[(.+?)\]/g; // [] 中括号 for (let i = 0; i < arr.length; i++) { for (let j = 0; j < dict.length; j++) { try { if (arr[i].indexOf(dict[j].tag) != -1) { let r = arr[i].match(regex)[0].replace('[', '').replace(']', '').replace(/(\s*$)/g, "") console.log('r', r) dict[j].arr.push(r) } } catch (error) { console.log(error) dict[j].arr.push('') } } } for (let i = 0; i < dict.length; i++) { let item = dict[i] for (let j = 0; j < item.arr.length; j++) { if (result[j]) { result[j][item.name] = item.arr[j] } else { result[j] = { [item.name]: item.arr[j] } } } } console.log('result', result) return result } async setDcmTempState(dcmtempList, para, state, done) { try { if (dcmtempList) { let dcmtempId = dcmtempList._id console.log(dcmtempId) await this.ctx.model.Dcmtemp.findOneAndUpdate({ _id: dcmtempId }, { $set: { done: done, status: state } }).exec() } else { let params = { host: para.host, port: para.port, aet: para.aet, done: done, NumberOfStudyRelatedInstances: para.NumberOfStudyRelatedInstances, StudyInstanceUID: para.StudyInstanceUID, status: state } let m = new this.ctx.model.Dcmtemp(params) m.save() } } catch (e) { console.log(e) } return } //下载 async movescu(body, series, config) { return new Promise(async (resolve, reject) => { try { let { pacs_id, seriesList, userName, userId, task_id, trialCode, subjectCode, TaskCode } = body let pacs = await this.ctx.model.Pacs.findOne({ _id: pacs_id }).select("aet host port") if (!pacs) throw "pacs位置不存在" fs.mkdirSync(config.ossPacsPath + getObjectMkdir(`/${trialCode}/${subjectCode}/${TaskCode}/${userId}/1`), { recursive: true }); let directoryPath = config.ossPacsPath + `/${trialCode}/${subjectCode}/${TaskCode}/${userId}/` let paraArr = [`${pacs.host}`, ` ${pacs.port}`, `-aec`, `${pacs.aet}`, `-aet`, `${config.aet}`, `-od`, `${directoryPath}`, `-S`, `-k`, `QueryRetrieveLevel=SERIES`, `-k`, `SeriesInstanceUID=${series}`, `+P`, `${config.port}`, '-v'] let movescu = child_process.spawn('movescu', paraArr) let cmd = `movescu ${pacs.host} ${pacs.port} -aec ${pacs.aet} -aet ${config.aet} -od ${directoryPath} -S -k QueryRetrieveLevel=SERIES -k SeriesInstanceUID=${series} +P ${config.port} +xa -v` movescu.stdout.on('data', (data) => { console.log(`move======stdout: ${data}`); }) movescu.stderr.on('data', (data) => { console.log(`move======pending: ${data}`); }) movescu.on('close', async (code, data) => { resolve() }); } catch (e) { console.log(e) } }) } //^^ //读取文件夹子文件个数 async readNum(fileName) { let filePath = `${this.config.site.PACS_SCP_STORE}/${fileName}` return new Promise((resolve, reject) => { fs.readdir(filePath, (err, files) => { if (err) { //未下载 resolve(-1) } else { resolve(files.length) } }) }) } //保存patient async dicomInfoSave(filesName, user_id) { let filesPath = `${this.config.site.PACS_SCP_STORE}/${filesName}` fs.readdir(filesPath, (err, files) => { if (err) { } else { for (let i = 0; i < 1; i++) { let fileName = files[i] let filePath = `${filesPath}/${fileName}` let fileData = fs.readFileSync(filePath) this.setStudy(fileData, user_id) } } }) } //^^ setStudy(fileData, user_id) { return new Promise(async (resolve, reject) => { let type = fileData.toString('utf8', 128, 132); if (type !== 'DICM') return reject('数据不正确'); let dicomInfo; try { //获取dicom信息 并初步校验数据正确性 dicomInfo = this.service.util.readDicomToJson(fileData) } catch (error) { //第一次读取失败 接着测试第二次读取 dicomInfo = this.service.util.readDicom(fileData); } if (!dicomInfo) return reject('数据不正确'); if (!dicomInfo.serie.SeriesInstanceUID) return reject('数据不正确'); if (!dicomInfo.image.InstanceNumber) { if (!dicomInfo.image.SOPInstanceUID) return reject('数据不正确'); try { let num = await this.createNewInstanceNumber(dicomInfo) if (!num) throw "生成失败"; dicomInfo.image.InstanceNumber = num; } catch (error) { return reject('数据不正确'); } } try { let image = await this.service.image.findAndUpsert(dicomInfo.image, user_id); let serie = await this.service.series.findAndUpsert(dicomInfo.serie, user_id, image.id); let study = await this.service.study.findAndUpsert(dicomInfo.study, user_id, serie); //存储dicom文件 image.path = await this.service.util.dicomSave(fileData, dicomInfo.image, serie._id); //将Dicom转化成png await this.service.util.createPng(serie, image); //如果存在image_id则增加 if (image.id) { serie.images_ids.addToSet(image.id) } if (!serie.study_id || serie.study_id != study.id) { serie.study_id = study._id; } // if (!this.config.debug) { // //放入待上传列表OSS表中 // await this.service.oss.prepareUpload(serie, image, '1'); // await this.service.oss.prepareUpload(serie, image, '2'); // } await image.save(); await serie.save(); return resolve(); } catch (error) { //dicom存储失败,删除上面创建的series this.logger.error(error); return reject('上传失败'); } }) } addOrUpdateArchiveStudy(data, Authorization, config) { return new Promise((resolve, reject) => { let options = { hostname: '106.14.89.110', port: 30000, path: "/Study/preArchiveDicomStudy", method: "POST", headers: { Authorization: Authorization } } const req = http.request(options, r => { let data = ''; r.on('data', chunk => { data += chunk }) r.on('end', () => { resolve(JSON.parse(data)) }) }) req.write(data) req.end() // axios({ // method: 'post', // data: data, // headers: { // Authorization: Authorization // }, // url: config.eiscPath + '/api/Study/preArchiveDicomStudy' // }).then(res => { // console.log(res.data) // resolve(res.data) // }) }) } preArchiveDicomStudy(info, config) { info.IsDicomReUpload = true return new Promise((resolve, reject) => { let options = { hostname: '106.14.89.110', port: 30000, path: "/Study/preArchiveDicomStudy", method: "POST", headers: { Authorization: info.Authorization } } const req = http.request(options, r => { let data = ''; r.on('data', chunk => { data += chunk }) r.on('end', () => { resolve(JSON.parse(data)) }) }) req.write(info) req.end() // axios({ // method: 'post', // data: info, // headers: { // Authorization: info.Authorization // }, // url: config.eiscPath + '/api/Study/preArchiveDicomStudy' // }).then(res => { // console.log(res.data) // resolve(res.data) // }) }) } archiveStudy(uploadQueues, index, info, config) { return new Promise((resolve, reject) => { this.ctx.service.pacs.preArchiveDicomStudy(info, config).then(async res => { let dicomInfo = uploadQueues[index].dicomInfo let seriesNum = uploadQueues[index].seriesList.length let fileNum = uploadQueues[index].fileList.length let seriesList = uploadQueues[index].seriesList dicomInfo.seriesNum = seriesNum dicomInfo.fileNum = fileNum dicomInfo.siteId = info.siteId dicomInfo.subjectId = info.subjectId dicomInfo.subjectVisitId = info.subjectVisitId dicomInfo.visitTaskId = info.visitTaskId let params = { trialId: info.trialId, siteId: info.siteId, subjectId: info.subjectId, subjectVisitId: info.subjectVisitId, studyMonitorId: res.Result, failedFileCount: 0, study: { studyId: dicomInfo.studyId, studyInstanceUid: dicomInfo.studyUid, studyTime: dicomInfo.studyTime, description: dicomInfo.description, seriesCount: dicomInfo.seriesNum, instanceCount: dicomInfo.fileNum, institutionName: dicomInfo.institutionName, patientId: dicomInfo.patientId, patientName: '', patientAge: '', patientSex: dicomInfo.patientSex, accessionNumber: dicomInfo.accNumber, patientBirthDate: '', acquisitionTime: dicomInfo.acquisitionTime, acquisitionNumber: dicomInfo.acquisitionNumber, triggerTime: dicomInfo.triggerTime, bodyPartExamined: '', seriesList: [], } } for (let i = 0; i < seriesList.length; i++) { let v = seriesList[i] let instanceList = [] let ImageResizePath for (let ii = 0; ii < v.instanceList.length; ii++) { let o = v.instanceList[ii] if (o.isReUpload) { } else { let htmlPath, htmlFile, htmlOssPath, htmlRes if (o.modality == 'SR') { htmlPath = await this.dsr2html(o.path) htmlFile = fs.readFileSync(o.path + '.html') htmlOssPath = `/${params.trialId}/Image/${params.siteId}/${params.subjectId}/${params.subjectVisitId}/treatment/${dicomInfo.studyUid}/${v.seriesUid}/${o.instanceUid}/${this.ctx.service.util.getGuid(dicomInfo.studyUid + v.seriesUid + o.instanceUid + params.trialId)}.html` htmlRes = await this.ctx.service.oss.dcmUpload(htmlOssPath, htmlFile, config) } let path = `/${params.trialId}/Image/${params.siteId}/${params.subjectId}/${params.subjectVisitId}/treatment/${dicomInfo.studyUid}/${v.seriesUid}/${o.instanceUid}/${this.ctx.service.util.getGuid(dicomInfo.studyUid + v.seriesUid + o.instanceUid + params.trialId)}` console.log(path) let res = await this.ctx.service.oss.dcmUpload(path, o.file, config) if (!res) { params.failedFileCount++ } else { } instanceList.push({ studyInstanceUid: dicomInfo.studyUid, seriesInstanceUid: v.seriesUid, sopInstanceUid: o.instanceUid, instanceNumber: o.instanceNumber, instanceTime: o.instanceTime, imageRows: o.imageRows, imageColumns: o.imageColumns, sliceLocation: o.sliceLocation, sliceThickness: o.sliceThickness, numberOfFrames: o.numberOfFrames, pixelSpacing: o.pixelSpacing, imagerPixelSpacing: o.imagerPixelSpacing, frameOfReferenceUID: o.frameOfReferenceUID, windowCenter: o.windowCenter, windowWidth: o.windowWidth, path: path, htmlPath: htmlOssPath }) } } params.study.seriesList.push({ visitTaskId: dicomInfo.visitTaskId, studyInstanceUid: dicomInfo.studyUid, seriesInstanceUid: v.seriesUid, seriesNumber: v.seriesNumber, seriesTime: v.seriesTime, sliceThickness: v.sliceThickness, imagePositionPatient: v.imagePositionPatient, imageOrientationPatient: v.imageOrientationPatient, sequenceName: v.sequenceName, protocolName: v.protocolName, imagerPixelSpacing: v.imagerPixelSpacing, acquisitionTime: dicomInfo.acquisitionTime, acquisitionNumber: dicomInfo.acquisitionNumber, triggerTime: dicomInfo.triggerTime, modality: v.modality, description: v.description, instanceCount: v.instanceList.length, bodyPartExamined: dicomInfo.bodyPart, instanceList: instanceList, ImageResizePath: ImageResizePath, }) } params.study.instanceCount = params.failedFileCount console.log('params', JSON.stringify(params)) let a = await this.ctx.service.pacs.addOrUpdateArchiveStudy(params, info.Authorization, config) resolve(a) // addOrUpdateArchiveStudy(params).then(res => { // resolve('上传成功') // }).catch((res) => { // reject() // clearInterval(t) // }) }) }) } async dsr2html(path) { return new Promise((resolve, reject) => { let htmlPath = `${path}.html` let CMD = `dsr2html ${path} ${htmlPath}` try { child_process.exec(CMD, { timeout: 10000, maxBuffer: 400000 * 1024 }, async (error, stderr, stdout) => { if (error) { reject(error) } else if (stderr) { resolve(htmlPath) } else if (stdout) { resolve(htmlPath) } else { resolve() } }) } catch (error) { reject(error) } }) } //^^ //当没有InstanceNumber时根据SOPInstanceUID不同生成 async createNewInstanceNumber(dicomInfo) { let SeriesInstanceUID = dicomInfo.image.SeriesInstanceUID; let SOPInstanceUID = dicomInfo.image.SOPInstanceUID; //获取该series下的images let serie = await this.ctx.model.Series.findOne({ SeriesInstanceUID: SeriesInstanceUID }) .populate({ path: "images_ids", }) let num = 1; if (serie && serie.images_ids) { for (let i = 0; i < serie.images_ids.length; i++) { let image = serie.images_ids[i]; if (image.SOPInstanceUID == SOPInstanceUID) { num = image.InstanceNumber; break; } //如果num小于当前image.InstanceNumber则image.InstanceNumber+1赋给num num = num < image.InstanceNumber ? image.InstanceNumber + 1 : num; } } return num; } } module.exports = PacsService;