Netty笔记之六:Netty对websocket的帮忙

日期: 2019-12-06 15:32 浏览次数 :

背景

多年来拿到供给要在网页上海展览中心示报警察与消防人员息。今后报告警察方信息都以由此短信,Wechat和 App 推送给客户的,现在要让报到客户在网页端也能实时收到到报告急察方推送。

依稀记得以前专门的学业的时候遇到过相似的须要。因为从前的浏览器标准相比陈旧,何况当场用 Java 很多,所以那个时候化解那个难题就用了 Comet4J。具体的规律便是长轮询,长链接。但这段时间毕竟 html5 流行开来了,IE 都被 Edge 接替了,再用早前这种本事就体现不适当时候宜。

很早早先就听过 WebSocket 的芳名,但因为那个时候非常多客商的浏览器还不帮忙,所以对这一个技巧也正是半途而废,未有太深切研商过。将来趁着项目必要,就来有一些尖锐领会一下。

demo

浏览器页面向服务器发送音信,服务器将日前音讯发送时间上报给浏览器页面。

劳务器端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

//websocket长连接示例
public class MyServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new WebSocketChannelInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }

    }
}

服务器端开首化连接

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
        pipeline.addLast(new HttpServerCodec());
        //以块的方式来写的处理器
        pipeline.addLast(new ChunkedWriteHandler());
        //netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
        pipeline.addLast(new HttpObjectAggregator(8192));

        //ws://server:port/context_path
        //ws://localhost:9999/ws
        //参数指的是contex_path
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //websocket定义了传递数据的6中frame类型
        pipeline.addLast(new TextWebSocketFrameHandler());

    }
}

WebSocketServerProtocolHandler:参数是拜见路线,那边钦赐的是ws,服务顾客端访谈服务器的时候钦赐的url是:ws://localhost:8899/ws
它担负websocket握手以至管理决定框架(Close,Ping(心跳检检查实验request),Pong(心跳检查实验响应))。 文本和二进制数据帧被传送到管道中的下多个管理程序举行管理。


WebSocket标准中定义了6种档案的次序的桢,netty为其提供了现实的照管的POJO落成。
WebSocketFrame:全体桢的父类,所谓桢便是WebSocket服务在创制的时候,在通路中拍卖的数据类型。本列子中型地铁户端和服务器之间管理的是文件音信。所以范型参数是TextWebSocketFrame。

图片 1

WebSocketFrame继承类

自定义Handler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

//处理文本协议数据,处理TextWebSocketFrame类型的数据,websocket专门处理文本的frame就是TextWebSocketFrame
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

    //读到客户端的内容并且向客户端去写内容
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("收到消息:"+msg.text());

        /**
         * writeAndFlush接收的参数类型是Object类型,但是一般我们都是要传入管道中传输数据的类型,比如我们当前的demo
         * 传输的就是TextWebSocketFrame类型的数据
         */
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务时间:"+ LocalDateTime.now()));
    }

    //每个channel都有一个唯一的id值
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //打印出channel唯一值,asLongText方法是channel的id的全名
        System.out.println("handlerAdded:"+ctx.channel().id().asLongText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生");
        ctx.close();
    }
}

页面

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客户端</title>
</head>
<body>
<script type="text/javascript">
    var socket;

    //如果浏览器支持WebSocket
    if(window.WebSocket){
        //参数就是与服务器连接的地址
        socket = new WebSocket("ws://localhost:8899/ws");

        //客户端收到服务器消息的时候就会执行这个回调方法
        socket.onmessage = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "n"+event.data;
        }

        //连接建立的回调函数
        socket.onopen = function(event){
            var ta = document.getElementById("responseText");
            ta.value = "连接开启";
        }

        //连接断掉的回调函数
        socket.onclose = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value +"n"+"连接关闭";
        }
    }else{
        alert("浏览器不支持WebSocket!");
    }

    //发送数据
    function send(message){
        if(!window.WebSocket){
            return;
        }

        //当websocket状态打开
        if(socket.readyState == WebSocket.OPEN){
            socket.send(message);
        }else{
            alert("连接没有开启");
        }
    }
