使用ZooKeeper实现软负载均衡(原理)
发布日期:2021-08-18 00:51:45 浏览次数:2 分类:技术文章

本文共 7392 字,大约阅读时间需要 24 分钟。

转载,原文连接:

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供的功能包括配置维护、名字服务、分布式同步、组服务等。

ZooKeeper会维护一个树形的数据结构,类似于Windows资源管理器目录,其中EPHEMERAL类型的节点会随着创建它的客户端断开而被删除,利用这个特性很容易实现软负载均衡。

基本原理是,每个应用的Server启动时创建一个EPHEMERAL节点,应用客户端通过读取节点列表获得可用服务器列表,并订阅节点事件,有Server宕机断开时触发事件,客户端监测到后把该Server从可用列表中删除。

来看示例,这里用了BIO模型编写了一个接收/应答的小程序用于演示效果,优点就是简单。为了方便后面的改造,客户端每次发送消息时都会读取服务器列表并从新建立连接。后边会看到只需要几十行代码即可改造为使用ZooKeeper的软负载模式。

Server代码

 

[java]   
 
 
  1. import java.io.BufferedReader;  
  2. import java.io.IOException;  
  3. import java.io.InputStreamReader;  
  4. import java.io.PrintWriter;  
  5. import java.net.ServerSocket;  
  6. import java.net.Socket;  
  7.   
  8. public class SimpleServer implements Runnable {  
  9.   
  10.     public static void main(String[] args) throws IOException {  
  11.         int port = 18080;  
  12.         SimpleServer server = new SimpleServer(port);  
  13.         Thread thread = new Thread(server);  
  14.         thread.start();  
  15.     }  
  16.   
  17.     private int port;  
  18.   
  19.     public SimpleServer(int port) {  
  20.         this.port = port;  
  21.     }  
  22.   
  23.     @Override  
  24.     public void run() {  
  25.         ServerSocket server = null;  
  26.         try {  
  27.             server = new ServerSocket(port);  
  28.             System.out.println("Server started");  
  29.             Socket socket = null;  
  30.             while (true) {  
  31.                 socket = server.accept();  
  32.                 new Thread(new SimpleServerHandler(socket)).start();  
  33.             }  
  34.         } catch(IOException ex) {  
  35.             ex.printStackTrace();  
  36.         } finally {  
  37.             if (server != null) {  
  38.                 try {  
  39.                     server.close();  
  40.                 } catch (IOException e) {}  
  41.             }  
  42.         }  
  43.     }  
  44. }  
  45.   
  46. class SimpleServerHandler implements Runnable {  
  47.   
  48.     private Socket socket;  
  49.   
  50.     public SimpleServerHandler(Socket socket) {  
  51.         this.socket = socket;  
  52.     }  
  53.   
  54.     @Override  
  55.     public void run() {  
  56.         BufferedReader in = null;  
  57.         PrintWriter out = null;  
  58.         try {  
  59.             in = new BufferedReader(new InputStreamReader(  
  60.                     this.socket.getInputStream()));  
  61.             out = new PrintWriter(this.socket.getOutputStream(), true);  
  62.             String body = null;  
  63.             while (true) {  
  64.                 body = in.readLine();  
  65.                 if (body == null)  
  66.                     break;  
  67.                 System.out.println("Receive : " + body);  
  68.                 out.println("Hello, " + body);  
  69.             }  
  70.   
  71.         } catch (Exception e) {  
  72.             if (in != null) {  
  73.                 try {  
  74.                     in.close();  
  75.                 } catch (IOException e1) {  
  76.                     e1.printStackTrace();  
  77.                 }  
  78.             }  
  79.             if (out != null) {  
  80.                 out.close();  
  81.             }  
  82.             if (this.socket != null) {  
  83.                 try {  
  84.                     this.socket.close();  
  85.                 } catch (IOException e1) {  
  86.                     e1.printStackTrace();  
  87.                 }  
  88.                 this.socket = null;  
  89.             }  
  90.         }  
  91.     }  
  92. }  

客户端代码

 

 

