summer项目开发四:请求转发和连接池

前面更改组件后,经过一系列处理,基本完成改造,后面又优化了下代码,剔除了无用代码。
接下来就要转发请求了。

请求转发

请求转发功能在写的时候,跳了不少坑,记录下。

刚开始写的时候,只想转http请求,自然就想到建立个swoolehttpserver,然后把request和response丢过去就可以了。
等写完了,发现根本没必要,因为这层我是不需要解析http协议的,直接原包转发就ok了,所以又改成了tcpserver
大致流程就是 http请求 -> summer manage => 收到的包原样转发到 服务端 => summer server。 至于加request_id的事情,这个后面再处理

$client = new Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$res = @$client->connect('127.0.0.1', 9502)
if ($res) {
    //链接端口可能会出警告
    $ret = @$client->send($info);
    if($ret){
        //无法判断tcp 因为应用层无法获得底层TCP连接的状态,执行send或recv时应用层与内核发生交互,才能得到真实的连接可用状态
        $rs = @$client->recv();
        if ($rs) {
            $this->getClient()->put($client);
            $serv->send($fd, $rs);
            $serv->close($fd);
            return $rs;
        }
    }
    $client->close();
}
$serv->close($fd);

所以在 suframemanagecommandsTcpStartCommand.php中,直接启动了个tcp server,加了几个事件监听,方便扩展,然后就直接转发了。

为了方便切换连接后端服务的client的类型,新增了个代理层,用tcp proxy来创建连接和转发请求。代码如suframemanagecomponentsTcpProxy, 主逻辑是直接转发,然后获取后端返回结果,关闭连接。

连接池

什么是连接池

既然是转发,就要连接后端服务,由于tcp是无状态的,每次连接需要多次握手,新请求进来又重复这一步骤,所以一个长连接连接到后端服务,是个不错的选择。

长连接就是握手后,不关闭连接,客户端和服务端保持一个链接状态$fd不会被关闭,有数据了直接发过去,响应也直接发过去,加快了速度,节省了资源。 但是同样的会带来一些问题,如何管理这些长连接? 连接池就是干这么个事情的, 大致流程就是,我这里有个池子,里面有几个长连接连接到后端服务, 有请求要用,就从池子里面拿,用完了,还回池子,其他请求可以接着用。

连接池的开发思路

写这个连接池的时候,绕了下弯路。 之前网上搜了下关于swoole 连接池的文章,大多写得比较含糊。包括官网的文章也是很吝惜文笔,很让我费了些脑子,官方的 https://wiki.swoole.com/wiki/page/p-coroutine_channel.html 关于连接池的说明,

大致意思是通过协程的通道,即:CoroutineChannel来实现,底层自动实现调度和切换。 按照官方demo,我也做了第一个版本的连接池,代码和官方demo差不多,初始化一堆链接,要用了 pop取出一个,用完就push还回连接池。 代码写完了,ok,一测试,确实可以,这里注意下,用了长连接https://wiki.swoole.com/wiki/page/407.html,后,后端的server不要去close那个链接$fd。

然后ab了一下,速度不错,

ab -n 10000 -c 1000 -k http://127.0.0.1:9501/

22.jpg

提示下,这里ab的时候会有个问题,一直有Failed requests,然后都是Length
网上搜了下是由于返回结果长度问题。不管我怎么处理,他在一定并发情况下都会出现,网上都说不用管,也就放弃纠结这个问题了。

但是当我看日志的时候发现个问题,当并发足够大的时候,连接池没有连接可用的时候,会丢包,然而丢的这个包居然会串到其他请求连接上去,导致了后端服务可能接受的数据是几个包发过来的,前端接受的返回可能是几个返回一起丢过去导致协议不能解包。 这是个问啥呢? 我认为应该是连接在进入和出 连接池的问题,因为这个协程在发出数据后,代码是挂起的,等接口返回后再唤醒, 可能这个中间出现了些问题。

而且这个方式还有个问题,就是一开始就把连接初始化好,如果后端服务重启了,或者断了下线, 这个连接就是个废连接,而且无法创建新连接。

连接池的开发

烦躁的处理了几个小时,当我把连接池初始化成1的时候,发现不会串,因为只有一个连接,不管怎么样,没放回连接池其他请求是没法用的。 而且速度还不错,按照这个思路, 初始化连接池的时候,我先不去创建连接,只设置个最大连接数,意思是这个池子最多只装那么多长链接,超过的不放进去,关闭掉, 然后要用的时候再取, 取出来池子没满就放到池子里面去。 刚开始我担心并发大的时候,会不会创建很多长连接, 实践出真知,ab一下,1万请求 1千并发情况下,最多创建了5个长连接,而且我连接池最大设置的2,有3个在处理完成后自动关闭了。

