458 lines
19 KiB
JavaScript
458 lines
19 KiB
JavaScript
const {
|
||
CreateMultipartUploadCommand,
|
||
UploadPartCommand,
|
||
CompleteMultipartUploadCommand,
|
||
ListMultipartUploadsCommand,//bucket中正在上传的文件列表
|
||
ListPartsCommand,//列出文件已上传的分片
|
||
GetObjectCommand,//获取文件
|
||
} = require("@aws-sdk/client-s3");
|
||
import SparkMD5 from "./spark-md5.min.js";
|
||
import store from "@/store";
|
||
let timer = null, // 网速定时器
|
||
bytesReceivedPerSecond = {}; // 时间节点上传文件总量
|
||
export function AWSclose() {
|
||
if (timer) {
|
||
clearInterval(timer);
|
||
timer = null;
|
||
store.state.trials.uploadTip = '0kb/s'
|
||
}
|
||
bytesReceivedPerSecond = {};
|
||
}
|
||
//查询文件是否存在于bucket或者正在上传
|
||
export async function exist(s3, bucket, fileInformation, progressFn, changeStatus) {
|
||
// 拿到上传到的file
|
||
const File = fileInformation.file;
|
||
// 拿到上传的size
|
||
const uploadFileSize = File.size; // 这里拿到的单位是字节(uploadFileSize/ 1024 / 1024
|
||
// = 多少兆)
|
||
// 设置每一片的大小,shardSize 指定上传的每个分片的大小,范围为100 KB~5 GB。
|
||
// 分片标准为5MB,文件总大小大于5GB分片为20MB
|
||
let shardSize = 5 * 1024 * 1024;
|
||
if (uploadFileSize < shardSize) {
|
||
shardSize = uploadFileSize;
|
||
}
|
||
if (uploadFileSize > 5 * 1024 * 1024 * 1024) {
|
||
shardSize = 20 * 1024 * 1024;
|
||
}
|
||
fileInformation = Object.assign({
|
||
shardSize,
|
||
sharding: []
|
||
}, fileInformation)
|
||
if (fileInformation.speed) {
|
||
setTimer();
|
||
}
|
||
// 1、查询该文件是否已上传到bucket
|
||
//判断sharding里面是否有东西,有东西证明已经上传过分片了,不需要再进行检测
|
||
if (fileInformation.sharding.length === 0) {
|
||
let existBucket = await existInBucket({ s3, bucket, fileInformation: fileInformation });
|
||
if (existBucket === 'true') {
|
||
progressFn(0, fileInformation.file, 1);
|
||
changeStatus(fileInformation.path, 'success');//直接告诉前端,状态
|
||
return;
|
||
} else if (existBucket === 'same key') {
|
||
console.log(fileInformation.path + " bucket中存在同名不同内容的文件");
|
||
} else if (existBucket === 'not exist') {
|
||
console.log(fileInformation.path + " bucket中不存在该文件");
|
||
}
|
||
//2、查询该文件是否存在上传事件
|
||
let upload = await existUpload({ s3, bucket: bucket, fileInformation: fileInformation });
|
||
if (upload.code === 0) {
|
||
//存在该上传事件并且已经上传了多个分片
|
||
console.log(fileInformation.path + " 存在上传事件,并已经上传多个分片");
|
||
//将分片存入sharding
|
||
const uploadId = upload.uploadId;
|
||
let parts = upload.parts;
|
||
let SIZE = 0;
|
||
for (let i = 0; i < parts.length; i++) {
|
||
SIZE += parts[i].Size;
|
||
fileInformation.sharding.push({ ETag: parts[i].ETag, PartNumber: parts[i].PartNumber, Size: parts[i].Size, UploadId: uploadId });
|
||
}
|
||
progressFn(SIZE / uploadFileSize, { fileSize: uploadFileSize }, 0);//告诉前端,加入分片
|
||
//重新上传
|
||
await uploadFile({ fileInformation: fileInformation, uploadId: uploadId, bucket, changeStatus, getSuspend, progressFn });
|
||
} else if (upload.code === 1) {
|
||
// //重名但是不同文件
|
||
console.log('err 重名文件')
|
||
changeStatus(fileInformation.path, 'same key');
|
||
} else if (upload.code === 2) {
|
||
//没有上传事件
|
||
console.log(fileInformation.path + " 不存在上传事件");
|
||
//建立分段上传事件
|
||
const connect = await createMultipartUpload({ s3, bucket: bucket, key: fileInformation.path, type: fileInformation.file.type });
|
||
//上传整个文件
|
||
await uploadFile({ s3, fileInformation: fileInformation, uploadId: connect.UploadId, bucket: bucket, changeStatus, progressFn });
|
||
}
|
||
} else {
|
||
//分片组里面有东西
|
||
//重新上传
|
||
await uploadFile({ s3, fileInformation: fileInformation, uploadId: fileInformation.sharding[0].UploadId, bucket, changeStatus, progressFn });
|
||
}
|
||
}
|
||
|
||
//上传文件未上传的所有分片
|
||
async function uploadFile({ s3, fileInformation, uploadId, bucket, changeStatus, progressFn }) {// file:上传文件, uploadId parts:已上传的分片
|
||
const chunkCount = Math.ceil(fileInformation.file.size / fileInformation.shardSize)//总分片数
|
||
//循环切片并上传
|
||
for (let i = 0; i < chunkCount; i++) {
|
||
let start = i * fileInformation.shardSize;//文件分片开始位置
|
||
let end = Math.min(fileInformation.file.size, start + fileInformation.shardSize)//文件分片结束位置
|
||
let _chunkFile = fileInformation.file.slice(start, end);//切片文件 即 待上传文件分片
|
||
//判断parts中是否存在该分片
|
||
let res1 = fileInformation.sharding.filter((part) => {
|
||
return part.PartNumber === (i + 1);
|
||
});
|
||
if (res1.length === 0) {
|
||
//不包含该分片
|
||
const upload = await uploadPart({ s3, f: _chunkFile, uploadId: uploadId, key: fileInformation.path, bucket: bucket, num: i + 1 });//将分片上传
|
||
//判断sharding中是否存在该分片,如果不存在的话,才判错
|
||
let res2 = fileInformation.sharding.filter((part) => {
|
||
return part.PartNumber === (i + 1);
|
||
});
|
||
if (res2.length === 0) {
|
||
if (upload !== 'err') {//上传分片成功,并且没有暂停上传
|
||
//判断是否存在该分片
|
||
//判断parts中是否存在该分片
|
||
let res3 = fileInformation.sharding.filter((part) => {
|
||
return part.PartNumber === (i + 1);
|
||
});
|
||
if (res3.length === 0) {
|
||
let LASTSIZE = fileInformation.sharding.reduce((sum, item) => sum + item.Size, 0)
|
||
fileInformation.sharding.push({ ETag: upload.ETag, PartNumber: i + 1, Size: _chunkFile.size, UploadId: uploadId });//上传成功,存到sharding
|
||
let SIZE = fileInformation.sharding.reduce((sum, item) => sum + item.Size, 0)
|
||
let lastPercentage = LASTSIZE / fileInformation.file.size, percentage = SIZE / fileInformation.file.size;
|
||
progressFn(percentage, fileInformation.file, lastPercentage);
|
||
if (fileInformation.speed) {
|
||
let time = new Date().getTime();
|
||
let timeList = Object.keys(bytesReceivedPerSecond).sort((a, b) => a - b);
|
||
let bytesTime = timeList.find(item => time - item < 1000);
|
||
if (bytesTime) {
|
||
bytesReceivedPerSecond[bytesTime] += fileInformation.file.size * (percentage - lastPercentage);
|
||
} else {
|
||
console.log("未查询到时间")
|
||
if (timeList.length > 0) {
|
||
bytesReceivedPerSecond[timeList[timeList.length - 1]] += fileInformation.file.size * (percentage - lastPercentage);
|
||
} else {
|
||
bytesReceivedPerSecond[time] = fileInformation.file.size * (percentage - lastPercentage);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
} else if (upload === 'err') {
|
||
changeStatus(fileInformation.path, 'err');
|
||
return;
|
||
}
|
||
}
|
||
|
||
}
|
||
}//for
|
||
if (fileInformation.sharding.length === chunkCount) {
|
||
//合并分片
|
||
const complete = await completeMultipartUpload({ s3, bucket: bucket, key: fileInformation.path, sharding: fileInformation.sharding, uploadId: uploadId });
|
||
if (complete !== 'err') {
|
||
changeStatus(fileInformation.path, 'success');//通知前端,上传成功
|
||
} else {
|
||
changeStatus(fileInformation.path, 'err');//通知前端,上传失败
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
// 判断该文件是否已经存在于bucket
|
||
// bucket file:上传文件
|
||
// 返回值 'same key':同名不同文件 'not exist':不存在该文件 'true':该文件已存在bucket中
|
||
async function existInBucket({ s3, bucket, fileInformation }) {
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
//getObject 每次最多传回767448b的数据,所以要分段请求
|
||
let bucketFileUniArray = [];
|
||
// 分段
|
||
let count = Math.ceil(fileInformation.file.size / 767448);
|
||
if (count > 4) {
|
||
count = 4;
|
||
}
|
||
for (let i = 0; i < count; i++) {
|
||
const obj = await getObject({ s3, bucket: bucket, fileInformation: fileInformation, count: i });
|
||
if (obj !== 'err') {
|
||
//获取文件的文件体 计算某个分片的md5
|
||
const fileBody = obj.Body;
|
||
let fileUnitArray = await fileBody.transformToByteArray();
|
||
bucketFileUniArray = [...bucketFileUniArray, ...fileUnitArray];
|
||
} else {
|
||
return 'not exist';
|
||
}
|
||
}
|
||
let bucketFileBufferArray = new Uint8Array(bucketFileUniArray);
|
||
// 将传入文件的fileReader 转成 arrayBuffer
|
||
let fileArrayBuff = null;
|
||
fileArrayBuff = await new Promise((resolve) => {
|
||
let fileReader = new FileReader();
|
||
fileReader.readAsArrayBuffer(fileInformation.file.slice(0, count * 767448));
|
||
fileReader.onload = (e) => {
|
||
resolve(e.target.result);
|
||
};
|
||
});
|
||
if (fileArrayBuff.byteLength > count * 767448) {
|
||
fileArrayBuff = fileArrayBuff.slice(0, count * 767448);
|
||
}
|
||
let bodyMD5 = await getMD5({ arrayBuffer: bucketFileBufferArray.buffer });
|
||
let fileMD5 = await getMD5({ arrayBuffer: fileArrayBuff });
|
||
if (bodyMD5 === fileMD5) {
|
||
//证明是同一个文件 秒传
|
||
return 'true';
|
||
} else {
|
||
return 'same key';
|
||
}
|
||
}
|
||
|
||
//判断该文件是否正在上传
|
||
// bucket:bucket file:上传文件
|
||
//返回值 'not exist upload':不存在上传事件 'same key':同名不同文件
|
||
async function existUpload({ s3, bucket, fileInformation }) {
|
||
//判断该文件是否有上传事件
|
||
const listUploads = await listMultipartUploadsCommand({ s3, bucket: bucket, key: fileInformation.path });
|
||
if (listUploads !== 'err') {
|
||
if (listUploads.Uploads !== undefined && listUploads.Uploads.length > 0) {
|
||
//存在上传事件 获取上传的第一个分片的eTag,计算传入文件md5,相比较是否相同
|
||
const uploads = listUploads.Uploads;
|
||
for (const one in uploads) {//可能存在多个连接
|
||
let uploadOne = uploads[one];
|
||
const uploadId = uploadOne.UploadId;//UploadId
|
||
const key = uploadOne.Key;//key
|
||
//查询该文件已上传分片
|
||
const listParts = await listPartsCommand({ s3, bucket: bucket, key: key, uploadId: uploadId });
|
||
if (listParts !== 'err') {
|
||
if (listParts.Parts !== undefined && listParts.Parts.length !== 0) {
|
||
//存在分片
|
||
let etag = listParts.Parts[0].ETag;
|
||
//计算文件的第一个分片的md5
|
||
let fileSlice = null;
|
||
if (fileInformation.file.size > fileInformation.shardSize) {
|
||
fileSlice = fileInformation.file.slice(0, fileInformation.shardSize);
|
||
} else {
|
||
fileSlice = fileInformation.file;
|
||
}
|
||
let fileMD5 = await new Promise((resolve) => {
|
||
const fileReader = new FileReader();
|
||
var spark = new SparkMD5.ArrayBuffer();
|
||
fileReader.readAsArrayBuffer(fileSlice);
|
||
fileReader.onload = (e) => {
|
||
spark.append(e.target.result);
|
||
var m = spark.end();
|
||
resolve(m);
|
||
};
|
||
});
|
||
if (etag.split('"')[1] === fileMD5) {
|
||
//是同一个文件上传
|
||
return {
|
||
code: 0,
|
||
message: 'true',
|
||
uploadId: uploadId,
|
||
key: key,
|
||
parts: listParts.Parts
|
||
}
|
||
} else {
|
||
//同名不同文件
|
||
continue;
|
||
}
|
||
} else {
|
||
//该文件有进行上传,但没有上传完成一个分片
|
||
continue;
|
||
}
|
||
} else {
|
||
//有连接,没上传分片
|
||
continue;
|
||
}
|
||
}//for
|
||
return {
|
||
code: 1,
|
||
message: 'same key'
|
||
}
|
||
} else {
|
||
//无连接
|
||
return {
|
||
code: 2,
|
||
message: 'not exist upload'
|
||
};
|
||
}
|
||
} else {
|
||
//无连接
|
||
return {
|
||
code: 2,
|
||
message: 'not exist upload'
|
||
};
|
||
}
|
||
}
|
||
|
||
//计算arrayBuffer的md5值
|
||
async function getMD5({ arrayBuffer }) {
|
||
return await new Promise((resolve) => {
|
||
const spark = new SparkMD5.ArrayBuffer();
|
||
spark.append(arrayBuffer);
|
||
const m = spark.end();
|
||
resolve(m);
|
||
});
|
||
}
|
||
|
||
//建立文件上传事件
|
||
async function createMultipartUpload({ s3, bucket, key, type }) {//bucket:bucket key:文件名 type:文件类型
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
const params = {
|
||
Bucket: bucket,
|
||
Key: key,
|
||
ContentType: type
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new CreateMultipartUploadCommand(params));
|
||
} catch (err) {
|
||
console.log('建立上传事件失败:', err.message)
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
|
||
//上传一个分片
|
||
async function uploadPart({ s3, f, uploadId, key, bucket, num }) { //f:文件分片,num:分片标号
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
const params = {
|
||
Bucket: bucket,
|
||
Key: key,
|
||
PartNumber: num,
|
||
UploadId: uploadId,
|
||
Body: f,
|
||
// ContentDisposition: "attachment; filename=hahaha.dcm"
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new UploadPartCommand(params));
|
||
} catch (err) {
|
||
console.log('上传分片第 ' + num + ' 片错误信息', err.message)
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
//将分片合并
|
||
async function completeMultipartUpload({ s3, bucket, key, sharding, uploadId }) {
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
let parts = [];
|
||
for (let i = 0; i < sharding.length; i++) {
|
||
parts.push({
|
||
"ETag": sharding[i].ETag,
|
||
"PartNumber": sharding[i].PartNumber,
|
||
})
|
||
}
|
||
const params = {
|
||
Bucket: bucket,
|
||
Key: key,
|
||
MultipartUpload: {
|
||
Parts: parts
|
||
},
|
||
UploadId: uploadId
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new CompleteMultipartUploadCommand(params))
|
||
} catch (err) {
|
||
console.log("合并分片失败: ", err.message);
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
|
||
//查询某个文件已经上传的所有分片
|
||
async function listPartsCommand({ s3, bucket, key, uploadId }) {
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
const params = {
|
||
Bucket: bucket,
|
||
Key: key,
|
||
UploadId: uploadId
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new ListPartsCommand(params));
|
||
} catch (err) {
|
||
console.log("查询该文件已上传分片失败: " + err.message);
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
//查询该文件是否存在上传事件
|
||
async function listMultipartUploadsCommand({ s3, bucket, key }) {
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
const params = {
|
||
Bucket: bucket,
|
||
Delimiter: '',
|
||
MaxUploads: 1000,
|
||
Prefix: key
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new ListMultipartUploadsCommand(params));
|
||
} catch (err) {
|
||
console.log("查询 " + key + " 文件是否存在上传事件失败: " + err.message);
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
//获取文件
|
||
async function getObject({ s3, bucket, fileInformation, count }) {
|
||
//一次请求最多 767448
|
||
if (s3 === null) {
|
||
return console.log("未创建s3客户端,请先调用init事件");
|
||
}
|
||
let byte1 = ((count + 1) * 767448 - 1) > fileInformation.file.size ? fileInformation.file.size : ((count + 1) * 767448 - 1);
|
||
let byte2 = (count * 767448) > fileInformation.file.size ? fileInformation.file.size : (count * 767448);
|
||
let range = "bytes=" + byte2 + "-" + byte1;
|
||
const params = {
|
||
Bucket: bucket,
|
||
Key: fileInformation.path,
|
||
Range: range
|
||
};
|
||
const res = async () => {
|
||
try {
|
||
return await s3.send(new GetObjectCommand(params));
|
||
} catch (err) {
|
||
console.log('获取 ' + fileInformation.path + ' 文件失败:', err.message);
|
||
return 'err';
|
||
}
|
||
}
|
||
return res();
|
||
}
|
||
function setTimer() {
|
||
if (timer) return false;
|
||
timer = setInterval(() => {
|
||
let timeList = Object.keys(bytesReceivedPerSecond).sort((a, b) => a - b);
|
||
if (timeList.length > 0) {
|
||
let totalBytes = timeList.reduce((sum, bytes) => sum + bytesReceivedPerSecond[bytes], 0) / (5 * 1024);
|
||
let unit = 'kb/s';
|
||
if (totalBytes > 1024) {
|
||
totalBytes = totalBytes / 1024;
|
||
unit = "mb/s";
|
||
}
|
||
store.state.trials.uploadTip = totalBytes.toFixed(2) + unit;
|
||
}
|
||
if (timeList.length >= 5) {
|
||
delete bytesReceivedPerSecond[timeList[0]]
|
||
}
|
||
let time = new Date().getTime();
|
||
bytesReceivedPerSecond[time] = 0;
|
||
}, 1000)
|
||
}
|
||
|