12 Commits
1.3.0 ... 1.4.1

Author SHA1 Message Date
John Niang
c109bbd61f Fallback to default handler for backward compatibility (#57)
#### What type of PR is this?

/kind improvement

#### What this PR does / why we need it:

See https://github.com/halo-sigs/plugin-s3/issues/56 for more.

This PR skips permalink resolution while the object key is missing. So that the default handler will resolve permalink from annotation `storage.halo.run/external-link`

#### Which issue(s) this PR fixes:

Fixes https://github.com/halo-sigs/plugin-s3/issues/56

#### Does this PR introduce a user-facing change?

```release-note
解决导入 Halo 1.x 附件后出现“Cannot obtain object key from attachment attachment-xyz”的问题
```
2023-08-01 10:14:54 +00:00
longjuan
2320800907 chore: bump version to 1.4.1 (#53)
```release-note
None
```
2023-07-21 15:58:13 +00:00
John Niang
00537c164c Ensure non-trailing parts are of equal size (#50)
#### What type of PR is this?

/kind improvement

#### What this PR does / why we need it:

Reshape the DataBuffers into parts of the same size (5MB) except for the last part.

#### Which issue(s) this PR fixes:

Fixes https://github.com/halo-sigs/plugin-s3/issues/49

#### Does this PR introduce a user-facing change?

```release-note
保证分片上传时片段大小一致
```
2023-07-05 05:10:11 +00:00
Kevin Zhang
8be39b9898 Impove the readme (#46)
Fixes https://github.com/halo-sigs/plugin-s3/issues/43

桶填写错误:
![image](https://github.com/halo-sigs/plugin-s3/assets/34374831/d3f0855b-5e72-4c59-80c6-017715cd4ba4)

补充桶填写说明:
<img width="895" alt="image" src="https://github.com/halo-sigs/plugin-s3/assets/34374831/d8d2e288-e4e2-4788-95a6-b86d91f4a1ec">


/kind feature

```release-note
完善README.md
```
2023-07-01 08:14:12 +00:00
longjuan
022ecea94f Fix url space error (#41)
Fixes https://github.com/halo-sigs/plugin-s3/issues/40
![image](https://github.com/halo-sigs/plugin-s3/assets/28662535/03bc4ed8-c539-451f-8a88-99084240038a)

```release-note
None
```
2023-06-01 08:23:13 +00:00
John Niang
b3bdd02e08 Fix incorrect setting on TTL of share URL (#39)
Share URL mechanism was provided in https://github.com/halo-sigs/plugin-s3/pull/35, and I set the TTL of the URL with 5 mins incorrectly.

```release-note
None
```
2023-06-01 08:13:16 +00:00
longjuan
5a95b4ced1 Permalink Adaptation Path Style (#38)
Fixes https://github.com/halo-sigs/plugin-s3/issues/37

```release-note
永久链接根据访问风格进行拼接
```

使用Path Style的策略
修改前:
![image](https://github.com/halo-sigs/plugin-s3/assets/28662535/631b33f8-e534-445b-bf1c-3edbc9a543bc)


修改后:
![image](https://github.com/halo-sigs/plugin-s3/assets/28662535/ca6edbd4-8455-4246-b49b-f12afc3ea020)
2023-05-12 16:52:27 +00:00
John Niang
88490bb80f Support to get shared URL and permalink of attachment in handler (#35)
On the Halo side, PR https://github.com/halo-dev/halo/pull/3740 has already added two new methods (`getSharedURL` and `getPermalink`) into AttachmentHandler. Now It's time to implement these two methods so that users can correctly and easily use these two methods.

This PR mainly implements [new AttachmentHandler](11a5807682/api/src/main/java/run/halo/app/core/extension/attachment/endpoint/AttachmentHandler.java). At the same time, I also refactored the build script for a better development experience.

Please note that, those changes might not influence compatibility with Halo 2.0.0. You can have test against Halo 2.0.0 manually.

/kind feature

```release-note
支持获取分享链接和永久链接
```
2023-04-21 12:33:40 +00:00
John Niang
5e9b9f803b Use S3Client instead of S3AsyncClient to avoid waiting two seconds for closing (#30)
Fixes https://github.com/halo-sigs/plugin-s3/issues/23

```release-note
修复文件上传慢的问题
```
2023-04-06 08:06:15 +00:00
longjuan
c635ebede8 perf: auto rename attachment if it exists (#22)
Fixes https://github.com/halo-dev/halo/issues/3337
不更新依赖了,直接复制了FileNameUtils
在有image.png的情况下再同时粘贴两张截图,期望两张都能被上传且被自动重命名。

![image](https://user-images.githubusercontent.com/28662535/220059741-da25a490-6f6a-4172-a393-aa3f84ab6b38.png)
![image](https://user-images.githubusercontent.com/28662535/220059786-24cda2bb-6faa-4377-8eb8-a70920916f3d.png)

```release-note
文件存在时自动重命名
```
2023-02-25 02:38:14 +00:00
miaodi
459cc1cf94 add oracle cloud configuration guide in README (#20)
增加oracle cloud的配置,实测可以上传。
官方文档地址:https://docs.oracle.com/en-us/iaas/Content/Object/Tasks/s3compatibleapi.htm

`Path Style`和`Virtual Hosted Style`均可以配置,并测试成功。
推荐使用`Virtual Hosted Style`方式:

![image](https://user-images.githubusercontent.com/19516717/216295351-5146f5ab-0cf6-43a1-bc6e-ad261c55f198.png)

entrypoint: compat.objectstorage.{region}.oraclecloud.com
将`{region}`替换为上图中`区域`的值

绑定域名留空

![image](https://user-images.githubusercontent.com/19516717/216307619-b54b5829-8341-469d-86b1-dad7e1e65260.png)
`Access Key`和`Access Secret` 在用户设置里面生成`客户秘钥`

```release-note
None
```
2023-02-02 14:20:10 +00:00
SanqianQVQ
780258ffc1 Update README.md (#18)
Added Cloudflare info and use of bright red  for readability.

    None
2023-01-31 10:58:09 +00:00
9 changed files with 492 additions and 199 deletions

View File

@@ -35,13 +35,19 @@
### Bucket 桶名称 ### Bucket 桶名称
与服务商控制台中的名称一致。 一般与服务商控制台中的空间名称一致。
> 注意部分服务商 s3 空间名 ≠ 空间名称若出现“Access Denied”报错可检查 Bucket 是否正确。
>
> 可通过 S3Browser 查看桶列表,七牛云也可在“开发者平台-对象存储-空间概览-s3域名”中查看 s3 空间名。
### Region ### Region
一般留空即可。 一般留空即可。
> 若确认过其他配置正确又不能访问,请在服务商的文档中查看并填写英文的 Region例如 `cn-east-1`。 > 若确认过其他配置正确又不能访问,请在服务商的文档中查看并填写英文的 Region例如 `cn-east-1`。
>
> Cloudflare 需要填写均为小写字母的 `auto`。
## 部分对象存储服务商兼容性 ## 部分对象存储服务商兼容性
@@ -55,10 +61,12 @@
|金山云|https://docs.ksyun.com/documents/6761|Virtual Hosted Style|✅| |金山云|https://docs.ksyun.com/documents/6761|Virtual Hosted Style|✅|
|青云|https://docsv3.qingcloud.com/storage/object-storage/s3/intro/|Virtual Hosted Style / <br>Path Style|✅| |青云|https://docsv3.qingcloud.com/storage/object-storage/s3/intro/|Virtual Hosted Style / <br>Path Style|✅|
|网易数帆|[https://sf.163.com/help/documents/89796157866430464](https://sf.163.com/help/documents/89796157866430464)|Virtual Hosted Style|✅| |网易数帆|[https://sf.163.com/help/documents/89796157866430464](https://sf.163.com/help/documents/89796157866430464)|Virtual Hosted Style|✅|
|Cloudflare|Cloudflare S3 兼容性API<br>[https://developers.cloudflare.com/r2/data-access/s3-api/](https://developers.cloudflare.com/r2/data-access/s3-api/)|Virtual Hosted Style / <br>Path Style|✅|
| Oracle Cloud |[https://docs.oracle.com/en-us/iaas/Content/Object/Tasks/s3compatibleapi.htm](https://docs.oracle.com/en-us/iaas/Content/Object/Tasks/s3compatibleapi.htm)|Virtual Hosted Style / <br>Path Style|✅|
|自建minio|\-|Path Style|✅| |自建minio|\-|Path Style|✅|
|华为云|文档未说明是否兼容,工单反馈不保证兼容性,实际测试可以使用|Virtual Hosted Style|❓| |华为云|文档未说明是否兼容,工单反馈不保证兼容性,实际测试可以使用|Virtual Hosted Style|❓|
|Ucloud|只支持 8MB 大小的分片,本插件暂不支持<br>[https://docs.ucloud.cn/ufile/s3/s3\_introduction](https://docs.ucloud.cn/ufile/s3/s3_introduction)|\-|| |Ucloud|只支持 8MB 大小的分片,本插件暂不支持<br>[https://docs.ucloud.cn/ufile/s3/s3\_introduction](https://docs.ucloud.cn/ufile/s3/s3_introduction)|\-||
|又拍云|暂不支持 s3 协议|\-|| |又拍云|暂不支持 s3 协议|\-||
## 开发环境 ## 开发环境

View File

@@ -1,5 +1,6 @@
plugins { plugins {
id "io.github.guqing.plugin-development" version "0.0.6-SNAPSHOT" id "io.github.guqing.plugin-development" version "0.0.7-SNAPSHOT"
id "io.freefair.lombok" version "8.0.0-rc2"
id 'java' id 'java'
} }
@@ -8,7 +9,7 @@ sourceCompatibility = JavaVersion.VERSION_17
repositories { repositories {
maven { url 'https://s01.oss.sonatype.org/content/repositories/releases' } maven { url 'https://s01.oss.sonatype.org/content/repositories/releases' }
maven { url 'https://repo.spring.io/milestone' } maven { url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' }
mavenCentral() mavenCentral()
} }
@@ -23,9 +24,8 @@ jar {
} }
dependencies { dependencies {
compileOnly platform("run.halo.dependencies:halo-dependencies:1.0.0") implementation platform('run.halo.tools.platform:plugin:2.5.0-SNAPSHOT')
compileOnly 'run.halo.app:api'
compileOnly files("lib/halo-2.0.0-SNAPSHOT-plain.jar")
implementation platform('software.amazon.awssdk:bom:2.19.8') implementation platform('software.amazon.awssdk:bom:2.19.8')
implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:s3'
@@ -33,14 +33,9 @@ dependencies {
implementation "javax.activation:activation:1.1.1" implementation "javax.activation:activation:1.1.1"
implementation "org.glassfish.jaxb:jaxb-runtime:2.3.3" implementation "org.glassfish.jaxb:jaxb-runtime:2.3.3"
compileOnly 'org.projectlombok:lombok' testImplementation 'run.halo.app:api'
annotationProcessor 'org.projectlombok:lombok:1.18.22'
testImplementation platform("run.halo.dependencies:halo-dependencies:1.0.0")
testImplementation files("lib/halo-2.0.0-SNAPSHOT-plain.jar")
testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.0' testImplementation 'io.projectreactor:reactor-test'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.0'
} }
test { test {

View File

@@ -1 +1 @@
version=1.3.0-SNAPSHOT version=1.4.1-SNAPSHOT

View File

@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

Binary file not shown.

View File

@@ -0,0 +1,44 @@
package run.halo.s3os;
import com.google.common.io.Files;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
public final class FileNameUtils {
private FileNameUtils() {
}
public static String removeFileExtension(String filename, boolean removeAllExtensions) {
if (filename == null || filename.isEmpty()) {
return filename;
}
var extPattern = "(?<!^)[.]" + (removeAllExtensions ? ".*" : "[^.]*$");
return filename.replaceAll(extPattern, "");
}
/**
* Append random string after file name.
* <pre>
* Case 1: halo.run -> halo-xyz.run
* Case 2: .run -> xyz.run
* Case 3: halo -> halo-xyz
* </pre>
*
* @param filename is name of file.
* @param length is for generating random string with specific length.
* @return File name with random string.
*/
public static String randomFileName(String filename, int length) {
var nameWithoutExt = Files.getNameWithoutExtension(filename);
var ext = Files.getFileExtension(filename);
var random = RandomStringUtils.randomAlphabetic(length).toLowerCase();
if (StringUtils.isBlank(nameWithoutExt)) {
return random + "." + ext;
}
if (StringUtils.isBlank(ext)) {
return nameWithoutExt + "-" + random;
}
return nameWithoutExt + "-" + random + "." + ext;
}
}

View File

@@ -1,16 +1,35 @@
package run.halo.s3os; package run.halo.s3os;
import lombok.Data; import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.pf4j.Extension; import org.pf4j.Extension;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory; import org.springframework.http.MediaTypeFactory;
import org.springframework.lang.Nullable;
import org.springframework.web.server.ServerErrorException; import org.springframework.web.server.ServerErrorException;
import org.springframework.web.server.ServerWebInputException; import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.util.UriUtils; import org.springframework.web.util.UriUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import run.halo.app.core.extension.attachment.Attachment; import run.halo.app.core.extension.attachment.Attachment;
import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec; import run.halo.app.core.extension.attachment.Attachment.AttachmentSpec;
import run.halo.app.core.extension.attachment.Constant; import run.halo.app.core.extension.attachment.Constant;
@@ -20,22 +39,26 @@ import run.halo.app.extension.ConfigMap;
import run.halo.app.extension.Metadata; import run.halo.app.extension.Metadata;
import run.halo.app.infra.utils.JsonUtils; import run.halo.app.infra.utils.JsonUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.awscore.presigner.SdkPresigner;
import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration; import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import java.net.URI; import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.nio.ByteBuffer; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import java.nio.charset.StandardCharsets; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import java.util.HashMap; import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import java.util.List; import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import java.util.Map; import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import java.util.UUID; import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import java.util.concurrent.ConcurrentHashMap; import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import software.amazon.awssdk.utils.SdkAutoCloseable;
@Slf4j @Slf4j
@Extension @Extension
@@ -48,38 +71,98 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
@Override @Override
public Mono<Attachment> upload(UploadContext uploadContext) { public Mono<Attachment> upload(UploadContext uploadContext) {
return Mono.just(uploadContext).filter(context -> this.shouldHandle(context.policy())) return Mono.just(uploadContext).filter(context -> this.shouldHandle(context.policy()))
.flatMap(context -> { .flatMap(context -> {
final var properties = getProperties(context.configMap()); final var properties = getProperties(context.configMap());
return upload(context, properties).map( return upload(context, properties)
objectDetail -> this.buildAttachment(context, properties, objectDetail)); .subscribeOn(Schedulers.boundedElastic())
}); .map(objectDetail -> this.buildAttachment(properties, objectDetail));
});
} }
@Override @Override
public Mono<Attachment> delete(DeleteContext deleteContext) { public Mono<Attachment> delete(DeleteContext deleteContext) {
return Mono.just(deleteContext).filter(context -> this.shouldHandle(context.policy())) return Mono.just(deleteContext).filter(context -> this.shouldHandle(context.policy()))
.flatMap(context -> { .flatMap(context -> {
var annotations = context.attachment().getMetadata().getAnnotations(); var objectKey = getObjectKey(context.attachment());
if (annotations == null || !annotations.containsKey(OBJECT_KEY)) { if (objectKey == null) {
return Mono.just(context); return Mono.just(context);
} }
var objectName = annotations.get(OBJECT_KEY); var properties = getProperties(deleteContext.configMap());
var properties = getProperties(deleteContext.configMap()); return Mono.using(() -> buildS3Client(properties),
var client = buildS3AsyncClient(properties); client -> Mono.fromCallable(
return Mono.fromFuture(client.deleteObject(DeleteObjectRequest.builder() () -> client.deleteObject(DeleteObjectRequest.builder()
.bucket(properties.getBucket()) .bucket(properties.getBucket())
.key(objectName) .key(objectKey)
.build())) .build())).subscribeOn(Schedulers.boundedElastic()),
.doFinally(signalType -> client.close()) S3Client::close)
.map(response -> { .doOnNext(response -> {
checkResult(response, "delete object"); checkResult(response, "delete object");
log.info("Delete object {} from bucket {} successfully", log.info("Delete object {} from bucket {} successfully",
objectName, properties.getBucket()); objectKey, properties.getBucket());
return context; })
}); .thenReturn(context);
})
.map(DeleteContext::attachment);
}
}) @Override
.map(DeleteContext::attachment); public Mono<URI> getSharedURL(Attachment attachment, Policy policy, ConfigMap configMap,
Duration ttl) {
if (!this.shouldHandle(policy)) {
return Mono.empty();
}
var objectKey = getObjectKey(attachment);
if (objectKey == null) {
return Mono.error(new IllegalArgumentException(
"Cannot obtain object key from attachment " + attachment.getMetadata().getName()));
}
var properties = getProperties(configMap);
return Mono.using(() -> buildS3Presigner(properties),
s3Presigner -> {
var getObjectRequest = GetObjectRequest.builder()
.bucket(properties.getBucket())
.key(objectKey)
.build();
var presignedRequest = GetObjectPresignRequest.builder()
.signatureDuration(ttl)
.getObjectRequest(getObjectRequest)
.build();
var presignedGetObjectRequest = s3Presigner.presignGetObject(presignedRequest);
var presignedURL = presignedGetObjectRequest.url();
try {
return Mono.just(presignedURL.toURI());
} catch (URISyntaxException e) {
return Mono.error(
new RuntimeException("Failed to convert URL " + presignedURL + " to URI."));
}
},
SdkPresigner::close)
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public Mono<URI> getPermalink(Attachment attachment, Policy policy, ConfigMap configMap) {
if (!this.shouldHandle(policy)) {
return Mono.empty();
}
var objectKey = getObjectKey(attachment);
if (objectKey == null) {
// fallback to default handler for backward compatibility
return Mono.empty();
}
var properties = getProperties(configMap);
var objectURL = getObjectURL(properties, objectKey);
return Mono.just(URI.create(objectURL));
}
@Nullable
private String getObjectKey(Attachment attachment) {
var annotations = attachment.getMetadata().getAnnotations();
if (annotations == null) {
return null;
}
return annotations.get(OBJECT_KEY);
} }
S3OsProperties getProperties(ConfigMap configMap) { S3OsProperties getProperties(ConfigMap configMap) {
@@ -87,164 +170,253 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
return JsonUtils.jsonToObject(settingJson, S3OsProperties.class); return JsonUtils.jsonToObject(settingJson, S3OsProperties.class);
} }
Attachment buildAttachment(UploadContext uploadContext, S3OsProperties properties, Attachment buildAttachment(S3OsProperties properties, ObjectDetail objectDetail) {
ObjectDetail objectDetail) { String externalLink = getObjectURL(properties, objectDetail.uploadState.objectKey);
String externalLink;
if (StringUtils.isBlank(properties.getDomain())) {
var host = properties.getBucket() + "." + properties.getEndpoint();
externalLink = properties.getProtocol() + "://" + host + "/" + objectDetail.objectKey();
} else {
externalLink = properties.getProtocol() + "://" + properties.getDomain() + "/" + objectDetail.objectKey();
}
var metadata = new Metadata(); var metadata = new Metadata();
metadata.setName(UUID.randomUUID().toString()); metadata.setName(UUID.randomUUID().toString());
metadata.setAnnotations( metadata.setAnnotations(new HashMap<>(
Map.of(OBJECT_KEY, objectDetail.objectKey(), Constant.EXTERNAL_LINK_ANNO_KEY, Map.of(OBJECT_KEY, objectDetail.uploadState.objectKey,
UriUtils.encodePath(externalLink, StandardCharsets.UTF_8))); Constant.EXTERNAL_LINK_ANNO_KEY, externalLink)));
var objectMetadata = objectDetail.objectMetadata(); var objectMetadata = objectDetail.objectMetadata();
var spec = new AttachmentSpec(); var spec = new AttachmentSpec();
spec.setSize(objectMetadata.contentLength()); spec.setSize(objectMetadata.contentLength());
spec.setDisplayName(uploadContext.file().filename()); spec.setDisplayName(objectDetail.uploadState.fileName);
spec.setMediaType(objectMetadata.contentType()); spec.setMediaType(objectMetadata.contentType());
var attachment = new Attachment(); var attachment = new Attachment();
attachment.setMetadata(metadata); attachment.setMetadata(metadata);
attachment.setSpec(spec); attachment.setSpec(spec);
log.info("Upload object {} to bucket {} successfully", objectDetail.objectKey(), properties.getBucket()); log.info("Upload object {} to bucket {} successfully", objectDetail.uploadState.objectKey,
properties.getBucket());
return attachment; return attachment;
} }
S3AsyncClient buildS3AsyncClient(S3OsProperties properties) { private String getObjectURL(S3OsProperties properties, String objectKey) {
return S3AsyncClient.builder() String objectURL;
.region(Region.of(properties.getRegion())) if (StringUtils.isBlank(properties.getDomain())) {
.endpointOverride(URI.create(properties.getEndpointProtocol() + "://" + properties.getEndpoint())) String host;
.credentialsProvider(() -> AwsBasicCredentials.create(properties.getAccessKey(), if (properties.getEnablePathStyleAccess()) {
properties.getAccessSecret())) host = properties.getEndpoint() + "/" + properties.getBucket();
.serviceConfiguration(S3Configuration.builder() } else {
.chunkedEncodingEnabled(false) host = properties.getBucket() + "." + properties.getEndpoint();
.pathStyleAccessEnabled(properties.getEnablePathStyleAccess()) }
.build()) objectURL = properties.getProtocol() + "://" + host + "/" + objectKey;
.build(); } else {
objectURL = properties.getProtocol() + "://" + properties.getDomain() + "/" + objectKey;
}
return UriUtils.encodePath(objectURL, StandardCharsets.UTF_8);
}
S3Client buildS3Client(S3OsProperties properties) {
return S3Client.builder()
.region(Region.of(properties.getRegion()))
.endpointOverride(
URI.create(properties.getEndpointProtocol() + "://" + properties.getEndpoint()))
.credentialsProvider(() -> AwsBasicCredentials.create(properties.getAccessKey(),
properties.getAccessSecret()))
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(properties.getEnablePathStyleAccess())
.build())
.build();
}
private S3Presigner buildS3Presigner(S3OsProperties properties) {
return S3Presigner.builder()
.region(Region.of(properties.getRegion()))
.endpointOverride(
URI.create(properties.getEndpointProtocol() + "://" + properties.getEndpoint()))
.credentialsProvider(() -> AwsBasicCredentials.create(properties.getAccessKey(),
properties.getAccessSecret()))
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(properties.getEnablePathStyleAccess())
.build())
.build();
}
Flux<DataBuffer> reshape(Publisher<DataBuffer> content, int bufferSize) {
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
return Flux.<ByteBuffer>create(sink -> {
var byteBuffer = ByteBuffer.allocate(bufferSize);
Flux.from(content)
.doOnNext(dataBuffer -> {
var count = dataBuffer.readableByteCount();
for (var i = 0; i < count; i++) {
byteBuffer.put(dataBuffer.read());
// Emit the buffer when buffer
if (!byteBuffer.hasRemaining()) {
sink.next(deepCopy(byteBuffer));
byteBuffer.clear();
}
}
})
.doOnComplete(() -> {
// Emit the last part of buffer.
if (byteBuffer.position() > 0) {
sink.next(deepCopy(byteBuffer));
}
})
.subscribe(DataBufferUtils::release, sink::error, sink::complete,
Context.of(sink.contextView()));
})
.map(dataBufferFactory::wrap)
.cast(DataBuffer.class)
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
ByteBuffer deepCopy(ByteBuffer src) {
src.flip();
var dest = ByteBuffer.allocate(src.limit());
dest.put(src);
src.rewind();
dest.flip();
return dest;
} }
Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) { Mono<ObjectDetail> upload(UploadContext uploadContext, S3OsProperties properties) {
var originFilename = uploadContext.file().filename(); return Mono.using(() -> buildS3Client(properties),
var objectKey = properties.getObjectName(originFilename); client -> {
var contentType = MediaTypeFactory.getMediaType(originFilename) var uploadState = new UploadState(properties, uploadContext.file().filename());
.orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
var uploadingMapKey = properties.getBucket() + "/" + objectKey;
// deduplication of uploading files
if (uploadingFile.put(uploadingMapKey, uploadingMapKey) != null) {
return Mono.error(new ServerWebInputException("文件 " + originFilename + " 已存在,建议更名后重试。"));
}
var s3client = buildS3AsyncClient(properties); var content = uploadContext.file().content();
var uploadState = new UploadState(properties.getBucket(), objectKey); return checkFileExistsAndRename(uploadState, client)
// init multipart upload
return Mono .flatMap(state -> Mono.fromCallable(() -> client.createMultipartUpload(
// check whether file exists
.fromFuture(s3client.headObject(HeadObjectRequest.builder()
.bucket(properties.getBucket())
.key(objectKey)
.build()))
.onErrorResume(NoSuchKeyException.class, e -> {
var builder = HeadObjectResponse.builder();
builder.sdkHttpResponse(SdkHttpResponse.builder().statusCode(404).build());
return Mono.just(builder.build());
})
.flatMap(response -> {
if (response != null && response.sdkHttpResponse() != null && response.sdkHttpResponse().isSuccessful()) {
return Mono.error(new ServerWebInputException("文件 " + originFilename + " 已存在,建议更名后重试。"));
}else {
return Mono.just(uploadState);
}
})
// init multipart upload
.flatMap(state -> Mono.fromFuture(s3client.createMultipartUpload(
CreateMultipartUploadRequest.builder() CreateMultipartUploadRequest.builder()
.bucket(properties.getBucket()) .bucket(properties.getBucket())
.contentType(contentType) .contentType(state.contentType)
.key(objectKey) .key(state.objectKey)
.build()))) .build())))
.flatMapMany((response) -> { .doOnNext((response) -> {
checkResult(response, "createMultipartUpload"); checkResult(response, "createMultipartUpload");
uploadState.setUploadId(response.uploadId()); uploadState.uploadId = response.uploadId();
return uploadContext.file().content(); })
}) .thenMany(reshape(content, MULTIPART_MIN_PART_SIZE))
// buffer to part // buffer to part
.windowUntil((buffer) -> { .windowUntil((buffer) -> {
uploadState.buffered += buffer.readableByteCount(); uploadState.buffered += buffer.readableByteCount();
if (uploadState.buffered >= MULTIPART_MIN_PART_SIZE) { if (uploadState.buffered >= MULTIPART_MIN_PART_SIZE) {
uploadState.buffered = 0; uploadState.buffered = 0;
return true; return true;
} else { } else {
return false; return false;
} }
}) })
// upload part // upload part
.concatMap((window) -> window.collectList().flatMap((bufferList) -> { .concatMap((window) -> window.collectList().flatMap((bufferList) -> {
var buffer = S3OsAttachmentHandler.concatBuffers(bufferList); var buffer = S3OsAttachmentHandler.concatBuffers(bufferList);
return uploadPart(uploadState, buffer, s3client); return uploadPart(uploadState, buffer, client);
})) }))
.reduce(uploadState, (state, completedPart) -> { .reduce(uploadState, (state, completedPart) -> {
state.completedParts.put(completedPart.partNumber(), completedPart); state.completedParts.put(completedPart.partNumber(), completedPart);
return state; return state;
}) })
// complete multipart upload // complete multipart upload
.flatMap((state) -> Mono .flatMap((state) -> Mono.just(client.completeMultipartUpload(
.fromFuture(s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder() CompleteMultipartUploadRequest
.bucket(state.bucket) .builder()
.uploadId(state.uploadId) .bucket(properties.getBucket())
.multipartUpload(CompletedMultipartUpload.builder() .uploadId(state.uploadId)
.parts(state.completedParts.values()) .multipartUpload(CompletedMultipartUpload.builder()
.build()) .parts(state.completedParts.values())
.key(state.objectKey)
.build()) .build())
)) .key(state.objectKey)
// get object metadata .build())
.flatMap((response) -> { ))
checkResult(response, "completeUpload"); // get object metadata
return Mono.fromFuture(s3client.headObject( .flatMap((response) -> {
checkResult(response, "completeUpload");
return Mono.just(client.headObject(
HeadObjectRequest.builder() HeadObjectRequest.builder()
.bucket(properties.getBucket()) .bucket(properties.getBucket())
.key(objectKey) .key(uploadState.objectKey)
.build() .build()
)); ));
})
// build object detail
.map((response) -> {
checkResult(response, "getMetadata");
return new ObjectDetail(uploadState, response);
})
// close client
.doFinally((signalType) -> {
if (uploadState.needRemoveMapKey) {
uploadingFile.remove(uploadState.getUploadingMapKey());
}
});
},
SdkAutoCloseable::close);
}
private Mono<UploadState> checkFileExistsAndRename(UploadState uploadState,
S3Client s3client) {
return Mono.defer(() -> {
// deduplication of uploading files
if (uploadingFile.put(uploadState.getUploadingMapKey(),
uploadState.getUploadingMapKey()) != null) {
return Mono.error(new FileAlreadyExistsException("文件 " + uploadState.objectKey
+
" 已存在,建议更名后重试。[local]"));
}
uploadState.needRemoveMapKey = true;
// check whether file exists
return Mono.fromSupplier(() -> s3client.headObject(HeadObjectRequest.builder()
.bucket(uploadState.properties.getBucket())
.key(uploadState.objectKey)
.build()))
.onErrorResume(NoSuchKeyException.class, e -> {
var builder = HeadObjectResponse.builder();
builder.sdkHttpResponse(SdkHttpResponse.builder().statusCode(404).build());
return Mono.just(builder.build());
})
.flatMap(response -> {
if (response != null && response.sdkHttpResponse() != null
&& response.sdkHttpResponse().isSuccessful()) {
return Mono.error(
new FileAlreadyExistsException("文件 " + uploadState.objectKey
+ " 已存在,建议更名后重试。[remote]"));
} else {
return Mono.just(uploadState);
}
});
})
.retryWhen(Retry.max(3)
.filter(FileAlreadyExistsException.class::isInstance)
.doAfterRetry((retrySignal) -> {
if (uploadState.needRemoveMapKey) {
uploadingFile.remove(uploadState.getUploadingMapKey());
uploadState.needRemoveMapKey = false;
}
uploadState.randomFileName();
}) })
// build object detail )
.map((response) -> { .onErrorMap(Exceptions::isRetryExhausted,
checkResult(response, "getMetadata"); throwable -> new ServerWebInputException(throwable.getCause().getMessage()));
return new ObjectDetail(properties.getBucket(), objectKey, response);
})
// close client
.doFinally((signalType) -> {
uploadingFile.remove(uploadingMapKey);
s3client.close();
});
} }
private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer, S3AsyncClient s3client) { private Mono<CompletedPart> uploadPart(UploadState uploadState, ByteBuffer buffer,
S3Client s3client) {
final int partNumber = ++uploadState.partCounter; final int partNumber = ++uploadState.partCounter;
return Mono return Mono.just(s3client.uploadPart(UploadPartRequest.builder()
.fromFuture(s3client.uploadPart(UploadPartRequest.builder() .bucket(uploadState.properties.getBucket())
.bucket(uploadState.bucket) .key(uploadState.objectKey)
.key(uploadState.objectKey) .partNumber(partNumber)
.partNumber(partNumber) .uploadId(uploadState.uploadId)
.uploadId(uploadState.uploadId) .contentLength((long) buffer.capacity())
.contentLength((long) buffer.capacity()) .build(),
.build(), RequestBody.fromByteBuffer(buffer)))
AsyncRequestBody.fromPublisher(Mono.just(buffer)))) .map((uploadPartResult) -> {
.map((uploadPartResult) -> { checkResult(uploadPartResult, "uploadPart");
checkResult(uploadPartResult, "uploadPart"); return CompletedPart.builder()
return CompletedPart.builder() .eTag(uploadPartResult.eTag())
.eTag(uploadPartResult.eTag()) .partNumber(partNumber)
.partNumber(partNumber) .build();
.build(); });
});
} }
private static void checkResult(SdkResponse result, String operation) { private static void checkResult(SdkResponse result, String operation) {
@@ -262,9 +434,7 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
} }
ByteBuffer partData = ByteBuffer.allocate(partSize); ByteBuffer partData = ByteBuffer.allocate(partSize);
buffers.forEach((buffer) -> { buffers.forEach((buffer) -> partData.put(buffer.toByteBuffer()));
partData.put(buffer.toByteBuffer());
});
// Reset read pointer to first byte // Reset read pointer to first byte
partData.rewind(); partData.rewind();
@@ -275,28 +445,44 @@ public class S3OsAttachmentHandler implements AttachmentHandler {
boolean shouldHandle(Policy policy) { boolean shouldHandle(Policy policy) {
if (policy == null || policy.getSpec() == null || if (policy == null || policy.getSpec() == null ||
policy.getSpec().getTemplateName() == null) { policy.getSpec().getTemplateName() == null) {
return false; return false;
} }
String templateName = policy.getSpec().getTemplateName(); String templateName = policy.getSpec().getTemplateName();
return "s3os".equals(templateName); return "s3os".equals(templateName);
} }
record ObjectDetail(String bucketName, String objectKey, HeadObjectResponse objectMetadata) { record ObjectDetail(UploadState uploadState, HeadObjectResponse objectMetadata) {
} }
@Data
static class UploadState { static class UploadState {
String bucket; final S3OsProperties properties;
String objectKey; final String originalFileName;
String uploadId; String uploadId;
int partCounter; int partCounter;
Map<Integer, CompletedPart> completedParts = new HashMap<>(); Map<Integer, CompletedPart> completedParts = new HashMap<>();
int buffered = 0; int buffered = 0;
String contentType;
String fileName;
String objectKey;
boolean needRemoveMapKey = false;
UploadState(String bucket, String objectKey) { public UploadState(S3OsProperties properties, String fileName) {
this.bucket = bucket; this.properties = properties;
this.objectKey = objectKey; this.originalFileName = fileName;
this.fileName = fileName;
this.objectKey = properties.getObjectName(fileName);
this.contentType = MediaTypeFactory.getMediaType(fileName)
.orElse(MediaType.APPLICATION_OCTET_STREAM).toString();
}
public String getUploadingMapKey() {
return properties.getBucket() + "/" + objectKey;
}
public void randomFileName() {
this.fileName = FileNameUtils.randomFileName(originalFileName, 4);
this.objectKey = properties.getObjectName(fileName);
} }
} }

View File

@@ -4,8 +4,8 @@ metadata:
name: PluginS3ObjectStorage name: PluginS3ObjectStorage
spec: spec:
enabled: true enabled: true
version: 1.3.0 version: 1.4.1
requires: ">=2.0.0" requires: ">=2.5.0"
author: author:
name: longjuan name: longjuan
website: https://github.com/longjuan website: https://github.com/longjuan

View File

@@ -1,12 +1,19 @@
package run.halo.s3os; package run.halo.s3os;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.List;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import run.halo.app.core.extension.attachment.Policy; import run.halo.app.core.extension.attachment.Policy;
class S3OsAttachmentHandlerTest { class S3OsAttachmentHandlerTest {
@@ -33,4 +40,57 @@ class S3OsAttachmentHandlerTest {
// policy is null // policy is null
assertFalse(handler.shouldHandle(null)); assertFalse(handler.shouldHandle(null));
} }
@Test
void reshapeDataBufferWithSmallerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
StepVerifier.create(handler.reshape(content, 2))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("ha", str);
})
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("lo", str);
})
.verifyComplete();
}
@Test
void reshapeDataBufferWithBiggerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(factory.wrap("halo".getBytes())));
StepVerifier.create(handler.reshape(content, 10))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("halo", str);
})
.verifyComplete();
}
@Test
void reshapeDataBuffersWithBiggerBufferSize() {
var handler = new S3OsAttachmentHandler();
var factory = DefaultDataBufferFactory.sharedInstance;
var content = Flux.<DataBuffer>fromIterable(List.of(
factory.wrap("ha".getBytes()),
factory.wrap("lo".getBytes())
));
StepVerifier.create(handler.reshape(content, 3))
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("hal", str);
})
.assertNext(dataBuffer -> {
var str = dataBuffer.toString(UTF_8);
assertEquals("o", str);
})
.verifyComplete();
}
} }