# 分布式锁一 数据库分布式锁
# 概述
java提供的synchronized,Lock是在JVM层面的锁,在单应用部署时,可保证数据的一致性,在集群环境中,仍有可能出问题,这时候需要考虑分布式锁。
数据库分布式锁,顾名思义就是基于数据库实现的一种锁,用于保证并发情况下数据一致性。基于mysql数据库的分布式锁有多种,可以用乐观锁或悲观锁实现,也可用一张表来实现分布式悲观锁。主要讲第二种的实现。
# 实现过程
数据库新建一张表用于,当我们要操作某个流程时,先在数据库里插入一条记录,当这个流程走完以后,再删除这条记录。基于 DB 的唯一索引。
当并发请求进来后,先判断这张表是否存在记录,如果不存在则执行流程,存在的话,则结束本次请求。
# 代码片段
- 新建一张表,业务类型和业务ID做联合唯一索引。
DROP TABLE IF EXISTS `cs_one_by_one`;
CREATE TABLE `cs_one_by_one` (
`BIZ_TYPE` varchar(32) NOT NULL COMMENT '业务类型',
`BIZ_ID` varchar(64) NOT NULL COMMENT '业务ID',
`METHOD` varchar(32) DEFAULT NULL COMMENT '方法',
`CREATED_TIME` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`BIZ_TYPE`,`BIZ_ID`),
UNIQUE KEY `UI_CS_ONE_BY_ONE_BIZ_TYPE_BIZ_ID` (`BIZ_TYPE`,`BIZ_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='OneByOne控制表';
- 实体类
public class OneByOne {
/** 业务类型 */
private String bizType;
/** 业务ID */
private String bizId;
/** 方法 */
private String method;
/** 创建时间 */
private Date createTime;
private String description;
private boolean insertSuccess;
/**
* 创建一个接一个处理记录
* @param bizType 业务类型
* @param bizId 业务ID
* @param method 方法
*/
public OneByOne(String bizType, String bizId, String method) {
this.bizType = bizType;
this.bizId = bizId;
this.method = method;
}
/**
* 获取业务类型
* @return 业务类型
*/
public String getBizType() {
return bizType;
}
/**
* 设置业务类型
* @param bizType 业务类型
*/
public void setBizType(String bizType) {
this.bizType = bizType;
}
/**
* 获取业务ID
* @return 业务ID
*/
public String getBizId() {
return bizId;
}
/**
* 设置业务ID
* @param bizId 业务ID
*/
public void setBizId(String bizId) {
this.bizId = bizId;
}
/**
* 获取方法
* @return 方法
*/
public String getMethod() {
return method;
}
/**
* 设置方法
* @param method 方法
*/
public void setMethod(String method) {
this.method = method;
}
/**
* 获取创建时间
* @return 创建时间
*/
public Date getCreateTime() {
return createTime;
}
/**
* 设置创建时间
* @param createTime 创建时间
*/
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
/**
* @return the description
*/
String getDescription() {
return description;
}
/**
* @param description the description to set
*/
void setDescription(String description) {
this.description = description;
}
/**
* @return the insertSuccess
*/
boolean isInsertSuccess() {
return insertSuccess;
}
/**
* @param insertSuccess the insertSuccess to set
*/
void setInsertSuccess(boolean insertSuccess) {
this.insertSuccess = insertSuccess;
}
}
- 处理模板
//回调接口
public interface CallBack<T> {
/**
* 调用
* @return 结果
*/
T invoke();
}
/**
* 一个接一个业务处理模版<br>
* 防止在请求并发下,业务重复处理,比如重复充值。<br>
*/
public interface OneByOneTemplate {
/**
* 执行
* @param oneByOne 一个接一个处理记录
* @param callBack 回调
* @return 执行结果
*/
<T> T execute(OneByOne oneByOne, CallBack<T> callBack);
}
/**
* 一个接一个业务处理模版默认实现<br>
*
* <p>
* 用一个业务处理表记录,在处理前对锁状态进行判断 ,判断逻辑参见{@link #beforeInvoke}方法<br>
*
* 业务处理表: 业务类型 PK|业务ID PK|方法|创建时间<br>
*/
public class OneByOneTemplateImpl implements OneByOneTemplate, InitializingBean {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(OneByOneTemplateImpl.class);
/** 插入处理记录 */
protected String insert;
/** 删除处理记录 */
protected String delete;
/** 表名 */
protected String table = "CS_ONE_BY_ONE";
/** 数据源 */
protected DataSource dataSource;
/** 事务模版 */
protected TransactionTemplate transactionTemplate;
/** Jdbc模版 */
protected JdbcTemplate jdbcTemplate;
/**
* {@inheritDoc}
*/
@Override
public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) {
oneByOne.setDescription(oneByOne.getBizType() + "-" + oneByOne.getBizId() + "-" + oneByOne.getMethod());
try {
this.beforeInvoke(oneByOne);
return callBack.invoke();
} finally {
this.afterInvoke(oneByOne);
}
}
/**
* 设置数据源
*
* @param dataSource 数据源
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* 设置表名
*
* @param table 表名
*/
public void setTable(String table) {
this.table = table;
}
/**
* {@inheritDoc}
*/
@Override
public void afterPropertiesSet() throws Exception {
if (this.dataSource == null) {
throw new AppException(CoreErrorCode.ERROR_PARAM_NULL, "数据源为空");
}
// 初始化Jdbc模版和事务模版
this.jdbcTemplate = new JdbcTemplate(this.dataSource);
this.transactionTemplate = new TransactionTemplate(new DataSourceTransactionManager(this.dataSource));
// 初始化SQL
this.insert = "INSERT INTO " + this.table + " (BIZ_TYPE, BIZ_ID, METHOD, CREATED_TIME) VALUES (?, ?, ?, ?)";
this.delete = "DELETE FROM " + this.table + " WHERE BIZ_TYPE = ? AND BIZ_ID = ?";
}
/**
* 回调前置
*
* @param oneByOne 一个接一个处理记录
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private void beforeInvoke(final OneByOne oneByOne) {
try {
oneByOne.setInsertSuccess(true);
// 插入处理记录
this.transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
jdbcTemplate.update(insert, oneByOne.getBizType(), oneByOne.getBizId(), oneByOne.getMethod(),
new Date());
return null;
}
});
} catch (Throwable t) {
oneByOne.setInsertSuccess(false);
if (logger.isWarnEnabled()) {
logger.warn(oneByOne.getDescription() + "插入处理记录失败!异常信息:{}", t);
}
// 如果插入失败,抛出AppException
throw new AppException(CoreErrorCode.ONE_BY_ONE_EXCEPTION, oneByOne.getDescription() + "业务正在处理中");
}
}
/**
* 回调后置
*
* @param oneByOne 一个接一个处理记录
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private void afterInvoke(final OneByOne oneByOne) {
try {
// 插入失败,不删除处理记录
if (!oneByOne.isInsertSuccess()) {
return;
}
// 删除处理记录
this.transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus status) {
jdbcTemplate.update(delete, oneByOne.getBizType(), oneByOne.getBizId());
return null;
}
});
} catch (Throwable t) {
logger.error(oneByOne.getDescription() + "删除处理记录失败!", t);
}
}
}
- 使用
oneByOneTemplate.execute(new OneByOne(OneByOneConstant.BIZ_TYPE_SETTLE_DETAIL_ACCEPT, dto.getRequestSerialNo(), "handle"),
new CallBack<Result>() {
@Override
public Result invoke() {
Result result = settleDetailAcceptHandle(dto);
return result;
}
});
# 实现有以下几个问题:
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
- 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
- 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
# 解决方法:
- 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
- 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
- 非阻塞的?搞一个while循环,直到insert成功再返回成功。
- 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
# 总结
数据库分布式锁实现上来说,是简单容易理解的。但是其性能以及可靠性,比不上另外两种实现。可适用并发量不是很高的业务上。