</script>
<form onsubmit="return false">
    <textarea name = "message" style="width: 400px;height: 200px"></textarea>

    <input type ="button" value="发送数据" onclick="send(this.form.message.value);">

    <h3>服务器输出:</h3>

    <textarea id ="responseText" style="width: 400px;height: 300px;"></textarea>

    <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空数据">
</form>
</body>
</html>

开首服务器,然后运转客商端页面,当顾客端和劳务器端连接建立的时候,服务器端实施handlerAdded回调方法,客商端执行onopen回调方法

劳务器端调节台:

handlerAdded:acde48fffe001122-00005c11-00000001-4ce4764fffa940fe-df037eb5

页面:

图片 2

客商端连接建构

顾客端发送消息,服务器端进行响应,

图片 3

浏览器客商端发送信息

服务端调节台打印:

收到消息:websocket程序

顾客端也接到服务器端的响应:

图片 4

浏览器客商端收到服务器端响应

张开开辟者工具

图片 5

页面发送websocket诉求.png

在从正规的HTTP或许HTTPS左券切换来WebSocket时,将会接收生龙活虎种晋级握手的体制。由此,使用WebSocket的应用程序将平昔以HTTP/S作为最早,然后再推行升级。那么些晋级动作发生的规准期刻特定与应用程序;它也许会发生在运转时候,也是有可能会发出在央浼了有些特定的IURAV4L之后。

图片 6

查看桢新闻

仿照效法本事
java web 服务器推送技能--comet4j
Comet:基于 HTTP 长连接的“服务器推”技巧

websocket 简介

今后浏览器要得到服务端数据,都以经过发送 HTTP 央求,然后等待服务端回应的。也正是说浏览器端一贯是成套诉求的发起者,唯有它主动,技术收获到多少。而要让浏览器后生可畏侧能够得到到服务端的实时数据,就须要不停地向服务端发起呼吁。即使好多气象下并不曾拿走到骨子里数据,但那大大增加了网络压力,对于服务带来讲压力也直线上涨。

图片 7

新兴大家学会了接纳长连接 + 长轮询的措施。换句话说,也便是延伸 HTTP 央求的留存时间,尽量保持 HTTP 连接。固然那在早晚程度上下滑了众多压力,但照旧必要不停地实行轮询,也做不到确实的实时性。(借用一张图)

图片 8

乘势 HTML5 的赶来,WebSocket 在 二零一一 年被定为正规(详细情形请参见 悍马H2FC 6455)。

借用 《Go Web 编制程序》的话。WebSocket 接纳了有个别例外的报头,使得浏览器和服务器只要求做二个握手的动作,就足以在浏览器和服务器之间创立一条连接通道。且此接二连三会保持在活动状态,你能够应用 JavaScript 来向连接写入或从当中选拔数据,就如在利用四个不荒谬的 TCP Socket 同样。它撤除了 Web 实时化的标题。

图片 9

由于 WebSocket 是全双工通讯,所以当建构了 WebSocket 连接之后,接下去的通信就相近于古板的 TCP 通讯了。顾客端和服务端能够并行发送数据,不再有实时性的标题。

netty对websocket和谐的支撑

代码安详严整

自个儿在这里处会微微汇报一下代码的骨干构成,也顺带说说 Go 语言中一些常用的写法和情势(本身也是从其余语言转变 Go 语言,终究 Go 语言也一定年轻。所以有提出的话,敬请建议。)。由于 Go 语言的发明人和某些最首要维护者大都来自于 C/C++ 语言,所以 Go 语言的代码也更偏侧于 C/C++ 系。

首先先看一下 Server 的结构:

