网络编程之每天学习一点点[day12]-----netty最佳实践之心跳检测
发布日期:2021-06-30 13:45:26
浏览次数:3
分类:技术文章
本文共 11063 字,大约阅读时间需要 36 分钟。
实际应用中,常见的有一个master机器和多个slave机器组成的集群,master常需要对slave进行心跳检测,接收来自slave的信息,这个例子中我们就使用netty来实现。
Server
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}
服务端代码不变,主要看下server
import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import java.util.HashMap;public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** key:ip value:auth */ private static HashMapAUTH_IP_MAP = new HashMap (); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("169.254.90.205", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMap cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }}
定义了一个静态代码块,来模拟数据库中的ip和key映射关系,当客户端发送认证信息auth时,我们将根据这个映射关系来比较:
private static HashMapAUTH_IP_MAP = new HashMap (); private static final String SUCCESS_KEY = "auth_success_key";
static { AUTH_IP_MAP.put("169.254.90.205", "1234"); }
Client代码也不变:
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeattHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}
看下ClienHeartBeattHandler
import java.net.InetAddress;import java.util.HashMap;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;import org.hyperic.sigar.CpuPerc;import org.hyperic.sigar.Mem;import org.hyperic.sigar.Sigar;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture heartBeat; //主动向服务器发送认证信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMapcpuPercMap = new HashMap (); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap memoryMap = new HashMap (); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }}
当client连接上server后,触发channelActive方法,发送本机ip和key:1234,给服务端,服务端拿到后进行认证。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); }
看下服务端认证代码:
private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } }
根据发客户端发送信息拆分第一个参数ip,获取得到对应key,跟服务端AUTH_IP_MAP中的key比较,如果相等则认证成功,返回给客户端一个:
private static final String SUCCESS_KEY = "auth_success_key";
客户端拿到后,握手成功,发心跳给服务端:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } }
心跳检测的实现是有一个newScheduledThreadPool,延迟0s执行,每2s执行一次HeartBeatTask,发送RequestInfo当前客户端机器信息给服务端打印出来,服务端如果没收到来自客户端的消息则断开和客户端的连接。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMapcpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }
RequestInfo:
import java.io.Serializable;import java.util.HashMap;public class RequestInfo implements Serializable { private String ip ; private HashMapcpuPercMap ; private HashMap memoryMap; //.. other field public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap memoryMap) { this.memoryMap = memoryMap; } }
转载地址:https://jeffsheng.blog.csdn.net/article/details/80822556 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
做的很好,不错不错
[***.243.131.199]2024年04月21日 16时28分23秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Java 并发学习记录之 wait/notify 机制
2019-05-01
Java 并发学习记录之线程间通信
2019-05-01
Java并发学习记录之volatile
2019-05-01
Docker + mysql主从配置
2019-05-01
Java集合学习之LinkedList
2019-05-01
Spring Security Oauth2 令牌增加额外信息
2019-05-01
Spring Security Oauth2 如何增加自定义授权模式
2019-05-01
logback + Kafka + logstash 集成
2019-05-01
在SpringBoot1.5.x下如何使RedisTokenStore集群化
2019-05-01
Spring Cloud Consul应用下线后,健康检查自动删除无效服务
2019-05-01
spring cloud consul 应用的多实例名的解决
2019-05-01
kafka设置某个topic的数据过期时间
2019-05-01
linux系统编程之信号(五):实时信号与sigqueue函数
2019-05-01
225. 用队列实现栈
2019-05-01
linux系统编程之信号(六):竞态条件与sigsuspend函数
2019-05-01
124. 二叉树中的最大路径和
2019-05-01
LeetCode 148:排序链表 【归并】
2019-05-01
LeetCode 560 和为 k 的子数组
2019-05-01
LeetCode 581 最短无序连续子数组
2019-05-01