mirror of
https://github.com/halo-dev/plugin-s3.git
synced 2025-10-15 14:40:46 +00:00
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c109bbd61f | ||
![]() |
2320800907 | ||
![]() |
00537c164c | ||
![]() |
8be39b9898 |
@@ -35,7 +35,11 @@
|
||||
|
||||
### Bucket 桶名称
|
||||
|
||||
与服务商的控制台中的桶名称一致。
|
||||
一般与服务商控制台中的空间名称一致。
|
||||
|
||||
> 注意部分服务商 s3 空间名 ≠ 空间名称,若出现“Access Denied”报错可检查 Bucket 是否正确。
|
||||
>
|
||||
> 可通过 S3Browser 查看桶列表,七牛云也可在“开发者平台-对象存储-空间概览-s3域名”中查看 s3 空间名。
|
||||
|
||||
### Region
|
||||
|
||||
|
@@ -35,6 +35,7 @@ dependencies {
|
||||
|
||||
testImplementation 'run.halo.app:api'
|
||||
testImplementation 'org.springframework.boot:spring-boot-starter-test'
|
||||
testImplementation 'io.projectreactor:reactor-test'
|
||||
}
|
||||
|
||||
test {
|
||||
|
@@ -1 +1 @@
|
||||
version=1.4.0-SNAPSHOT
|
||||
version=1.4.1-SNAPSHOT
|
||||
|
@@ -14,7 +14,10 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.pf4j.Extension;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.MediaTypeFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -22,8 +25,10 @@ import org.springframework.web.server.ServerErrorException;
|
||||
import org.springframework.web.server.ServerWebInputException;
|
||||
import org.springframework.web.util.UriUtils;
|
||||
import reactor.core.Exceptions;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.context.Context;
|
||||
import reactor.util.retry.Retry;
|
||||
import run.halo.app.core.extension.attachment.Attachment;
|
||||
import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec;
|
||||
@@ -143,8 +148,8 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
||||
}
|
||||
var objectKey = getObjectKey(attachment);
|
||||
if (objectKey == null) {
|
||||
return Mono.error(new IllegalArgumentException(
|
||||
"Cannot obtain object key from attachment " + attachment.getMetadata().getName()));
|
||||
// fallback to default handler for backward compatibility
|
||||
return Mono.empty();
|
||||
}
|
||||
var properties = getProperties(configMap);
|
||||
var objectURL = getObjectURL(properties, objectKey);
|
||||
@@ -232,10 +237,52 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
||||
.build();
|
||||
}
|
||||
|
||||
Flux<DataBuffer> reshape(Publisher<DataBuffer> content, int bufferSize) {
|
||||
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
|
||||
return Flux.<ByteBuffer>create(sink -> {
|
||||
var byteBuffer = ByteBuffer.allocate(bufferSize);
|
||||
Flux.from(content)
|
||||
.doOnNext(dataBuffer -> {
|
||||
var count = dataBuffer.readableByteCount();
|
||||
for (var i = 0; i < count; i++) {
|
||||
byteBuffer.put(dataBuffer.read());
|
||||
// Emit the buffer when buffer
|
||||
if (!byteBuffer.hasRemaining()) {
|
||||
sink.next(deepCopy(byteBuffer));
|
||||
byteBuffer.clear();
|
||||
}
|
||||
}
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
// Emit the last part of buffer.
|
||||
if (byteBuffer.position() > 0) {
|
||||
sink.next(deepCopy(byteBuffer));
|
||||
}
|
||||
})
|
||||
.subscribe(DataBufferUtils::release, sink::error, sink::complete,
|
||||
Context.of(sink.contextView()));
|
||||
})
|
||||
.map(dataBufferFactory::wrap)
|
||||
.cast(DataBuffer.class)
|
||||
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
|
||||
}
|
||||
|
||||
ByteBuffer deepCopy(ByteBuffer src) {
|
||||
src.flip();
|
||||
var dest = ByteBuffer.allocate(src.limit());
|
||||
dest.put(src);
|
||||
src.rewind();
|
||||
dest.flip();
|
||||
return dest;
|
||||
}
|
||||
|
||||
Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) {
|
||||
return Mono.using(() -> buildS3Client(properties),
|
||||
client -> {
|
||||
var uploadState = new UploadState(properties, uploadContext.file().filename());
|
||||
|
||||
var content = uploadContext.file().content();
|
||||
|
||||
return checkFileExistsAndRename(uploadState, client)
|
||||
// init multipart upload
|
||||
.flatMap(state -> Mono.fromCallable(() -> client.createMultipartUpload(
|
||||
@@ -243,12 +290,12 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
||||
.bucket(properties.getBucket())
|
||||
.contentType(state.contentType)
|
||||
.key(state.objectKey)
|
||||
.build())).subscribeOn(Schedulers.boundedElastic()))
|
||||
.flatMapMany((response) -> {
|
||||
.build())))
|
||||
.doOnNext((response) -> {
|
||||
checkResult(response, "createMultipartUpload");
|
||||
uploadState.uploadId = response.uploadId();
|
||||
return uploadContext.file().content();
|
||||
})
|
||||
.thenMany(reshape(content, MULTIPART_MIN_PART_SIZE))
|
||||
// buffer to part
|
||||
.windowUntil((buffer) -> {
|
||||
uploadState.buffered += buffer.readableByteCount();
|
||||
|
@@ -4,8 +4,8 @@ metadata:
|
||||
name: PluginS3ObjectStorage
|
||||
spec:
|
||||
enabled: true
|
||||
version: 1.4.0
|
||||
requires: ">=2.0.0"
|
||||
version: 1.4.1
|
||||
requires: ">=2.5.0"
|
||||
author:
|
||||
name: longjuan
|
||||
website: https://github.com/longjuan
|
||||
|
@@ -1,12 +1,19 @@
|
||||
package run.halo.s3os;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
import run.halo.app.core.extension.attachment.Policy;
|
||||
|
||||
class S3OsAttachmentHandlerTest {
|
||||
@@ -33,4 +40,57 @@ class S3OsAttachmentHandlerTest {
|
||||
// policy is null
|
||||
assertFalse(handler.shouldHandle(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void reshapeDataBufferWithSmallerBufferSize() {
|
||||
var handler = new S3OsAttachmentHandler();
|
||||
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
|
||||
|
||||
StepVerifier.create(handler.reshape(content, 2))
|
||||
.assertNext(dataBuffer -> {
|
||||
var str = dataBuffer.toString(UTF_8);
|
||||
assertEquals("ha", str);
|
||||
})
|
||||
.assertNext(dataBuffer -> {
|
||||
var str = dataBuffer.toString(UTF_8);
|
||||
assertEquals("lo", str);
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void reshapeDataBufferWithBiggerBufferSize() {
|
||||
var handler = new S3OsAttachmentHandler();
|
||||
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
|
||||
|
||||
StepVerifier.create(handler.reshape(content, 10))
|
||||
.assertNext(dataBuffer -> {
|
||||
var str = dataBuffer.toString(UTF_8);
|
||||
assertEquals("halo", str);
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void reshapeDataBuffersWithBiggerBufferSize() {
|
||||
var handler = new S3OsAttachmentHandler();
|
||||
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||
var content = Flux.<DataBuffer>fromIterable(List.of(
|
||||
factory.wrap("ha".getBytes()),
|
||||
factory.wrap("lo".getBytes())
|
||||
));
|
||||
|
||||
StepVerifier.create(handler.reshape(content, 3))
|
||||
.assertNext(dataBuffer -> {
|
||||
var str = dataBuffer.toString(UTF_8);
|
||||
assertEquals("hal", str);
|
||||
})
|
||||
.assertNext(dataBuffer -> {
|
||||
var str = dataBuffer.toString(UTF_8);
|
||||
assertEquals("o", str);
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user