// Server defines parameters for running websocket server.
type Server struct {
 // Address for server to listen on
 Addr string

 // Path for websocket request, default "/ws".
 WSPath string

 // Path for push message, default "/push".
 PushPath string

 // Upgrader is for upgrade connection to websocket connection using
 // "github.com/gorilla/websocket".
 //
 // If Upgrader is nil, default upgrader will be used. Default upgrader is
 // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
 // returns true.
 Upgrader *websocket.Upgrader

 // Check token if it's valid and return userID. If token is valid, userID
 // must be returned and ok should be true. Otherwise ok should be false.
 AuthToken func(token string) (userID string, ok bool)

 // Authorize push request. Message will be sent if it returns true,
 // otherwise the request will be discarded. Default nil and push request
 // will always be accepted.
 PushAuth func(r *http.Request) bool

 wh *websocketHandler
 ph *pushHandler
}

PS: 由于本人全方位项指标讲解都以用República Portuguesa语写的,所以见谅了,希望不要紧碍阅读。

这里说一下 Upgrader *websocket.Upgrader,那是 gorilla/websocket 包的对象,它用来提高 HTTP 乞求。

假诺四个构造体参数过多,平时不提出直接开首化,而是利用它提供的 New 方法。这里是:

// NewServer creates a new Server.
func NewServer(addr string) *Server {
 return &Server{
  Addr:  addr,
  WSPath: serverDefaultWSPath,
  PushPath: serverDefaultPushPath,
 }
}

那也是 Go 语言对外提供初叶化方法的风华正茂种不以为奇用法。

下一场 Server 使用 ListenAndServe 方法运维并监听端口,与 http 包的应用相似:

// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
 b := &binder{
  userID2EventConnMap: make(map[string]*[]eventConn),
  connID2UserIDMap: make(map[string]string),
 }
 // websocket request handler
 wh := websocketHandler{
  upgrader: defaultUpgrader,
  binder: b,
 }
 if s.Upgrader != nil {
  wh.upgrader = s.Upgrader
 }
 if s.AuthToken != nil {
  wh.calcUserIDFunc = s.AuthToken
 }
 s.wh = &wh
 http.Handle(s.WSPath, s.wh)
 // push request handler
 ph := pushHandler{
  binder: b,
 }
 if s.PushAuth != nil {
  ph.authFunc = s.PushAuth
 }
 s.ph = &ph
 http.Handle(s.PushPath, s.ph)
 return http.ListenAndServe(s.Addr, nil)
}

那边大家转换了三个 Handler,分别为 websocketHandler 和 pushHandler。websocketHandler 担任与浏览器创设连接并传输数据,而 pushHandler 则管理推送端的央求。能够观望,这里四个 Handler 都卷入了八个binder 对象。这些 binder 用于保险 token <-> userID <-> Conn 的涉嫌:

// binder is defined to store the relation of userID and eventConn
type binder struct {
 mu sync.RWMutex
 // map stores key: userID and value of related slice of eventConn
 userID2EventConnMap map[string]*[]eventConn

 // map stores key: connID and value: userID
 connID2UserIDMap map[string]string
}

websocketHandler

切切实实看一下 websocketHandler 的落到实处。

// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
 // upgrader is used to upgrade request.
 upgrader *websocket.Upgrader

 // binder stores relations about websocket connection and userID.
 binder *binder

 // calcUserIDFunc defines to calculate userID by token. The userID will
 // be equal to token if this function is nil.
 calcUserIDFunc func(token string) (userID string, ok bool)
}

不会细小略的布局。websocketHandler 达成了 http.Handler 接口:

// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 wsConn, err := wh.upgrader.Upgrade(w, r, nil)
 if err != nil {
  return
 }
 defer wsConn.Close()
 // handle Websocket request
 conn := NewConn(wsConn)
 conn.AfterReadFunc = func(messageType int, r io.Reader) {
  var rm RegisterMessage
  decoder := json.NewDecoder(r)
  if err := decoder.Decode(&rm); err != nil {
   return
  }
  // calculate userID by token
  userID := rm.Token
  if wh.calcUserIDFunc != nil {
   uID, ok := wh.calcUserIDFunc(rm.Token)
   if !ok {
    return
   }
   userID = uID
  }
  // bind
  wh.binder.Bind(userID, rm.Event, conn)
 }
 conn.BeforeCloseFunc = func() {
  // unbind
  wh.binder.Unbind(conn)
 }
 conn.Listen()
}

