前面更改组件后,经过一系列处理,基本完成改造,后面又优化了下代码,剔除了无用代码。
接下来就要转发请求了。
请求转发
请求转发功能在写的时候,跳了不少坑,记录下。
刚开始写的时候,只想转http请求,自然就想到建立个swoolehttpserver,然后把request和response丢过去就可以了。
等写完了,发现根本没必要,因为这层我是不需要解析http协议的,直接原包转发就ok了,所以又改成了tcpserver
大致流程就是 http请求 -> summer manage => 收到的包原样转发到 服务端 => summer server。 至于加request_id的事情,这个后面再处理
$client = new Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$res = @$client->connect('127.0.0.1', 9502)
if ($res) {
//链接端口可能会出警告
$ret = @$client->send($info);
if($ret){
//无法判断tcp 因为应用层无法获得底层TCP连接的状态,执行send或recv时应用层与内核发生交互,才能得到真实的连接可用状态
$rs = @$client->recv();
if ($rs) {
$this->getClient()->put($client);
$serv->send($fd, $rs);
$serv->close($fd);
return $rs;
}
}
$client->close();
}
$serv->close($fd);
所以在 suframemanagecommandsTcpStartCommand.php中,直接启动了个tcp server,加了几个事件监听,方便扩展,然后就直接转发了。
为了方便切换连接后端服务的client的类型,新增了个代理层,用tcp proxy来创建连接和转发请求。代码如suframemanagecomponentsTcpProxy, 主逻辑是直接转发,然后获取后端返回结果,关闭连接。
连接池
什么是连接池
既然是转发,就要连接后端服务,由于tcp是无状态的,每次连接需要多次握手,新请求进来又重复这一步骤,所以一个长连接连接到后端服务,是个不错的选择。
长连接就是握手后,不关闭连接,客户端和服务端保持一个链接状态$fd不会被关闭,有数据了直接发过去,响应也直接发过去,加快了速度,节省了资源。 但是同样的会带来一些问题,如何管理这些长连接? 连接池就是干这么个事情的, 大致流程就是,我这里有个池子,里面有几个长连接连接到后端服务, 有请求要用,就从池子里面拿,用完了,还回池子,其他请求可以接着用。
连接池的开发思路
写这个连接池的时候,绕了下弯路。 之前网上搜了下关于swoole 连接池的文章,大多写得比较含糊。包括官网的文章也是很吝惜文笔,很让我费了些脑子,官方的 https://wiki.swoole.com/wiki/page/p-coroutine_channel.html 关于连接池的说明,
大致意思是通过协程的通道,即:CoroutineChannel来实现,底层自动实现调度和切换。 按照官方demo,我也做了第一个版本的连接池,代码和官方demo差不多,初始化一堆链接,要用了 pop取出一个,用完就push还回连接池。 代码写完了,ok,一测试,确实可以,这里注意下,用了长连接https://wiki.swoole.com/wiki/page/407.html,后,后端的server不要去close那个链接$fd。
然后ab了一下,速度不错,
ab -n 10000 -c 1000 -k http://127.0.0.1:9501/
提示下,这里ab的时候会有个问题,一直有Failed requests,然后都是Length
网上搜了下是由于返回结果长度问题。不管我怎么处理,他在一定并发情况下都会出现,网上都说不用管,也就放弃纠结这个问题了。
但是当我看日志的时候发现个问题,当并发足够大的时候,连接池没有连接可用的时候,会丢包,然而丢的这个包居然会串到其他请求连接上去,导致了后端服务可能接受的数据是几个包发过来的,前端接受的返回可能是几个返回一起丢过去导致协议不能解包。 这是个问啥呢? 我认为应该是连接在进入和出 连接池的问题,因为这个协程在发出数据后,代码是挂起的,等接口返回后再唤醒, 可能这个中间出现了些问题。
而且这个方式还有个问题,就是一开始就把连接初始化好,如果后端服务重启了,或者断了下线, 这个连接就是个废连接,而且无法创建新连接。
连接池的开发
烦躁的处理了几个小时,当我把连接池初始化成1的时候,发现不会串,因为只有一个连接,不管怎么样,没放回连接池其他请求是没法用的。 而且速度还不错,按照这个思路, 初始化连接池的时候,我先不去创建连接,只设置个最大连接数,意思是这个池子最多只装那么多长链接,超过的不放进去,关闭掉, 然后要用的时候再取, 取出来池子没满就放到池子里面去。 刚开始我担心并发大的时候,会不会创建很多长连接, 实践出真知,ab一下,1万请求 1千并发情况下,最多创建了5个长连接,而且我连接池最大设置的2,有3个在处理完成后自动关闭了。
代码如下:
class TcpPool {
use Singleton;
protected $timeout = 0.1;
/**
* @var Channel
*/
protected $pool;
protected $size;
public function __construct($size) {
$this->size = $size;
$this->createPool();
}
public function createPool($size = null) {
$this->creating = true;
$size = $this->size;
$this->pool = new Channel($size);
}
public function put($client) {
if($this->pool->length() > $this->size){
$client->close();
return false;
}
$this->pool->push($client);
}
public function getLength() {
return $this->pool->length();
}
/**
* @return Client|null
*/
public function get() {
// echo "连接数" . $this->pool->length() . "\n";
if($this->pool->length()){
return $this->pool->pop($this->timeout);
}
$client = new Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$res = @$client->connect('127.0.0.1', 9502);
if ($res) {
$this->put($client);
return $this->get();
}
}
}
新的问题
但是这里同样遇到个问题,当我把数据加到足够大的时候,有个问题,连接既不进入池子,也不关闭,后来跟踪了代码, 发现在入口的时候会有个小问题, 入口前会去判断连接池长度,没满入口,但是协程的问题,在高并发下,会有几率在判断池子没慢的情况下,刚准备push进池子,其他协程被唤醒,刚好push到了池子,如果池子满了,这个链接是进不去的,然后又没关闭,就导致了有个连接无法管理。
怎么半呢?做了一系列尝试,用了锁等等,效果都不理想,但是一想,其实这个速度已经足够快了,一般情况下,只要池子设置足够合理,是可以规避这个问题的。 况且还可以通过重载服务的方式,主动通知这边清空连接池。这个也就不算问题。
请求数据过滤
做这层转发很大一层的意义在于这里可以拦截所有数据,然后根据我们一些规则,处理完后,再转发给相应的server处理。比如授权,比如限流,比如请求日志和添加请求唯一id等等。
因为之前加入了事件,所以在不改动主流程的情况下,用事件监听来完成扩展再合适不过了,当然,你也可以在app/config/listener里面注册一些自己的监听器做更多的扩展
public function onReceive(\Swoole\Server $serv, $fd, $reactor_id, $data) {
//由于这里可能会更改data,所以没有用go创建协程处理
EventManager::get()->trigger('tcp.request', null, ['data' => &$data]);
if(!$data){
$serv->close($fd);
return;
}
$out = $this->proxy->dispatch($serv, $fd, $reactor_id, $data);
go(function () use ($data, $out){
EventManager::get()->trigger('tcp.response.after', null, [
'data' => $data,
'out' => $out,
]);
});
}
这样我们就可以在监听了tcp.request的地方进行处理:
class TcpListener implements ListenerAggregateInterface {
use ListenerAggregateTrait;
/**
* 注册事件
* @param EventManagerInterface $events
* @param int $priority
*/
public function attach(EventManagerInterface $events, $priority = 1) {
$this->listeners[] = $events->attach(Events::E_TCP_REQUEST, [$this, 'request'], $priority);
}
/**
* 请求事件
* @param EventInterface $e
*/
public function request(EventInterface $e) {
$data = $e->getParams();
$request = \Zend\Http\Request::fromString($data['data']);
if($request->getUri() == '/favicon.ico'){
$data['data'] = null;
return;
}
$headers = $request->getHeaders();
$headers->addHeaderLine('request_id', uniqid());
$headers->addHeaderLine('uid', rand(1, 9999));
$data['data'] = $request . '';
}
...
}
这样我们在manage层就可以通过传过来的token或者其他认证方式,换取用户uid,后端server直接在header里面获取uid即可,不用每个server里再次去验证token是否有效,这在微服务化场景下,真是个福利。
request_id同样,一个请求下转发到一个server,而这个server需要调用其他服务rpc接口,带上这个id,然后在统一收集日志,就可以追踪出一个请求调用的所有服务的日志,方便追踪排查问题。
后端服务接受的数据如下:
看! uid和request_id已经在后端的header里出现。 这个处理对性能影响怎么样呢?ab一下
ab -c 5000 -n 10000 -k http://127.0.0.1:9501/
后续
至此请求转发和连接池大致开发到这里,后面接着会处理连接池注册的问题,需要把各个后端服务注册到连接池,通过转发规则,自动分发到不同的server.