标签搜索

目 录CONTENT

文章目录

ZooKeeper部署与基本操作

陈铭
2023-02-25 / 0 评论 / 1 点赞 / 188 阅读 / 6,403 字 / 正在检测是否收录...

安装部署

安装包下载

官网下载,这边用的是apache-zookeeper-3.5.7-bin.tar.gz进行演示

安装 JDK

我们从之前部署Hadoop集群的三台机器进行安装ZooKeeper,这样JDK、环境变量和同步脚本xsync就不用再设置了

本地安装

安装ZooKeeper

拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到hadoop100这台机器(Hadoop集群的主节点)的/opt/software/下

cd /opt/software
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
cd /opt/module
mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7

# 配置环境变量
sudo vim /etc/profile
# 添加如下内容
# ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export PATH=$PATH:$ZOOKEEPER_HOME/bin

配置ZooKeeper

# 修改zookeeper配置
cd $ZOOKEEPER_HOME/conf
mv zoo_sample.cfg zoo.cfg

vim $ZOOKEEPER_HOME/conf/zoo.cfg
# 修改如下内容(本地安装就修改个dataDir)
dataDir=/opt/module/zookeeper-3.5.7/zkData

cd $ZOOKEEPER_HOME
mkdir zkData

启动ZooKeeper

# 启动
zkServer.sh start

# 查看进程是否启动
jps
# 启动正常显示如下
4020 Jps
4001 QuorumPeerMain

# 查看状态
zkServer.sh status
# 启动正常显示如下
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Mode: standalone

# 进入客户端
zkCli.sh

# 退出客户端(zkCli.sh进入到客户端环境下quit退出)
quit

# 停止 Zookeeper
zkServer.sh stop

配置参数解读

Zookeeper中的配置文件zoo.cfg中参数含义解读如下:

  1. tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
    image-1677293425241
  2. initLimit = 10:LF初始通信时限
    image-1677293435897
    Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
  3. syncLimit = 5:LF同步通信时限
    image-1677293435897
    Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死
    掉,从服务器列表中删除Follwer。
  4. dataDir:保存Zookeeper中的数据
    注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
  5. clientPort = 2181:客户端连接端口,通常不做修改。

集群部署

集群部署将会在 hadoop100、hadoop101 和 hadoop102 三个节点上都部署 Zookeeper,在之前本地安装基础上,hadoop100已经安装上了ZooKeeper了

安装ZooKeeper

在hadoop100这台机上操作

cd /opt/module/zookeeper-3.5.7

vim /opt/module/zookeeper-3.5.7/zkData/myid
# 添加如下内容
2

配置ZooKeeper

在hadoop100这台机上操作

# 修改zookeeper配置
vim $ZOOKEEPER_HOME/conf/zoo.cfg
# 添加如下内容(dataDir在之前本地安装时候配置好了,dataDir=/opt/module/zookeeper-3.5.7/zkData)
# 其中server.2对应myid文件中的内容,即2。hadoop101 和 hadoop102在分发安装包后需要将myid文件中的内容修改成3和4
#######################cluster##########################
server.2=hadoop100:2888:3888
server.3=hadoop101:2888:3888
server.4=hadoop102:2888:3888

# 分发安装包
cd /opt/module
xsync zookeeper-3.5.7

分发完成后后在另外两台机器上需要重新配置下zookeeper的环境变量,并修改/opt/module/zookeeper-3.5.7/zkData/myid(hadoop101的myid内容改成3, hadoop102的myid内容改成4)
配置参数 server.A=B:C:D 解读

  • A 是一个数字,表示这个是第几号服务器;
    集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据
    就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比
    较从而判断到底是哪个 server。
  • B 是这个服务器的地址;
  • C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
  • D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的
    Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

启动ZooKeeper

# 分别在hadoop100、hadoop101 和 hadoop102 三个节点上执行
zkServer.sh start

# 查看状态(在三个节点上执行)
zkServer.sh status
# 启动正常可以看到
# hadoop100的status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Mode: follower
# hadoop101的status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Mode: leader
# hadoop102的status
JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Mode: follower

集群启动脚本

#!/bin/bash

case $1 in 
"start"){
  for i in hadoop100 hadoop101 hadoop102 
  do
    echo ---------- zookeeper $i 启动 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
  done
};;
"stop"){
  for i in hadoop100 hadoop101 hadoop102 
  do
    echo ---------- zookeeper $i 停止 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
  done
};;
"status"){
  for i in hadoop100 hadoop101 hadoop102 
  do
    echo ---------- zookeeper $i 状态 ------------
    ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
  done
};;
esac

如果集群启动脚本启动后,遇到了JAVA_HOME找不到的报错(但其实path已经配置上了),可以修改如下
image-1677334649673

vim module/zookeeper-3.5.7/bin/zkEnv.sh
# 在文件开头添加如下
export JAVA_HOME=/opt/module/jdk1.8.0_212

# 分发一下
xsync /opt/module/zookeeper-3.5.7/bin/zkEnv.sh 

选举机制

第一次启动

image-1677300161198

  1. 服务器1启 动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
  5. 服务器5启动,同4一样当小弟

非第一次启动

image-1677300294958

  1. 当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
  • 服务器初始化启动。
  • 服务器运行期间无法和Leader保持连接。
  1. 而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
  • 集群中本来就已经存在一个Leader。 对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
  • 集群中确实不存在Leader。 假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。SID为1、2、4的机器投票情况(EPOCH,ZXID,SID):(1,8,1) 、(1,8,2) 、(1,7,4),他们分别投票了给了自己,但最终SID为2的机器成为Leader。
  • 选举Leader规则: ① EPOCH大的直接胜出、② EPOCH相同,事务id大的胜出、③ 事务id相同,服务器id大的胜出

客户端命令行操作

命令行语法

命令基本语法 功能描述
help 显示所有操作命令
ls path 使用 ls 命令来查看当前 znode 的子节点 [可监听];-w 监听子节点变化;-s 附加次级信息
create 普通创建;-s 含有序列;-e 临时(重启或者超时消失)
get path 获得节点的值 [可监听];-w 监听节点内容变化;-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点
# 启动客户端
zkCli.sh -server hadoop100:2181

# 进入客户端后,显示所有操作命令
help

znode 节点数据信息

后续所有命令都是进入到客户端执行的

# 查看当前znode中所包含的内容
ls /
# 执行完正常显示
[zookeeper]

# 查看当前节点详细数据
ls -s /
# 执行完正常显示
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  • czxid:创建节点的事务 zxid。每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
  • ctime:znode 被创建的毫秒数(从 1970 年开始)
  • mzxid:znode 最后更新的事务 zxid
  • mtime:znode 最后修改的毫秒数(从 1970 年开始)
  • pZxid:znode 最后更新的子节点 zxid
  • cversion:znode 子节点变化号,znode 子节点修改次数
  • dataversion:znode 数据变化号
  • aclVersion:znode 访问控制列表的变化号
  • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  • dataLength:znode 的数据长度
  • numChildren:znode 子节点数量

节点类型(持久/短暂/有序号/无序号)

image-1677301145353

分别创建2个普通节点(永久节点 + 不带序号)

# 创建节点语法:create <子节点全路径或相对路径> <数据内容>
create /sanguo "diaochan"
# 执行完正常显示
Created /sanguo

create /sanguo/shuguo "liubei"
# 执行完正常显示
Created /sanguo/shuguo

注意: 创建节点时,要赋值

获得节点的值

get -s /sanguo
# 执行完正常显示
diaochan
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000003
mtime = Wed Aug 29 00:03:23 CST 2018
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1

get -s /sanguo/shuguo
# 执行完正常显示
liubei
cZxid = 0x100000004
ctime = Wed Aug 29 00:04:35 CST 2018
mZxid = 0x100000004
mtime = Wed Aug 29 00:04:35 CST 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0

创建带序号的节点(永久节点 + 带序号)

# 先创建一个普通的根节点/sanguo/weiguo
create /sanguo/weiguo "caocao"
# 执行完正常显示
Created /sanguo/weiguo

# 创建带序号的节点
create -s /sanguo/weiguo/zhangliao "zhangliao"
# 执行完正常显示
Created /sanguo/weiguo/zhangliao0000000000

create -s /sanguo/weiguo/zhangliao "zhangliao"
# 执行完正常显示
Created /sanguo/weiguo/zhangliao0000000001

create -s /sanguo/weiguo/xuchu "xuchu"
# 执行完正常显示
Created /sanguo/weiguo/xuchu0000000002

注意: 如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排
序时从 2 开始,以此类推。

创建短暂节点(短暂节点 + 不带序号 or 带序号)

# 创建短暂的不带序号的节点
create -e /sanguo/wuguo "zhouyu"
# 执行完正常显示
Created /sanguo/wuguo

# 创建短暂的带序号的节点
create -e -s /sanguo/wuguo "zhouyu"
# 执行完正常显示
Created /sanguo/wuguo0000000003

# 在当前客户端是能查看到的
ls /sanguo
# 执行完正常显示
[shuguo, weiguo, wuguo, wuguo0000000003]

# 退出当前客户端然后再重启客户端
quit
# 退出客户端后再进入
zkCli.sh

# 再次查看根目录下短暂节点已经删除
ls /sanguo
# 执行完正常显示
[shuguo, weiguo]

