WebSocket服务端开发(二)-WebSocketServer类主流程介绍

本文介绍WebSocketServer主函数run的实现,从整体上理解协议工作流程。

run方法代码如下:

function run() {
  //将服务器的socket添加到初始化socket列表中
  array_push($this->socketList, $this->serverSocket);
  //工作流程开始
  while (true) {
    //read为所有存在的socket列表
    $read = $this->socketList;
    //如果shutdown变量设置为true,服务器关闭,退出循环
    if ($this->shutdown) {
      $this->onshutdown();
      return;
    }

    if ($this->debug) {
      echo "Waiting for socket_select\n";
    }
    //该函数会从所有可读写的socket中选取一个socket进行处理,该方法会阻塞流程,只有在收到连接时该方法才会返回
    if (socket_select($read, $write, $except, NULL) === false) {
      if ($this->debug) {
        echo $this->getLastErrMsg();
      }
      continue;
    }

    foreach ($read as $socketItem) {
      //如果选取的socket是服务器监听的socket,则此时是新连接接入
      if ($socketItem === $this->serverSocket) {
        //接受socket连接
        $socket = $this->socketAccept();
        if ($socket) {
          //执行连接方法
          $this->connect($socket);
        }
      } else {
        //此时是连接过的socket,获取socketId
        $socketId = $this->getSocketId($socketItem);
        if ($socketId === FALSE) {
          //获取socketId失败,则将该socket断开连接
          $this->disconnectBySocket($socketItem);
          continue;
        }
        //接收传来的数据
        $data = $this->socketRecv($socketId);
        if (strlen($data) > 0) {
          //收到的数据长度不为空时,需要重置连接错误计数
          $this->socketListMap[$socketId]['errorCnt'] = 0;
          if (!isset($this->socketListMap[$socketId])) {
            $this->disconnect($socketId);
            continue;
          } else if (!$this->socketListMap[$socketId]['handshake']) {
            //尚未进行WebSocket协议握手,尝试读取连接缓冲区,如果缓冲区中没有数据,则将socketId记录到握手中列表
            //这是为了防止握手包被分成多个包进行传递(正常情况下不会出现此问题)
            //但根据HTTP协议,并未规定HTTP请求头不能被分割,故应该根据协议中的\r\n\r\n来判断请求头已发送完毕
            if (strlen($this->socketListMap[$socketId]['buffer']) === 0) {
              $this->handshakingList[$socketId] = time();
            }
            //将数据写入缓冲区
            $this->socketListMap[$socketId]['buffer'] .= $data;
            //比较后4个字节是否为\r\n\r\n
            if (substr_compare($this->socketListMap[$socketId]['buffer'], str_repeat(chr(0x0D) . chr(0x0A), 2), -4) === 0) {
              //进行握手处理
              $this->doHandShake($socketId);
            } else {
              //数据没有传送完毕,需要缓冲数据直到全部接收请求头(这个可以通过Telnet命令直接连接,每输入一个字节都会立即传给服务器,这时服务器应该缓存内容。但同时也应该设置超时时间,防止恶意占用服务器资源。)
              $this->onUpgradePartReceive($socketId);
            }
          } else if ($this->parseFrame($data, $socketId)) {
            //parseFrame会解析数据帧,如果该帧FIN标识为1则函数会返回true,交给businessHandler进行业务逻辑处理,数据在socketListMap的buffer中,所以只需要提供socketId即可找到该socket的所有信息。
            $this->businessHandler($socketId);
          }
        } else {
          $this->socketListMap[$socketId]['errorCnt'] += 1;
          if ($this->debug){
            echo "Receive empty data![$errorCnt]\n";
          }
          if ($errorCnt >= 3) {
            $this->disconnect($socketId);
          }
        }
      }
    }
    //每次处理完连接后,判断是否需要健康检查,检查之后会移除不健康的socket
    if (time() - $this->lastHealthCheck > $this->healthCheckInterval) {
      $this->healthCheck();
    }
    $this->removeUnhandshakeConnect();
  }
}

WebSocket服务端开发(一)-WebSocketServer类简介

本文介绍使用PHP语言编写的Server类WebSocketServer的整体设计思路及包含的方法。WebSocketServer类只是一个简单的单线程的WebSocket基类,可以通过继承该类实现自己的Server类。

一、成员变量
包含以下成员变量:

//各种帧类型的常量
const FRAME_CONTINUE = 0x00;
const FRAME_TEXT = 0x01;
const FRAME_BIN = 0x02;
const FRAME_CLOSE = 0x08;
const FRAME_PING = 0x09;
const FRAME_PONG = 0x0A;