代码如下:

class TcpPool {
    use Singleton;

    protected $timeout = 0.1;

    /**
     * @var Channel
     */
    protected $pool;
    protected $size;

    public function __construct($size) {
        $this->size = $size;
        $this->createPool();
    }

    public function createPool($size = null) {
        $this->creating = true;
        $size = $this->size;
        $this->pool = new Channel($size);
    }

    public function put($client) {
        if($this->pool->length() > $this->size){
            $client->close();
            return false;
        }
        $this->pool->push($client);
    }

    public function getLength() {
        return $this->pool->length();
    }

    /**
     * @return Client|null
     */
    public function get() {
//        echo "连接数" . $this->pool->length() . "\n";
        if($this->pool->length()){
            return $this->pool->pop($this->timeout);
        }
        $client = new Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
        $res = @$client->connect('127.0.0.1', 9502);
        if ($res) {
            $this->put($client);
            return $this->get();
        }
    }

}

新的问题

但是这里同样遇到个问题,当我把数据加到足够大的时候,有个问题,连接既不进入池子,也不关闭,后来跟踪了代码, 发现在入口的时候会有个小问题, 入口前会去判断连接池长度,没满入口,但是协程的问题,在高并发下,会有几率在判断池子没慢的情况下,刚准备push进池子,其他协程被唤醒,刚好push到了池子,如果池子满了,这个链接是进不去的,然后又没关闭,就导致了有个连接无法管理。

怎么半呢?做了一系列尝试,用了锁等等,效果都不理想,但是一想,其实这个速度已经足够快了,一般情况下,只要池子设置足够合理,是可以规避这个问题的。 况且还可以通过重载服务的方式,主动通知这边清空连接池。这个也就不算问题。

请求数据过滤

做这层转发很大一层的意义在于这里可以拦截所有数据,然后根据我们一些规则,处理完后,再转发给相应的server处理。比如授权,比如限流,比如请求日志和添加请求唯一id等等。

因为之前加入了事件,所以在不改动主流程的情况下,用事件监听来完成扩展再合适不过了,当然,你也可以在app/config/listener里面注册一些自己的监听器做更多的扩展

public function onReceive(\Swoole\Server $serv, $fd, $reactor_id, $data) {
    //由于这里可能会更改data,所以没有用go创建协程处理
    EventManager::get()->trigger('tcp.request', null, ['data' => &$data]);
    if(!$data){
        $serv->close($fd);
        return;
    }
    $out = $this->proxy->dispatch($serv, $fd, $reactor_id, $data);
    go(function () use ($data, $out){
        EventManager::get()->trigger('tcp.response.after', null, [
            'data' => $data,
            'out' => $out,
        ]);
    });
}

这样我们就可以在监听了tcp.request的地方进行处理:

class TcpListener implements ListenerAggregateInterface {
    use ListenerAggregateTrait;

    /**
     * 注册事件
     * @param EventManagerInterface $events
     * @param int $priority
     */
    public function attach(EventManagerInterface $events, $priority = 1) {
        $this->listeners[] = $events->attach(Events::E_TCP_REQUEST, [$this, 'request'], $priority);
    }

    /**
     * 请求事件
     * @param EventInterface $e
     */
    public function request(EventInterface $e) {
        $data = $e->getParams();
            $request = \Zend\Http\Request::fromString($data['data']);
            if($request->getUri() == '/favicon.ico'){
                $data['data'] = null;
                return;
            }
            $headers = $request->getHeaders();
            $headers->addHeaderLine('request_id', uniqid());
            $headers->addHeaderLine('uid', rand(1, 9999));
            $data['data'] = $request . '';
    }
        ...
}

这样我们在manage层就可以通过传过来的token或者其他认证方式,换取用户uid,后端server直接在header里面获取uid即可,不用每个server里再次去验证token是否有效,这在微服务化场景下,真是个福利。

request_id同样,一个请求下转发到一个server,而这个server需要调用其他服务rpc接口,带上这个id,然后在统一收集日志,就可以追踪出一个请求调用的所有服务的日志,方便追踪排查问题。
后端服务接受的数据如下:

QQ截图20190611230233.jpg

看! uid和request_id已经在后端的header里出现。 这个处理对性能影响怎么样呢?ab一下

 ab -c 5000 -n 10000 -k http://127.0.0.1:9501/

1.jpg

后续

至此请求转发和连接池大致开发到这里,后面接着会处理连接池注册的问题,需要把各个后端服务注册到连接池,通过转发规则,自动分发到不同的server.

标签: swoole连接池 swoole转发

发表评论: