mirror of
https://github.com/halo-dev/plugin-s3.git
synced 2025-10-15 23:01:12 +00:00
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c109bbd61f | ||
![]() |
2320800907 | ||
![]() |
00537c164c | ||
![]() |
8be39b9898 |
@@ -35,7 +35,11 @@
|
|||||||
|
|
||||||
### Bucket 桶名称
|
### Bucket 桶名称
|
||||||
|
|
||||||
与服务商的控制台中的桶名称一致。
|
一般与服务商控制台中的空间名称一致。
|
||||||
|
|
||||||
|
> 注意部分服务商 s3 空间名 ≠ 空间名称,若出现“Access Denied”报错可检查 Bucket 是否正确。
|
||||||
|
>
|
||||||
|
> 可通过 S3Browser 查看桶列表,七牛云也可在“开发者平台-对象存储-空间概览-s3域名”中查看 s3 空间名。
|
||||||
|
|
||||||
### Region
|
### Region
|
||||||
|
|
||||||
|
@@ -35,6 +35,7 @@ dependencies {
|
|||||||
|
|
||||||
testImplementation 'run.halo.app:api'
|
testImplementation 'run.halo.app:api'
|
||||||
testImplementation 'org.springframework.boot:spring-boot-starter-test'
|
testImplementation 'org.springframework.boot:spring-boot-starter-test'
|
||||||
|
testImplementation 'io.projectreactor:reactor-test'
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
@@ -1 +1 @@
|
|||||||
version=1.4.0-SNAPSHOT
|
version=1.4.1-SNAPSHOT
|
||||||
|
@@ -14,7 +14,10 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.pf4j.Extension;
|
import org.pf4j.Extension;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
import org.springframework.core.io.buffer.DataBuffer;
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
|
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||||
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.MediaTypeFactory;
|
import org.springframework.http.MediaTypeFactory;
|
||||||
import org.springframework.lang.Nullable;
|
import org.springframework.lang.Nullable;
|
||||||
@@ -22,8 +25,10 @@ import org.springframework.web.server.ServerErrorException;
|
|||||||
import org.springframework.web.server.ServerWebInputException;
|
import org.springframework.web.server.ServerWebInputException;
|
||||||
import org.springframework.web.util.UriUtils;
|
import org.springframework.web.util.UriUtils;
|
||||||
import reactor.core.Exceptions;
|
import reactor.core.Exceptions;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
import reactor.util.context.Context;
|
||||||
import reactor.util.retry.Retry;
|
import reactor.util.retry.Retry;
|
||||||
import run.halo.app.core.extension.attachment.Attachment;
|
import run.halo.app.core.extension.attachment.Attachment;
|
||||||
import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec;
|
import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec;
|
||||||
@@ -143,8 +148,8 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
|||||||
}
|
}
|
||||||
var objectKey = getObjectKey(attachment);
|
var objectKey = getObjectKey(attachment);
|
||||||
if (objectKey == null) {
|
if (objectKey == null) {
|
||||||
return Mono.error(new IllegalArgumentException(
|
// fallback to default handler for backward compatibility
|
||||||
"Cannot obtain object key from attachment " + attachment.getMetadata().getName()));
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
var properties = getProperties(configMap);
|
var properties = getProperties(configMap);
|
||||||
var objectURL = getObjectURL(properties, objectKey);
|
var objectURL = getObjectURL(properties, objectKey);
|
||||||
@@ -232,10 +237,52 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Flux<DataBuffer> reshape(Publisher<DataBuffer> content, int bufferSize) {
|
||||||
|
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
|
||||||
|
return Flux.<ByteBuffer>create(sink -> {
|
||||||
|
var byteBuffer = ByteBuffer.allocate(bufferSize);
|
||||||
|
Flux.from(content)
|
||||||
|
.doOnNext(dataBuffer -> {
|
||||||
|
var count = dataBuffer.readableByteCount();
|
||||||
|
for (var i = 0; i < count; i++) {
|
||||||
|
byteBuffer.put(dataBuffer.read());
|
||||||
|
// Emit the buffer when buffer
|
||||||
|
if (!byteBuffer.hasRemaining()) {
|
||||||
|
sink.next(deepCopy(byteBuffer));
|
||||||
|
byteBuffer.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.doOnComplete(() -> {
|
||||||
|
// Emit the last part of buffer.
|
||||||
|
if (byteBuffer.position() > 0) {
|
||||||
|
sink.next(deepCopy(byteBuffer));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.subscribe(DataBufferUtils::release, sink::error, sink::complete,
|
||||||
|
Context.of(sink.contextView()));
|
||||||
|
})
|
||||||
|
.map(dataBufferFactory::wrap)
|
||||||
|
.cast(DataBuffer.class)
|
||||||
|
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
|
||||||
|
}
|
||||||
|
|
||||||
|
ByteBuffer deepCopy(ByteBuffer src) {
|
||||||
|
src.flip();
|
||||||
|
var dest = ByteBuffer.allocate(src.limit());
|
||||||
|
dest.put(src);
|
||||||
|
src.rewind();
|
||||||
|
dest.flip();
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) {
|
Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) {
|
||||||
return Mono.using(() -> buildS3Client(properties),
|
return Mono.using(() -> buildS3Client(properties),
|
||||||
client -> {
|
client -> {
|
||||||
var uploadState = new UploadState(properties, uploadContext.file().filename());
|
var uploadState = new UploadState(properties, uploadContext.file().filename());
|
||||||
|
|
||||||
|
var content = uploadContext.file().content();
|
||||||
|
|
||||||
return checkFileExistsAndRename(uploadState, client)
|
return checkFileExistsAndRename(uploadState, client)
|
||||||
// init multipart upload
|
// init multipart upload
|
||||||
.flatMap(state -> Mono.fromCallable(() -> client.createMultipartUpload(
|
.flatMap(state -> Mono.fromCallable(() -> client.createMultipartUpload(
|
||||||
@@ -243,12 +290,12 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
|
|||||||
.bucket(properties.getBucket())
|
.bucket(properties.getBucket())
|
||||||
.contentType(state.contentType)
|
.contentType(state.contentType)
|
||||||
.key(state.objectKey)
|
.key(state.objectKey)
|
||||||
.build())).subscribeOn(Schedulers.boundedElastic()))
|
.build())))
|
||||||
.flatMapMany((response) -> {
|
.doOnNext((response) -> {
|
||||||
checkResult(response, "createMultipartUpload");
|
checkResult(response, "createMultipartUpload");
|
||||||
uploadState.uploadId = response.uploadId();
|
uploadState.uploadId = response.uploadId();
|
||||||
return uploadContext.file().content();
|
|
||||||
})
|
})
|
||||||
|
.thenMany(reshape(content, MULTIPART_MIN_PART_SIZE))
|
||||||
// buffer to part
|
// buffer to part
|
||||||
.windowUntil((buffer) -> {
|
.windowUntil((buffer) -> {
|
||||||
uploadState.buffered += buffer.readableByteCount();
|
uploadState.buffered += buffer.readableByteCount();
|
||||||
|
@@ -4,8 +4,8 @@ metadata:
|
|||||||
name: PluginS3ObjectStorage
|
name: PluginS3ObjectStorage
|
||||||
spec:
|
spec:
|
||||||
enabled: true
|
enabled: true
|
||||||
version: 1.4.0
|
version: 1.4.1
|
||||||
requires: ">=2.0.0"
|
requires: ">=2.5.0"
|
||||||
author:
|
author:
|
||||||
name: longjuan
|
name: longjuan
|
||||||
website: https://github.com/longjuan
|
website: https://github.com/longjuan
|
||||||
|
@@ -1,12 +1,19 @@
|
|||||||
package run.halo.s3os;
|
package run.halo.s3os;
|
||||||
|
|
||||||
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.springframework.core.io.buffer.DataBuffer;
|
||||||
|
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
import run.halo.app.core.extension.attachment.Policy;
|
import run.halo.app.core.extension.attachment.Policy;
|
||||||
|
|
||||||
class S3OsAttachmentHandlerTest {
|
class S3OsAttachmentHandlerTest {
|
||||||
@@ -33,4 +40,57 @@ class S3OsAttachmentHandlerTest {
|
|||||||
// policy is null
|
// policy is null
|
||||||
assertFalse(handler.shouldHandle(null));
|
assertFalse(handler.shouldHandle(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void reshapeDataBufferWithSmallerBufferSize() {
|
||||||
|
var handler = new S3OsAttachmentHandler();
|
||||||
|
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||||
|
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
|
||||||
|
|
||||||
|
StepVerifier.create(handler.reshape(content, 2))
|
||||||
|
.assertNext(dataBuffer -> {
|
||||||
|
var str = dataBuffer.toString(UTF_8);
|
||||||
|
assertEquals("ha", str);
|
||||||
|
})
|
||||||
|
.assertNext(dataBuffer -> {
|
||||||
|
var str = dataBuffer.toString(UTF_8);
|
||||||
|
assertEquals("lo", str);
|
||||||
|
})
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void reshapeDataBufferWithBiggerBufferSize() {
|
||||||
|
var handler = new S3OsAttachmentHandler();
|
||||||
|
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||||
|
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
|
||||||
|
|
||||||
|
StepVerifier.create(handler.reshape(content, 10))
|
||||||
|
.assertNext(dataBuffer -> {
|
||||||
|
var str = dataBuffer.toString(UTF_8);
|
||||||
|
assertEquals("halo", str);
|
||||||
|
})
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void reshapeDataBuffersWithBiggerBufferSize() {
|
||||||
|
var handler = new S3OsAttachmentHandler();
|
||||||
|
var factory = DefaultDataBufferFactory.sharedInstance;
|
||||||
|
var content = Flux.<DataBuffer>fromIterable(List.of(
|
||||||
|
factory.wrap("ha".getBytes()),
|
||||||
|
factory.wrap("lo".getBytes())
|
||||||
|
));
|
||||||
|
|
||||||
|
StepVerifier.create(handler.reshape(content, 3))
|
||||||
|
.assertNext(dataBuffer -> {
|
||||||
|
var str = dataBuffer.toString(UTF_8);
|
||||||
|
assertEquals("hal", str);
|
||||||
|
})
|
||||||
|
.assertNext(dataBuffer -> {
|
||||||
|
var str = dataBuffer.toString(UTF_8);
|
||||||
|
assertEquals("o", str);
|
||||||
|
})
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user