mirror of
https://github.com/halo-dev/plugin-s3.git
synced 2025-10-18 08:24:32 +00:00
perf: optimize s3link interfaces using index mechanisms (#127)
* perf: optimize s3link list queries using index mechanisms
This commit is contained in:
@@ -16,7 +16,7 @@ repositories {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation platform('run.halo.tools.platform:plugin:2.12.0-SNAPSHOT')
|
||||
implementation platform('run.halo.tools.platform:plugin:2.13.0-SNAPSHOT')
|
||||
compileOnly 'run.halo.app:api'
|
||||
|
||||
implementation platform('software.amazon.awssdk:bom:2.19.8')
|
||||
|
@@ -1,13 +1,12 @@
|
||||
package run.halo.s3os;
|
||||
|
||||
import java.util.Set;
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@RequiredArgsConstructor
|
||||
public class LinkRequest {
|
||||
private String policyName;
|
||||
private List<String> objectKeys;
|
||||
private Set<String> objectKeys;
|
||||
}
|
@@ -1,30 +1,22 @@
|
||||
package run.halo.s3os;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import run.halo.app.core.extension.attachment.Attachment;
|
||||
import run.halo.app.core.extension.attachment.Policy;
|
||||
import run.halo.app.extension.ReactiveExtensionClient;
|
||||
import run.halo.app.plugin.ApiVersion;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@ApiVersion("s3os.halo.run/v1alpha1")
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
public class S3LinkController {
|
||||
private final S3LinkService s3LinkService;
|
||||
private final ReactiveExtensionClient client;
|
||||
|
||||
/**
|
||||
* Map of linking file, used as a lock, key is policyName/objectKey, value is policyName/objectKey.
|
||||
*/
|
||||
private final Map<String, Object> linkingFile = new ConcurrentHashMap<>();
|
||||
|
||||
@GetMapping("/policies/s3")
|
||||
public Flux<Policy> listS3Policies() {
|
||||
@@ -48,38 +40,7 @@ public class S3LinkController {
|
||||
|
||||
@PostMapping("/attachments/link")
|
||||
public Mono<LinkResult> addAttachmentRecord(@RequestBody LinkRequest linkRequest) {
|
||||
return Flux.fromIterable(linkRequest.getObjectKeys())
|
||||
.filter(objectKey -> linkingFile.put(linkRequest.getPolicyName() + "/" + objectKey,
|
||||
linkRequest.getPolicyName() + "/" + objectKey) == null)
|
||||
.collectList()
|
||||
.flatMap(operableObjectKeys -> client.list(Attachment.class,
|
||||
attachment -> Objects.equals(attachment.getSpec().getPolicyName(),
|
||||
linkRequest.getPolicyName())
|
||||
&& StringUtils.isNotEmpty(attachment.getMetadata().getAnnotations()
|
||||
.get(S3OsAttachmentHandler.OBJECT_KEY))
|
||||
&& linkRequest.getObjectKeys().contains(attachment.getMetadata()
|
||||
.getAnnotations().get(S3OsAttachmentHandler.OBJECT_KEY)),
|
||||
null)
|
||||
.collectList()
|
||||
.flatMap(existingAttachments -> Flux.fromIterable(linkRequest.getObjectKeys())
|
||||
.flatMap((objectKey) -> {
|
||||
if (operableObjectKeys.contains(objectKey) && existingAttachments.stream()
|
||||
.noneMatch(attachment -> Objects.equals(
|
||||
attachment.getMetadata().getAnnotations().get(
|
||||
S3OsAttachmentHandler.OBJECT_KEY), objectKey))) {
|
||||
return s3LinkService
|
||||
.addAttachmentRecord(linkRequest.getPolicyName(), objectKey)
|
||||
.onErrorResume((throwable) -> Mono.just(
|
||||
new LinkResult.LinkResultItem(objectKey, false,
|
||||
throwable.getMessage())));
|
||||
} else {
|
||||
return Mono.just(new LinkResult.LinkResultItem(objectKey, false,
|
||||
"附件库中已存在该对象"));
|
||||
}
|
||||
})
|
||||
.doOnNext(linkResultItem -> linkingFile.remove(
|
||||
linkRequest.getPolicyName() + "/" + linkResultItem.getObjectKey()))
|
||||
.collectList()
|
||||
.map(LinkResult::new)));
|
||||
return s3LinkService.addAttachmentRecords(linkRequest.getPolicyName(),
|
||||
linkRequest.getObjectKeys());
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package run.halo.s3os;
|
||||
|
||||
|
||||
import java.util.Set;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import run.halo.app.core.extension.attachment.Policy;
|
||||
@@ -11,7 +12,7 @@ public interface S3LinkService {
|
||||
Mono<S3ListResult> listObjects(String policyName, String continuationToken,
|
||||
Integer pageSize);
|
||||
|
||||
Mono<LinkResult.LinkResultItem> addAttachmentRecord(String policyName, String objectKey);
|
||||
Mono<LinkResult> addAttachmentRecords(String policyName, Set<String> objectKeys);
|
||||
|
||||
Mono<S3ListResult> listObjectsUnlinked(String policyName, String continuationToken,
|
||||
String continuationObject, Integer pageSize);
|
||||
|
@@ -4,6 +4,9 @@ import static run.halo.s3os.S3OsAttachmentHandler.OBJECT_KEY;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
@@ -23,7 +26,11 @@ import reactor.core.scheduler.Schedulers;
|
||||
import run.halo.app.core.extension.attachment.Attachment;
|
||||
import run.halo.app.core.extension.attachment.Policy;
|
||||
import run.halo.app.extension.ConfigMap;
|
||||
import run.halo.app.extension.ListOptions;
|
||||
import run.halo.app.extension.MetadataUtil;
|
||||
import run.halo.app.extension.ReactiveExtensionClient;
|
||||
import run.halo.app.extension.index.query.QueryFactory;
|
||||
import run.halo.app.extension.router.selector.FieldSelector;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
|
||||
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
|
||||
@@ -37,6 +44,11 @@ public class S3LinkServiceImpl implements S3LinkService {
|
||||
private final ReactiveExtensionClient client;
|
||||
private final S3OsAttachmentHandler handler;
|
||||
|
||||
/**
|
||||
* Map of linking file, used as a lock, key is policyName/objectKey, value is policyName/objectKey.
|
||||
*/
|
||||
private final Map<String, Object> linkingFile = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
@Override
|
||||
public Flux<Policy> listS3Policies() {
|
||||
@@ -73,9 +85,10 @@ public class S3LinkServiceImpl implements S3LinkService {
|
||||
.stream().map(S3ListResult.ObjectVo::fromS3Object)
|
||||
.filter(objectVo -> !objectVo.getKey().endsWith("/"))
|
||||
.collect(Collectors.toMap(S3ListResult.ObjectVo::getKey, o -> o));
|
||||
return client.list(Attachment.class,
|
||||
attachment -> policyName.equals(
|
||||
attachment.getSpec().getPolicyName()), null)
|
||||
ListOptions listOptions = new ListOptions();
|
||||
listOptions.setFieldSelector(
|
||||
FieldSelector.of(QueryFactory.equal("spec.policyName", policyName)));
|
||||
return client.listAll(Attachment.class, listOptions, null)
|
||||
.doOnNext(attachment -> {
|
||||
S3ListResult.ObjectVo objectVo =
|
||||
objectVos.get(attachment.getMetadata().getAnnotations()
|
||||
@@ -95,6 +108,59 @@ public class S3LinkServiceImpl implements S3LinkService {
|
||||
.onErrorMap(S3ExceptionHandler::map);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<LinkResult> addAttachmentRecords(String policyName, Set<String> objectKeys) {
|
||||
return getOperableObjectKeys(objectKeys, policyName)
|
||||
.flatMap(operableObjectKeys -> getExistingAttachments(objectKeys, policyName)
|
||||
.flatMap(existingAttachments -> getLinkResultItems(objectKeys, operableObjectKeys,
|
||||
existingAttachments, policyName)
|
||||
.collectList()
|
||||
.map(LinkResult::new)));
|
||||
}
|
||||
|
||||
private Mono<Set<String>> getOperableObjectKeys(Set<String> objectKeys, String policyName) {
|
||||
return Flux.fromIterable(objectKeys)
|
||||
.filter(objectKey ->
|
||||
linkingFile.put(policyName + "/" + objectKey, policyName + "/" + objectKey) == null)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private Mono<Set<String>> getExistingAttachments(Set<String> objectKeys,
|
||||
String policyName) {
|
||||
ListOptions listOptions = new ListOptions();
|
||||
listOptions.setFieldSelector(
|
||||
FieldSelector.of(QueryFactory.equal("spec.policyName", policyName)));
|
||||
return client.listAll(Attachment.class, listOptions, null)
|
||||
.filter(attachment -> StringUtils.isNotBlank(
|
||||
MetadataUtil.nullSafeAnnotations(attachment).get(S3OsAttachmentHandler.OBJECT_KEY))
|
||||
&& objectKeys.contains(
|
||||
MetadataUtil.nullSafeAnnotations(attachment).get(S3OsAttachmentHandler.OBJECT_KEY)))
|
||||
.map(attachment -> MetadataUtil.nullSafeAnnotations(attachment)
|
||||
.get(S3OsAttachmentHandler.OBJECT_KEY))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private Flux<LinkResult.LinkResultItem> getLinkResultItems(Set<String> objectKeys,
|
||||
Set<String> operableObjectKeys,
|
||||
Set<String> existingAttachments,
|
||||
String policyName) {
|
||||
return Flux.fromIterable(objectKeys)
|
||||
.flatMap((objectKey) -> {
|
||||
if (operableObjectKeys.contains(objectKey) &&
|
||||
!existingAttachments.contains(objectKey)) {
|
||||
return addAttachmentRecord(policyName, objectKey)
|
||||
.onErrorResume((throwable) -> Mono.just(
|
||||
new LinkResult.LinkResultItem(objectKey, false,
|
||||
throwable.getMessage())));
|
||||
} else {
|
||||
return Mono.just(
|
||||
new LinkResult.LinkResultItem(objectKey, false, "附件库中已存在该对象"));
|
||||
}
|
||||
})
|
||||
.doFinally(signalType -> operableObjectKeys.forEach(
|
||||
objectKey -> linkingFile.remove(policyName + "/" + objectKey)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<S3ListResult> listObjectsUnlinked(String policyName, String continuationToken,
|
||||
String continuationObject, Integer pageSize) {
|
||||
@@ -151,8 +217,6 @@ public class S3LinkServiceImpl implements S3LinkService {
|
||||
record TokenState(String currToken, String nextToken) {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<LinkResult.LinkResultItem> addAttachmentRecord(String policyName,
|
||||
String objectKey) {
|
||||
return authenticationConsumer(authentication -> client.fetch(Policy.class, policyName)
|
||||
|
@@ -4,7 +4,7 @@ metadata:
|
||||
name: PluginS3ObjectStorage
|
||||
spec:
|
||||
enabled: true
|
||||
requires: ">=2.12.0"
|
||||
requires: ">=2.13.0"
|
||||
author:
|
||||
name: Halo OSS Team
|
||||
website: https://github.com/halo-dev
|
||||
|
Reference in New Issue
Block a user