博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Zookeeper集群搭建和简单使用
阅读量:6851 次
发布时间:2019-06-26

本文共 10738 字,大约阅读时间需要 35 分钟。

hot3.png

1、zookeeper集群搭建

一、解压   zookeeper-3.4.6.tar.gz   命令 [root@node22 java]# tar -zxvf zookeeper-3.4.6.tar.gz

二、在/usr/local/java/zookeeper-3.4.6/conf文件下新建一个zoo.cfg文件 内容如下

tickTime=2000  #zk中最小的时间单元   initLimit syncLimit时间都是这个时间的倍数

dataDir=/opt/zookeeper                  
clientPort=2181  
initLimit=5   #flower在启动时向leader同步数据 如果同步数量较大建议把这个时间调大避免没同步完就开始对外提供服务   这个时间是5*tickTime
syncLimit=2  # leader和flower进行通信 如果在这个时间内leader没收到flower的响应 leader就认为该flower挂了  这个时间不建议调太大  这个时间是2*tickTime
server.1=node22:2888:3888  #第一个端口是leader和flower直接通信的  第二个端口是leader选举端口
server.2=node33:2888:3888
server.3=node44:2888:3888   其他几台机器配置一样

  三、在/opt目录下新建一个zookeeper/myid文件  三台机器的内容分别是 1  2  3

server.1   myid 内容是1

server.2   myid 内容是2

server.3   myid 内容是3

四、启动zookeeper  命令 [root@node44 ~]# zkServer.sh start

五、检查zookeeper启动状态命令   [root@node44 zookeeper]#  zkServer.sh status

一套是leader  两台是follower

2、zookeeper原生java代码

public class ZookeeperBase {

    /** zookeeper地址 */

    static final String CONNECT_ADDR = "node22:2181,node33:2181,node44:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 2000;//ms 
    /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
    static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
    public static void main(String[] args) throws Exception{
        
        ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
            
            public void process(WatchedEvent event) {
                //获取事件的状态
                KeeperState keeperState = event.getState();
                EventType eventType = event.getType();
                //如果是建立连接
                if(KeeperState.SyncConnected == keeperState){
                    if(EventType.None == eventType){
                        //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                        connectedSemaphore.countDown();
                        System.out.println("zk 建立连接");
                    }
                }
            }
        });

        //进行阻塞   防止zookeeper对象没创建好就开始用

        connectedSemaphore.await();
        
        System.out.println("..");
        //创建父节点

//Ids.OPEN_ACL_UNSAFE  设置权限 所有人都可以访问testRoot目录

//CreateMode.PERSISTENT 节点类型   持久化目录  

//        zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        
        //创建子节点
//        zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        
        //获取节点洗信息
//        byte[] data = zk.getData("/testRoot", false, null);
//        System.out.println(new String(data));
//        System.out.println(zk.getChildren("/testRoot", false));
        
        //修改节点的值
//        zk.setData("/testRoot", "modify data root".getBytes(), -1);

//false 表示不触发监控事件

//        byte[] data = zk.getData("/testRoot", false, null);
//        System.out.println(new String(data));        
        
        //判断节点是否存在
//        System.out.println(zk.exists("/testRoot/children", false));
        //删除节点
//        zk.delete("/testRoot/children", -1);
//        System.out.println(zk.exists("/testRoot/children", false));
        zk.close();
    }
    
}

如果想对访问节点数据加密使用addAuthInfo方法

当要加密的时候acl类型要用CREATOR_ALL_ACL

ZooKeeper zk=new ZooKeeper(conn, sessionTimeout, new ZookeeperWatcher());

         zk.addAuthInfo("digest", "username:password".getBytes()); 

