mirror of
https://gitee.com/dromara/RuoYi-Cloud-Plus.git
synced 2025-09-04 03:26:31 +00:00
docs 删除多余dubbo注释修改
This commit is contained in:
@@ -49,9 +49,6 @@ import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROXY_FAILED
|
||||
import static org.apache.dubbo.common.utils.StringUtils.replace;
|
||||
import static org.apache.dubbo.metadata.report.support.Constants.*;
|
||||
|
||||
/**
|
||||
* 抽象的元数据上报实现类,实现了元数据上报的基本操作
|
||||
*/
|
||||
public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
|
||||
protected static final String DEFAULT_ROOT = "dubbo";
|
||||
@@ -61,7 +58,8 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
// Log output
|
||||
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
|
||||
|
||||
// 本地磁盘缓存,特定键值 registries 记录元数据中心列表,其他是通知的服务提供者列表
|
||||
// Local disk cache, where the special key value.registries records the list of metadata centers, and the others are
|
||||
// the list of notified service providers
|
||||
final Properties properties = new Properties();
|
||||
private final ExecutorService reportCacheExecutor =
|
||||
Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true));
|
||||
@@ -71,7 +69,7 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
final Map<MetadataIdentifier, Object> failedReports = new ConcurrentHashMap<>(4);
|
||||
private URL reportURL;
|
||||
boolean syncReport;
|
||||
// 本地磁盘缓存文件
|
||||
// Local disk cache file
|
||||
File file;
|
||||
private AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
public MetadataReportRetry metadataReportRetry;
|
||||
@@ -81,30 +79,29 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
private final boolean reportDefinition;
|
||||
protected ApplicationModel applicationModel;
|
||||
|
||||
/**
|
||||
* 构造方法,初始化元数据上报实现类
|
||||
*
|
||||
* @param reportServerURL 元数据上报服务器的URL
|
||||
*/
|
||||
public AbstractMetadataReport(URL reportServerURL) {
|
||||
setUrl(reportServerURL);
|
||||
applicationModel = reportServerURL.getOrDefaultApplicationModel();
|
||||
|
||||
boolean localCacheEnabled = reportServerURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true);
|
||||
// 启动文件保存定时器
|
||||
// Start file save timer
|
||||
String defaultFilename = System.getProperty("user.home") + DUBBO_METADATA
|
||||
+ reportServerURL.getApplication() + "-" + replace(reportServerURL.getAddress(), ":", "-") + CACHE;
|
||||
+ reportServerURL.getApplication()
|
||||
+ "-" + replace(reportServerURL.getAddress(), ":", "-")
|
||||
+ CACHE;
|
||||
String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
|
||||
File file = null;
|
||||
if (localCacheEnabled && ConfigUtils.isNotEmpty(filename)) {
|
||||
file = new File(filename);
|
||||
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
|
||||
if (!file.exists()
|
||||
&& file.getParentFile() != null
|
||||
&& !file.getParentFile().exists()) {
|
||||
if (!file.getParentFile().mkdirs()) {
|
||||
throw new IllegalArgumentException("Invalid service store file " + file
|
||||
+ ", cause: Failed to create directory " + file.getParentFile() + "!");
|
||||
}
|
||||
}
|
||||
// 如果文件存在,首先删除它
|
||||
// if this file exists, firstly delete it.
|
||||
if (!initialized.getAndSet(true) && file.exists()) {
|
||||
file.delete();
|
||||
}
|
||||
@@ -115,31 +112,22 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
metadataReportRetry = new MetadataReportRetry(
|
||||
reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
|
||||
reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
|
||||
|
||||
// 循环上报数据开关
|
||||
// cycle report the data switch
|
||||
if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
|
||||
reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
|
||||
reportTimerScheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
|
||||
reportTimerScheduler = Executors.newSingleThreadScheduledExecutor(
|
||||
new NamedThreadFactory("DubboMetadataReportTimer", true));
|
||||
reportTimerScheduler.scheduleAtFixedRate(
|
||||
this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
this.reportMetadata = reportServerURL.getParameter(REPORT_METADATA_KEY, false);
|
||||
this.reportDefinition = reportServerURL.getParameter(REPORT_DEFINITION_KEY, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取元数据上报服务器的URL
|
||||
*
|
||||
* @return 元数据上报服务器的URL
|
||||
*/
|
||||
public URL getUrl() {
|
||||
return reportURL;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置元数据上报服务器的URL
|
||||
*
|
||||
* @param url 元数据上报服务器的URL
|
||||
*/
|
||||
protected void setUrl(URL url) {
|
||||
if (url == null) {
|
||||
throw new IllegalArgumentException("metadataReport url == null");
|
||||
@@ -147,11 +135,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
this.reportURL = url;
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行保存属性操作,将属性持久化到本地磁盘缓存文件中
|
||||
*
|
||||
* @param version 缓存版本号
|
||||
*/
|
||||
private void doSaveProperties(long version) {
|
||||
if (version < lastCacheChanged.get()) {
|
||||
return;
|
||||
@@ -159,7 +142,7 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
if (file == null) {
|
||||
return;
|
||||
}
|
||||
// 保存操作
|
||||
// Save
|
||||
try {
|
||||
File lockfile = new File(file.getAbsolutePath() + ".lock");
|
||||
if (!lockfile.exists()) {
|
||||
@@ -173,7 +156,7 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
"Can not lock the metadataReport cache file " + file.getAbsolutePath()
|
||||
+ ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
|
||||
}
|
||||
// 保存
|
||||
// Save
|
||||
try {
|
||||
if (!file.exists()) {
|
||||
file.createNewFile();
|
||||
@@ -181,10 +164,12 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
|
||||
Properties tmpProperties;
|
||||
if (!syncReport) {
|
||||
// 当 syncReport = false 时,从同一个线程(reportCacheExecutor)中调用 properties.setProperty 和 properties.store,因此不需要深度复制
|
||||
// When syncReport = false, properties.setProperty and properties.store are called from the same
|
||||
// thread(reportCacheExecutor), so deep copy is not required
|
||||
tmpProperties = properties;
|
||||
} else {
|
||||
// 使用 store 方法和 this.properties 的 setProperty 方法会在多线程环境下引起锁竞争,因此需要深度复制一个新的容器
|
||||
// Using store method and setProperty method of the this.properties will cause lock contention
|
||||
// under multi-threading, so deep copy a new container
|
||||
tmpProperties = new Properties();
|
||||
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
|
||||
for (Map.Entry<Object, Object> entry : entries) {
|
||||
@@ -205,14 +190,15 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
} else {
|
||||
reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
|
||||
}
|
||||
logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "",
|
||||
"Failed to save service store file, cause: " + e.getMessage(), e);
|
||||
logger.warn(
|
||||
COMMON_UNEXPECTED_EXCEPTION,
|
||||
"",
|
||||
"",
|
||||
"Failed to save service store file, cause: " + e.getMessage(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 加载本地磁盘缓存文件中的属性
|
||||
*/
|
||||
void loadProperties() {
|
||||
if (file != null && file.exists()) {
|
||||
try (InputStream in = new FileInputStream(file)) {
|
||||
@@ -226,14 +212,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将元数据信息保存到本地文件中
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符,用于唯一标识元数据信息
|
||||
* @param value 要保存的元数据信息的字符串表示
|
||||
* @param add 是否添加元数据信息,如果为 true,则添加;否则,移除
|
||||
* @param sync 是否同步保存,如果为 true,则同步保存;否则,异步保存
|
||||
*/
|
||||
private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
|
||||
if (file == null) {
|
||||
return;
|
||||
@@ -241,19 +219,14 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
|
||||
try {
|
||||
if (add) {
|
||||
// 添加元数据信息到 properties 中
|
||||
properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
|
||||
} else {
|
||||
// 移除指定的元数据信息
|
||||
properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||
}
|
||||
// 更新缓存变化版本号
|
||||
long version = lastCacheChanged.incrementAndGet();
|
||||
if (sync) {
|
||||
// 同步保存属性到文件
|
||||
new SaveProperties(version).run();
|
||||
} else {
|
||||
// 异步执行保存属性到文件任务
|
||||
reportCacheExecutor.execute(new SaveProperties(version));
|
||||
}
|
||||
|
||||
@@ -267,9 +240,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
return getUrl().toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* 内部类,实现了 `Runnable` 接口,用于保存属性到本地文件
|
||||
*/
|
||||
private class SaveProperties implements Runnable {
|
||||
private long version;
|
||||
|
||||
@@ -283,14 +253,9 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存属性到本地磁盘缓存文件中
|
||||
*
|
||||
* @param providerMetadataIdentifier 提供者元数据标识符
|
||||
* @param serviceDefinition 要存储的服务定义
|
||||
*/
|
||||
@Override
|
||||
public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||
public void storeProviderMetadata(
|
||||
MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||
if (syncReport) {
|
||||
storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition);
|
||||
} else {
|
||||
@@ -298,43 +263,33 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步任务:存储服务提供者元数据的任务
|
||||
*
|
||||
* @param providerMetadataIdentifier 提供者元数据的标识符
|
||||
* @param serviceDefinition 服务定义对象
|
||||
*/
|
||||
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||
// 将元数据事件转换为服务订阅事件
|
||||
MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent(applicationModel, providerMetadataIdentifier.getUniqueServiceName());
|
||||
private void storeProviderMetadataTask(
|
||||
MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||||
|
||||
// 发布元数据事件到指标事件总线,执行回调任务
|
||||
MetadataEvent metadataEvent = MetadataEvent.toServiceSubscribeEvent(
|
||||
applicationModel, providerMetadataIdentifier.getUniqueServiceName());
|
||||
MetricsEventBus.post(
|
||||
metadataEvent,
|
||||
() -> {
|
||||
boolean result = true;
|
||||
try {
|
||||
// 记录日志:存储服务提供者元数据
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("store provider metadata. Identifier : " + providerMetadataIdentifier
|
||||
+ "; definition: " + serviceDefinition);
|
||||
}
|
||||
|
||||
// 将服务定义对象放入所有元数据报告的缓存中,移除失败的报告
|
||||
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
|
||||
failedReports.remove(providerMetadataIdentifier);
|
||||
|
||||
// 将服务定义对象转换为 JSON 字符串并存储到元数据存储中
|
||||
String data = JsonUtils.toJson(serviceDefinition);
|
||||
doStoreProviderMetadata(providerMetadataIdentifier, data);
|
||||
|
||||
// 保存属性变更到本地属性缓存
|
||||
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
|
||||
} catch (Exception e) {
|
||||
// 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务
|
||||
// retry again. If failed again, throw exception.
|
||||
failedReports.put(providerMetadataIdentifier, serviceDefinition);
|
||||
metadataReportRetry.startRetryTask();
|
||||
logger.error(PROXY_FAILED_EXPORT_SERVICE, "", "",
|
||||
logger.error(
|
||||
PROXY_FAILED_EXPORT_SERVICE,
|
||||
"",
|
||||
"",
|
||||
"Failed to put provider metadata " + providerMetadataIdentifier + " in "
|
||||
+ serviceDefinition + ", cause: " + e.getMessage(),
|
||||
e);
|
||||
@@ -345,48 +300,32 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
aBoolean -> aBoolean);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储消费者元数据
|
||||
* 如果同步报告开关打开,则直接调用同步方法存储;否则,通过线程池异步执行存储任务
|
||||
*
|
||||
* @param consumerMetadataIdentifier 消费者元数据的标识符
|
||||
* @param serviceParameterMap 服务参数映射表
|
||||
*/
|
||||
@Override
|
||||
public void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||
public void storeConsumerMetadata(
|
||||
MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||
if (syncReport) {
|
||||
storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap);
|
||||
} else {
|
||||
reportCacheExecutor.execute(() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap));
|
||||
reportCacheExecutor.execute(
|
||||
() -> storeConsumerMetadataTask(consumerMetadataIdentifier, serviceParameterMap));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步任务:存储消费者元数据的任务
|
||||
*
|
||||
* @param consumerMetadataIdentifier 消费者元数据的标识符
|
||||
* @param serviceParameterMap 服务参数映射表
|
||||
*/
|
||||
protected void storeConsumerMetadataTask(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||
protected void storeConsumerMetadataTask(
|
||||
MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap) {
|
||||
try {
|
||||
// 记录日志:存储消费者元数据
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("store consumer metadata. Identifier : " + consumerMetadataIdentifier + "; definition: "
|
||||
+ serviceParameterMap);
|
||||
}
|
||||
|
||||
// 将服务参数映射表放入所有元数据报告的缓存中,移除失败的报告
|
||||
allMetadataReports.put(consumerMetadataIdentifier, serviceParameterMap);
|
||||
failedReports.remove(consumerMetadataIdentifier);
|
||||
|
||||
// 将服务参数映射表转换为 JSON 字符串并存储到元数据存储中
|
||||
String data = JsonUtils.toJson(serviceParameterMap);
|
||||
doStoreConsumerMetadata(consumerMetadataIdentifier, data);
|
||||
|
||||
// 保存属性变更到本地属性缓存
|
||||
saveProperties(consumerMetadataIdentifier, data, true, !syncReport);
|
||||
} catch (Exception e) {
|
||||
// 如果存储失败,记录错误日志,加入失败的报告列表,并启动重试任务
|
||||
// retry again. If failed again, throw exception.
|
||||
failedReports.put(consumerMetadataIdentifier, serviceParameterMap);
|
||||
metadataReportRetry.startRetryTask();
|
||||
logger.error(
|
||||
@@ -399,34 +338,20 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 销毁方法,用于释放资源和关闭相关任务调度器
|
||||
*/
|
||||
@Override
|
||||
public void destroy() {
|
||||
// 关闭报告缓存执行器
|
||||
if (reportCacheExecutor != null) {
|
||||
reportCacheExecutor.shutdown();
|
||||
}
|
||||
|
||||
// 关闭报告定时调度器
|
||||
if (reportTimerScheduler != null) {
|
||||
reportTimerScheduler.shutdown();
|
||||
}
|
||||
|
||||
// 销毁元数据报告重试管理器,并置空引用
|
||||
if (metadataReportRetry != null) {
|
||||
metadataReportRetry.destroy();
|
||||
metadataReportRetry = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存服务元数据。根据同步设置,同步执行或通过报告缓存执行保存操作
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识符
|
||||
* @param url 服务的URL
|
||||
*/
|
||||
@Override
|
||||
public void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) {
|
||||
if (syncReport) {
|
||||
@@ -436,11 +361,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除服务元数据。根据同步设置,同步执行或通过报告缓存执行移除操作
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识符
|
||||
*/
|
||||
@Override
|
||||
public void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier) {
|
||||
if (syncReport) {
|
||||
@@ -450,51 +370,28 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取导出的URL列表。如果未能获取,则回退到本地缓存
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识符
|
||||
* @return 导出的URL列表
|
||||
*/
|
||||
@Override
|
||||
public List<String> getExportedURLs(ServiceMetadataIdentifier metadataIdentifier) {
|
||||
// TODO 回退到本地缓存
|
||||
// TODO, fallback to local cache
|
||||
return doGetExportedURLs(metadataIdentifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储订阅的数据。如果同步报告开启,则直接存储订阅数据;否则,将异步执行存储操作
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅元数据标识符
|
||||
* @param urls 订阅的URL集合
|
||||
*/
|
||||
@Override
|
||||
public void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set<String> urls) {
|
||||
if (syncReport) {
|
||||
doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls));
|
||||
} else {
|
||||
reportCacheExecutor.execute(() -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)));
|
||||
reportCacheExecutor.execute(
|
||||
() -> doSaveSubscriberData(subscriberMetadataIdentifier, JsonUtils.toJson(urls)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取订阅的URL列表
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅元数据标识符
|
||||
* @return 订阅的URL列表
|
||||
*/
|
||||
@Override
|
||||
public List<String> getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) {
|
||||
String content = doGetSubscribedURLs(subscriberMetadataIdentifier);
|
||||
return JsonUtils.toJavaList(content, String.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取URL的协议
|
||||
*
|
||||
* @param url URL对象
|
||||
* @return URL的协议
|
||||
*/
|
||||
String getProtocol(URL url) {
|
||||
String protocol = url.getSide();
|
||||
protocol = protocol == null ? url.getProtocol() : protocol;
|
||||
@@ -502,52 +399,33 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否需要重试处理元数据集合
|
||||
*
|
||||
* @return 如果需要继续重试,则返回true;否则返回false
|
||||
* @return if need to continue
|
||||
*/
|
||||
public boolean retry() {
|
||||
return doHandleMetadataCollection(failedReports);
|
||||
}
|
||||
|
||||
/**
|
||||
* 指示是否应报告定义
|
||||
*
|
||||
* @return 如果应报告定义,则返回true;否则返回false
|
||||
*/
|
||||
@Override
|
||||
public boolean shouldReportDefinition() {
|
||||
return reportDefinition;
|
||||
}
|
||||
|
||||
/**
|
||||
* 指示是否应报告元数据
|
||||
*
|
||||
* @return 如果应报告元数据,则返回true;否则返回false
|
||||
*/
|
||||
@Override
|
||||
public boolean shouldReportMetadata() {
|
||||
return reportMetadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理元数据集合的方法,根据元数据的侧边(提供者或消费者)将其存储到相应的位置
|
||||
*
|
||||
* @param metadataMap 元数据映射,包含要处理的元数据标识符和相应的对象
|
||||
* @return 如果处理完毕后需要继续重试,则返回true;否则返回false
|
||||
*/
|
||||
private boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
|
||||
if (metadataMap.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable = metadataMap.entrySet().iterator();
|
||||
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable =
|
||||
metadataMap.entrySet().iterator();
|
||||
while (iterable.hasNext()) {
|
||||
Map.Entry<MetadataIdentifier, Object> item = iterable.next();
|
||||
if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
|
||||
// 如果是提供者侧的元数据,则存储为完整的服务定义对象
|
||||
this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
|
||||
} else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
|
||||
// 如果是消费者侧的元数据,则存储为参数映射
|
||||
this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
|
||||
}
|
||||
}
|
||||
@@ -555,8 +433,7 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 用于单元测试的方法,不是私有方法
|
||||
* 发布所有元数据到相应的处理方法
|
||||
* not private. just for unittest.
|
||||
*/
|
||||
void publishAll() {
|
||||
logger.info("start to publish all metadata.");
|
||||
@@ -564,14 +441,9 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算一个起始时间,用于设置定时任务的启动时间
|
||||
* 时间计算逻辑包括:
|
||||
* 1. 获取当前时间的毫秒数
|
||||
* 2. 将日历设置为当天的午夜(00:00:00.000)
|
||||
* 3. 计算当前时间到午夜的毫秒数差
|
||||
* 4. 加上一定的偏移量,包括四小时的一半和一个四小时内的随机毫秒数
|
||||
* between 2:00 am to 6:00 am, the time is random.
|
||||
*
|
||||
* @return 计算得到的起始时间
|
||||
* @return
|
||||
*/
|
||||
long calculateStartTime() {
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
@@ -586,77 +458,42 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
+ ThreadLocalRandom.current().nextInt(FOUR_HOURS_IN_MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* MetadataReportRetry 类用于处理元数据报告的重试机制
|
||||
*/
|
||||
class MetadataReportRetry {
|
||||
protected final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(getClass());
|
||||
|
||||
/**
|
||||
* 用于执行定时重试任务的调度执行器服务
|
||||
*/
|
||||
final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true));
|
||||
|
||||
/**
|
||||
* 用于取消重试任务的计划执行句柄
|
||||
*/
|
||||
final ScheduledExecutorService retryExecutor =
|
||||
Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true));
|
||||
volatile ScheduledFuture retryScheduledFuture;
|
||||
|
||||
/**
|
||||
* 重试次数计数器,用于记录已经进行的重试次数
|
||||
*/
|
||||
final AtomicInteger retryCounter = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* 重试任务的执行周期,以毫秒为单位
|
||||
*/
|
||||
// retry task schedule period
|
||||
long retryPeriod;
|
||||
|
||||
/**
|
||||
* 当没有失败报告时,等待多少次运行重试任务
|
||||
*/
|
||||
// if no failed report, wait how many times to run retry task.
|
||||
int retryTimesIfNonFail = 600;
|
||||
|
||||
/**
|
||||
* 重试限制次数,达到此次数后不再继续重试
|
||||
*/
|
||||
int retryLimit;
|
||||
|
||||
/**
|
||||
* 构造函数,初始化重试次数和重试周期
|
||||
*
|
||||
* @param retryTimes 重试次数限制
|
||||
* @param retryPeriod 重试周期(毫秒)
|
||||
*/
|
||||
public MetadataReportRetry(int retryTimes, int retryPeriod) {
|
||||
this.retryPeriod = retryPeriod;
|
||||
this.retryLimit = retryTimes;
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动重试任务,如果未启动则执行定时重试
|
||||
*/
|
||||
void startRetryTask() {
|
||||
if (retryScheduledFuture == null) {
|
||||
synchronized (retryCounter) {
|
||||
if (retryScheduledFuture == null) {
|
||||
retryScheduledFuture = retryExecutor.scheduleWithFixedDelay(
|
||||
() -> {
|
||||
// 检查并连接到元数据
|
||||
// Check and connect to the metadata
|
||||
try {
|
||||
int times = retryCounter.incrementAndGet();
|
||||
logger.info("start to retry task for metadata report. retry times:" + times);
|
||||
|
||||
// 执行重试操作,如果无失败报告并且超过指定重试次数,则取消重试任务
|
||||
if (retry() && times > retryTimesIfNonFail) {
|
||||
cancelRetryTask();
|
||||
}
|
||||
|
||||
// 如果超过重试限制次数,则取消重试任务
|
||||
if (times > retryLimit) {
|
||||
cancelRetryTask();
|
||||
}
|
||||
} catch (Throwable t) { // 防御性容错处理
|
||||
} catch (Throwable t) { // Defensive fault tolerance
|
||||
logger.error(
|
||||
COMMON_UNEXPECTED_EXCEPTION,
|
||||
"",
|
||||
@@ -673,9 +510,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消重试任务。如果存在已计划的任务,则取消并关闭重试执行器
|
||||
*/
|
||||
void cancelRetryTask() {
|
||||
if (retryScheduledFuture != null) {
|
||||
retryScheduledFuture.cancel(false);
|
||||
@@ -683,17 +517,12 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
retryExecutor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 销毁操作。调用取消重试任务方法以确保所有任务被取消
|
||||
*/
|
||||
void destroy() {
|
||||
cancelRetryTask();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取重试执行器实例,仅用于测试目的
|
||||
*
|
||||
* @deprecated 仅用于测试
|
||||
* @deprecated only for test
|
||||
*/
|
||||
@Deprecated
|
||||
ScheduledExecutorService getRetryExecutor() {
|
||||
@@ -701,13 +530,6 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将订阅者数据保存到持久存储中。如果 URL 列表为空,则直接返回
|
||||
* 对 URL 列表进行编码后保存。
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅者元数据标识
|
||||
* @param urls URL 列表
|
||||
*/
|
||||
private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List<String> urls) {
|
||||
if (CollectionUtils.isEmpty(urls)) {
|
||||
return;
|
||||
@@ -719,66 +541,25 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
doSaveSubscriberData(subscriberMetadataIdentifier, encodedUrlList);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储提供者元数据信息的抽象方法。由子类实现具体存储逻辑
|
||||
*
|
||||
* @param providerMetadataIdentifier 提供者元数据标识
|
||||
* @param serviceDefinitions 服务定义信息字符串
|
||||
*/
|
||||
protected abstract void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions);
|
||||
protected abstract void doStoreProviderMetadata(
|
||||
MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions);
|
||||
|
||||
/**
|
||||
* 存储消费者元数据信息的抽象方法。由子类实现具体存储逻辑
|
||||
*
|
||||
* @param consumerMetadataIdentifier 消费者元数据标识
|
||||
* @param serviceParameterString 服务参数字符串
|
||||
*/
|
||||
protected abstract void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString);
|
||||
protected abstract void doStoreConsumerMetadata(
|
||||
MetadataIdentifier consumerMetadataIdentifier, String serviceParameterString);
|
||||
|
||||
/**
|
||||
* 存储服务元数据信息的抽象方法。由子类实现具体存储逻辑
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识
|
||||
* @param url URL 对象
|
||||
*/
|
||||
protected abstract void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url);
|
||||
|
||||
/**
|
||||
* 删除服务元数据信息的抽象方法。由子类实现具体删除逻辑
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识
|
||||
*/
|
||||
protected abstract void doRemoveMetadata(ServiceMetadataIdentifier metadataIdentifier);
|
||||
|
||||
/**
|
||||
* 获取导出的 URL 列表的抽象方法。由子类实现具体获取逻辑
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识
|
||||
* @return 导出的 URL 列表
|
||||
*/
|
||||
protected abstract List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier);
|
||||
|
||||
/**
|
||||
* 存储订阅者数据的抽象方法。由子类实现具体存储逻辑
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅者元数据标识
|
||||
* @param urlListStr URL 列表的 JSON 字符串形式
|
||||
*/
|
||||
protected abstract void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr);
|
||||
protected abstract void doSaveSubscriberData(
|
||||
SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr);
|
||||
|
||||
/**
|
||||
* 获取订阅的 URL 列表的抽象方法。由子类实现具体获取逻辑
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅者元数据标识
|
||||
* @return 订阅的 URL 列表的 JSON 字符串形式
|
||||
*/
|
||||
protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier);
|
||||
|
||||
/**
|
||||
* 获取报告缓存执行器的方法。仅供单元测试使用
|
||||
*
|
||||
* @return 报告缓存执行器
|
||||
* @deprecated 仅供单元测试使用
|
||||
* @deprecated only for unit test
|
||||
*/
|
||||
@Deprecated
|
||||
protected ExecutorService getReportCacheExecutor() {
|
||||
@@ -786,10 +567,7 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取元数据报告重试管理器的方法。仅供单元测试使用
|
||||
*
|
||||
* @return 元数据报告重试管理器
|
||||
* @deprecated 仅供单元测试使用
|
||||
* @deprecated only for unit test
|
||||
*/
|
||||
@Deprecated
|
||||
protected MetadataReportRetry getMetadataReportRetry() {
|
||||
|
@@ -16,7 +16,6 @@
|
||||
*/
|
||||
package org.apache.dubbo.metadata.store.redis;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.config.configcenter.ConfigItem;
|
||||
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
|
||||
@@ -29,17 +28,40 @@ import org.apache.dubbo.metadata.MappingChangedEvent;
|
||||
import org.apache.dubbo.metadata.MappingListener;
|
||||
import org.apache.dubbo.metadata.MetadataInfo;
|
||||
import org.apache.dubbo.metadata.ServiceNameMapping;
|
||||
import org.apache.dubbo.metadata.report.identifier.*;
|
||||
import org.apache.dubbo.metadata.report.identifier.BaseMetadataIdentifier;
|
||||
import org.apache.dubbo.metadata.report.identifier.KeyTypeEnum;
|
||||
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
|
||||
import org.apache.dubbo.metadata.report.identifier.ServiceMetadataIdentifier;
|
||||
import org.apache.dubbo.metadata.report.identifier.SubscriberMetadataIdentifier;
|
||||
import org.apache.dubbo.metadata.report.support.AbstractMetadataReport;
|
||||
import org.apache.dubbo.rpc.RpcException;
|
||||
import redis.clients.jedis.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.JedisCluster;
|
||||
import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
import redis.clients.jedis.JedisPubSub;
|
||||
import redis.clients.jedis.Transaction;
|
||||
import redis.clients.jedis.params.SetParams;
|
||||
import redis.clients.jedis.util.JedisClusterCRC16;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.*;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CYCLE_REPORT_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.QUEUES_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
|
||||
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RESPONSE;
|
||||
import static org.apache.dubbo.metadata.MetadataConstants.META_DATA_STORE_TAG;
|
||||
import static org.apache.dubbo.metadata.ServiceNameMapping.DEFAULT_MAPPING_GROUP;
|
||||
@@ -47,40 +69,31 @@ import static org.apache.dubbo.metadata.ServiceNameMapping.getAppNames;
|
||||
import static org.apache.dubbo.metadata.report.support.Constants.DEFAULT_METADATA_REPORT_CYCLE_REPORT;
|
||||
|
||||
/**
|
||||
* RedisMetadataReport 是基于 Redis 的元数据报告实现类
|
||||
* RedisMetadataReport
|
||||
*/
|
||||
public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
|
||||
private static final String REDIS_DATABASE_KEY = "database";
|
||||
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RedisMetadataReport.class);
|
||||
// 受保护的 JedisPool 实例,用于测试
|
||||
|
||||
// protected , for test
|
||||
protected JedisPool pool;
|
||||
// Redis 集群节点集合
|
||||
private Set<HostAndPort> jedisClusterNodes;
|
||||
private int timeout;
|
||||
private String password;
|
||||
private final String root;
|
||||
// 映射数据监听器映射表
|
||||
private final ConcurrentHashMap<String, MappingDataListener> mappingDataListenerMap = new ConcurrentHashMap<>();
|
||||
private SetParams jedisParams = SetParams.setParams();
|
||||
|
||||
/**
|
||||
* 构造方法,根据给定的 URL 初始化 RedisMetadataReport
|
||||
*
|
||||
* @param url 元数据中心的 URL
|
||||
*/
|
||||
public RedisMetadataReport(URL url) {
|
||||
super(url);
|
||||
timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
|
||||
password = url.getPassword();
|
||||
this.root = url.getGroup(DEFAULT_ROOT);
|
||||
|
||||
// 设置默认的周期性报告时间
|
||||
if (url.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
|
||||
// TTL 默认是周期报告时间的两倍
|
||||
// ttl default is twice the cycle-report time
|
||||
jedisParams.ex(ONE_DAY_IN_MILLISECONDS * 2);
|
||||
}
|
||||
|
||||
// 判断是否为集群模式
|
||||
if (url.getParameter(CLUSTER_KEY, false)) {
|
||||
jedisClusterNodes = new HashSet<>();
|
||||
List<URL> urls = url.getBackupUrls();
|
||||
@@ -88,61 +101,31 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
jedisClusterNodes.add(new HostAndPort(tmpUrl.getHost(), tmpUrl.getPort()));
|
||||
}
|
||||
} else {
|
||||
// 单机模式下的 Redis 数据库编号,默认为 0
|
||||
int database = url.getParameter(REDIS_DATABASE_KEY, 0);
|
||||
pool = new JedisPool(new JedisPoolConfig(), url.getHost(), url.getPort(), timeout, password, database);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储提供者元数据的具体实现
|
||||
*
|
||||
* @param providerMetadataIdentifier 提供者元数据标识符
|
||||
* @param serviceDefinitions 服务定义信息
|
||||
*/
|
||||
@Override
|
||||
protected void doStoreProviderMetadata(MetadataIdentifier providerMetadataIdentifier, String serviceDefinitions) {
|
||||
this.storeMetadata(providerMetadataIdentifier, serviceDefinitions);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储消费者元数据的具体实现
|
||||
*
|
||||
* @param consumerMetadataIdentifier 消费者元数据标识符
|
||||
* @param value 元数据值
|
||||
*/
|
||||
@Override
|
||||
protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, String value) {
|
||||
this.storeMetadata(consumerMetadataIdentifier, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储服务元数据的具体实现
|
||||
*
|
||||
* @param serviceMetadataIdentifier 服务元数据标识符
|
||||
* @param url 服务URL编码后的完整字符串
|
||||
*/
|
||||
@Override
|
||||
protected void doSaveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier, URL url) {
|
||||
this.storeMetadata(serviceMetadataIdentifier, URL.encode(url.toFullString()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除元数据的具体实现
|
||||
*
|
||||
* @param serviceMetadataIdentifier 服务元数据标识符
|
||||
*/
|
||||
@Override
|
||||
protected void doRemoveMetadata(ServiceMetadataIdentifier serviceMetadataIdentifier) {
|
||||
this.deleteMetadata(serviceMetadataIdentifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取导出的URL列表
|
||||
*
|
||||
* @param metadataIdentifier 服务元数据标识符
|
||||
* @return 导出的URL列表,如果内容为空则返回空列表
|
||||
*/
|
||||
@Override
|
||||
protected List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdentifier) {
|
||||
String content = getMetadata(metadataIdentifier);
|
||||
@@ -152,45 +135,21 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
return new ArrayList<>(Arrays.asList(URL.decode(content)));
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储订阅者数据的具体实现
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅者元数据标识符
|
||||
* @param urlListStr URL列表字符串
|
||||
*/
|
||||
@Override
|
||||
protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urlListStr) {
|
||||
this.storeMetadata(subscriberMetadataIdentifier, urlListStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取订阅的URL列表
|
||||
*
|
||||
* @param subscriberMetadataIdentifier 订阅者元数据标识符
|
||||
* @return 订阅的URL列表
|
||||
*/
|
||||
@Override
|
||||
protected String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier) {
|
||||
return this.getMetadata(subscriberMetadataIdentifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取服务定义
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @return 服务定义内容
|
||||
*/
|
||||
@Override
|
||||
public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
|
||||
return this.getMetadata(metadataIdentifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* 存储元数据的通用方法,根据是否有连接池选择存储方式
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @param v 元数据值
|
||||
*/
|
||||
private void storeMetadata(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
if (pool != null) {
|
||||
storeMetadataStandalone(metadataIdentifier, v);
|
||||
@@ -199,12 +158,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在集群模式下存储元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @param v 元数据值
|
||||
*/
|
||||
private void storeMetadataInCluster(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
@@ -217,12 +170,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在单机模式下存储元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @param v 元数据值
|
||||
*/
|
||||
private void storeMetadataStandalone(BaseMetadataIdentifier metadataIdentifier, String v) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
jedis.set(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), v, jedisParams);
|
||||
@@ -233,11 +180,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
*/
|
||||
private void deleteMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||
if (pool != null) {
|
||||
deleteMetadataStandalone(metadataIdentifier);
|
||||
@@ -246,11 +188,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在集群模式下删除元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
*/
|
||||
private void deleteMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
@@ -262,11 +199,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在单机模式下删除元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
*/
|
||||
private void deleteMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
jedis.del(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||
@@ -277,12 +209,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @return 元数据值
|
||||
*/
|
||||
private String getMetadata(BaseMetadataIdentifier metadataIdentifier) {
|
||||
if (pool != null) {
|
||||
return getMetadataStandalone(metadataIdentifier);
|
||||
@@ -291,12 +217,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在集群模式下获取元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @return 元数据值
|
||||
*/
|
||||
private String getMetadataInCluster(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
@@ -308,12 +228,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在单机模式下获取元数据
|
||||
*
|
||||
* @param metadataIdentifier 元数据标识符
|
||||
* @return 元数据值
|
||||
*/
|
||||
private String getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
return jedis.get(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||||
@@ -325,17 +239,15 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用Redis哈希存储类和应用程序名称的映射关系
|
||||
* <p>
|
||||
* 键:默认为 'dubbo:mapping'
|
||||
* 字段:类(serviceInterface)
|
||||
* 值:应用程序名称列表
|
||||
*
|
||||
* @param serviceInterface 类名(作为字段)
|
||||
* @param defaultMappingGroup 默认映射组 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||
* @param newConfigContent 新的应用程序名称列表
|
||||
* @param ticket 先前的应用程序名称列表
|
||||
* @return 是否成功注册映射关系
|
||||
* Store class and application names using Redis hashes
|
||||
* key: default 'dubbo:mapping'
|
||||
* field: class (serviceInterface)
|
||||
* value: application_names
|
||||
* @param serviceInterface field(class)
|
||||
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||
* @param newConfigContent new application_names
|
||||
* @param ticket previous application_names
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean registerServiceAppMapping(
|
||||
@@ -353,17 +265,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据是否存在 Redis 连接池选择存储映射关系的方法。
|
||||
* 如果存在 Redis 连接池,则使用单机存储方式 {@link #storeMappingStandalone(String, String, String, String)};
|
||||
* 否则,使用集群存储方式 {@link #storeMappingInCluster(String, String, String, String)}。
|
||||
*
|
||||
* @param key 存储键
|
||||
* @param field 存储字段
|
||||
* @param value 存储值
|
||||
* @param ticket 事务票据,用于 CAS 操作
|
||||
* @return 存储是否成功
|
||||
*/
|
||||
private boolean storeMapping(String key, String field, String value, String ticket) {
|
||||
if (pool != null) {
|
||||
return storeMappingStandalone(key, field, value, ticket);
|
||||
@@ -373,17 +274,8 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 在 Redis 集群中存储映射关系
|
||||
* 使用 Redis 集群的方式存储给定键和字段的映射关系,并实现乐观锁 CAS 操作
|
||||
* 如果旧值为空或与给定的事务票据匹配,则更新字段的值为新值,并发布更新事件
|
||||
* 使用 WATCH 和 MULTI 来实现事务操作
|
||||
*
|
||||
* @param key 存储键
|
||||
* @param field 存储字段
|
||||
* @param value 存储值
|
||||
* @param ticket 事务票据,用于 CAS 操作
|
||||
* @return 存储是否成功
|
||||
* @throws RpcException 存储过程中发生的异常,以及失败的原因
|
||||
* use 'watch' to implement cas.
|
||||
* Find information about slot distribution by key.
|
||||
*/
|
||||
private boolean storeMappingInCluster(String key, String field, String value, String ticket) {
|
||||
try (JedisCluster jedisCluster =
|
||||
@@ -412,14 +304,8 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 在单机Redis中存储映射关系
|
||||
* 使用 'watch' 实现CAS(比较并交换)
|
||||
*
|
||||
* @param key Redis键
|
||||
* @param field Redis哈希字段(类名)
|
||||
* @param value 新的应用程序名称列表
|
||||
* @param ticket 先前的应用程序名称列表
|
||||
* @return 是否成功存储映射关系
|
||||
* use 'watch' to implement cas.
|
||||
* Find information about slot distribution by key.
|
||||
*/
|
||||
private boolean storeMappingStandalone(String key, String field, String value, String ticket) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
@@ -444,48 +330,36 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建映射关键字,用于存储服务类和应用名称的 Redis 哈希表键
|
||||
* 结合根路径和默认映射组名构建完整的映射键
|
||||
*
|
||||
* @param defaultMappingGroup 默认映射组名 {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||
* @return 构建的映射关键字
|
||||
* build mapping key
|
||||
* @param defaultMappingGroup {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||
* @return
|
||||
*/
|
||||
private String buildMappingKey(String defaultMappingGroup) {
|
||||
return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建发布订阅键,用于 Redis 发布-订阅模式中的通道名称
|
||||
* 结合默认映射组名和队列键构建完整的发布订阅键
|
||||
*
|
||||
* @return 构建的发布订阅键
|
||||
* build pub/sub key
|
||||
*/
|
||||
private String buildPubSubKey() {
|
||||
return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + QUEUES_KEY;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据服务键和分组获取配置项
|
||||
* 使用分组构建映射键,并获取映射数据,然后返回一个配置项对象
|
||||
*
|
||||
* @param serviceKey 服务键,用于标识特定的服务
|
||||
* @param group 分组,用于构建映射键
|
||||
* @return 配置项对象,包含从映射数据中获取的内容
|
||||
* get content and use content to complete cas
|
||||
* @param serviceKey class
|
||||
* @param group {@link ServiceNameMapping#DEFAULT_MAPPING_GROUP}
|
||||
*/
|
||||
@Override
|
||||
public ConfigItem getConfigItem(String serviceKey, String group) {
|
||||
String key = buildMappingKey(group);
|
||||
String content = getMappingData(key, serviceKey);
|
||||
|
||||
return new ConfigItem(content, content);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据键和字段从 Redis 中获取映射数据
|
||||
* 如果连接池不为空,则使用独立模式获取数据;否则使用集群模式
|
||||
*
|
||||
* @param key 键,用于定位数据的存储位置
|
||||
* @param field 字段,用于定位具体的数据项
|
||||
* @return 获取到的映射数据
|
||||
* get current application_names
|
||||
*/
|
||||
private String getMappingData(String key, String field) {
|
||||
if (pool != null) {
|
||||
@@ -495,14 +369,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 Redis 集群中获取指定键和字段的映射数据
|
||||
*
|
||||
* @param key Redis 哈希表的键
|
||||
* @param field 哈希表中的字段
|
||||
* @return 返回键和字段对应的值,如果获取失败则抛出异常
|
||||
* @throws RpcException 如果从 Redis 集群获取数据失败,抛出该异常
|
||||
*/
|
||||
private String getMappingDataInCluster(String key, String field) {
|
||||
try (JedisCluster jedisCluster =
|
||||
new JedisCluster(jedisClusterNodes, timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
|
||||
@@ -514,13 +380,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 使用集群模式从 Redis 中获取映射数据
|
||||
*
|
||||
* @param key 键,用于定位数据的存储位置
|
||||
* @param field 字段,用于定位具体的数据项
|
||||
* @return 获取到的映射数据,如果获取失败则抛出 RpcException 异常
|
||||
*/
|
||||
private String getMappingDataStandalone(String key, String field) {
|
||||
try (Jedis jedis = pool.getResource()) {
|
||||
return jedis.hget(key, field);
|
||||
@@ -532,10 +391,7 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除服务应用映射的监听器
|
||||
*
|
||||
* @param serviceKey 服务键,用于标识特定的服务
|
||||
* @param listener 映射监听器,用于处理映射变更事件
|
||||
* remove listener. If have no listener,thread will dead
|
||||
*/
|
||||
@Override
|
||||
public void removeServiceAppMappingListener(String serviceKey, MappingListener listener) {
|
||||
@@ -550,13 +406,8 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动一个线程并订阅 {@link this#buildPubSubKey()}
|
||||
* 如果 'application_names' 消息发生变化,则通知 {@link MappingListener}。
|
||||
*
|
||||
* @param serviceKey 服务键
|
||||
* @param listener 映射监听器
|
||||
* @param url URL
|
||||
* @return 返回服务与应用映射关系的集合
|
||||
* Start a thread and subscribe to {@link this#buildPubSubKey()}.
|
||||
* Notify {@link MappingListener} if there is a change in the 'application_names' message.
|
||||
*/
|
||||
@Override
|
||||
public Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
|
||||
@@ -570,82 +421,45 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
return this.getServiceAppMapping(serviceKey, url);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定服务键的服务与应用映射关系集合
|
||||
*
|
||||
* @param serviceKey 服务键
|
||||
* @param url URL
|
||||
* @return 返回服务与应用映射关系的集合
|
||||
*/
|
||||
@Override
|
||||
public Set<String> getServiceAppMapping(String serviceKey, URL url) {
|
||||
String key = buildMappingKey(DEFAULT_MAPPING_GROUP);
|
||||
return getAppNames(getMappingData(key, serviceKey));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取订阅者元数据信息
|
||||
*
|
||||
* @param identifier 订阅者元数据标识符
|
||||
* @param instanceMetadata 实例元数据映射
|
||||
* @return 返回订阅者的元数据信息对象
|
||||
*/
|
||||
@Override
|
||||
public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier identifier, Map<String, String> instanceMetadata) {
|
||||
String content = this.getMetadata(identifier);
|
||||
return JsonUtils.toJavaObject(content, MetadataInfo.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布应用元数据信息
|
||||
*
|
||||
* @param identifier 订阅者元数据标识符
|
||||
* @param metadataInfo 元数据信息对象
|
||||
*/
|
||||
@Override
|
||||
public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
|
||||
this.storeMetadata(identifier, metadataInfo.getContent());
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消发布应用元数据信息
|
||||
*
|
||||
* @param identifier 订阅者元数据标识符
|
||||
* @param metadataInfo 元数据信息对象
|
||||
*/
|
||||
@Override
|
||||
public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
|
||||
this.deleteMetadata(identifier);
|
||||
}
|
||||
|
||||
// 用于测试
|
||||
// for test
|
||||
public MappingDataListener getMappingDataListener() {
|
||||
return mappingDataListenerMap.get(buildPubSubKey());
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听 'application_names' 消息的变化并通知监听器
|
||||
* Listen for changes in the 'application_names' message and notify the listener.
|
||||
*/
|
||||
class NotifySub extends JedisPubSub {
|
||||
|
||||
private final Map<String, Set<MappingListener>> listeners = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 添加监听器
|
||||
*
|
||||
* @param key 监听的键
|
||||
* @param listener 监听器对象
|
||||
*/
|
||||
public void addListener(String key, MappingListener listener) {
|
||||
Set<MappingListener> listenerSet = listeners.computeIfAbsent(key, k -> new ConcurrentHashSet<>());
|
||||
listenerSet.add(listener);
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除监听器
|
||||
*
|
||||
* @param serviceKey 服务键
|
||||
* @param listener 监听器对象
|
||||
*/
|
||||
public void removeListener(String serviceKey, MappingListener listener) {
|
||||
Set<MappingListener> listenerSet = this.listeners.get(serviceKey);
|
||||
if (listenerSet != null) {
|
||||
@@ -656,21 +470,10 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查监听器集合是否为空
|
||||
*
|
||||
* @return 如果监听器集合为空则返回 true,否则返回 false
|
||||
*/
|
||||
public Boolean isEmpty() {
|
||||
return this.listeners.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* 当接收到消息时触发的方法
|
||||
*
|
||||
* @param key 消息的键
|
||||
* @param msg 接收到的消息内容
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(String key, String msg) {
|
||||
logger.info("sub from redis:" + key + " message:" + msg);
|
||||
@@ -683,24 +486,11 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 当接收到模式消息时触发的方法
|
||||
*
|
||||
* @param pattern 模式
|
||||
* @param key 消息的键
|
||||
* @param msg 接收到的消息内容
|
||||
*/
|
||||
@Override
|
||||
public void onPMessage(String pattern, String key, String msg) {
|
||||
onMessage(key, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* 当成功订阅模式时触发的方法
|
||||
*
|
||||
* @param pattern 订阅的模式
|
||||
* @param subscribedChannels 订阅的频道数量
|
||||
*/
|
||||
@Override
|
||||
public void onPSubscribe(String pattern, int subscribedChannels) {
|
||||
super.onPSubscribe(pattern, subscribedChannels);
|
||||
@@ -708,7 +498,7 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
|
||||
/**
|
||||
* 监听应用名称变化消息的线程类
|
||||
* Subscribe application names change message.
|
||||
*/
|
||||
class MappingDataListener extends Thread {
|
||||
|
||||
@@ -718,27 +508,14 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
// for test
|
||||
protected volatile boolean running = true;
|
||||
|
||||
/**
|
||||
* 构造方法,指定监听的路径
|
||||
*
|
||||
* @param path 监听的路径
|
||||
*/
|
||||
public MappingDataListener(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取通知订阅器
|
||||
*
|
||||
* @return 通知订阅器
|
||||
*/
|
||||
public NotifySub getNotifySub() {
|
||||
return notifySub;
|
||||
}
|
||||
|
||||
/**
|
||||
* 线程运行方法,持续订阅指定路径的消息
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
while (running) {
|
||||
@@ -763,9 +540,6 @@ public class RedisMetadataReport extends AbstractMetadataReport {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭方法,用于停止线程运行并取消订阅指定路径的消息
|
||||
*/
|
||||
public void shutdown() {
|
||||
try {
|
||||
running = false;
|
||||
|
Reference in New Issue
Block a user