protected $serverSocket = null;//服务器监听的socket
protected $shutdown = false;//关闭状态,如果是true表示服务器准备关闭
protected $socketList = array();//保存所有socket的数组
protected $socketListMap = array();//根据唯一id对socket进行索引,并保存socket的其他自定义属性,相当于session
private $handshakingList = array();//正在进行握手的socket,用于处理握手超时的socket
private $lastHealthCheck = null;//最后一次进行健康检查的时间,这里根据最后一次通信时间判断健康状态,检查时默认不会发送pong帧
private $healthCheckInterval = 300;//健康检查间隔,单位秒,每次处理完一个连接后会判断是否进行健康检查。
private $handshakeTimeout = 10;//握手超时时间,单位秒,为了防止过多的未完成握手占用系统资源,会对超时的握手连接进行关闭处理。

二、构造函数与析构函数

//port监听的端口号,address监听的IP地址,0.0.0.0表示监听本机上任何地址,debug为调试开关,为true是会打印错误信息及其他信息。
function __construct($port, $address = '0.0.0.0', $debug = false) {
  $this->serverSocket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
  socket_set_option($this->serverSocket, SOL_SOCKET, SO_REUSEADDR, 1);
  socket_set_option($this->serverSocket, SOL_SOCKET, TCP_NODELAY, 1);
  //socket绑定
  if (socket_bind($this->serverSocket, $address, $port) === false) {
    if ($debug) {
      echo $this->getLastErrMsg();
    }
    return;
  }

  //监听开始
  if (socket_listen($this->serverSocket) === false) {
    if ($debug) {
      echo $this->getLastErrMsg();
    }
    return;
  }

  $this->onstarted($this->serverSocket);
  $this->lastHealthCheck = time();
  $this->run();
}

function __destruct() {
  socket_close($this->serverSocket);
}

三、WebSocket类方法

//主工作函数,控制整体流程
function run(){}
//服务启动后回调
function onstarted($serverSocket){}
//socket连接后回调
function onconnected($socket){}
//收到不完整的握手包时回调
function onUpgradePartReceive($socketId){}
//握手失败后回调
function onHandShakeFailure($socketId){}
//握手成功后回调
function onHandShakeSuccess($socketId){}
//断开连接时回调
function ondisconnected($socketId){}
//从列表中移除socket时回调
function onAfterRemoveSocket($socketId){}
//健康检查后回调
function onafterhealthcheck($unhealthyList){}
//遇到socket错误时回调
function onerror($errCode, $socketId){}
//服务器关闭时回调
function onshutdown(){}

//健康检查函数
function healthCheck(){}
//业务逻辑处理函数(应由继承类覆盖实现)
function businessHandler($socketId){}
//帧是否结束
function isFin($byte){}
//帧是否进行掩码处理
function isMasked($byte){}
//获取帧类型
function getFrameType($byte){}
//处理任意类型的帧
function parseRawFrame($payload, $mask){}
//处理文本帧
function parseTextFrame($payload, $mask){}
//处理二进制帧
function parseBinaryFrame($payload, $mask){}
//创建关闭帧,支持关闭码及关闭原因
function closeFrame($socketId, $closeCode = 1000, $closeMsg = 'goodbye'){}
//发送ping帧
function sendPing($socketId, $data = 'ping'){}
//发送pong帧
function sendPong($socketId, $data = 'pong'){}
//获取掩码
function getMask($data, $len){}
//获取数据负载
function getPayload($data, $len){}
//获取负载长度
function getPayloadLen($data){}
//判断是否为控制帧
function isControlFrame($frameType){}
//解析帧
function parseFrame($data, $socketId){}
//创建指定类型的帧,支持分帧
function createFrame($data, $type, $fin = 0x01){}
//计算WebSocket-Accept值
function getWebSocketAccept($websocketKey){}
//协议选择(协商),建议继承后通过覆盖自定义
function selectProtocol($protocols){}
//获取握手响应头
function getHandShakeHeader($headers){}
//基础协议头检查,只检查WebSocket协议要求的请求头字段
function checkBaseHeader($header){}
//用户自定义头检查,如cookie等信息检查,需要覆盖此方法实现,该处返回恒为true
function checkCustomHeader($header){}
//创建握手失败的响应
function badRequest($socketId){}
//获取响应头的各个字段
function getHeaders($header){}
//移除握手超时的连接
function removeUnhandshakeConnect(){}
//WebSocket握手
function doHandShake($socketId){}
//显示数据信息,仅调试使用
function showData($buffer){}
//发送数据
function sendData($socketId, $data, $type = self::FRAME_TEXT, $isFin = true){}
//根据socket获取socketId
function getSocketId($socket){}
//添加socket到列表中
function addSocket($socket){}
//从列表中移除socket
function removeSocket($socketId){}
//关闭服务器
function shutdown(){}
//接受Socket
function socketAccept(){}
//从socket中读数据
function socketRecv($socketId){}
//通过socket写数据
function socketSend($socketId, $data){}
//关闭底层socket
function socketClose($socketId){}
//连接socket操作
function connect($socket){}
//通过socket断开连接
function disconnectBySocket($socket){}
//通过socketId断开连接
function disconnect($socketId, $silent = false){}
//获取错误码
function getLastErrCode($socketId = null){}
//获取错误详情
function getLastErrMsg($socketId = null, $errCode = null){}

后面的文章将具体介绍每个函数的实现。