网络编程之每天学习一点点[day13]-----netty编解码技术
发布日期:2021-06-30 13:45:26 浏览次数:3 分类:技术文章

本文共 11063 字,大约阅读时间需要 36 分钟。

编解码技术,说白了就是java序列化传输,序列化的目的就两个:

第一个是进行网络传输,第二对象持久化。

虽然我们可以使用java进行对象序列化,netty去传输。但是java序列化的硬伤太多,比如java序列话没办法跨语言、序列化后的码流太大、序列化性能太低等等

主流的编解码框架:

JBOSS的MarsShaling包

google的Protobuf

基于Protobuf的Kyro

MessagePack框架

Jboss Marshalling是一个java对象序列化包,对jdk默认的序列化框架进行了优化,但又保留了跟java.io.serlalizable接口的兼容,同时增加了一些可调的参数和附加特性。

类库:jboss-marshalling-1.3.0、jboss-marshalling-serial-1.3.0

我们使用客户端传输的是一个req对象:

import java.io.Serializable;public class Req implements Serializable{	/**	 * 	 */	private static final long serialVersionUID = 6673744374596339216L;	private String id ;	private String name ;	private String requestMessage ;	private byte[] attachment;		public String getId() {		return id;	}	public void setId(String id) {		this.id = id;	}	public String getName() {		return name;	}	public void setName(String name) {		this.name = name;	}	public String getRequestMessage() {		return requestMessage;	}	public void setRequestMessage(String requestMessage) {		this.requestMessage = requestMessage;	}	public byte[] getAttachment() {		return attachment;	}	public void setAttachment(byte[] attachment) {		this.attachment = attachment;	}}

服务器端响应的对象是Resp:

import java.io.Serializable;public class Resp implements Serializable{		/**	 * 	 */	private static final long serialVersionUID = -3041772672090376328L;	private String id;	private String name;	private String responseMessage;		public String getId() {		return id;	}	public void setId(String id) {		this.id = id;	}	public String getName() {		return name;	}	public void setName(String name) {		this.name = name;	}	public String getResponseMessage() {		return responseMessage;	}	public void setResponseMessage(String responseMessage) {		this.responseMessage = responseMessage;	}	}

客户端和服务端的代码中都增加了marshalling的解码器和编码器

                                sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

先看下客户端的代码

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.io.File;import java.io.FileInputStream;import bhz.utils.GzipUtils;public class Client {		public static void main(String[] args) throws Exception{				EventLoopGroup group = new NioEventLoopGroup();		Bootstrap b = new Bootstrap();		b.group(group)		 .channel(NioSocketChannel.class)		 .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i = 0; i < 5; i++ ){ Req req = new Req(); req.setId("" + i); req.setName("pro" + i); req.setRequestMessage("数据信息" + i); String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; File file = new File(path); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); req.setAttachment(GzipUtils.gzip(data)); cf.channel().writeAndFlush(req); } cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}

客户端封装了5个req对象,并且读取本地图片001.jpg,使用gzip压缩后传递给服务端。

看下客户端处理器:

import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelHandlerAdapter{		@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {			}	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		try {			Resp resp = (Resp)msg;			System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());					} finally {			ReferenceCountUtil.release(msg);		}	}	@Override	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {			}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		ctx.close();	}	}

附上gzip工具类:

import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.util.zip.GZIPInputStream;import java.util.zip.GZIPOutputStream;public class GzipUtils {    public static byte[] gzip(byte[] data) throws Exception{    	ByteArrayOutputStream bos = new ByteArrayOutputStream();    	GZIPOutputStream gzip = new GZIPOutputStream(bos);    	gzip.write(data);    	gzip.finish();    	gzip.close();    	byte[] ret = bos.toByteArray();    	bos.close();    	return ret;    }        public static byte[] ungzip(byte[] data) throws Exception{    	ByteArrayInputStream bis = new ByteArrayInputStream(data);    	GZIPInputStream gzip = new GZIPInputStream(bis);    	byte[] buf = new byte[1024];    	int num = -1;    	ByteArrayOutputStream bos = new ByteArrayOutputStream();    	while((num = gzip.read(buf, 0 , buf.length)) != -1 ){    		bos.write(buf, 0, num);    	}    	gzip.close();    	bis.close();    	byte[] ret = bos.toByteArray();    	bos.flush();    	bos.close();    	return ret;    }        public static void main(String[] args) throws Exception{		    	//读取文件    	String readPath = System.getProperty("user.dir") + File.separatorChar + "sources" +  File.separatorChar + "006.jpg";        File file = new File(readPath);          FileInputStream in = new FileInputStream(file);          byte[] data = new byte[in.available()];          in.read(data);          in.close();                  System.out.println("文件原始大小:" + data.length);        //测试压缩                byte[] ret1 = GzipUtils.gzip(data);        System.out.println("压缩之后大小:" + ret1.length);                byte[] ret2 = GzipUtils.ungzip(ret1);        System.out.println("还原之后大小:" + ret2.length);                //写出文件        String writePath = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "006.jpg";        FileOutputStream fos = new FileOutputStream(writePath);        fos.write(ret2);        fos.close();    	    	    		}            }

再看下我们的server代码:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class Server {	public static void main(String[] args) throws Exception{				EventLoopGroup pGroup = new NioEventLoopGroup();		EventLoopGroup cGroup = new NioEventLoopGroup();				ServerBootstrap b = new ServerBootstrap();		b.group(pGroup, cGroup)		 .channel(NioServerSocketChannel.class)		 .option(ChannelOption.SO_BACKLOG, 1024)		 //设置日志		 .handler(new LoggingHandler(LogLevel.INFO))		 .childHandler(new ChannelInitializer
() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}

ServerHandler

import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import java.io.File;import java.io.FileOutputStream;import bhz.utils.GzipUtils;public class ServerHandler extends ChannelHandlerAdapter{	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {	}	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		Req req = (Req)msg;		System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());		byte[] attachment = GzipUtils.ungzip(req.getAttachment());				String path = System.getProperty("user.dir") + File.separatorChar + "receive" +  File.separatorChar + "001.jpg";        FileOutputStream fos = new FileOutputStream(path);        fos.write(attachment);        fos.close();				Resp resp = new Resp();		resp.setId(req.getId());		resp.setName("resp" + req.getId());		resp.setResponseMessage("响应内容" + req.getId());		ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);	}	@Override	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {			}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {		ctx.close();	}		}

读取客户端发送的req,将其中的图片放置在receice文件下,封装响应报文给客户端。

看下我们的重点,MarshallingCodeCFactory:

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;import io.netty.handler.codec.marshalling.MarshallerProvider;import io.netty.handler.codec.marshalling.MarshallingDecoder;import io.netty.handler.codec.marshalling.MarshallingEncoder;import io.netty.handler.codec.marshalling.UnmarshallerProvider;import org.jboss.marshalling.MarshallerFactory;import org.jboss.marshalling.Marshalling;import org.jboss.marshalling.MarshallingConfiguration;/** * Marshalling工厂 * @author(jeffSheng) */public final class MarshallingCodeCFactory {    /**     * 创建Jboss Marshalling解码器MarshallingDecoder     * @return MarshallingDecoder     */    public static MarshallingDecoder buildMarshallingDecoder() {    	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");		//创建了MarshallingConfiguration对象,配置了版本号为5 		final MarshallingConfiguration configuration = new MarshallingConfiguration();		configuration.setVersion(5);		//根据marshallerFactory和configuration创建provider		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);		return decoder;    }    /**     * 创建Jboss Marshalling编码器MarshallingEncoder     * @return MarshallingEncoder     */    public static MarshallingEncoder buildMarshallingEncoder() {		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");		final MarshallingConfiguration configuration = new MarshallingConfiguration();		configuration.setVersion(5);		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组		MarshallingEncoder encoder = new MarshallingEncoder(provider);		return encoder;    }}

转载地址:https://jeffsheng.blog.csdn.net/article/details/80836808 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:网络编程之每天学习一点点[day14]-----netty实现文件下载
下一篇:网络编程之每天学习一点点[day12]-----netty最佳实践之心跳检测

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2024年04月25日 20时50分51秒