修改节点数据值

set /sanguo/weiguo "simayi"

监听器原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目
录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数
据的任何改变都能快速的响应到监听了该节点的应用程序。
image-1677301196116

节点的值变化监听

# 在 hadoop102 主机的客户上注册监听/sanguo 节点数据变化
get -w /sanguo

# 在 hadoop101 主机的客户端上修改/sanguo 节点的数据
set /sanguo "xisi"

# 观察 hadoop102 主机收到数据变化的监听
# 执行完正常显示
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo

注意: 在hadoop101再多次修改/sanguo的值,hadoop104上不会再收到监听。因为注册
一次,只能监听一次。想再次监听,需要再次注册。

节点的子节点变化监听(路径变化)

# 在 hadoop102 主机上注册监听/sanguo 节点的子节点变化
ls -w /sanguo
# 执行完正常显示
[shuguo, weiguo]

# 在 hadoop101 主机/sanguo 节点上创建子节点
create /sanguo/jin "simayi"
# 执行完正常显示
Created /sanguo/jin

# 观察 hadoop102 主机收到子节点变化的监听
# 执行完正常显示
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo

注意: 节点的路径变化,也是注册一次,生效一次。想多次生效,就需要多次注册。

节点删除与查看

# 删除节点
delete /sanguo/jin

# 递归删除节点
deleteall /sanguo/shuguo

# 查看节点状态
stat /sanguo
# 执行完正常显示
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000011
mtime = Wed Aug 29 00:21:23 CST 2018
pZxid = 0x100000014
cversion = 9
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 1

客户端 API 操作

前提: 保证 hadoop100、hadoop101、hadoop102 服务器上 Zookeeper 集群服务端启动。

IDEA 环境搭建

添加pom文件

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.7</version>
    </dependency>
</dependencies>

拷贝log4j.properties文件到项目根目录

需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n 

创建 ZooKeeper 客户端

package ltd.cmjava.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class zkClient {

    // 注意:逗号左右不能有空格
    private String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zkClient;

    @Before
    public void init() throws IOException {

        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

//                System.out.println("-------------------------------");
//                List<String> children = null;
//                try {
//                    children = zkClient.getChildren("/", true);
//
//                    for (String child : children) {
//                        System.out.println(child);
//                    }
//
//                    System.out.println("-------------------------------");
//                } catch (KeeperException e) {
//                    e.printStackTrace();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
            }
        });
    }

    @Test
    public void create() throws KeeperException, InterruptedException {
        String nodeCreated = zkClient.create("/cm", "ss.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/", true);

        for (String child : children) {
            System.out.println(child);
        }

        // 延时
        Thread.sleep(Long.MAX_VALUE);
    }

    @Test
    public void exist() throws KeeperException, InterruptedException {

        Stat stat = zkClient.exists("/cm", false);

        System.out.println(stat==null? "not exist " : "exist");
    }
}