zk.create(root, "root".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

 

 

3、zkclient java代码   zkclient  是在原生zookeeper基础之上进行封装的  zkclient  增加了递归创建目录和递归删除   把watcher功能单独出来

zkclient  创建删除目录

public class ZkClientBase {

    /** zookeeper地址 */

    static final String CONNECT_ADDR = "node22:2181,node33:2181,node44:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 
    public static void main(String[] args) throws Exception {
        ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
        //1. create and delete方法 
        zkc.createEphemeral("/temp");
        zkc.createPersistent("/super/c1", true);
        Thread.sleep(10000);
        zkc.delete("/temp");
        zkc.deleteRecursive("/super");
        //2. 设置path和data 并且读取子节点和每个节点的内容
//        zkc.createPersistent("/super", "1234");
//        zkc.createPersistent("/super/c1", "c1内容");
//        zkc.createPersistent("/super/c2", "c2内容");
//        List<String> list = zkc.getChildren("/super");
//        for(String p : list){
//            System.out.println(p);
//            String rp = "/super/" + p;
//            String data = zkc.readData(rp);
//            System.out.println("节点为:" + rp + ",内容为: " + data);
//        }
        
        //3. 更新和判断节点是否存在
//        zkc.writeData("/super/c1", "新内容");
//        System.out.println(zkc.readData("/super/c1"));
//        System.out.println(zkc.exists("/super/c1"));
        
        //4.递归删除/super内容
//        zkc.deleteRecursive("/super");        
    }
}

zkclient  的watcher

    ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);

        //对父节点super目录添加监听子节点变化。
        zkc.subscribeChildChanges("/super", new IZkChildListener() {
            
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("parentPath: " + parentPath);
                System.out.println("currentChilds: " + currentChilds);
            }
        });

//对父节点super添加监听数据点变化。

        zkc.subscribeDataChanges("/super", new IZkDataListener() {
            @Override
            public void handleDataDeleted(String path) throws Exception {
                System.out.println("删除的节点为:" + path);
            }
            
            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
            }
        });

4、curator

public class CuratorBase {

    
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "node22:2181,node33:2181,node44:2181";
    /** session超时时间 */
    static final int SESSION_OUTTIME = 5000;//ms 
    
    public static void main(String[] args) throws Exception {
        
        //1 重试策略:初试时间为1s 重试10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
//                    .namespace("super")
                    .build();
        //3 开启连接
        cf.start();
        
//        System.out.println(States.CONNECTED);
//        System.out.println(cf.getState());
        
        // 新加、删除
        /**
        //4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
        //5 删除节点
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        */
        
        // 读取、修改
        /**
        //创建节点
//        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1","c1内容".getBytes());
//        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2","c2内容".getBytes());
        //读取节点
//        String ret1 = new String(cf.getData().forPath("/super/c2"));
//        System.out.println(ret1);
        //修改节点
//        cf.setData().forPath("/super/c2", "修改c2内容".getBytes());
//        String ret2 = new String(cf.getData().forPath("/super/c2"));
//        System.out.println(ret2);    
        */
        
        // 绑定回调函数
        /**
        ExecutorService pool = Executors.newCachedThreadPool();
        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
        .inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
                System.out.println("code:" + ce.getResultCode());
                System.out.println("type:" + ce.getType());
                System.out.println("线程为:" + Thread.currentThread().getName());
            }
        }, pool)
        .forPath("/super/c3","c3内容".getBytes());
        Thread.sleep(Integer.MAX_VALUE);
        */
        
        
        // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法
        /**
        List<String> list = cf.getChildren().forPath("/super");
        for(String p : list){
            System.out.println(p);
        }
        
        Stat stat = cf.checkExists().forPath("/super/c3");
        System.out.println(stat);
        
        Thread.sleep(2000);
        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        */
        
        
        //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        
    }
}

curator  的watcher1 监听节点变化

//1 重试策略:初试时间为1s 重试10次

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();
        
        //3 建立连接
        cf.start();
        
        //4 建立一个cache缓存   false表示不开启压缩
        final NodeCache cache = new NodeCache(cf, "/super", false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            /**
             * <B>方法名称:</B>nodeChanged<BR>
             * <B>概要说明:</B>触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。<BR>
             * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
             */
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("路径为:" + cache.getCurrentData().getPath());
                System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
                System.out.println("状态为:" + cache.getCurrentData().getStat());
                System.out.println("---------------------------------------");
            }
        });

curator  的watcher1 监听子节点的变化

//1 重试策略:初试时间为1s 重试10次

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通过工厂创建连接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();
        
        //3 建立连接
        cf.start();
        
        //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受
        PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
        //5 在初始化的时候就进行缓存监听
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            /**
             * <B>方法名称:</B>监听子节点变更<BR>
             * <B>概要说明:</B>新建、修改、删除<BR>

*/

            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED :" + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED :" + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED :" + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });

curator 实现分布式计数器

相当于把成员变量保存到zookeeper中

        //4使用DistributedAtomicInteger
        DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(3, 1000));
        AtomicValue<Integer> value = atomicIntger.add(1);
        System.out.println(value.succeeded());
        System.out.println(value.postValue());    //最新值

curator 分布式锁

InterProcessMutex lock = new InterProcessMutex(cf, "/super");

//加锁

  lock.acquire();

//实现自己的业务

//释放

  lock.release();

5、Taokeeper环境搭建  zookeeper环境监控

taokeeper 下载地址 

下载以下三个文件

220820_3KL6_2995717.png

一、在mysql数据库中运行taokeeper.sql 文件  创建taokeeper数据库

221103_Nw9L_2995717.png

二、下载tomcat

三、解压 taokeeper-monitor.tar.gz 到tomcat/webapp目录下

 tar -zxvf /opt/software/taokeeper-monitor.tar.gz -C /opt/software/apache-tomcat-7.0.64/webapps/

四、把taokeeper-monitor-config.properties放到指定位置 

mv /opt/software/taokeeper-monitor-config.properties /opt/software/apache-tomcat-7.0.64/

五、修改taokeeper-monitor-config.properties配置文件内容

#Daily

systemInfo.envName=DAILY

#DBCP

dbcp.driverClassName=com.mysql.jdbc.Driver
dbcp.dbJDBCUrl=jdbc:mysql://bigdata01:3306/taokeeper   #数据库地址
dbcp.characterEncoding=GBK
dbcp.username=root  #数据库用户名
dbcp.password=123456 #数据库密码
dbcp.maxActive=30
dbcp.maxIdle=10
dbcp.maxWait=10000

#SystemConstant

SystemConstent.dataStoreBasePath=/opt/software/apache-tomcat-7.0.64/monitor-data  #taokeeper数据存放地址
SystemConstant.userNameOfSSH=root #liunx用户名
SystemConstant.passwordOfSSH=123456 #liunx密码
#Optional
SystemConstant.portOfSSH=22

六、在tomcat启动脚本中随便找个地方添加JAVA_OPTS  vi bin/catalina.sh 

JAVA_OPTS=-DconfigFilePath="/opt/software/apache-tomcat-7.0.64/taokeeper-monitor-config.properties"

七、 启动tomcat服务器  在浏览器中输入 http://bigdata01:8080/taokeeper-monitor

出现如下图所示表示taokeeper安装成功

222743_ewyP_2995717.png

把zookeeper的ip加入到监控中

222940_VxzE_2995717.png

 

转载于:https://my.oschina.net/xiaozhou18/blog/787132

你可能感兴趣的文章
Java8中的LocalDateTime工具类
查看>>
Exchange 2013 PowerShell创建自定义对象
查看>>
RAID-10 阵列的创建(软)
查看>>
javaScript的调试(四)
查看>>
nginx不使用正则表达式匹配
查看>>
利用putty进行vnc + ssh tunneling登录
查看>>
hadoop1.x作业提交过程分析(源码分析第二篇)
查看>>
默认安装vsftpd后
查看>>
《Redis设计与实现》读书笔记
查看>>
waiting for changelog lock.
查看>>
小白学爬虫-批量部署Splash负载集群
查看>>
你离BAT之间,只差这一套Java面试题
查看>>
laravel package 推荐,数据备份
查看>>
Synchronized锁在Spring事务管理下,为啥还线程不安全?
查看>>
环境变量PATH cp命令 mv命令 文档查看cat/more/less/head/tail
查看>>
阿里云亮相2019联通合作伙伴大会,边缘计算等3款云产品助力5G时代产业数字化转型...
查看>>
dubbo源码分析-服务端发布流程-笔记
查看>>
阿里云发布Apsara SA系列混合云存储阵列
查看>>
GoJS教程:链接模版
查看>>
QListWidget方式显示缩略图
查看>>