# 分布式锁一 数据库分布式锁

# 概述

  • java提供的synchronized,Lock是在JVM层面的锁,在单应用部署时,可保证数据的一致性,在集群环境中,仍有可能出问题,这时候需要考虑分布式锁。

  • 数据库分布式锁,顾名思义就是基于数据库实现的一种锁,用于保证并发情况下数据一致性。基于mysql数据库的分布式锁有多种,可以用乐观锁或悲观锁实现,也可用一张表来实现分布式悲观锁。主要讲第二种的实现。

# 实现过程

  • 数据库新建一张表用于,当我们要操作某个流程时,先在数据库里插入一条记录,当这个流程走完以后,再删除这条记录。基于 DB 的唯一索引。

  • 当并发请求进来后,先判断这张表是否存在记录,如果不存在则执行流程,存在的话,则结束本次请求。

# 代码片段

  • 新建一张表,业务类型业务ID做联合唯一索引。
DROP TABLE IF EXISTS `cs_one_by_one`;
CREATE TABLE `cs_one_by_one` (
  `BIZ_TYPE` varchar(32) NOT NULL COMMENT '业务类型',
  `BIZ_ID` varchar(64) NOT NULL COMMENT '业务ID',
  `METHOD` varchar(32) DEFAULT NULL COMMENT '方法',
  `CREATED_TIME` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`BIZ_TYPE`,`BIZ_ID`),
  UNIQUE KEY `UI_CS_ONE_BY_ONE_BIZ_TYPE_BIZ_ID` (`BIZ_TYPE`,`BIZ_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='OneByOne控制表';
  • 实体类
public class OneByOne {

    /** 业务类型 */
    private String bizType;

    /** 业务ID */
    private String bizId;

    /** 方法 */
    private String method;

    /** 创建时间 */
    private Date   createTime;

    
    private String description;
    
    private boolean insertSuccess;
    
    
    /**
     * 创建一个接一个处理记录
     * @param bizType 业务类型
     * @param bizId 业务ID 
     * @param method 方法
     */
    public OneByOne(String bizType, String bizId, String method) {
        this.bizType = bizType;
        this.bizId = bizId;
        this.method = method;
    }

    /**
     * 获取业务类型
     * @return 业务类型
     */
    public String getBizType() {
        return bizType;
    }

    /**
     * 设置业务类型
     * @param bizType 业务类型
     */
    public void setBizType(String bizType) {
        this.bizType = bizType;
    }

    /**
     * 获取业务ID
     * @return 业务ID
     */
    public String getBizId() {
        return bizId;
    }

    /**
     * 设置业务ID
     * @param bizId 业务ID
     */
    public void setBizId(String bizId) {
        this.bizId = bizId;
    }

    /**
     * 获取方法
     * @return 方法
     */
    public String getMethod() {
        return method;
    }

    /**
     * 设置方法
     * @param method 方法
     */
    public void setMethod(String method) {
        this.method = method;
    }

    /**
     * 获取创建时间
     * @return 创建时间
     */
    public Date getCreateTime() {
        return createTime;
    }

    /**
     * 设置创建时间
     * @param createTime 创建时间
     */
    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }
    
    /**
     * @return the description
     */
    String getDescription() {
        return description;
    }

    /**
     * @param description the description to set
     */
    void setDescription(String description) {
        this.description = description;
    }

    /**
     * @return the insertSuccess
     */
    boolean isInsertSuccess() {
        return insertSuccess;
    }

    /**
     * @param insertSuccess the insertSuccess to set
     */
    void setInsertSuccess(boolean insertSuccess) {
        this.insertSuccess = insertSuccess;
    }

}
  • 处理模板
//回调接口
public interface CallBack<T> {
    /**
     * 调用
     * @return 结果
     */
    T invoke();
}
/**
 * 一个接一个业务处理模版<br>
 * 防止在请求并发下,业务重复处理,比如重复充值。<br>
 */
public interface OneByOneTemplate {

    /**
     * 执行
     * @param oneByOne 一个接一个处理记录
     * @param callBack 回调
     * @return 执行结果
     */
    <T> T execute(OneByOne oneByOne, CallBack<T> callBack);

}
/**
 * 一个接一个业务处理模版默认实现<br>
 * 
 * <p>
 * 用一个业务处理表记录,在处理前对锁状态进行判断 ,判断逻辑参见{@link #beforeInvoke}方法<br>
 * 
 * 业务处理表: 业务类型 PK|业务ID PK|方法|创建时间<br>
 */
public class OneByOneTemplateImpl implements OneByOneTemplate, InitializingBean {

    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(OneByOneTemplateImpl.class);

    /** 插入处理记录 */
    protected String insert;

    /** 删除处理记录 */
    protected String delete;

    /** 表名 */
    protected String table = "CS_ONE_BY_ONE";

    /** 数据源 */
    protected DataSource dataSource;

    /** 事务模版 */
    protected TransactionTemplate transactionTemplate;

    /** Jdbc模版 */
    protected JdbcTemplate jdbcTemplate;

    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) {
        oneByOne.setDescription(oneByOne.getBizType() + "-" + oneByOne.getBizId() + "-" + oneByOne.getMethod());

        try {
            this.beforeInvoke(oneByOne);

            return callBack.invoke();
        } finally {
            this.afterInvoke(oneByOne);
        }
    }

    /**
     * 设置数据源
     * 
     * @param dataSource 数据源
     */
    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * 设置表名
     * 
     * @param table 表名
     */
    public void setTable(String table) {
        this.table = table;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        if (this.dataSource == null) {
            throw new AppException(CoreErrorCode.ERROR_PARAM_NULL, "数据源为空");
        }

        // 初始化Jdbc模版和事务模版
        this.jdbcTemplate = new JdbcTemplate(this.dataSource);
        this.transactionTemplate = new TransactionTemplate(new DataSourceTransactionManager(this.dataSource));

        // 初始化SQL
        this.insert = "INSERT INTO " + this.table + " (BIZ_TYPE, BIZ_ID, METHOD, CREATED_TIME) VALUES (?, ?, ?, ?)";

        this.delete = "DELETE FROM " + this.table + " WHERE BIZ_TYPE = ? AND BIZ_ID = ?";
    }

    /**
     * 回调前置
     * 
     * @param oneByOne 一个接一个处理记录
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void beforeInvoke(final OneByOne oneByOne) {
        try {
            oneByOne.setInsertSuccess(true);

            // 插入处理记录
            this.transactionTemplate.execute(new TransactionCallback() {
                @Override
                public Object doInTransaction(TransactionStatus status) {
                    jdbcTemplate.update(insert, oneByOne.getBizType(), oneByOne.getBizId(), oneByOne.getMethod(),
                            new Date());

                    return null;
                }
            });
        } catch (Throwable t) {
            oneByOne.setInsertSuccess(false);

            if (logger.isWarnEnabled()) {
                logger.warn(oneByOne.getDescription() + "插入处理记录失败!异常信息:{}", t);
            }

            // 如果插入失败,抛出AppException
            throw new AppException(CoreErrorCode.ONE_BY_ONE_EXCEPTION, oneByOne.getDescription() + "业务正在处理中");
        }
    }

    /**
     * 回调后置
     * 
     * @param oneByOne 一个接一个处理记录
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void afterInvoke(final OneByOne oneByOne) {
        try {
            // 插入失败,不删除处理记录
            if (!oneByOne.isInsertSuccess()) {
                return;
            }

            // 删除处理记录
            this.transactionTemplate.execute(new TransactionCallback() {
                @Override
                public Object doInTransaction(TransactionStatus status) {
                    jdbcTemplate.update(delete, oneByOne.getBizType(), oneByOne.getBizId());

                    return null;
                }
            });
        } catch (Throwable t) {
            logger.error(oneByOne.getDescription() + "删除处理记录失败!", t);
        }
    }
}
  • 使用
oneByOneTemplate.execute(new OneByOne(OneByOneConstant.BIZ_TYPE_SETTLE_DETAIL_ACCEPT, dto.getRequestSerialNo(), "handle"),
            new CallBack<Result>() {
                @Override
                public Result invoke() {
                    Result result = settleDetailAcceptHandle(dto);
                    return result;
                }
            });

# 实现有以下几个问题:

  • 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
  • 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
  • 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
  • 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

# 解决方法:

  • 数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
  • 没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
  • 非阻塞的?搞一个while循环,直到insert成功再返回成功。
  • 非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。

# 总结

数据库分布式锁实现上来说,是简单容易理解的。但是其性能以及可靠性,比不上另外两种实现。可适用并发量不是很高的业务上。