创建子节点

    @Test
    public void create() throws KeeperException, InterruptedException {
        String nodeCreated = zkClient.create("/cm", "ss.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

测试: 在 hadoop100 的 zk 客户端上查看创建节点情况

get -s /cm
# 执行正常显示
shuaige

获取子节点并监听节点变化

去掉new ZooKeeper处关于重写new Watcher()的process方法的注释,相当于注册一个监听器,然后再运行getChildren方法

    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/", true);

        for (String child : children) {
            System.out.println(child);
        }

        // 延时
        Thread.sleep(Long.MAX_VALUE);
    }

在 IDEA 控制台上看到如下节点:

zookeeper
sanguo
cm

在 hadoop100 的客户端上创建再创建一个节点/cm1,观察 IDEA 控制台

create /cm1 "cm1"

在 hadoop100 的客户端上删除节点/cm1,观察 IDEA 控制台

delete /cm1

判断 Znode 是否存在

    @Test
    public void exist() throws KeeperException, InterruptedException {

        Stat stat = zkClient.exists("/cm", false);

        System.out.println(stat==null? "not exist " : "exist");
    }

客户端向服务端写数据流程

image-1677309742025


image-1677309751239

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

image-1677310409027

具体实现

先在集群上创建/servers 节点

# 进入客户端
create /servers "servers"
# 执行正常显示
Created /servers

在 Idea 中创建服务器端,向 Zookeeper 注册事件

package ltd.cmjava.case1;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

    private String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

        DistributeServer server = new DistributeServer();
        // 1 获取zk连接
        server.getConnect();

        // 2 注册服务器到zk集群
        server.regist(args[0]);


        // 3 启动业务逻辑(睡觉)
        server.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void regist(String hostname) throws KeeperException, InterruptedException {
        String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname +" is online") ;
    }

    private void getConnect() throws IOException {

        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

客户端代码

package ltd.cmjava.case1;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

    private String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();

        // 1 获取zk连接
        client.getConnect();

        // 2 监听/servers下面子节点的增加和删除
        client.getServerList();

        // 3 业务逻辑(睡觉)
        client.business();

    }

    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void getServerList() throws KeeperException, InterruptedException {
        List<String> children = zk.getChildren("/servers", true);

        ArrayList<String> servers = new ArrayList<>();

        for (String child : children) {

            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        // 打印
        System.out.println(servers);
    }

    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

                try {
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

测试

在 Linux 命令行上操作增加减少服务器

  1. 启动 DistributeClient 客户端
  2. 在 hadoop100 上 zk 的客户端/servers 目录上创建临时带序号节点
create -e -s /servers/hadoop100 "hadoop100"
create -e -s /servers/hadoop101 "hadoop101"
  1. 观察 Idea 控制台变化
[hadoop100]
[hadoop101, hadoop100]
  1. 执行删除操作
delete /servers/hadoop1000000000000
  1. 观察 Idea 控制台变化
[hadoop101]

在 Idea 上操作增加减少服务器

  1. 启动 DistributeClient 客户端(如果已经启动过,不需要重启)
  2. 启动 DistributeServer 服务
    点击 Edit Configurations…
    image-1677310476378
    在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop100
    image-1677310706855
  3. 观察 DistributeServer 控制台,提示 hadoop100 is online
  4. 观察 DistributeClient 控制台,提示 hadoop100 is online
  5. 同样查看下linux端的节点信息
ls /servers
# 应该会显示,其中hadoop1000000000002是DistributeServer启动时创建的新节点
[hadoop1000000000002, hadoop1010000000001]

ZooKeeper 分布式锁案例

什么叫做分布式锁呢?

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
image-1677310774602

原生 Zookeeper 实现分布式锁案例

分布式锁实现

package ltd.cmjava.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributedLock {

    private final String connectString = "hadoop100:2181,hadoop101:2181,hadoop102:2181";
    private final int sessionTimeout = 2000;
    private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentMode;

    public DistributedLock() throws IOException, InterruptedException, KeeperException {

        // 获取连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch  如果连接上zk  可以释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }

                // waitLatch  需要释放
                if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
                    waitLatch.countDown();
                }
            }
        });

        // 等待zk正常连接后,往下走程序
        connectLatch.await();

        // 判断根节点/locks是否存在
        Stat stat = zk.exists("/locks", false);

        if (stat == null) {
            // 创建一下根节点
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zklock() {
        // 创建对应的临时带序号节点
        try {
            currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // wait一小会, 让结果更清晰一些
            Thread.sleep(10);

            // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点

            List<String> children = zk.getChildren("/locks", false);

            // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
            if (children.size() == 1) {
                return;
            } else {
                Collections.sort(children);

                // 获取节点名称 seq-00000000
                String thisNode = currentMode.substring("/locks/".length());
                // 通过seq-00000000获取该节点在children集合的位置
                int index = children.indexOf(thisNode);

                // 判断
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // 就一个节点,可以获取锁了
                    return;
                } else {
                    // 需要监听  他前一个节点变化
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath,true,new Stat());

                    // 等待监听
                    waitLatch.await();

                    return;
                }
            }


        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    // 解锁
    public void unZkLock() {

        // 删除节点
        try {
            zk.delete(this.currentMode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }

    }

}

分布式锁测试

package ltd.cmjava.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributedLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

       final  DistributedLock lock1 = new DistributedLock();

        final  DistributedLock lock2 = new DistributedLock();

       new Thread(new Runnable() {
           @Override
           public void run() {
               try {
                   lock1.zklock();
                   System.out.println("线程1 启动,获取到锁");
                   Thread.sleep(5 * 1000);

                   lock1.unZkLock();
                   System.out.println("线程1 释放锁");
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    lock2.zklock();
                    System.out.println("线程2 启动,获取到锁");
                    Thread.sleep(5 * 1000);

                    lock2.unZkLock();
                    System.out.println("线程2 释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}

观察控制台变化:

运行DistributedLockTest的main方法

线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁

Curator 框架实现分布式锁案例

原生的 Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归

Curator案例实操

Curator是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
详情请查看官方文档
添加pom依赖

<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>4.3.0</version>
</dependency>
<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>4.3.0</version>
</dependency>
<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-client</artifactId>
     <version>4.3.0</version>
</dependency>

代码实现

package ltd.cmjava.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop100:2181,hadoop101:2181,hadoop102:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功");
        return client;
    }
}

观察控制台变化:

线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁
1

评论区