当前位置 博文首页 > Jasper6688的博客:基于Netty的客户端与服务器之间的通信实例
需要的jar包:https://github.com/Jasper2s/Study_Imooc/tree/master/JavaPractice/lib
1.全局配置类:存储每一个客户端接入进来的配置
package com.java.netty;
/**
* 全局配置类
* @author qiuzhiwen
*
*/
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class NettyConfig {
//存储每一个客户端接入进来的配置
public static ChannelGroup group=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
2.核心业务处理类:接收/处理/响应客户端websocket请求
package com.java.netty;
import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
/**
* 接收/处理/响应客户端websocket请求的核心处理类
* @author qiuzhiwen
*
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<Object>{
private WebSocketServerHandshaker handShaker;
private static final String WEB_SOCKET_URL="ws://localhost:8888/websocket";
//客户端与服务端创建连接端时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启!");
}
//客户端与服务端断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客户端与服务端连接断开!");
}
//服务端接收客户端发送过来的数据之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//服务端处理客户端websocket请求的核心方法
@Override
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
//处理客户端向服务器发起http握手请求的业务
if(msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest)msg);
}else if(msg instanceof WebSocketFrame) {
//处理websocket连接业务
handWebsocketFrame(context, (WebSocketFrame)msg);
}
}
/**
* 处理客户端与服务端之前的websocket业务
* @param context
* @param frame
*/
@SuppressWarnings("unused")
private void handWebsocketFrame(ChannelHandlerContext context,WebSocketFrame frame) {
//判断是否关闭websocket的指令
if(frame instanceof CloseWebSocketFrame) {
handShaker.close(context.channel(), (CloseWebSocketFrame)frame.retain());
}
//判断是否为ping消息
if(frame instanceof PingWebSocketFrame) {
context.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判断是否是二进制消息
if(!(frame instanceof TextWebSocketFrame)) {
System.out.println("不支持二进制消息!");
//throw new RuntimeException("["+this.getClass().getName()+"]不支持消息!");
}
//返回应答消息
//获取客户端向服务器发送的消息
if(!(frame instanceof CloseWebSocketFrame)) {
String request=((TextWebSocketFrame)frame).text();//强转之前需要判断
System.out.println("服务端收到的客户端消息==>>"+request);
TextWebSocketFrame tws=new TextWebSocketFrame(new Date().toString()+context.channel().id()+"===>>>"+request);
//群发,服务端向每个连接上的客户端群发消息
NettyConfig.group.writeAndFlush(tws);
}
}
/**
* 处理客户端向服务器发起http握手请求的业务
* @param context
* @param request
*/
@SuppressWarnings("unused")
private void handHttpRequest(ChannelHandlerContext context,FullHttpRequest request) {
if(!request.getDecoderResult().isSuccess()||!"websocket".equals(request.headers().get("Upgrade"))) {
sendHttpResponse(context, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory=new WebSocketServerHandshakerFactory(WEB_SOCKET_URL, null, false);
handShaker=wsFactory.newHandshaker(request);
if(handShaker==null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(context.channel());
}else {
handShaker.handshake(context.channel(), request);
}
}
/**
* 服务端向客户端响应消息
* @param context
* @param request
* @param response
*/
private void sendHttpResponse(ChannelHandlerContext context,FullHttpRequest request,DefaultFullHttpResponse response) {
if(response.getStatus().code()!=200) {
ByteBuf buf=Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture f=context.channel().writeAndFlush(response);
if(response.getStatus().code()!=200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
3.初始化组件类:初始化连接时的各种组件
package com.java.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* 初始化连接时的各个组件
* @author qiuzhiwen
*
*/
public class WebSocketChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast("http-codec", new HttpServerCodec());
sc.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
sc.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
sc.pipeline().addLast("handler", new WebSocketHandler());
}
}
4.程序启动类:程序启动入口
package com.java.netty;
/**
* 程序启动类
* @author qiuzhiwen
*
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Main {
@SuppressWarnings({ })
public static void main(String[] args) {
EventLoopGroup boosGroup=null;
EventLoopGroup workGroup=null;
try {
boosGroup=new NioEventLoopGroup();
workGroup=new NioEventLoopGroup();
ServerBootstrap sb=new ServerBootstrap();
sb.group(boosGroup,workGroup);
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new WebSocketChannelHandler());
System.out.println("服务端开启,等待客户端连接...");
Channel ch = sb.bind(8888).sync().channel();
ch.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅地退出程序
if(boosGroup!=null) {
boosGroup.shutdownGracefully();
}
if(workGroup!=null) {
boosGroup.shutdownGracefully();
}
}
}
}
5.客户端页面程序
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8" />
<title>WebSocket客户端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket) {
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage = function(event) {
var ta = document.getElementById('responseContent');
ta.value += event.data + "\r\n";
};
socket.onopen = function(event) {
var ta = document.getElementById('responseContent');
ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseContent');
ta.value = "";
ta.value = "WebSocket连接已经关闭\r\n";
};
} else {
alert("您的浏览器不支持WebSocket");
}
window.onbeforeunload = function() {
ws.close();
}
function send(message) {
if(!window.WebSocket) {
return;
}
if(socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("WebSocket连接没有建立成功!!");
}
}
</script>
</head>
<body>
<form onSubmit="return false;">
<input type="text" name="message" value="" />
<br/><br/>
<input type="button" value="发送WebSocket请求消息" onClick="send(this.form.message.value)" />
<hr color="red" />
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id="responseContent" style="width:1024px; height:300px"></textarea>
</form>
</body>
</html>
6.启动服务器
7.启动客户端并发送消息
PS:源码地址:https://github.com/Jasper2s/Study_Imooc/tree/master/JavaPractice/src/com/java/netty