首先将盛传的 http.Request 转变为 websocket.Conn,再将其分装为我们自定义的一个wserver.Conn(封装,恐怕说是组合,是 Go 语言的一级用法。记住,Go 语言未有继承,唯有结合)。然后设置了 Conn 的 AfterReadFunc 和 BeforeCloseFunc 方法,接着运行了 conn.Listen(卡塔尔(英语:State of Qatar)。AfterReadFunc 意思是当 Conn 读取到数量后,尝试验证并依附 token 总计 userID,然乎 bind 注册绑定。BeforeCloseFunc 则为 Conn 关闭前行行解绑操作。

pushHandler

pushHandler 则轻巧驾驭。它深入分析要求然后推送数据:

// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 if r.Method != http.MethodPost {
  w.WriteHeader(http.StatusMethodNotAllowed)
  return
 }
 // authorize
 if s.authFunc != nil {
  if ok := s.authFunc(r); !ok {
   w.WriteHeader(http.StatusUnauthorized)
   return
  }
 }
 // read request
 var pm PushMessage
 decoder := json.NewDecoder(r.Body)
 if err := decoder.Decode(&pm); err != nil {
  w.WriteHeader(http.StatusBadRequest)
  w.Write([]byte(ErrRequestIllegal.Error()))
  return
 }
 // validate the data
 if pm.UserID == "" || pm.Event == "" || pm.Message == "" {
  w.WriteHeader(http.StatusBadRequest)
  w.Write([]byte(ErrRequestIllegal.Error()))
  return
 }
 cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
 io.Copy(w, result)
}

Conn

Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。

// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
 Conn *websocket.Conn
 AfterReadFunc func(messageType int, r io.Reader)
 BeforeCloseFunc func()

 once sync.Once
 id  string
 stopCh chan struct{}
}

最主要的办法为 Listen(卡塔尔(英语:State of Qatar):

// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
 c.Conn.SetCloseHandler(func(code int, text string) error {
  if c.BeforeCloseFunc != nil {
   c.BeforeCloseFunc()
  }
  if err := c.Close(); err != nil {
   log.Println(err)
  }
  message := websocket.FormatCloseMessage(code, "")
  c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
  return nil
 })
 // Keeps reading from Conn util get error.
ReadLoop:
 for {
  select {
  case <-c.stopCh:
   break ReadLoop
  default:
   messageType, r, err := c.Conn.NextReader()
   if err != nil {
    // TODO: handle read error maybe
    break ReadLoop
   }
   if c.AfterReadFunc != nil {
    c.AfterReadFunc(messageType, r)
   }
  }
 }
}

最首要安装了当 websocket 连接关闭时的管理和不停地读取数据。

文中很难周密地描述整个代码的运营流程,像现实阅读代码,请前往 github.com/alfred-zhong/wserver 获取。

并发的背景

WebSocket是后生可畏种标准,是Html5规范的风流倜傥有个别,websocket解决什么难题吗?消除http合同的有个别欠缺。大家清楚,http合同是蓬蓬勃勃种无状态的,基于央求响应形式的商业事务。

网页闲谈的前后相继(基于http合同的),浏览器顾客端发送多少个数据,服务器收到到那个浏览器数据现在,怎么着将数据推送给任何的浏览器顾客端呢?
那就关系到服务器的推技能。早年为了促成这种服务器也足以像浏览器客商端推送音讯的长连接须求,有过多方案,比如说最常用的施用风度翩翩种轮询技艺,就是顾客端每间距风流浪漫段时间,例如说2s要么3s向服务器发送诉求,去央求服务器端是或不是还应该有新闻未有响应给顾客端,有就响应给顾客端,当然未有响应就只是风流倜傥种无用的乞请。

