mirror of
https://gitee.com/dromara/RuoYi-Cloud-Plus.git
synced 2025-10-14 14:10:24 +00:00
update 优化 oss下载接口改用发布订阅的方式替代阻塞流,优化大文件下载时的内存占用
This commit is contained in:
@@ -2,6 +2,7 @@ package org.dromara.common.oss.core;
|
||||
|
||||
import cn.hutool.core.io.IoUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.constant.Constants;
|
||||
import org.dromara.common.core.utils.DateUtils;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
@@ -13,9 +14,7 @@ import org.dromara.common.oss.exception.OssException;
|
||||
import org.dromara.common.oss.properties.OssProperties;
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||
import software.amazon.awssdk.core.ResponseInputStream;
|
||||
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
|
||||
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
|
||||
import software.amazon.awssdk.core.async.*;
|
||||
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.s3.S3AsyncClient;
|
||||
@@ -29,9 +28,12 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
@@ -40,6 +42,7 @@ import java.util.function.Consumer;
|
||||
*
|
||||
* @author AprilWind
|
||||
*/
|
||||
@Slf4j
|
||||
public class OssClient {
|
||||
|
||||
/**
|
||||
@@ -236,30 +239,62 @@ public class OssClient {
|
||||
*
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param out 输出流
|
||||
* @return 输出流中写入的字节数(长度)
|
||||
* @param consumer 自定义处理逻辑
|
||||
* @throws OssException 如果下载失败,抛出自定义异常
|
||||
*/
|
||||
public void download(String key, OutputStream out, Consumer<Long> consumer) {
|
||||
try {
|
||||
this.download(key, consumer).writeTo(out);
|
||||
} catch (Exception e) {
|
||||
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 下载文件从 Amazon S3 到 输出流
|
||||
*
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param contentLengthConsumer 文件大小消费者函数
|
||||
* @return 写出订阅器
|
||||
* @throws OssException 如果下载失败,抛出自定义异常
|
||||
*/
|
||||
public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
|
||||
try {
|
||||
// 构建下载请求
|
||||
DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest = DownloadRequest.builder()
|
||||
DownloadRequest<ResponsePublisher<GetObjectResponse>> publisherDownloadRequest = DownloadRequest.builder()
|
||||
// 文件对象
|
||||
.getObjectRequest(y -> y.bucket(properties.getBucketName())
|
||||
.key(key)
|
||||
.build())
|
||||
.addTransferListener(LoggingTransferListener.create())
|
||||
// 使用订阅转换器
|
||||
.responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
|
||||
// 使用发布订阅转换器
|
||||
.responseTransformer(AsyncResponseTransformer.toPublisher())
|
||||
.build();
|
||||
|
||||
// 使用 S3TransferManager 下载文件
|
||||
Download<ResponseInputStream<GetObjectResponse>> responseFuture = transferManager.download(downloadRequest);
|
||||
// 输出到流中
|
||||
try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.completionFuture().join().result()) { // auto-closeable stream
|
||||
if (consumer != null) {
|
||||
consumer.accept(responseStream.response().contentLength());
|
||||
Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager.download(publisherDownloadRequest);
|
||||
// 获取下载发布订阅转换器
|
||||
ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
|
||||
// 执行文件大小消费者函数
|
||||
Optional.ofNullable(contentLengthConsumer)
|
||||
.ifPresent(lengthConsumer -> lengthConsumer.accept(publisher.response().contentLength()));
|
||||
|
||||
// 构建写出订阅器对象
|
||||
return out -> {
|
||||
// 创建可写入的字节通道
|
||||
try(WritableByteChannel channel = Channels.newChannel(out)){
|
||||
// 订阅数据
|
||||
publisher.subscribe(byteBuffer -> {
|
||||
while (byteBuffer.hasRemaining()) {
|
||||
try {
|
||||
channel.write(byteBuffer);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
responseStream.transferTo(out); // 阻塞调用线程 blocks the calling thread
|
||||
}
|
||||
}).join();
|
||||
}
|
||||
};
|
||||
} catch (Exception e) {
|
||||
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
|
||||
}
|
||||
|
@@ -0,0 +1,15 @@
|
||||
package org.dromara.common.oss.core;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* 写出订阅器
|
||||
*
|
||||
* @author 秋辞未寒
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface WriteOutSubscriber<T> {
|
||||
|
||||
void writeTo(T out) throws IOException;
|
||||
|
||||
}
|
Reference in New Issue
Block a user