mirror of
https://gitee.com/dromara/RuoYi-Cloud-Plus.git
synced 2025-11-28 01:00:05 +08:00
update seata 1.6.1 => 1.7.0
This commit is contained in:
Binary file not shown.
@@ -17,7 +17,7 @@
|
||||
<revision>2.1.0-SNAPSHOT</revision>
|
||||
<spring-cloud-alibaba.version>2022.0.0.0-RC2</spring-cloud-alibaba.version>
|
||||
<sentinel.version>1.8.6</sentinel.version>
|
||||
<seata.version>1.6.1</seata.version>
|
||||
<seata.version>1.7.0</seata.version>
|
||||
<nacos.client.version>2.2.1</nacos.client.version>
|
||||
<dubbo.version>3.2.4</dubbo.version>
|
||||
<spring.context.support.version>1.0.11</spring.context.support.version>
|
||||
|
||||
@@ -1,188 +0,0 @@
|
||||
/*
|
||||
* Copyright 1999-2019 Seata.io Group.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package io.seata.spring.util;
|
||||
|
||||
import io.seata.common.util.CollectionUtils;
|
||||
import io.seata.rm.tcc.remoting.parser.DubboUtil;
|
||||
import org.springframework.aop.TargetSource;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.framework.AdvisedSupport;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Proxy tools base on spring
|
||||
*
|
||||
* 临时修复 seata 适配 jdk17 反射bug
|
||||
*
|
||||
* @author zhangsen
|
||||
*/
|
||||
public class SpringProxyUtils {
|
||||
private SpringProxyUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Find target class class.
|
||||
*
|
||||
* @param proxy the proxy
|
||||
* @return the class
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
public static Class<?> findTargetClass(Object proxy) throws Exception {
|
||||
if (proxy == null) {
|
||||
return null;
|
||||
}
|
||||
if (AopUtils.isAopProxy(proxy) && proxy instanceof Advised) {
|
||||
// #issue 3709
|
||||
final TargetSource targetSource = ((Advised) proxy).getTargetSource();
|
||||
if (!targetSource.isStatic()) {
|
||||
return targetSource.getTargetClass();
|
||||
}
|
||||
return findTargetClass(targetSource.getTarget());
|
||||
}
|
||||
return proxy.getClass();
|
||||
}
|
||||
|
||||
public static Class<?>[] findInterfaces(Object proxy) throws Exception {
|
||||
if (AopUtils.isJdkDynamicProxy(proxy)) {
|
||||
AdvisedSupport advised = getAdvisedSupport(proxy);
|
||||
return getInterfacesByAdvised(advised);
|
||||
} else {
|
||||
return new Class<?>[]{};
|
||||
}
|
||||
}
|
||||
|
||||
private static Class<?>[] getInterfacesByAdvised(AdvisedSupport advised) {
|
||||
Class<?>[] interfaces = advised.getProxiedInterfaces();
|
||||
if (interfaces.length > 0) {
|
||||
return interfaces;
|
||||
} else {
|
||||
throw new IllegalStateException("Find the jdk dynamic proxy class that does not implement the interface");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets advised support.
|
||||
*
|
||||
* @param proxy the proxy
|
||||
* @return the advised support
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
public static AdvisedSupport getAdvisedSupport(Object proxy) throws Exception {
|
||||
Object dynamicAdvisedInterceptor;
|
||||
if (AopUtils.isJdkDynamicProxy(proxy)) {
|
||||
dynamicAdvisedInterceptor = Proxy.getInvocationHandler(proxy);
|
||||
} else {
|
||||
Field h = proxy.getClass().getDeclaredField("CGLIB$CALLBACK_0");
|
||||
h.setAccessible(true);
|
||||
dynamicAdvisedInterceptor = h.get(proxy);
|
||||
}
|
||||
Field advised = dynamicAdvisedInterceptor.getClass().getDeclaredField("advised");
|
||||
advised.setAccessible(true);
|
||||
return (AdvisedSupport)advised.get(dynamicAdvisedInterceptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is proxy boolean.
|
||||
*
|
||||
* @param bean the bean
|
||||
* @return the boolean
|
||||
*/
|
||||
public static boolean isProxy(Object bean) {
|
||||
if (bean == null) {
|
||||
return false;
|
||||
}
|
||||
//check dubbo proxy ?
|
||||
return DubboUtil.isDubboProxyName(bean.getClass().getName()) || (Proxy.class.isAssignableFrom(bean.getClass())
|
||||
|| AopUtils.isAopProxy(bean));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the target class , get the interface of its agent if it is a Proxy
|
||||
*
|
||||
* @param proxy the proxy
|
||||
* @return target interface
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
public static Class<?> getTargetInterface(Object proxy) throws Exception {
|
||||
if (proxy == null) {
|
||||
throw new java.lang.IllegalArgumentException("proxy can not be null");
|
||||
}
|
||||
|
||||
//jdk proxy
|
||||
if (Proxy.class.isAssignableFrom(proxy.getClass())) {
|
||||
Proxy p = (Proxy)proxy;
|
||||
return p.getClass().getInterfaces()[0];
|
||||
}
|
||||
|
||||
return getTargetClass(proxy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the class type of the proxy target object, if hadn't a target object, return the interface of the proxy
|
||||
*
|
||||
* @param proxy the proxy
|
||||
* @return target interface
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
protected static Class<?> getTargetClass(Object proxy) throws Exception {
|
||||
if (proxy == null) {
|
||||
throw new java.lang.IllegalArgumentException("proxy can not be null");
|
||||
}
|
||||
//not proxy
|
||||
if (!AopUtils.isAopProxy(proxy)) {
|
||||
return proxy.getClass();
|
||||
}
|
||||
AdvisedSupport advisedSupport = getAdvisedSupport(proxy);
|
||||
Object target = advisedSupport.getTargetSource().getTarget();
|
||||
/*
|
||||
* the Proxy of sofa:reference has no target
|
||||
*/
|
||||
if (target == null) {
|
||||
if (CollectionUtils.isNotEmpty(advisedSupport.getProxiedInterfaces())) {
|
||||
return advisedSupport.getProxiedInterfaces()[0];
|
||||
} else {
|
||||
return proxy.getClass();
|
||||
}
|
||||
} else {
|
||||
return getTargetClass(target);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get the all interfaces of bean, if the bean is null, then return empty array
|
||||
* @param bean the bean
|
||||
* @return target interface
|
||||
*/
|
||||
public static Class<?>[] getAllInterfaces(Object bean) {
|
||||
Set<Class<?>> interfaces = new HashSet<>();
|
||||
if (bean != null) {
|
||||
Class<?> clazz = bean.getClass();
|
||||
while (!Object.class.getName().equalsIgnoreCase(clazz.getName())) {
|
||||
Class<?>[] clazzInterfaces = clazz.getInterfaces();
|
||||
interfaces.addAll(Arrays.asList(clazzInterfaces));
|
||||
clazz = clazz.getSuperclass();
|
||||
}
|
||||
}
|
||||
return interfaces.toArray(new Class[0]);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -27,10 +27,11 @@
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<seata.version>1.6.1</seata.version>
|
||||
<jcommander.version>1.72</jcommander.version>
|
||||
<seata.version>1.7.0</seata.version>
|
||||
<jcommander.version>1.82</jcommander.version>
|
||||
<druid.version>1.2.12</druid.version>
|
||||
<spring-boot.version>2.7.12</spring-boot.version>
|
||||
<spring-boot.version>2.7.14</spring-boot.version>
|
||||
<native-build-tools-plugin.version>0.9.20</native-build-tools-plugin.version>
|
||||
</properties>
|
||||
|
||||
<dependencyManagement>
|
||||
@@ -196,7 +197,10 @@
|
||||
<groupId>net.logstash.logback</groupId>
|
||||
<artifactId>logstash-logback-encoder</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.janino</groupId>
|
||||
<artifactId>janino</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -15,18 +15,35 @@
|
||||
*/
|
||||
package io.seata.server;
|
||||
|
||||
import io.seata.common.aot.NativeUtils;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author spilledyear@outlook.com
|
||||
*/
|
||||
@SpringBootApplication(scanBasePackages = {"io.seata"})
|
||||
public class SeataServerApplication {
|
||||
public static void main(String[] args) throws IOException {
|
||||
// run the spring-boot application
|
||||
SpringApplication.run(SeataServerApplication.class, args);
|
||||
|
||||
public static void main(String[] args) throws Throwable {
|
||||
try {
|
||||
// run the spring-boot application
|
||||
SpringApplication.run(SeataServerApplication.class, args);
|
||||
} catch (Throwable t) {
|
||||
// This exception is used to end `spring-boot-maven-plugin:process-aot`, so ignore it.
|
||||
if ("org.springframework.boot.SpringApplication$AbandonedRunException".equals(t.getClass().getName())) {
|
||||
throw t;
|
||||
}
|
||||
|
||||
// In the `native-image`, if an exception occurs prematurely during the startup process, the exception log will not be recorded,
|
||||
// so here we sleep for 20 seconds to observe the exception information.
|
||||
if (NativeUtils.inNativeImage()) {
|
||||
t.printStackTrace();
|
||||
Thread.sleep(20000);
|
||||
}
|
||||
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,7 +19,12 @@ import io.seata.core.rpc.Disposable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.web.context.WebServerInitializedEvent;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
@@ -30,12 +35,18 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
||||
* @author spilledyear@outlook.com
|
||||
*/
|
||||
@Component
|
||||
public class ServerRunner implements CommandLineRunner, DisposableBean {
|
||||
public class ServerRunner implements CommandLineRunner, DisposableBean,
|
||||
ApplicationListener<ApplicationEvent>, Ordered {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class);
|
||||
|
||||
private boolean started = Boolean.FALSE;
|
||||
|
||||
private int port;
|
||||
|
||||
@Value("${logging.file.path}")
|
||||
private String logPath;
|
||||
|
||||
private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>();
|
||||
|
||||
public static void addDisposable(Disposable disposable) {
|
||||
@@ -50,6 +61,7 @@ public class ServerRunner implements CommandLineRunner, DisposableBean {
|
||||
started = true;
|
||||
|
||||
long cost = System.currentTimeMillis() - start;
|
||||
LOGGER.info("\r\n you can visit seata console UI on http://127.0.0.1:{}. \r\n log path: {}.", this.port, this.logPath);
|
||||
LOGGER.info("seata server started in {} millSeconds", cost);
|
||||
} catch (Throwable e) {
|
||||
started = Boolean.FALSE;
|
||||
@@ -78,4 +90,16 @@ public class ServerRunner implements CommandLineRunner, DisposableBean {
|
||||
LOGGER.debug("destoryAll finish");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationEvent event) {
|
||||
if (event instanceof WebServerInitializedEvent) {
|
||||
this.port = ((WebServerInitializedEvent)event).getWebServer().getPort();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.LOWEST_PRECEDENCE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* Branch Session Controller
|
||||
*
|
||||
* @author zhongxiang.wang
|
||||
*/
|
||||
@RestController
|
||||
@@ -32,5 +33,4 @@ public class BranchSessionController {
|
||||
@Resource(type = BranchSessionService.class)
|
||||
private BranchSessionService branchSessionService;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -26,9 +26,9 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
|
||||
/**
|
||||
* Global Lock Controller
|
||||
*
|
||||
* @author zhongxiang.wang
|
||||
*/
|
||||
@RestController
|
||||
@@ -40,6 +40,7 @@ public class GlobalLockController {
|
||||
|
||||
/**
|
||||
* Query locks by param
|
||||
*
|
||||
* @param param the param
|
||||
* @return the list of GlobalLockVO
|
||||
*/
|
||||
|
||||
@@ -28,6 +28,7 @@ import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* Global Session Controller
|
||||
*
|
||||
* @author zhongxiang.wang
|
||||
*/
|
||||
@RestController
|
||||
@@ -39,8 +40,9 @@ public class GlobalSessionController {
|
||||
|
||||
/**
|
||||
* Query all globalSession
|
||||
*
|
||||
* @param param param for query globalSession
|
||||
* @return the list of GlobalSessionVO
|
||||
* @return the list of GlobalSessionVO
|
||||
*/
|
||||
@GetMapping("query")
|
||||
public PageResult<GlobalSessionVO> query(@ModelAttribute GlobalSessionParam param) {
|
||||
|
||||
@@ -79,6 +79,9 @@ public class GlobalLockFileServiceImpl implements GlobalLockService {
|
||||
* @return the RowLock list
|
||||
*/
|
||||
private Stream<RowLock> filterAndMap(GlobalLockParam param, BranchSession branchSession) {
|
||||
if (CollectionUtils.isEmpty(branchSession.getLockHolder())) {
|
||||
return Stream.empty();
|
||||
}
|
||||
|
||||
final String tableName = param.getTableName();
|
||||
|
||||
|
||||
@@ -167,8 +167,9 @@ public abstract class AbstractCore implements Core {
|
||||
|
||||
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession,
|
||||
BranchSession branchSession) throws IOException, TimeoutException {
|
||||
|
||||
BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(
|
||||
branchSession.getResourceId(), branchSession.getClientId(), request);
|
||||
branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT());
|
||||
return response.getBranchStatus();
|
||||
}
|
||||
|
||||
@@ -191,8 +192,9 @@ public abstract class AbstractCore implements Core {
|
||||
|
||||
protected BranchStatus branchRollbackSend(BranchRollbackRequest request, GlobalSession globalSession,
|
||||
BranchSession branchSession) throws IOException, TimeoutException {
|
||||
|
||||
BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest(
|
||||
branchSession.getResourceId(), branchSession.getClientId(), request);
|
||||
branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT());
|
||||
return response.getBranchStatus();
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ package io.seata.server.coordinator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.seata.common.thread.NamedThreadFactory;
|
||||
import io.seata.common.util.CollectionUtils;
|
||||
import io.seata.common.util.DurationUtil;
|
||||
import io.seata.config.ConfigurationFactory;
|
||||
import io.seata.core.constants.ConfigurationKeys;
|
||||
import io.seata.core.context.RootContext;
|
||||
@@ -36,11 +35,12 @@ import io.seata.core.rpc.netty.NettyRemotingServer;
|
||||
import io.seata.server.AbstractTCInboundHandler;
|
||||
import io.seata.server.metrics.MetricsPublisher;
|
||||
import io.seata.server.session.*;
|
||||
import io.seata.server.store.StoreConfig;
|
||||
import org.apache.commons.lang.time.DateFormatUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
@@ -105,13 +105,13 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
/**
|
||||
* the pool size of branch asynchronous remove thread pool
|
||||
*/
|
||||
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors();
|
||||
private static final int BRANCH_ASYNC_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
|
||||
|
||||
private static final Duration MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
|
||||
ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, DEFAULT_MAX_COMMIT_RETRY_TIMEOUT);
|
||||
private static final long MAX_COMMIT_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getLong(
|
||||
ConfigurationKeys.MAX_COMMIT_RETRY_TIMEOUT, DEFAULT_MAX_COMMIT_RETRY_TIMEOUT);
|
||||
|
||||
private static final Duration MAX_ROLLBACK_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getDuration(
|
||||
ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DurationUtil.DEFAULT_DURATION, DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT);
|
||||
private static final long MAX_ROLLBACK_RETRY_TIMEOUT = ConfigurationFactory.getInstance().getLong(
|
||||
ConfigurationKeys.MAX_ROLLBACK_RETRY_TIMEOUT, DEFAULT_MAX_ROLLBACK_RETRY_TIMEOUT);
|
||||
|
||||
private static final boolean ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE = ConfigurationFactory.getInstance().getBoolean(
|
||||
ConfigurationKeys.ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE, DEFAULT_ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE);
|
||||
@@ -134,15 +134,9 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
private final GlobalStatus[] rollbackingStatuses = new GlobalStatus[] {GlobalStatus.TimeoutRollbacking,
|
||||
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking};
|
||||
|
||||
private final GlobalStatus[] retryCommittingStatuses =
|
||||
new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying};
|
||||
private final GlobalStatus[] retryCommittingStatuses = new GlobalStatus[] {GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Committed};
|
||||
|
||||
private final ThreadPoolExecutor branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
|
||||
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(
|
||||
CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE)
|
||||
), new NamedThreadFactory("branchSessionRemove", BRANCH_ASYNC_POOL_SIZE),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
private final ThreadPoolExecutor branchRemoveExecutor;
|
||||
|
||||
private RemotingServer remotingServer;
|
||||
|
||||
@@ -161,6 +155,19 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
}
|
||||
this.remotingServer = remotingServer;
|
||||
this.core = new DefaultCore(remotingServer);
|
||||
boolean enableBranchAsyncRemove = CONFIG.getBoolean(
|
||||
ConfigurationKeys.ENABLE_BRANCH_ASYNC_REMOVE, DEFAULT_ENABLE_BRANCH_ASYNC_REMOVE);
|
||||
// create branchRemoveExecutor
|
||||
if (enableBranchAsyncRemove && StoreConfig.getSessionMode() != StoreConfig.SessionMode.FILE) {
|
||||
branchRemoveExecutor = new ThreadPoolExecutor(BRANCH_ASYNC_POOL_SIZE, BRANCH_ASYNC_POOL_SIZE,
|
||||
Integer.MAX_VALUE, TimeUnit.MILLISECONDS,
|
||||
new ArrayBlockingQueue<>(
|
||||
CONFIG.getInt(ConfigurationKeys.SESSION_BRANCH_ASYNC_QUEUE_SIZE, DEFAULT_BRANCH_ASYNC_QUEUE_SIZE)
|
||||
), new NamedThreadFactory("branchSessionRemove", BRANCH_ASYNC_POOL_SIZE),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
} else {
|
||||
branchRemoveExecutor = null;
|
||||
}
|
||||
}
|
||||
|
||||
public static DefaultCoordinator getInstance(RemotingServer remotingServer) {
|
||||
@@ -296,7 +303,8 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
return false;
|
||||
}
|
||||
|
||||
LOGGER.info("Global transaction[{}] is timeout and will be rollback.", globalSession.getXid());
|
||||
LOGGER.warn("Global transaction[{}] is timeout and will be rollback,transaction begin time:{} and now:{}", globalSession.getXid(),
|
||||
DateFormatUtils.ISO_DATE_FORMAT.format(globalSession.getBeginTime()), DateFormatUtils.ISO_DATE_FORMAT.format(System.currentTimeMillis()));
|
||||
|
||||
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
||||
globalSession.close();
|
||||
@@ -338,13 +346,10 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
// The function of this 'return' is 'continue'.
|
||||
return;
|
||||
}
|
||||
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {
|
||||
if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT, rollbackingSession.getBeginTime())) {
|
||||
if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {
|
||||
rollbackingSession.clean();
|
||||
}
|
||||
// Prevent thread safety issues
|
||||
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
|
||||
LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());
|
||||
|
||||
SessionHelper.endRollbackFailed(rollbackingSession, true, true);
|
||||
|
||||
@@ -354,7 +359,7 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
||||
core.doGlobalRollback(rollbackingSession, true);
|
||||
} catch (TransactionException ex) {
|
||||
LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
|
||||
LOGGER.error("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -374,15 +379,11 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
SessionHelper.forEach(committingSessions, committingSession -> {
|
||||
try {
|
||||
// prevent repeated commit
|
||||
if (committingSession.getStatus() == GlobalStatus.Committing
|
||||
&& !committingSession.isDeadSession()) {
|
||||
if (GlobalStatus.Committing.equals(committingSession.getStatus()) && !committingSession.isDeadSession()) {
|
||||
// The function of this 'return' is 'continue'.
|
||||
return;
|
||||
}
|
||||
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT.toMillis(), committingSession.getBeginTime())) {
|
||||
// Prevent thread safety issues
|
||||
SessionHolder.getRetryCommittingSessionManager().removeGlobalSession(committingSession);
|
||||
LOGGER.error("Global transaction commit retry timeout and has removed [{}]", committingSession.getXid());
|
||||
if (isRetryTimeout(now, MAX_COMMIT_RETRY_TIMEOUT, committingSession.getBeginTime())) {
|
||||
|
||||
// commit retry timeout event
|
||||
SessionHelper.endCommitFailed(committingSession, true, true);
|
||||
@@ -390,10 +391,14 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
//The function of this 'return' is 'continue'.
|
||||
return;
|
||||
}
|
||||
if (GlobalStatus.Committed.equals(committingSession.getStatus())
|
||||
&& committingSession.getBranchSessions().isEmpty()) {
|
||||
SessionHelper.endCommitted(committingSession,true);
|
||||
}
|
||||
committingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
|
||||
core.doGlobalCommit(committingSession, true);
|
||||
} catch (TransactionException ex) {
|
||||
LOGGER.info("Failed to retry committing [{}] {} {}", committingSession.getXid(), ex.getCode(), ex.getMessage());
|
||||
LOGGER.error("Failed to retry committing [{}] {} {}", committingSession.getXid(), ex.getCode(), ex.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -500,14 +505,18 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
asyncCommitting.shutdown();
|
||||
timeoutCheck.shutdown();
|
||||
undoLogDelete.shutdown();
|
||||
branchRemoveExecutor.shutdown();
|
||||
if (branchRemoveExecutor != null) {
|
||||
branchRemoveExecutor.shutdown();
|
||||
}
|
||||
try {
|
||||
retryRollbacking.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
retryCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
asyncCommitting.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
timeoutCheck.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
undoLogDelete.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
if (branchRemoveExecutor != null) {
|
||||
branchRemoveExecutor.awaitTermination(TIMED_TASK_SHUTDOWN_MAX_WAIT_MILLS, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
} catch (InterruptedException ignore) {
|
||||
|
||||
}
|
||||
@@ -549,6 +558,9 @@ public class DefaultCoordinator extends AbstractTCInboundHandler implements Tran
|
||||
*/
|
||||
public BranchRemoveTask(GlobalSession globalSession, BranchSession branchSession) {
|
||||
this.globalSession = globalSession;
|
||||
if (branchSession == null) {
|
||||
throw new IllegalArgumentException("BranchSession can`t be null!");
|
||||
}
|
||||
this.branchSession = branchSession;
|
||||
}
|
||||
|
||||
|
||||
@@ -216,6 +216,7 @@ public class DefaultCore implements Core {
|
||||
switch (branchStatus) {
|
||||
case PhaseTwo_Committed:
|
||||
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
|
||||
LOGGER.info("Commit branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
||||
return CONTINUE;
|
||||
case PhaseTwo_CommitFailed_Unretryable:
|
||||
//not at branch
|
||||
@@ -324,10 +325,10 @@ public class DefaultCore implements Core {
|
||||
return CONTINUE;
|
||||
case PhaseTwo_RollbackFailed_Unretryable:
|
||||
SessionHelper.endRollbackFailed(globalSession, retrying);
|
||||
LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
||||
LOGGER.error("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
||||
return false;
|
||||
default:
|
||||
LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
||||
LOGGER.error("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
|
||||
if (!retrying) {
|
||||
globalSession.queueToRetryRollback();
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import io.seata.config.Configuration;
|
||||
import io.seata.config.ConfigurationFactory;
|
||||
import io.seata.server.store.StoreConfig;
|
||||
import io.seata.server.store.StoreConfig.LockMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The type Lock manager factory.
|
||||
@@ -28,6 +30,7 @@ import io.seata.server.store.StoreConfig.LockMode;
|
||||
*/
|
||||
public class LockerManagerFactory {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(LockerManagerFactory.class);
|
||||
private static final Configuration CONFIG = ConfigurationFactory.getInstance();
|
||||
|
||||
/**
|
||||
@@ -58,6 +61,7 @@ public class LockerManagerFactory {
|
||||
if (null == lockMode) {
|
||||
lockMode = StoreConfig.getLockMode();
|
||||
}
|
||||
LOGGER.info("use lock store mode: {}", lockMode.getName());
|
||||
//if not exist the lock mode, throw exception
|
||||
if (null != StoreConfig.StoreMode.get(lockMode.name())) {
|
||||
LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode.getName());
|
||||
|
||||
@@ -297,6 +297,10 @@ public class BranchSession implements Lockable, Comparable<BranchSession>, Sessi
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isAT() {
|
||||
return this.getBranchType() == BranchType.AT;
|
||||
}
|
||||
|
||||
public LockStatus getLockStatus() {
|
||||
return lockStatus;
|
||||
}
|
||||
|
||||
@@ -226,7 +226,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
|
||||
|
||||
@Override
|
||||
public void end() throws TransactionException {
|
||||
if (isSuccessEnd()) {
|
||||
if (GlobalStatus.isTwoPhaseSuccess(status)) {
|
||||
// Clean locks first
|
||||
clean();
|
||||
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
|
||||
@@ -239,14 +239,6 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSuccessEnd() {
|
||||
if (status == GlobalStatus.Committed || status == GlobalStatus.Rollbacked
|
||||
|| status == GlobalStatus.TimeoutRollbacked) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void clean() throws TransactionException {
|
||||
if (!LockerManagerFactory.getLockManager().releaseGlobalSessionLock(this)) {
|
||||
throw new TransactionException("UnLock globalSession error, xid = " + this.xid);
|
||||
@@ -305,7 +297,7 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeBranch(BranchSession branchSession) throws TransactionException {
|
||||
public void unlockBranch(BranchSession branchSession) throws TransactionException {
|
||||
// do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),
|
||||
// because it's already unlocked in 'DefaultCore.commit()'
|
||||
if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {
|
||||
@@ -313,12 +305,22 @@ public class GlobalSession implements SessionLifecycle, SessionStorable {
|
||||
throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeBranch(BranchSession branchSession) throws TransactionException {
|
||||
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
|
||||
lifecycleListener.onRemoveBranch(this, branchSession);
|
||||
}
|
||||
remove(branchSession);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAndUnlockBranch(BranchSession branchSession) throws TransactionException {
|
||||
unlockBranch(branchSession);
|
||||
removeBranch(branchSession);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets branch.
|
||||
*
|
||||
|
||||
@@ -134,6 +134,10 @@ public class SessionHelper {
|
||||
MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_COMMITTED_KEY, true,
|
||||
beginTime, retryBranch);
|
||||
} else {
|
||||
if (globalSession.isSaga()) {
|
||||
globalSession.setStatus(GlobalStatus.Committed);
|
||||
globalSession.end();
|
||||
}
|
||||
MetricsPublisher.postSessionDoneEvent(globalSession, false, false);
|
||||
}
|
||||
}
|
||||
@@ -152,8 +156,8 @@ public class SessionHelper {
|
||||
/**
|
||||
* End commit failed.
|
||||
*
|
||||
* @param globalSession the global session
|
||||
* @param retryGlobal the retry global
|
||||
* @param globalSession the global session
|
||||
* @param retryGlobal the retry global
|
||||
* @param isRetryTimeout is retry timeout
|
||||
* @throws TransactionException the transaction exception
|
||||
*/
|
||||
@@ -201,6 +205,10 @@ public class SessionHelper {
|
||||
MetricsPublisher.postSessionDoneEvent(globalSession, IdConstants.STATUS_VALUE_AFTER_ROLLBACKED_KEY, true,
|
||||
beginTime, retryBranch);
|
||||
} else {
|
||||
if (globalSession.isSaga()) {
|
||||
globalSession.setStatus(GlobalStatus.Rollbacked);
|
||||
globalSession.end();
|
||||
}
|
||||
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Rollbacked, false, false);
|
||||
}
|
||||
}
|
||||
@@ -294,7 +302,8 @@ public class SessionHelper {
|
||||
*/
|
||||
public static void removeBranch(GlobalSession globalSession, BranchSession branchSession, boolean isAsync)
|
||||
throws TransactionException {
|
||||
if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) {
|
||||
globalSession.unlockBranch(branchSession);
|
||||
if (isEnableBranchRemoveAsync() && isAsync) {
|
||||
COORDINATOR.doBranchRemoveAsync(globalSession, branchSession);
|
||||
} else {
|
||||
globalSession.removeBranch(branchSession);
|
||||
@@ -312,12 +321,26 @@ public class SessionHelper {
|
||||
if (branchSessions == null || branchSessions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE) && isAsync) {
|
||||
COORDINATOR.doBranchRemoveAllAsync(globalSession);
|
||||
} else {
|
||||
for (BranchSession branchSession : branchSessions) {
|
||||
globalSession.removeBranch(branchSession);
|
||||
boolean isAsyncRemove = isEnableBranchRemoveAsync() && isAsync;
|
||||
for (BranchSession branchSession : branchSessions) {
|
||||
if (isAsyncRemove) {
|
||||
globalSession.unlockBranch(branchSession);
|
||||
} else {
|
||||
globalSession.removeAndUnlockBranch(branchSession);
|
||||
}
|
||||
}
|
||||
if (isAsyncRemove) {
|
||||
COORDINATOR.doBranchRemoveAllAsync(globalSession);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* if true, enable delete the branch asynchronously
|
||||
*
|
||||
* @return the boolean
|
||||
*/
|
||||
private static boolean isEnableBranchRemoveAsync() {
|
||||
return Objects.equals(Boolean.TRUE, DELAY_HANDLE_SESSION)
|
||||
&& Objects.equals(Boolean.TRUE, ENABLE_BRANCH_ASYNC_REMOVE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,6 +102,7 @@ public class SessionHolder {
|
||||
if (null == sessionMode) {
|
||||
sessionMode = StoreConfig.getSessionMode();
|
||||
}
|
||||
LOGGER.info("use session store mode: {}", sessionMode.getName());
|
||||
if (SessionMode.DB.equals(sessionMode)) {
|
||||
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName());
|
||||
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName(),
|
||||
@@ -399,7 +400,7 @@ public class SessionHolder {
|
||||
func.call();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.info("Exception running function with key = {}", key, e);
|
||||
LOGGER.error("Exception running function with key = {}", key, e);
|
||||
} finally {
|
||||
if (lock) {
|
||||
try {
|
||||
|
||||
@@ -58,6 +58,14 @@ public interface SessionLifecycle {
|
||||
*/
|
||||
void addBranch(BranchSession branchSession) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Release the lock of branch.
|
||||
*
|
||||
* @param branchSession the branch session
|
||||
* @throws TransactionException the transaction exception
|
||||
*/
|
||||
void unlockBranch(BranchSession branchSession) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Remove branch.
|
||||
*
|
||||
@@ -66,6 +74,14 @@ public interface SessionLifecycle {
|
||||
*/
|
||||
void removeBranch(BranchSession branchSession) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Remove branch and release the lock of branch.
|
||||
*
|
||||
* @param branchSession the branchSession
|
||||
* @throws TransactionException the TransactionException
|
||||
*/
|
||||
void removeAndUnlockBranch(BranchSession branchSession) throws TransactionException;
|
||||
|
||||
/**
|
||||
* Is active boolean.
|
||||
*
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright 1999-2019 Seata.io Group.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package io.seata.server.spring.listener;
|
||||
|
||||
import io.seata.common.util.CollectionUtils;
|
||||
import io.seata.common.util.StringUtils;
|
||||
import io.seata.config.ConfigurationFactory;
|
||||
import io.seata.config.FileConfiguration;
|
||||
import io.seata.config.file.FileConfig;
|
||||
import io.seata.server.store.StoreConfig;
|
||||
import org.springframework.context.ApplicationContextInitializer;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.PropertiesPropertySource;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static io.seata.common.ConfigurationKeys.*;
|
||||
|
||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||
public class SeataPropertiesLoader implements ApplicationContextInitializer<ConfigurableApplicationContext> {
|
||||
|
||||
List<String> prefixList = Arrays.asList(FILE_ROOT_PREFIX_CONFIG, FILE_ROOT_PREFIX_REGISTRY, SERVER_PREFIX,
|
||||
STORE_PREFIX, METRICS_PREFIX, TRANSPORT_PREFIX);
|
||||
|
||||
@Override
|
||||
public void initialize(ConfigurableApplicationContext applicationContext) {
|
||||
ConfigurableEnvironment environment = applicationContext.getEnvironment();
|
||||
FileConfiguration configuration = ConfigurationFactory.getOriginFileInstanceRegistry();
|
||||
FileConfig fileConfig = configuration.getFileConfig();
|
||||
Map<String, Object> configs = fileConfig.getAllConfig();
|
||||
if (CollectionUtils.isNotEmpty(configs)) {
|
||||
Optional<FileConfiguration> originFileInstance = ConfigurationFactory.getOriginFileInstance();
|
||||
originFileInstance
|
||||
.ifPresent(fileConfiguration -> configs.putAll(fileConfiguration.getFileConfig().getAllConfig()));
|
||||
Properties properties = new Properties();
|
||||
configs.forEach((k, v) -> {
|
||||
if (v instanceof String) {
|
||||
if (StringUtils.isEmpty((String)v)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// Convert the configuration name to the configuration name under Spring Boot
|
||||
if (prefixList.stream().anyMatch(k::startsWith)) {
|
||||
properties.put(SEATA_FILE_PREFIX_ROOT_CONFIG + k, v);
|
||||
}
|
||||
});
|
||||
environment.getPropertySources().addLast(new PropertiesPropertySource("seataOldConfig", properties));
|
||||
}
|
||||
// Load by priority
|
||||
System.setProperty("sessionMode", StoreConfig.getSessionMode().getName());
|
||||
System.setProperty("lockMode", StoreConfig.getLockMode().getName());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -13,11 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package io.seata.server;
|
||||
package io.seata.server.spring.listener;
|
||||
|
||||
import io.seata.common.holder.ObjectHolder;
|
||||
import io.seata.common.util.StringUtils;
|
||||
import io.seata.server.store.StoreConfig;
|
||||
import io.seata.spring.boot.autoconfigure.SeataCoreEnvironmentPostProcessor;
|
||||
import io.seata.spring.boot.autoconfigure.SeataServerEnvironmentPostProcessor;
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
@@ -56,9 +55,6 @@ public class ServerApplicationListener implements GenericApplicationListener {
|
||||
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment);
|
||||
SeataCoreEnvironmentPostProcessor.init();
|
||||
SeataServerEnvironmentPostProcessor.init();
|
||||
// Load by priority
|
||||
System.setProperty("sessionMode", StoreConfig.getSessionMode().getName());
|
||||
System.setProperty("lockMode", StoreConfig.getLockMode().getName());
|
||||
|
||||
String[] args = environmentPreparedEvent.getArgs();
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
*/
|
||||
package io.seata.server.storage.db.lock;
|
||||
|
||||
|
||||
import io.seata.common.exception.ShouldNeverHappenException;
|
||||
import io.seata.common.loader.EnhancedServiceLoader;
|
||||
import io.seata.common.loader.LoadLevel;
|
||||
@@ -37,7 +36,9 @@ import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static io.seata.core.constants.ConfigurationKeys.DISTRIBUTED_LOCK_DB_TABLE;
|
||||
|
||||
@@ -56,6 +57,19 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
|
||||
private DataSource distributedLockDataSource;
|
||||
|
||||
private static final String LOCK_WAIT_TIMEOUT_MYSQL_MESSAGE = "try restarting transaction";
|
||||
|
||||
private static final int LOCK_WAIT_TIMEOUT_MYSQL_CODE = 1205;
|
||||
|
||||
private static final Set<Integer> IGNORE_MYSQL_CODE = new HashSet<>();
|
||||
|
||||
private static final Set<String> IGNORE_MYSQL_MESSAGE = new HashSet<>();
|
||||
|
||||
static {
|
||||
IGNORE_MYSQL_CODE.add(LOCK_WAIT_TIMEOUT_MYSQL_CODE);
|
||||
IGNORE_MYSQL_MESSAGE.add(LOCK_WAIT_TIMEOUT_MYSQL_MESSAGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* whether the distribute lock demotion
|
||||
* using for 1.5.0 only and will remove in 1.6.0
|
||||
@@ -109,16 +123,16 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
originalAutoCommit = connection.getAutoCommit();
|
||||
connection.setAutoCommit(false);
|
||||
|
||||
DistributedLockDO distributedLockDOFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());
|
||||
if (null == distributedLockDOFromDB) {
|
||||
DistributedLockDO lockFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());
|
||||
if (null == lockFromDB) {
|
||||
boolean ret = insertDistribute(connection, distributedLockDO);
|
||||
connection.commit();
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()) {
|
||||
if (lockFromDB.getExpireTime() >= System.currentTimeMillis()) {
|
||||
LOGGER.debug("the distribute lock for key :{} is holding by :{}, acquire lock failure.",
|
||||
distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());
|
||||
distributedLockDO.getLockKey(), lockFromDB.getLockValue());
|
||||
connection.commit();
|
||||
return false;
|
||||
}
|
||||
@@ -128,7 +142,11 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
|
||||
return ret;
|
||||
} catch (SQLException ex) {
|
||||
LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
|
||||
// ignore "Lock wait timeout exceeded; try restarting transaction"
|
||||
// TODO: need nowait adaptation
|
||||
if (!ignoreSQLException(ex)) {
|
||||
LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
|
||||
}
|
||||
try {
|
||||
if (connection != null) {
|
||||
connection.rollback();
|
||||
@@ -167,8 +185,10 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
|
||||
if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()
|
||||
&& !Objects.equals(distributedLockDOFromDB.getLockValue(), distributedLockDO.getLockValue())) {
|
||||
LOGGER.debug("the distribute lock for key :{} is holding by :{}, skip the release lock.",
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("the distribute lock for key :{} is holding by :{}, skip the release lock.",
|
||||
distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());
|
||||
}
|
||||
connection.commit();
|
||||
return true;
|
||||
}
|
||||
@@ -180,7 +200,9 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
connection.commit();
|
||||
return ret;
|
||||
} catch (SQLException ex) {
|
||||
LOGGER.error("execute release lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
|
||||
if (!ignoreSQLException(ex)) {
|
||||
LOGGER.error("execute release lock failure, key is: {}", distributedLockDO.getLockKey(), ex);
|
||||
}
|
||||
|
||||
try {
|
||||
if (connection != null) {
|
||||
@@ -248,4 +270,14 @@ public class DataBaseDistributedLocker implements DistributedLocker {
|
||||
this.distributedLockDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
|
||||
}
|
||||
|
||||
private boolean ignoreSQLException(SQLException exception) {
|
||||
if (IGNORE_MYSQL_CODE.contains(exception.getErrorCode())) {
|
||||
return true;
|
||||
}
|
||||
if (StringUtils.isNotBlank(exception.getMessage())) {
|
||||
return IGNORE_MYSQL_MESSAGE.stream().anyMatch(message -> exception.getMessage().contains(message));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -220,6 +220,7 @@ public class FileSessionManager extends AbstractSessionManager implements Reload
|
||||
case RollbackFailed:
|
||||
case TimeoutRollbacked:
|
||||
case TimeoutRollbackFailed:
|
||||
case RollbackRetryTimeout:
|
||||
case Finished:
|
||||
return false;
|
||||
default:
|
||||
|
||||
@@ -20,6 +20,7 @@ import io.seata.core.store.db.AbstractDataSourceProvider;
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
|
||||
/**
|
||||
* The dbcp datasource provider
|
||||
@@ -51,6 +52,7 @@ public class DbcpDataSourceProvider extends AbstractDataSourceProvider {
|
||||
ds.setTestWhileIdle(true);
|
||||
ds.setValidationQuery(getValidationQuery(getDBType()));
|
||||
ds.setConnectionProperties("useUnicode=yes;characterEncoding=utf8;socketTimeout=5000;connectTimeout=500");
|
||||
ds.setDefaultTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
return ds;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import io.seata.common.loader.LoadLevel;
|
||||
import io.seata.core.store.db.AbstractDataSourceProvider;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.Connection;
|
||||
|
||||
/**
|
||||
* The druid datasource provider
|
||||
@@ -52,6 +53,7 @@ public class DruidDataSourceProvider extends AbstractDataSourceProvider {
|
||||
ds.setDefaultAutoCommit(true);
|
||||
// fix issue 5030
|
||||
ds.setUseOracleImplicitCache(false);
|
||||
ds.setDefaultTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
|
||||
return ds;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ package io.seata.server.store;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import com.zaxxer.hikari.util.IsolationLevel;
|
||||
import io.seata.common.loader.LoadLevel;
|
||||
import io.seata.core.store.db.AbstractDataSourceProvider;
|
||||
|
||||
@@ -55,6 +56,7 @@ public class HikariDataSourceProvider extends AbstractDataSourceProvider {
|
||||
config.setAutoCommit(true);
|
||||
config.setConnectionTimeout(getMaxWait());
|
||||
config.setInitializationFailTimeout(-1);
|
||||
config.setTransactionIsolation(IsolationLevel.TRANSACTION_READ_COMMITTED.name());
|
||||
return new HikariDataSource(config);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,8 +80,9 @@ public class ATCore extends AbstractCore {
|
||||
branchSession.getBranchId()));
|
||||
}
|
||||
} catch (StoreException e) {
|
||||
if (e.getCause() instanceof BranchTransactionException) {
|
||||
throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(),
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof BranchTransactionException) {
|
||||
throw new BranchTransactionException(((BranchTransactionException)cause).getCode(),
|
||||
String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(),
|
||||
branchSession.getBranchId()));
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ public class SagaCore extends AbstractCore {
|
||||
public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus globalStatus) throws TransactionException {
|
||||
if (GlobalStatus.Committed.equals(globalStatus)) {
|
||||
SessionHelper.removeAllBranch(globalSession, false);
|
||||
SessionHelper.endCommitted(globalSession,false);
|
||||
SessionHelper.endCommitted(globalSession, false);
|
||||
LOGGER.info("Global[{}] committed", globalSession.getXid());
|
||||
} else if (GlobalStatus.Rollbacked.equals(globalStatus)
|
||||
|| GlobalStatus.Finished.equals(globalStatus)) {
|
||||
|
||||
@@ -0,0 +1,362 @@
|
||||
[
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.rpc.RegisterCheckAuthHandler"
|
||||
},
|
||||
"name": "io.seata.server.auth.DefaultCheckAuthHandler",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.store.db.DataSourceProvider"
|
||||
},
|
||||
"name": "io.seata.server.store.DbcpDataSourceProvider",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.store.db.DataSourceProvider"
|
||||
},
|
||||
"name": "io.seata.server.store.DruidDataSourceProvider",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.store.db.DataSourceProvider"
|
||||
},
|
||||
"name": "io.seata.server.store.HikariDataSourceProvider",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.store.DistributedLocker"
|
||||
},
|
||||
"name": "io.seata.server.storage.redis.lock.RedisDistributedLocker",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.core.store.DistributedLocker"
|
||||
},
|
||||
"name": "io.seata.server.storage.db.lock.DataBaseDistributedLocker",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.coordinator.AbstractCore"
|
||||
},
|
||||
"name": "io.seata.server.transaction.at.ATCore",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"io.seata.core.rpc.RemotingServer"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.coordinator.AbstractCore"
|
||||
},
|
||||
"name": "io.seata.server.transaction.tcc.TccCore",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"io.seata.core.rpc.RemotingServer"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.coordinator.AbstractCore"
|
||||
},
|
||||
"name": "io.seata.server.transaction.saga.SagaCore",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"io.seata.core.rpc.RemotingServer"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.coordinator.AbstractCore"
|
||||
},
|
||||
"name": "io.seata.server.transaction.xa.XACore",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"io.seata.core.rpc.RemotingServer"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.lock.LockManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.db.lock.DataBaseLockManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.lock.LockManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.file.lock.FileLockManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.lock.LockManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.redis.lock.RedisLockManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.session.SessionManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.file.session.FileSessionManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"java.lang.String",
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.session.SessionManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.db.session.DataBaseSessionManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
},
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.seata.server.session.SessionManager"
|
||||
},
|
||||
"name": "io.seata.server.storage.redis.session.RedisSessionManager",
|
||||
"methods": [
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": []
|
||||
},
|
||||
{
|
||||
"name": "<init>",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Integer",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseInteger",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Long",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseLong",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Boolean",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseBoolean",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Byte",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseByte",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Short",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseShort",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Float",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseFloat",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "com.google.inject.internal.TypeConverterBindingProcessor"
|
||||
},
|
||||
"name": "java.lang.Double",
|
||||
"methods": [
|
||||
{
|
||||
"name": "parseDouble",
|
||||
"parameterTypes": [
|
||||
"java.lang.String"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.channel.socket.nio.SelectorProviderUtil"
|
||||
},
|
||||
"name": "java.nio.channels.spi.SelectorProvider",
|
||||
"methods": [
|
||||
{
|
||||
"name": "openServerSocketChannel",
|
||||
"parameterTypes": [
|
||||
"java.net.ProtocolFamily"
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.channel.DefaultChannelConfig"
|
||||
},
|
||||
"name": "io.netty.buffer.ByteBufAllocator"
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.channel.DefaultChannelConfig"
|
||||
},
|
||||
"name": "io.netty.buffer.ByteBufUtil"
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.util.ResourceLeakDetector"
|
||||
},
|
||||
"name": "io.netty.buffer.AbstractByteBufAllocator",
|
||||
"allDeclaredMethods": true
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.util.ResourceLeakDetector"
|
||||
},
|
||||
"name": "io.netty.buffer.AdvancedLeakAwareByteBuf",
|
||||
"allDeclaredMethods": true
|
||||
},
|
||||
{
|
||||
"condition": {
|
||||
"typeReachable": "io.netty.util.ResourceLeakDetector"
|
||||
},
|
||||
"name": "io.netty.util.ReferenceCountUtil",
|
||||
"allDeclaredMethods": true
|
||||
}
|
||||
]
|
||||
@@ -0,0 +1,21 @@
|
||||
{
|
||||
"resources": {
|
||||
"includes": [
|
||||
{
|
||||
"pattern": "\\Qlogback\/\\E.*"
|
||||
},
|
||||
{
|
||||
"pattern": "\\Qlua\/redislocker\/redislock.lua\\E"
|
||||
},
|
||||
{
|
||||
"pattern": "\\Qapplication.yml\\E"
|
||||
},
|
||||
{
|
||||
"pattern": "\\Qbanner.txt\\E"
|
||||
},
|
||||
{
|
||||
"pattern": "\\Qlogback-spring.xml\\E"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,4 @@
|
||||
org.springframework.context.ApplicationListener=\
|
||||
io.seata.server.ServerApplicationListener
|
||||
io.seata.server.spring.listener.ServerApplicationListener
|
||||
org.springframework.context.ApplicationContextInitializer=\
|
||||
io.seata.server.spring.listener.SeataPropertiesLoader
|
||||
@@ -56,4 +56,4 @@ seata:
|
||||
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
|
||||
tokenValidityInMilliseconds: 1800000
|
||||
ignore:
|
||||
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
|
||||
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login
|
||||
|
||||
Reference in New Issue
Block a user