1.结合实际需求场景详细讲解zookeeper的概念 2.完成zookeeper集群的搭建 3.简单使用zookeeper shell操作zookeeper 4.详细讲解zookeeper的java api的使用及典型的需求场景的案例代码
1.深入理解zookeeper的使用场景及概念 2.熟练使用zookeeper的java api 3.理解监听集群主机上下线案例代码
Zookeeper是一个分布式协调服务;就是为用户的分布式应用程序提供协调服务
zookeeper是为别的分布式程序服务的
Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)
Zookeeper集群的角色: Leader 和 follower (Observer)
zookeeper在底层最核心的两个功能:
层次化的目录结构,命名符合常规文件系统规范
每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
客户端应用可以在节点上设置监视器
Znode有两种类型:
Znode有四种形式的目录节点(默认是persistent )
PERSISTENT 持久类型
PERSISTENT_SEQUENTIAL(持久序列类型/test0000000019 )sequential
EPHEMERAL 短暂类型
EPHEMERAL_SEQUENTIAL 短暂序列类型
创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
Zookeeper作为一个分布式的服务框架,主要用来解决分布式集群中应用系统的一致性问题,它能提供基于类似于文件系统的目录节点树方式的数据存储,但是 Zookeeper并不是用来专门存储数据的,它的作用主要是用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理,后面将会详细介绍 Zookeeper能够解决的一些典型问题,这里先介绍一下,Zookeeper 的操作接口和简单使用示例。
Zookeeper使用java编写,运行在jvm上,所以需要提前安装并配置好好java环境,推荐Oracle jdk1.7及以上版本。
下载地址:http://apache.opencas.org/zookeeper/ 从官方网站上下载tar.gz包,我们这里使用的是:zookeeper-3.4.7.tar.gz
集群规划
主机名(hostname) | 安装软件 | 运行进程 |
---|---|---|
min1 | zookeeper-3.4.7 | QuorumPeerMain |
min2 | zookeeper-3.4.7 | QuorumPeerMain |
min3 | zookeeper-3.4.7 | QuorumPeerMain |
安装步骤
在min1机器上安装zookeeper-3.4.7
上传zookeeper-3.4.7.tar.gz到/ home / hadoop / apps
解压到当前目录(/ home / hadoop / apps)
xxxxxxxxxx
cd /home/hadoop/apps
tar -zvxf zookeeper-3.4.7.tar.gz
配置
添加一个zoo.cfg配置文件
xxxxxxxxxx
cd 在/home/hadoop/apps/zookeeper-3.4.7/conf
mv zoo_sample.cfg zoo.cfg
#在/ home / hadoop / apps/zookeeper-3.4.7/创建一个data文件夹
mkdir data
修改配置文件(zoo.cfg)
xvi zoo.cfg #添加如下内容
#为zookeeper指定一个工作目录,此目录要手动创建
dataDir=/home/hadoop/app/zookeeper-3.4.7/data
#指定集群中各个机器之间地址及通信端口
#3888是选举端口 2888是leader和follower通信端口
#注意 域名(如min1) 都要在各个机器/etc/hosts文件中配置了
#在zoo.cfg最后添加如下内容
server.1=min1:2888:3888
server.2=min2:2888:3888
server.3=min3:2888:3888
在上面server.N对应的主机的“dataDir=/home/hadoop/app/zookeeper-3.4.7/data”目录下创建一个myid文件,里面内容是N( server.1对应的min1里面的myid文件内容为1
server.2对应的min2里面的myid文件内容为2
server.3对应的min3里面的myid文件内容为3)
xxxxxxxxxx
cd /home/hadoop/app/zookeeper-3.4.7/data
touch myid #创建一个myid的文件
echo "1" > myid #写入的内容要对应,本机是min1 设置的名称是server.1 因此写入时,要将1写入mydid
设置zookeeper日志文件的zookeeper.out存放目录
xxxxxxxxxx
vi /home/hadoop/apps/zookeeper-3.4.7/bin/zkEnv.sh #在如下位置设置
将在min1配置好的zk拷贝到min2和min3节点
xxxxxxxxxx
scp -r /home/hadoop/apps/zookeeper-3.4.7/ min2:/home/hadoop/apps/
#在min2的机器中修改myid的值
cd /home/hadoop/app/zookeeper-3.4.7/data
echo "2" > myid
scp -r /home/hadoop/apps/zookeeper-3.4.7/ min3:/home/hadoop/apps/
#在min3的机器中修改myid的值
cd /home/hadoop/app/zookeeper-3.4.7/data
echo "3" > myid
分别在min1、min2和min3中启动zookeeper
xxxxxxxxxx
cd /home/hadoop/apps/zookeeper-3.4.7/bin
./zkServer.sh start #启动zookeeper
分别在min1、min2和min3中执行jps 显示如下进程,则启动成功
查看zk状态
xxxxxxxxxx
cd /home/hadoop/apps/zookeeper-3.4.7/bin
./zkServer.sh status #启动zookeeper
在三台机器中分别显示如下信息:
在任何一台机器上执行如下命令进入zk客户端的shell
xxxxxxxxxx
cd /home/hadoop/apps/zookeeper-3.4.7/bin
./zkCli.sh
执行完zkcli.sh命令后进入的界面效果如下:
通过help命令查看zk客户端的shell命令帮助
zk服务器中也有类似 linux中的目录结构,”/“ 是它的目录根,在这个根下可以创建任何一个key、value对,其中key值就相当于一个linux中子目录名,但是这个子目录是可以有值的,就是那个value。这个key-value对在zk中叫一个node 这个node可以有子孙node。
在根下创建一个node节点key为age, value为18
xxxxxxxxxx
create /age 18
显示“根”下所有node
xxxxxxxxxx
ls /
获得一个node的value值
xxxxxxxxxx
get /age
创建一个java项目
依赖的jar包
xxxxxxxxxx
zookeeper-3.4.7\lib下的
jline-0.9.94.jar
log4j-1.2.15.jar
netty-3.2.2.Final.jar
slf4j-api-1.6.1.jar
slf4j-log4j12-1.6.1.jar
zookeeper-3.4.5\zookeeper-3.4.7.jar
xxxxxxxxxx
public class TestCRUD {
private static final String connectString = "min1:2181," +"min2:2181," +"min3:2181";
private static final int sessionTimeout = 2000;
public void testConnect(){
ZooKeeper zkClient = null;
try {
//连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
/**
* public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
*/
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println("=========="+event.getType()+"===="+event.getState());
}
});
} catch (IOException e) {
e.printStackTrace();
}
//连接状态值 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
System.out.println("-------------------"+zkClient.getState());
while(true){
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
//等待zookeeper客户端连接服务器成功的,查看连接时的状态信息。
System.out.println("====-----===="+zkClient.getState());
}
}
public void testCreate() throws Exception {
ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout,
new Watcher() {
public void process(WatchedEvent event) {
System.out.println("=========="+event.getType());
}
});
Thread.sleep(10*1000);//等待的目的是使zookeeper客户端连接成功后,才执行后续操作
if(zkClient.getState() == States.CONNECTED){//连接服务器连接成功
String nodeCreated = zkClient.create("/age", //:要创建的节点的路径
"18".getBytes(), //节点中的数据
Ids.OPEN_ACL_UNSAFE, //节点的权限
CreateMode.PERSISTENT);//节点的类型
System.out.println("创建成功后,节点的真是路径是:"+nodeCreated);
}
}
//判断znode是否存在
public void testExist() throws Exception{
ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout,
new Watcher() {
//修改目标节点 event.getType()的值是NodeDataChanged
public void process(WatchedEvent event) {
System.out.println("=========="+event.getType());
}
});
Thread.sleep(10*1000);//等待的目的是使zookeeper客户端连接成功后,才执行后续操作
//第二个参数表示 是否监听”/age“节点的状态的改变,为true时,则监听,当另一个zookeeper客户端修改、删除该节点的值时,回调watcher
Stat stat = zkClient.exists("/age", true);
System.out.println(stat==null?"not exist":"exist");
Thread.sleep(20*1000);
}
//获取znode的数据
public void getData() throws Exception {
ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout,
new Watcher() {
public void process(WatchedEvent event) {
System.out.println("=========="+event.getType());
}
});
Thread.sleep(10*1000);//等待的目的是使zookeeper客户端连接成功后,才执行后续操作
byte[] data = zkClient.getData("/age", false, null);
System.out.println(new String(data));
}
// 获取子节点
public void getChildren() throws Exception {
ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout,
new Watcher() {
public void process(WatchedEvent event) {
System.out.println("=========="+event.getType());
}
});
Thread.sleep(10*1000);//等待的目的是使zookeeper客户端连接成功后,才执行后续操作
//只能获得一个路径下的直接子节点的信息,不能递归获取孙节点
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//删除znode
public void deleteZnode() throws Exception {
ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout,
new Watcher() {
public void process(WatchedEvent event) {
System.out.println("=========="+event.getType());
}
});
Thread.sleep(10*1000);//等待的目的是使zookeeper客户端连接成功后,才执行后续操作
//参数2:指定要删除的版本,-1表示删除所有版本
zkClient.delete("/age", -1);
}
}
案例说明:
1.DistributedServer程序一运行起来就向zookeeper的/servers路径下创建一个临时节点
如该类启动三个实例时,分别为传入三个字符串,分别为 server1,server2,server3 根据代码逻辑,会在zookeeper的/servers下创建三个EPHEMERAL_SEQUENTIAL类型的节点,节点名为 /servers/server000001 此节点对应的value是server1 /servers/server000002 此节点对应的value是server2 /servers/server000003 此节点对应的value是server3
2.DistributedClient代码实时夺取/servers下的节点集合内容
如 [/servers/server000001,/servers/server000002,/servers/server000003] 说明这个客户端所在的机器,可用的集群有三台机器 DistributedClient代码监听了/servers目录的子节点,当有机器掉线时,/servers下的节点集合内容会变化的 如 [/servers/server000001,,/servers/server000003] [/servers/server000003] 这样也就实时感知了三台集群中机器的状态
总结:有四台机器组成的集群,如a1,a2,a3,a4其中a1是主节点;a2,a3,a4是从节点
DistributedServer代码分别运行在a2,a3,a4从节点上 DistributedClient代码运行在a1主节点上
代码如下:
xxxxxxxxxx
public class DistributedServer {
private static final String connectString = "min1:2181," +"min2:2181," +"min3:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/test2017-12-25";
private ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
// 获取zk连接
DistributedServer server = new DistributedServer();
server.getConnect();
Thread.sleep(10*1000);
// 利用zk连接注册服务器信息
server.registerServer(args[0]);
// 启动业务功能
server.handleBussiness(args[0]);
}
/**
* 创建到zk的客户端连接
*/
public void getConnect() throws Exception {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType() + "---" + event.getPath());
try {
zk.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 向zk集群注册服务器信息
*/
public void registerServer(String hostname) throws Exception {
// CreateMode.EPHEMERAL_SEQUENTIAL 节点类型
/**
* 参数3 acl是Access Contral
* ZooKeeper使用ACL来控制访问其znode(ZooKeeper的数据树的数据节点)。ACL的实现方式非常类似于UNIX文件的访问权限:它采用访问权限位 允许/禁止 对节点的各种操作以及能进行操作的范围。
* 不同于UNIX权限的是,ZooKeeper的节点不局限于 用户(文件的拥有者),组和其他人(其它)这三个标准范围。ZooKeeper不具有znode的拥有者的概念。相反,ACL指定id集以及与之对应的权限。
* http://ifeve.com/zookeeper-access-control-using-acls/
*/
String create = zk.create(parentNode + "/server", hostname.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online.." + create);
}
/**
* 业务功能
*/
public void handleBussiness(String hostname) throws InterruptedException {
System.out.println(hostname + "start working.....");
Thread.sleep(Long.MAX_VALUE);
}
}
public class DistributedClient {
private static final String connectString = "min1:2181," +"min2:2181," +"min3:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/test2017-12-25";
private volatile List<String> serverList;
private ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
// 获取zk连接
DistributedClient client = new DistributedClient();
client.getConnect();
Thread.sleep(10*1000);
// 获取servers的子节点信息(并监听),从中获取服务器信息列表
client.getServerList();
// 业务线程启动
client.handleBussiness();
}
/**
* 创建到zk的客户端连接
*/
public void getConnect() throws Exception {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
try {
//重新更新服务器列表,并且注册了监听
getServerList();
} catch (Exception e) {}
}
});
}
/**
* 获取服务器信息列表
*/
public void getServerList() throws Exception {
// 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);
// 先创建一个局部的list来存服务器信息
List<String> servers = new ArrayList<String>();
for (String child : children) {
// child只是子节点的节点名
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
// 把servers赋值给成员变量serverList,已提供给各业务线程使用
serverList = servers;
//打印服务器列表
System.out.println(serverList);
}
/**
* 业务功能
*
* @throws InterruptedException
*/
public void handleBussiness() throws InterruptedException {
System.out.println("client start working.....");
Thread.sleep(Long.MAX_VALUE);
}
}