当前位置 博文首页 > 阳阳的博客:基于SpringBoot与数据库表记录的方式实现简单的分布

    阳阳的博客:基于SpringBoot与数据库表记录的方式实现简单的分布

    作者:[db:作者] 时间:2021-08-14 21:02

    同一进程内的不同线程操作共享资源时,我们只需要对资源加锁,比如利用JUC下的工具,就可以保证操作的正确性。对JUC不熟悉的同学,可以看看以下的几篇文章:

    • 浅说Synchronized
    • Synchronized的优化
    • Unsafe类
    • 浅探CAS实现原理
    • CountDownLatch实现原理
    • ThreadLocal使用不好,小心造成内存泄露!
    • 更多文章,在我的多线程专栏中

    但是,为了高可用,我们的系统总是多副本的,分布在不同的机器上,以上同进程内的锁机制就不再起作用。为了保证多副本系统对共享资源的访问,我们引入了分布式锁。

    分布式锁主要的实现方式有以下几种:

    • 基于数据库的,其中又细分为基于数据库的表记录、悲观锁、乐观锁
    • 基于缓存的,比如Redis
    • 基于Zookeeper的

    今天演示一下最简单的分布式锁方案——基于数据库表记录的分布式锁

    主要的原理就是利用数据库的唯一索引(对数据库的索引不了解的同学,可以参考我的另外一篇文章mysql索引简谈)

    例如,有以下的一张表:

    CREATE TABLE `test`.`Untitled`  (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增序号',
      `name` varchar(255) NOT NULL COMMENT '锁名称',
      `survival_time` int(11) NOT NULL COMMENT '存活时间,单位ms',
      `create_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
      `thread_name` varchar(255) NOT NULL COMMENT '线程名称',
      PRIMARY KEY (`id`) USING BTREE,
      UNIQUE INDEX `uk_name`(`name`) USING BTREE
    ) ENGINE = InnoDB ROW_FORMAT = Dynamic;

    其中name字段加上了唯一索引,多条含有同样name值的新增操作,数据库只能保证仅有一个操作成功,其他操作都会被拒绝掉,并且抛出“重复键”的错误。

    那么,当系统1准备获取分布式锁时,就尝试往数据库中插入一条name="key"的记录,如果插入成功,则代表获取锁成功。其他系统想要获取分布式锁,同样需要往数据库插入相同name的记录,当然数据库会报错,插入失败,也就代表着这些系统获取锁失败。当系统1想要释放掉锁时,删除掉此记录即可。thread_name列可以用来保证只能主动释放自己创建的锁。

    我们希望实现的分布式锁有以下的效果:

    1. 获取锁是阻塞的,获取不到会一直阻塞
    2. 锁会失效,超过锁的生存时间后,会自动释放掉。这一点可以避免某些系统因为宕机而无法主动释放锁的问题

    大致的流程图如下:

    使用到了以下依赖:

    • SpringBoot
    • MyBatis-plus
    • Lombok

    项目的工程目录为:

    ?

    其中pom文件用到的依赖:

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.6</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>3.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-extension</artifactId>
                <version>3.3.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

    配置项为:

    server:
      port: 9091
    
    
    spring:
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: a123
    
    logging:
      level:
        root: info

    用于映射数据库字段的实体类为:

    package com.yang.lock1.entity;
    
    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableField;
    import com.baomidou.mybatisplus.annotation.TableId;
    import com.baomidou.mybatisplus.annotation.TableName;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.util.Date;
    
    /**
     * @author qcy
     * @create 2020/08/25 15:03:47
     */
    @Data
    @NoArgsConstructor
    @TableName(value = "t_lock")
    public class Lock {
    
        /**
         * 自增序号
         */
        @TableId(value = "id", type = IdType.AUTO)
        private Integer id;
    
        /**
         * 锁名称
         */
        private String name;
    
        /**
         * 存活时间,单位ms
         */
        private int survivalTime;
    
        /**
         * 锁创建的时间
         */
        private Date createTime;
    
        /**
         * 线程名称
         */
        private String ThreadName;
    }
    

    Dao层:

    package com.yang.lock1.dao;
    
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.yang.lock1.entity.Lock;
    import org.apache.ibatis.annotations.Mapper;
    
    /**
     * @author qcy
     * @create 2020/08/25 15:06:24
     */
    @Mapper
    public interface LockDao extends BaseMapper<Lock> {
    }
    

    Service接口层:

    package com.yang.lock1.service;
    
    import com.baomidou.mybatisplus.extension.service.IService;
    import com.yang.lock1.entity.Lock;
    
    /**
     * @author qcy
     * @create 2020/08/25 15:07:44
     */
    public interface LockService extends IService<Lock> {
    
        /**
         * 阻塞获取分布式锁
         *
         * @param name         锁名称
         * @param survivalTime 存活时间
         */
        void lock(String name, int survivalTime);
    
        /**
         * 释放锁
         *
         * @param name 锁名称
         */
        public void unLock(String name);
    }
    

    Service实现层:

    package com.yang.lock1.service.impl;
    
    import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
    import com.yang.lock1.dao.LockDao;
    import com.yang.lock1.entity.Lock;
    import com.yang.lock1.service.LockService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.dao.DuplicateKeyException;
    import org.springframework.stereotype.Service;
    
    import java.util.Date;
    
    /**
     * @author qcy
     * @create 2020/08/25 15:08:25
     */
    @Slf4j
    @Service
    public class LockServiceImpl extends ServiceImpl<LockDao, Lock> implements LockService {
    
        @Override
        public void lock(String name, int survivalTime) {
            String threadName = "system1-" + Thread.currentThread().getName();
            while (true) {
                Lock lock = this.lambdaQuery().eq(Lock::getName, name).one();
                if (lock == null) {
                    //说明无锁
                    Lock lk = new Lock();
                    lk.setName(name);
                    lk.setSurvivalTime(survivalTime);
                    lk.setThreadName(threadName);
                    try {
                        save(lk);
                        log.info(threadName + "获取锁成功");
                        return;
                    } catch (DuplicateKeyException e) {
                        //继续重试
                        log.info(threadName + "获取锁失败");
                        continue;
                    }
                }
    
                //此时有锁,判断锁是否过期
                Date now = new Date();
                Date expireDate = new Date(lock.getCreateTime().getTime() + lock.getSurvivalTime());
                if (expireDate.before(now)) {
                    //锁已经过期
                    boolean result = removeById(lock.getId());
                    if (result) {
                        log.info(threadName + "删除了过期锁");
                    }
    
                    //尝试获取锁
                    Lock lk = new Lock();
                    lk.setName(name);
                    lk.setSurvivalTime(survivalTime);
                    lk.setThreadName(threadName);
                    try {
                        save(lk);
                        log.info(threadName + "获取锁成功");
                        return;
                    } catch (DuplicateKeyException e) {
                        log.info(threadName + "获取锁失败");
                    }
                }
            }
    
        }
    
        @Override
        public void unLock(String name) {
            //释放锁的时候,需要注意只能释放自己创建的锁
            String threadName = "system1-" + Thread.currentThread().getName();
            Lock lock = lambdaQuery().eq(Lock::getName, name).eq(Lock::getThreadName, threadName).one();
            if (lock != null) {
                boolean b = removeById(lock.getId());
                if (b) {
                    log.info(threadName + "释放了锁");
                } else {
                    log.info(threadName + "准备释放锁,但锁过期了,被其他客户端强制释放掉了");
                }
            } else {
                log.info(threadName + "准备释放锁,但锁过期了,被其他客户端强制释放掉了");
            }
        }
    
    }
    

    测试类如下:

    package com.yang.lock1;
    
    import com.yang.lock1.service.LockService;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    
    /**
     * @author qcy
     * @create 2020/08/25 15:10:54
     */
    @Slf4j
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class Lock1ApplicationTest {
    
        @Resource
        LockService lockService;
    
        @Test
        public void testLock() {
            log.info("system1准备获取锁");
            lockService.lock("key", 6 * 1000);
            try {
                //模拟业务耗时
                Thread.sleep(4 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lockService.unLock("key");
            }
        }
    
    }
    

    将代码复制一份出来,将system1改为system2。现在,同时启动两个系统:

    system1的输出如下:

    system2的输出如下:

    第23.037秒时,system1尝试获取锁,23.650秒时获取成功,持有分布式锁。第26秒时system2尝试获取锁,被阻塞。到27.701秒时,system1释放掉了锁,system2在27.749时才获取到了锁,在31秒时释放掉了。

    现在我们将system1的业务时长改为10秒,就可以模拟出system2释放system1超时的锁的场景了。

    先启动system1,再启动system2

    此时system1的输出如下:

    system2的输出如下:

    14秒时,system1获取到了锁,接着由于业务耗时突然超出预期,需要运行10秒。在此期间,system1创建的锁超过了其存活时间。此时system2在19秒时,删除了此过期锁,接着获取到了锁。24秒时,system1回头发现自己的锁已经被释放掉了,最后system2正常释放掉了自己的锁。

    基于数据库实现分布式锁,还有悲观锁与乐观锁方式,我会另开篇幅。

    cs