当前位置 博文首页 > Jasper6688的博客:基于Netty的客户端与服务器之间的通信实例

    Jasper6688的博客:基于Netty的客户端与服务器之间的通信实例

    作者:[db:作者] 时间:2021-06-15 21:44

    需要的jar包:https://github.com/Jasper2s/Study_Imooc/tree/master/JavaPractice/lib

    1.全局配置类:存储每一个客户端接入进来的配置

    package com.java.netty;
    /**
     * 全局配置类
     * @author qiuzhiwen
     *
     */
    
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    public class NettyConfig {
    	//存储每一个客户端接入进来的配置
    	public static ChannelGroup group=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
    

    2.核心业务处理类:接收/处理/响应客户端websocket请求

    package com.java.netty;
    
    import java.util.Date;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpVersion;
    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketFrame;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
    import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
    import io.netty.util.CharsetUtil;
    /**
     * 接收/处理/响应客户端websocket请求的核心处理类
     * @author qiuzhiwen
     *
     */
    public class WebSocketHandler extends SimpleChannelInboundHandler<Object>{
    
    	private WebSocketServerHandshaker handShaker;
    	private static final String WEB_SOCKET_URL="ws://localhost:8888/websocket";
    	//客户端与服务端创建连接端时候调用
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		NettyConfig.group.add(ctx.channel());
    		System.out.println("客户端与服务端连接开启!");
    	}
    
    	//客户端与服务端断开连接的时候调用
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		NettyConfig.group.remove(ctx.channel());
    		System.out.println("客户端与服务端连接断开!");
    	}
    
    	//服务端接收客户端发送过来的数据之后调用
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		ctx.flush();
    	}
    
    	//工程出现异常的时候调用
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		ctx.close();
    	}
    
    	//服务端处理客户端websocket请求的核心方法
    	@Override
    	protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
    		//处理客户端向服务器发起http握手请求的业务
    		if(msg instanceof FullHttpRequest) {
    			handHttpRequest(context, (FullHttpRequest)msg);
    		}else if(msg instanceof WebSocketFrame) {
    			//处理websocket连接业务
    			handWebsocketFrame(context, (WebSocketFrame)msg);
    		}
    	}
    	
    	/**
    	 * 处理客户端与服务端之前的websocket业务
    	 * @param context
    	 * @param frame
    	 */
    	@SuppressWarnings("unused")
    	private void handWebsocketFrame(ChannelHandlerContext context,WebSocketFrame frame) {
    		//判断是否关闭websocket的指令
    		if(frame instanceof CloseWebSocketFrame) {
    			handShaker.close(context.channel(), (CloseWebSocketFrame)frame.retain());
    		}
    		//判断是否为ping消息
    		if(frame instanceof PingWebSocketFrame) {
    			context.channel().write(new PongWebSocketFrame(frame.content().retain()));
    			return;
    		}
    		//判断是否是二进制消息
    		if(!(frame instanceof TextWebSocketFrame)) {
    			System.out.println("不支持二进制消息!");
    			//throw new RuntimeException("["+this.getClass().getName()+"]不支持消息!");
    		}
    		//返回应答消息
    		//获取客户端向服务器发送的消息
    		if(!(frame instanceof CloseWebSocketFrame)) {
    			String request=((TextWebSocketFrame)frame).text();//强转之前需要判断
    			System.out.println("服务端收到的客户端消息==>>"+request);
    			TextWebSocketFrame tws=new TextWebSocketFrame(new Date().toString()+context.channel().id()+"===>>>"+request);
    			//群发,服务端向每个连接上的客户端群发消息
    			NettyConfig.group.writeAndFlush(tws);
    		}
    	}
    	
    	/**
    	 * 处理客户端向服务器发起http握手请求的业务
    	 * @param context
    	 * @param request
    	 */
    	@SuppressWarnings("unused")
    	private void handHttpRequest(ChannelHandlerContext context,FullHttpRequest request) {
    		if(!request.getDecoderResult().isSuccess()||!"websocket".equals(request.headers().get("Upgrade"))) {
    			sendHttpResponse(context, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
    			return;
    		}
    		WebSocketServerHandshakerFactory wsFactory=new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
    		handShaker=wsFactory.newHandshaker(request);
    		if(handShaker==null) {
    			WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(context.channel());
    		}else {
    			handShaker.handshake(context.channel(), request);
    		}
    	}
    	
    	/**
    	 * 服务端向客户端响应消息
    	 * @param context
    	 * @param request
    	 * @param response
    	 */
    	private void sendHttpResponse(ChannelHandlerContext context,FullHttpRequest request,DefaultFullHttpResponse response) {
    		if(response.getStatus().code()!=200) {
    			ByteBuf buf=Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
    			response.content().writeBytes(buf);
    			buf.release();
    		}
    		//服务端向客户端发送数据
    		ChannelFuture f=context.channel().writeAndFlush(response);
    		if(response.getStatus().code()!=200) {
    			f.addListener(ChannelFutureListener.CLOSE);
    		}
    	}
    
    }
    

    3.初始化组件类:初始化连接时的各种组件

    package com.java.netty;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * 初始化连接时的各个组件
     * @author qiuzhiwen
     *
     */
    public class WebSocketChannelHandler extends ChannelInitializer<SocketChannel>{
    
    	@Override
    	protected void initChannel(SocketChannel sc) throws Exception {
    		sc.pipeline().addLast("http-codec", new HttpServerCodec());
    		sc.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
    		sc.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
    		sc.pipeline().addLast("handler", new WebSocketHandler());
    
    	}
    
    }
    

    4.程序启动类:程序启动入口

    package com.java.netty;
    /**
     * 程序启动类
     * @author qiuzhiwen
     *
     */
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class Main {
    
    	@SuppressWarnings({ })
    	public static void main(String[] args) {
    		EventLoopGroup boosGroup=null;
    		EventLoopGroup workGroup=null;
    		try {
    			boosGroup=new NioEventLoopGroup();
    			workGroup=new NioEventLoopGroup();
    			ServerBootstrap sb=new ServerBootstrap();
    			sb.group(boosGroup,workGroup);
    			sb.channel(NioServerSocketChannel.class);
    			sb.childHandler(new WebSocketChannelHandler());
    			System.out.println("服务端开启,等待客户端连接...");
    			Channel ch = sb.bind(8888).sync().channel();
    			ch.closeFuture().sync();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			//优雅地退出程序
    			if(boosGroup!=null) {
    				boosGroup.shutdownGracefully();
    			}
    			if(workGroup!=null) {
    				boosGroup.shutdownGracefully();
    			}
    		}
    	}
    }
    

    5.客户端页面程序

    <html>
    	<head>
    		<meta http-equiv="Content-Type" content="text/html; charset = utf-8" />
    		<title>WebSocket客户端</title>
    		<script type="text/javascript">
    			var socket;
    			if(!window.WebSocket) {
    				window.WebSocket = window.MozWebSocket;
    			}
    			if(window.WebSocket) {
    				socket = new WebSocket("ws://localhost:8888/websocket");
    				socket.onmessage = function(event) {
    					var ta = document.getElementById('responseContent');
    					ta.value += event.data + "\r\n";
    				};
    				socket.onopen = function(event) {
    					var ta = document.getElementById('responseContent');
    					ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";
    				};
    				socket.onclose = function(event) {
    					var ta = document.getElementById('responseContent');
    					ta.value = "";
    					ta.value = "WebSocket连接已经关闭\r\n";
    				};
    			} else {
    				alert("您的浏览器不支持WebSocket");
    			}
    			
    			window.onbeforeunload = function() {
    				ws.close();
    			}
    
    			function send(message) {
    				if(!window.WebSocket) {
    					return;
    				}
    				if(socket.readyState == WebSocket.OPEN) {
    					socket.send(message);
    				} else {
    					alert("WebSocket连接没有建立成功!!");
    				}
    			}
    		</script>
    	</head>
    	<body>
    		<form onSubmit="return false;">
    			<input type="text" name="message" value="" />
    			<br/><br/>
    			<input type="button" value="发送WebSocket请求消息" onClick="send(this.form.message.value)" />
    			<hr color="red" />
    			<h2>客户端接收到服务端返回的应答消息</h2>
    			<textarea id="responseContent" style="width:1024px; height:300px"></textarea>
    		</form>
    	</body>
    </html>

    6.启动服务器

    7.启动客户端并发送消息

    PS:源码地址:https://github.com/Jasper2s/Study_Imooc/tree/master/JavaPractice/src/com/java/netty

    下一篇:没有了