这种长轮询技艺的顽固的病痛有:
1)响应数据不是实时的,在下贰遍轮询乞求的时候才会获得那一个响应音讯,只可以算得准实时,而不是从严意义的实时。
2)大好多轮询央浼的空轮询,产生大气的财富带宽的萧条,每一次http央浼指点了汪洋不行的头消息,而服务器端其实大多数都不关切那几个头音讯,而实际上海南大学学好多情景下那几个头信息都远远高于body消息,变成了能源的损耗。

拓展
相比较新的技能去做轮询的效应是Comet。这种才能纵然能够双向通讯,但依旧必要频繁发出乞求。并且在Comet中,普及应用的长链接,也会损耗服务器财富。

本文中代码能够在 github.com/alfred-zhong/wserver获取。

WebSocket是什么?

WebSocket后生可畏种在单个 TCP 连接上进展全双工通讯的合计。WebSocket通讯公约于二〇一一年被IETF定为正式RubiconFC 6455,并被HighlanderFC7936所增补标准。WebSocket API也被W3C定为标准。

WebSocket 使得顾客端和服务器之间的数据交流变得更为简约,允许服务端主动向客商端推送数据。在 WebSocket API 中,浏览器和服务器只供给形成一遍握手,两个之间就径直能够成立持久性的总是,并展开双向数据传输。

websocket的产出正是消除了顾客端与服务端的这种长连接难点,这种长连接是实留意义上的长连接。客商端与服务器假使再三再四构建两岸正是对等的实业,不再区分严苛意义的客商端和服务端。长连接唯有在第一创设的时候,客商端才会向服务端发送一些呼吁,这几个诉求包涵诉求头和须求体,大器晚成旦确立好连接之后,客商端和服务器只会发送数据自个儿而无需再去发送须要头音讯,这样大批量减去了
网络带宽。websocket和谐本人是创设在http协议之上的晋升公约,客商端首先向服务器端去创设连接,那一个一而再本身正是http左券只是在头消息中含有了豆蔻梢头部分websocket公约的连带音信,生龙活虎旦http连接建构今后,服务器端读到这一个websocket公约的相关信息就将此左券进级成websocket公约。websocket合计也得以动用在非浏览器接收,只须求引进相关的websocket库就足以了。

HTML5概念了WebSocket左券,能更加好的节约服务器财富和带宽,何况能够更实时地拓宽报纸发表。Websocket使用ws或wss的集独财富标识符,相通于HTTPS,此中wss表示在TLS之上的Websocket。如:

ws://example.com/wsapi
wss://secure.example.com/

优点

  • 超少的决定开垦:相对与http央浼的头顶音讯,websocket音讯鲜明回退。
  • 越来越强的实时性:由于协商是全双工的,所以服务器能够任何时候主动给顾客端下发数据。相对于HTTP伏乞须求静观其变客商端发起倡议服务端能力响应,延迟明显越来越少;即便是和Comet等看似的长轮询相比较,其也能在短时间内更频仍地传递数据。
  • 保持接二连三情状。于HTTP分化的是,Websocket供给先创立连接,那就使得其成为朝气蓬勃种有意况的合同,之后通讯时能够省略部分情状消息。而HTTP乞求或者必要在各类央浼都辅导状态新闻(如身份认证等)。
  • 越来越好的二进制扶助。Websocket定义了二进制帧,相对HTTP,可以更自在地管理二进制内容。
  • 能够支撑扩张。Websocket定义了扩展,客商能够扩充左券、实现部分自定义的子合同。如部分浏览器扶持压缩等。
  • 越来越好的减少效果。相对于HTTP压缩,Websocket在伏贴的扩展协理下,能够沿用以前内容的上下文,在传递相仿的数额时,能够分明地增长压缩率。