[java]   
 
 
  1. import java.io.BufferedReader;  
  2. import java.io.IOException;  
  3. import java.io.InputStreamReader;  
  4. import java.io.PrintWriter;  
  5. import java.net.Socket;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. public class SimpleClient {  
  10.   
  11.     private static List<String> servers = new ArrayList<>();  
  12.       
  13.     public static void main(String[] args) {  
  14.           
  15.         initServerList();  
  16.           
  17.         SimpleClient client = new SimpleClient();  
  18.         BufferedReader console = new BufferedReader(new InputStreamReader(System.in));  
  19.         while (true) {  
  20.             String name;  
  21.             try {  
  22.                 name = console.readLine();  
  23.                 if("exit".equals(name)) {  
  24.                     System.exit(0);  
  25.                 }  
  26.                 client.send(name);  
  27.             } catch (IOException e) {  
  28.                 e.printStackTrace();  
  29.             }  
  30.         }  
  31.     }  
  32.       
  33.     private static void initServerList() {  
  34.         servers.clear();  
  35.         servers.add("127.0.0.1:18080");  
  36.     }  
  37.       
  38.     public static String getServer() {  
  39.         return servers.get(0);  
  40.     }  
  41.       
  42.     public SimpleClient() {  
  43.     }  
  44.       
  45.     public void send(String name) {  
  46.           
  47.         String server = SimpleClient.getServer();  
  48.         String[] cfg = server.split(":");  
  49.           
  50.         Socket socket = null;  
  51.         BufferedReader in = null;  
  52.         PrintWriter out = null;  
  53.         try {  
  54.             socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));  
  55.             in = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
  56.             out = new PrintWriter(socket.getOutputStream(), true);  
  57.               
  58.             out.println(name);  
  59.             while(true) {  
  60.                 String resp = in.readLine();  
  61.                 if(resp == null)  
  62.                     break;  
  63.                 else if(resp.length() > 0) {  
  64.                     System.out.println("Receive : " + resp);  
  65.                     break;  
  66.                 }  
  67.             }  
  68.         } catch (Exception e) {  
  69.             e.printStackTrace();  
  70.         } finally {  
  71.             if (out != null) {  
  72.                 out.close();  
  73.             }  
  74.             if (in != null) {  
  75.                 try {  
  76.                     in.close();  
  77.                 } catch (IOException e) {  
  78.                     e.printStackTrace();  
  79.                 }  
  80.             }  
  81.             if (socket != null) {  
  82.                 try {  
  83.                     socket.close();  
  84.                 } catch (IOException e) {  
  85.                     e.printStackTrace();  
  86.                 }  
  87.             }  
  88.         }  
  89.     }  
  90. }  

运行测试,服务器端输出截图:

 

客户端输出截图:

很好,一切运行正常。

接下来添加ZooKeeper部分。为了演示效果更好,修改一下ZooKeeper的配置文件,以便于服务器断开后能更快的被监测到。主要是减小Session的超时时间

zookeeper/conf/zoo.cfg

 

[java]   
 
 
  1. tickTime=2000  
  2. initLimit=2  
  3. syncLimit=5  
  4. dataDir=D:\\ZooKeeper\\zookeeper-3.4.8\\data  
  5. clientPort=2181  
  6. http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance  
  7. minSessionTimeout=2000  
  8. maxSessionTimeout=5000  

在项目中添加zkclient的maven依赖

 

 

[java]   
 
 
  1. <!-- http://mvnrepository.com/artifact/com.101tec/zkclient -->  
  2. <dependency>  
  3.     <groupId>com.101tec</groupId>  
  4.     <artifactId>zkclient</artifactId>  
  5.     <version>0.8</version>  
  6. </dependency>  

Server代码

 

[java]   
 
 
  1. import java.io.BufferedReader;  
  2. import java.io.IOException;  
  3. import java.io.InputStreamReader;  
  4. import java.io.PrintWriter;  
  5. import java.net.ServerSocket;  
  6. import java.net.Socket;  
  7.   
  8. import org.I0Itec.zkclient.ZkClient;  
  9.   
  10. public class SimpleServer implements Runnable {  
  11.   
  12.     public static void main(String[] args) throws IOException {  
  13.         int port = 18080;  
  14.         SimpleServer server = new SimpleServer(port);  
  15.         Thread thread = new Thread(server);  
  16.         thread.start();  
  17.     }  
  18.   
  19.     private int port;  
  20.   
  21.     public SimpleServer(int port) {  
  22.         this.port = port;  
  23.     }  
  24.       
  25.     private void regServer() {  
  26.         //向ZooKeeper注册当前服务器  
  27.         ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);  
  28.         String path = "/test/server" + port;  
  29.         if(client.exists(path))  
  30.             client.delete(path);  
  31.         client.createEphemeral(path, "127.0.0.1:" + port);  
  32.     }  
  33.   
  34.     @Override  
  35.     public void run() {  
  36.         ServerSocket server = null;  
  37.         try {  
  38.             server = new ServerSocket(port);  
  39.             regServer();  
  40.             System.out.println("Server started at " + port);  
  41.             Socket socket = null;  
  42.             while (true) {  
  43.                 socket = server.accept();  
  44.                 new Thread(new SimpleServerHandler(socket)).start();  
  45.             }  
  46.         } catch(IOException ex) {  
  47.             ex.printStackTrace();  
  48.         } finally {  
  49.             if (server != null) {  
  50.                 try {  
  51.                     server.close();  
  52.                 } catch (IOException e) {}  
  53.             }  
  54.         }  
  55.   
  56.     }  
  57. }  
  58. //SimpleServerHandler略  

客户端代码

 

[java]   
 
 
  1. import java.io.BufferedReader;  
  2. import java.io.IOException;  
  3. import java.io.InputStreamReader;  
  4. import java.io.PrintWriter;  
  5. import java.net.Socket;  
  6. import java.util.ArrayList;  
  7. import java.util.Arrays;  
  8. import java.util.List;  
  9. import java.util.Random;  
  10.   
  11. import org.I0Itec.zkclient.IZkChildListener;  
  12. import org.I0Itec.zkclient.ZkClient;  
  13.   
  14. public class SimpleClient {  
  15.   
  16.     private static List<String> servers = new ArrayList<>();  
  17.       
  18.     public static void main(String[] args) {  
  19.           
  20.         initServerList();  
  21.           
  22.         SimpleClient client = new SimpleClient();  
  23.         BufferedReader console = new BufferedReader(new InputStreamReader(System.in));  
  24.         while (true) {  
  25.             String name;  
  26.             try {  
  27.                 name = console.readLine();  
  28.                 if("exit".equals(name)) {  
  29.                     System.exit(0);  
  30.                 }  
  31.                 client.send(name);  
  32.             } catch (IOException e) {  
  33.                 e.printStackTrace();  
  34.             }  
  35.         }  
  36.     }  
  37.       
  38.     private static void initServerList() {  
  39.         //启动时从ZooKeeper读取可用服务器  
  40.         String path = "/test";  
  41.         ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);  
  42.         List<String> childs = zkClient.getChildren(path);  
  43.         servers.clear();  
  44.         for(String p : childs) {  
  45.             servers.add(zkClient.readData(path + "/" + p));  
  46.         }  
  47.         //订阅节点变化事件  
  48.         zkClient.subscribeChildChanges("/test", new IZkChildListener() {  
  49.             @Override  
  50.             public void handleChildChange(String parentPath, List<String> currentChilds)  
  51.                     throws Exception {  
  52.                 System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString()));  
  53.                 servers.clear();  
  54.                 for(String p : currentChilds) {  
  55.                     servers.add(zkClient.readData(path + "/" + p));  
  56.                 }  
  57.                 System.out.println("Servers: " + servers.toString());  
  58.             }  
  59.         });  
  60.           
  61.     }  
  62.       
  63.     public static String getServer() {  
  64.         return servers.get(new Random().nextInt(servers.size()));  
  65.     }  
  66.     //其他无变化, 略  
  67. }  

分别启动Server和Client,然后修改Server的端口号,再启动一个实例,可以看到客户端检测到了这个新服务器的存在

 

 

 

在客户端发送一些消息,可以看到被随机的分发到两个Server上处理

接下来关闭其中一个Server,可以看到客户端几秒钟后监测到这个事件并自动删除了该服务器。

可以看到,基于ZooKeeper实现软负载均衡非常简单,与应用紧密结合,使用灵活。

转载于:https://www.cnblogs.com/sa-dan/p/6836772.html

转载地址:https://blog.csdn.net/weixin_30872733/article/details/99317652 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:jQuery Validate (摘自官网)
下一篇:SpringBoot 下 mybatis 的缓存

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年04月18日 22时04分29秒