当前位置 博文首页 > m0_53222768的博客:64.zookeeper

    m0_53222768的博客:64.zookeeper

    作者:[db:作者] 时间:2021-06-13 15:39

    @一贤爱吃土豆

    1.引言

    • 在分布式环境下,如果舍弃SpringCloud,使用其他的分布式框架,那么注册心中,配置集中管理,集群管理,分布式锁,分布式任务,队列的管理想单独实现怎么办。

    2.Zookeeper介绍

    • Zookeeper本身是Hadoop生态园的中的一个组件,Zookeeper强大的功能,在Java分布式架构中,也会频繁的使用到Zookeeper。
      在这里插入图片描述
    • Zookeeper就是一个文件系统 + 监听通知机制

    3.Zookeeper安装

    • docker-compose.yml
    version: "3.1"
    services:
      zk:
       image: daocloud.io/daocloud/zookeeper:latest
       restart: always
       container_name: zk
       ports:
         - 2181:2181
    

    4.Zookeeper架构(重点)

    4.1:Zookeeper的架构图

    • 每一个节点都被称为znode
    • 每一个znode中都可以存储数据
    • 节点名称是不允许重复的
      在这里插入图片描述

    4.2:znode类型

    • 四种Znode
      • 持久节点:永久的保存在你的Zookeeper
      • 持久有序节点:永久的保存在你的Zookeeper,他会给节点添加一个有序的序号。 /xx -> /xx0000001
      • 临时节点:当存储的客户端和Zookeeper服务断开连接时,这个临时节点自动删除
      • 临时有序节点:当存储的客户端和Zookeeper服务断开连接时,这个临时节点自动删除,他会给节点添加一个有序的序号。 /xx -> /xx0000001

    4.3:Zookeeper的监听通知机制

    • 客户端可以去监听Zookeeper中的Znode节点。
    • Znode改变时(增删改),会通知监听当前Znode的客户端
      在这里插入图片描述

    5.Zookeeper常用命令

    • Zookeeper针对增删改查的常用命令:
    # 查询当前节点下的全部子节点
    ls 节点名称
    # 例子 ls /
    
    # 查询当前节点下的数据
    get 节点名称
    # 例子 get /zookeeper
    
    # 创建节点
    create [-s] [-e] znode名称 znode数据
    # -s:sequence,有序节点
    # -e:ephemeral,临时节点
    # 如果两个都不写,就是持久节点
    
    # 修改节点值
    set znode名称 新数据
    
    # 删除节点
    delete znode名称    # 没有子节点的znode
    rmr znode名称      # 删除当前节点和全部的子节点
    

    6.Zookeeper集群(重点)

    6.1:Zookeeper集群架构图

    在这里插入图片描述

    6.2:Zookeeper集群中节点的角色

    • Leader:Master主节点
    • Follower (默认的从节点):从节点,参与选举全新的Leader
    • Observer:从节点,不参与投票
    • Looking:正在找Leader节点

    6.3:Zookeeper投票策略

    • 每一个Zookeeper服务都会被分配一个全局唯一的myid,myid是一个数字。

    • Zookeeper在执行写数据时,每一个节点都有一个自己的FIFO的队列。保证写每一个数据的时候,顺序是不会乱的,Zookeeper还会给每一个数据分配一个全局唯一的zxid,数据越新zxid就越大。

    • 选举Leader:

      • 选举出zxid最大的节点作为Leader。
      • 在zxid相同的节点中,选举出一个myid最大的节点,作为Leader。

    6.4:搭建Zookeeper集群

    • 把之前安装好的单机的zookeeper的容器down掉,并删除yml文件
    • 在docker_zk目录下重新创建yml文件
    • zk1:2888:3888;2181 zk1为服务的名称 : 第一个2888为各个节点通信的端口,第二个3888为各个节点投票的端口 2181为容器内部的端口
    version: "3.1"
    services:
      zk1:
        image: zookeeper
        restart: always
        container_name: zk1
        ports:
          - 2181:2181
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      zk2:
        image: zookeeper
        restart: always
        container_name: zk2
        ports:
          - 2182:2181
        environment:
          ZOO_MY_ID: 2
          ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
      zk3:
        image: zookeeper
        restart: always
        container_name: zk3
        ports:
          - 2183:2181
        environment:
          ZOO_MY_ID: 3
          ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181
    
    • 启动zookeeper集群,并分别查看三个容器的角色,在容器的bin目录下执行 ./zkServer.sh status 进行查看

    • my_id谁大就谁做leader的前提是,没有leader的情况下会选大的做leader
      把zk3宕机,则会选举zk2作为新的leader
      给zk2创建新的节点,则zk1可以同步到新创建的节点信息
      把zk2宕机了,则整个集群就瘫痪了
      重启zk3,因为zk1中有数据,所以zxid比zk3大,因此选zk1作为leader

    7.java操作Zookeeper

    7.1:java连接Zookeeper

    • 创建Maven工程
    • 导入依赖:
    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
    
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    
    • 编写连接Zookeeper集群的工具类:
    public class ZkUtil {
        public static CuratorFramework cf(){
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,2);
            CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString("192.168.206.142:2181,192.168.206.142:2182,192.168.206.142:2183")
                    .retryPolicy(retryPolicy)
                    .build();
            cf.start();
            return cf;
        }
    }
    
    • 测试类:
      在这里插入图片描述

    7.2:java操作Znode节点

    • 查询:
    public class Demo2 {
        CuratorFramework cf = ZkUtil.cf();
        // 获取子节点
        @Test
        public void getChildren() throws Exception {
            List<String> strings = cf.getChildren().forPath("/");
            for (String string : strings) {
                System.out.println(string);
            }
        }
        // 获取节点数据
        @Test
        public void getData() throws Exception {
            byte[] bytes = cf.getData().forPath("/qf");
            System.out.println(new String(bytes,"UTF-8"));
        }
    }
    
    • 添加:
    @Test
    public void create() throws Exception {
        cf.create().withMode(CreateMode.PERSISTENT).forPath("/qf2","uuuu".getBytes());
    }
    
    • 修改:
    @Test
    public void update() throws Exception {
        cf.setData().forPath("/qf2","oooo".getBytes());
    }
    
    • 删除:
    @Test
    public void delete() throws Exception {
        cf.delete().deletingChildrenIfNeeded().forPath("/qf2");
    }
    
    • 查看znode的状态:
    @Test
    public void stat() throws Exception {
        Stat stat = cf.checkExists().forPath("/qf");
        System.out.println(stat);
    }
    

    7.3:监听通知机制

    • 实现方式:
    public class Demo3 {
        CuratorFramework cf = ZkUtil.cf();
        @Test
        public void listen() throws Exception {
            //1. 创建NodeCache对象,指定要监听的znode
            NodeCache nodeCache = new NodeCache(cf,"/qf");
            nodeCache.start();
            //2. 添加一个监听器
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    byte[] data = nodeCache.getCurrentData().getData();
                    Stat stat = nodeCache.getCurrentData().getStat();
                    String path = nodeCache.getCurrentData().getPath();
                    System.out.println("监听的节点是:" + path);
                    System.out.println("节点现在的数据是:" + new String(data,"UTF-8"));
                    System.out.println("节点状态是:" + stat);
                }
            });
            System.out.println("开始监听!!");
            //3. System.in.read();
            System.in.read();
        }
    }