手记

深入浅出 Laravel Echo (2)

看源代码,解析一次完整的 public channel 下发流程。

此图来自网上,如有侵权,通知我删除

通过上图,我们至少要知道两件事:

  1. Laravel 和我们的前端 (vue) 没有直接关联,它们通过 Socket.io Server 来做中转,这是怎么做到的呢?
  2. 怎么发送 Brocadcasted Data?

下面来一一解析。

BroadcastServiceProvider

BroadcastServiceProvider 主要包含了 Broadcast 相关的五个驱动器、Broadcast 事件、Broadcast 队列等方法,比较简单就不在解析了,今天主要说说怎么通过 redis 来驱动 Broadcast 的。

首先还是简单配置下 Broadcastconfig

// broadcasting.php
<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Default Broadcaster
    |--------------------------------------------------------------------------
    |
    | This option controls the default broadcaster that will be used by the
    | framework when an event needs to be broadcast. You may set this to
    | any of the connections defined in the "connections" array below.
    |
    | Supported: "pusher", "redis", "log", "null"
    |
    */

    'default' => env('BROADCAST_DRIVER', 'null'),

    /*
    |--------------------------------------------------------------------------
    | Broadcast Connections
    |--------------------------------------------------------------------------
    |
    | Here you may define all of the broadcast connections that will be used
    | to broadcast events to other systems or over websockets. Samples of
    | each available type of connection are provided inside this array.
    |
    */

    'connections' => [

        'pusher' => [
            'driver' => 'pusher',
            'key' => env('PUSHER_APP_KEY'),
            'secret' => env('PUSHER_APP_SECRET'),
            'app_id' => env('PUSHER_APP_ID'),
            'options' => [
                //
            ],
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
        ],

        'log' => [
            'driver' => 'log',
        ],

        'null' => [
            'driver' => 'null',
        ],

    ],

];

// .env
BROADCAST_DRIVER=redis
REDIS_HOST=redis
REDIS_PASSWORD=null
REDIS_PORT=6379

之前了解过 Laravel 的 ServiceProvider 的工作原理,所以我们就不用赘述太多这方面的流程了,我们主要看看 BroadcastServiceProvider 的注册方法:

public function register()
{
    $this->app->singleton(BroadcastManager::class, function ($app) {
        return new BroadcastManager($app);
    });

    $this->app->singleton(BroadcasterContract::class, function ($app) {
        return $app->make(BroadcastManager::class)->connection();
    });

    $this->app->alias(
        BroadcastManager::class, BroadcastingFactory::class
    );
}

我们写一个发送 Broadcast demo:

// routes/console.php
Artisan::command('public_echo', function () {
    event(new RssPublicEvent());
})->describe('echo demo');

// app/Events/RssPublicEvent.php
<?php

namespace App\Events;

use Carbon\Carbon;
use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class RssPublicEvent implements ShouldBroadcast
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return \Illuminate\Broadcasting\Channel|array
     */
    public function broadcastOn()
    {
        return new Channel('public_channel');
    }

    /**
     * 指定广播数据。
     *
     * @return array
     */
    public function broadcastWith()
    {
        // 返回当前时间
        return ['name' => 'public_channel_'.Carbon::now()->toDateTimeString()];
    }
}

有了这下发 Event,我们看看它是怎么执行的,主要看 BroadcastEventhandle 方法:

public function handle(Broadcaster $broadcaster)
{
    // 主要看,有没有自定义该 Event 名称,没有的话,直接使用类名
    $name = method_exists($this->event, 'broadcastAs')
            ? $this->event->broadcastAs() : get_class($this->event);

    $broadcaster->broadcast(
        Arr::wrap($this->event->broadcastOn()), $name,
        $this->getPayloadFromEvent($this->event)
    );
}

先看怎么获取参数的 $this->getPayloadFromEvent($this->event)

protected function getPayloadFromEvent($event)
{
    if (method_exists($event, 'broadcastWith')) {
        return array_merge(
            $event->broadcastWith(), ['socket' => data_get($event, 'socket')]
        );
    }

    $payload = [];

    foreach ((new ReflectionClass($event))->getProperties(ReflectionProperty::IS_PUBLIC) as $property) {
        $payload[$property->getName()] = $this->formatProperty($property->getValue($event));
    }

    unset($payload['broadcastQueue']);

    return $payload;
}

主要传入我们自定义的数组,见函数 $event->broadcastWith()、[‘socket’ => data_get($event, ‘socket’)] 和 Event 中定义的所有 public 属性。

最后就是执行方法了:

$broadcaster->broadcast(
    Arr::wrap($this->event->broadcastOn()), $name,
    $this->getPayloadFromEvent($this->event)
);

看上面的例子,$this->event->broadcastOn() 对应的是:

return new Channel('public_channel');

好了,该是看看接口 Broadcaster 了。

<?php

namespace Illuminate\Contracts\Broadcasting;

interface Broadcaster
{
    /**
     * Authenticate the incoming request for a given channel.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return mixed
     */
    public function auth($request);

    /**
     * Return the valid authentication response.
     *
     * @param  \Illuminate\Http\Request  $request
     * @param  mixed  $result
     * @return mixed
     */
    public function validAuthenticationResponse($request, $result);

    /**
     * Broadcast the given event.
     *
     * @param  array  $channels
     * @param  string  $event
     * @param  array  $payload
     * @return void
     */
    public function broadcast(array $channels, $event, array $payload = []);
}

这里主要提供三个函数,我们暂时看目前最关心的 broadcast(),通过「PhpStorm」IDE,我们也能看出,继承这个接口的,主要就是平台 config 配置提供的几个驱动器:

我们开始往下走,看 redis 驱动器:

public function broadcast(array $channels, $event, array $payload = [])
{
    $connection = $this->redis->connection($this->connection);

    $payload = json_encode([
        'event' => $event,
        'data' => $payload,
        'socket' => Arr::pull($payload, 'socket'),
    ]);

    foreach ($this->formatChannels($channels) as $channel) {
        $connection->publish($channel, $payload);
    }
}

这就简单的,无非就是创建 redis 连接,然后将数据 (包含 eventdatasocket 构成的数组),利用 redis publish 出去,等着 laravel-echo-server 监听接收!

注:redis 有发布 (publish),就会有订阅,如:Psubscribe

好了,我们开始研究 laravel-echo-server,看它怎么订阅的。

laravel-echo-server

在 Laravel 项目没有专门提供该 Server,很多项目都是使用 tlaverdure/laravel-echo-server (https://github.com/tlaverdure/laravel-echo-server),其中我们的偶像 Laradock 也集成了该工具。

所以我们就拿 Laradock 配置来说一说。

.
|____Dockerfile
|____laravel-echo-server.json
|____package.json

主要包含三个文件,一个 Dockerfile 文件,用来创建容器;package.json 主要是安装 tlaverdure/laravel-echo-server 插件;laravel-echo-server.json 文件就是与 Laravel 交互的配置文件。

看看 Dockfile 内容:

FROM node:alpine

RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories

# Create app directory
RUN mkdir -p /usr/src/app
WORKDIR /usr/src/app

# Install app dependencies
COPY package.json /usr/src/app/

RUN apk add --update \
    python \
    python-dev \
    py-pip \
    build-base

RUN npm install

# Bundle app source
COPY laravel-echo-server.json /usr/src/app/laravel-echo-server.json

EXPOSE 3000
CMD [ "npm", "start" ]

主要是以 node:alpine 为底,将项目部署在路径 /usr/src/app/,执行命令 npm install 安装插件,参考文件 package.json:

{
  "name": "laravel-echo-server-docker",
  "description": "Docker container for running laravel-echo-server",
  "version": "0.0.1",
  "license": "MIT",
  "dependencies": {
    "laravel-echo-server": "^1.3.9"
  },
  "scripts": {
    "start": "laravel-echo-server start"
  }
}

然后,在将配置文件加载进该路径下,最后执行 npm start,也就是执行命令 laravel-echo-server start,并且放出 3000 端口。

我们通过启动容器,然后进入容器看看文件结构:

执行 docker-compose up laravel-echo-server 后就可以看到 server 启动:

同样的,我们也可以下载它的源代码,来运行达到效果。

tlaverdure/laravel-echo-server

Laravel Echo Node JS Server for Socket.io

下载源代码:

git clone https://github.com/tlaverdure/laravel-echo-server.git

进入项目安装插件:

npm install

执行后,直接生成 dist 文件夹:

.
|____api
| |____http-api.js
| |____index.js
|____channels
| |____channel.js
| |____index.js
| |____presence-channel.js
| |____private-channel.js
|____cli
| |____cli.js
| |____index.js
|____database
| |____database-driver.js
| |____database.js
| |____index.js
| |____redis.js
| |____sqlite.js
|____echo-server.js
|____index.js
|____log.js
|____server.js
|____subscribers
| |____http-subscriber.js
| |____index.js
| |____redis-subscriber.js
| |____subscriber.js

通过提供的 example 可以知道执行的入口在于 EchoServerrun 方法,简单修改下 options 配置:

var echo = require('../dist/index.js');

var options = {
  "authHost": "http://lrss.learning.test",
  "authEndpoint": "/broadcasting/auth",
  "clients": [],
  "database": "redis",
  "databaseConfig": {
    "redis": {
      "port": "63794",
      "host": "0.0.0.0"
    }
  },
  "devMode": true,
  "host": null,
  "port": "6001",
  "protocol": "http",
  "socketio": {},
  "sslCertPath": "",
  "sslKeyPath": ""
};

echo.run(options);

测试一下看看,是否和 Laravel 服务连接到位:

Laravel-echo-server 打印结果:

说明连接上了。

刚才的 dist 文件夹是通过 TypeScript 生成的结果,当然,我们需要通过它的源代码来解读:

.
|____api
| |____http-api.ts
| |____index.ts
|____channels
| |____channel.ts
| |____index.ts
| |____presence-channel.ts
| |____private-channel.ts
|____cli
| |____cli.ts
| |____index.ts
|____database
| |____database-driver.ts
| |____database.ts
| |____index.ts
| |____redis.ts
| |____sqlite.ts
|____echo-server.ts
|____index.ts
|____log.ts
|____server.ts
|____subscribers
| |____http-subscriber.ts
| |____index.ts
| |____redis-subscriber.ts
| |____subscriber.ts

主要包含:接口 (api)、频道 (channels)、 数据库 (database)、订阅 (subscribers) 等,我们会一个个来说的。

我们先看 echo-server.tslisten 函数:

/**
 * Listen for incoming event from subscibers.
 *
 * @return {void}
 */
listen(): Promise<any> {
    return new Promise((resolve, reject) => {
        let http = this.httpSub.subscribe((channel, message) => {
            return this.broadcast(channel, message);
        });

        let redis = this.redisSub.subscribe((channel, message) => {
            return this.broadcast(channel, message);
        });

        Promise.all([http, redis]).then(() => resolve());
    });
}

我们主要看 this.redisSub.subscribe() 无非就是通过 redis 订阅,然后再把 channelmessage 广播出去,好了,我们看看怎么做到订阅的,看 redis-subscribersubscribe() 函数:

/**
 * Subscribe to events to broadcast.
 *
 * @return {Promise<any>}
 */
subscribe(callback): Promise<any> {

    return new Promise((resolve, reject) => {
        this._redis.on('pmessage', (subscribed, channel, message) => {
            try {
                message = JSON.parse(message);

                if (this.options.devMode) {
                    Log.info("Channel: " + channel);
                    Log.info("Event: " + message.event);
                }

                callback(channel, message);
            } catch (e) {
                if (this.options.devMode) {
                    Log.info("No JSON message");
                }
            }
        });

        this._redis.psubscribe('*', (err, count) => {
            if (err) {
                reject('Redis could not subscribe.')
            }

            Log.success('Listening for redis events...');

            resolve();
        });
    });
}

这里我们就可以看到之前提到的 redis 订阅函数了:

this._redis.psubscribe('*', (err, count) => {
    if (err) {
        reject('Redis could not subscribe.')
    }

    Log.success('Listening for redis events...');

    resolve();
});

好了,只要获取信息,就可以广播出去了:

this._redis.on('pmessage', (subscribed, channel, message) => {
    try {
        message = JSON.parse(message);

        if (this.options.devMode) {
            Log.info("Channel: " + channel);
            Log.info("Event: " + message.event);
        }

        // callback(channel, message);
        // return this.broadcast(channel, message);
        if (message.socket && this.find(message.socket)) {
            this.server.io.sockets.connected[message.socket](channel)
            .emit(message.event, channel, message.data);

            return true
        } else {
            this.server.io.to(channel)
            .emit(message.event, channel, message.data);

            return true
        }
    } catch (e) {
        if (this.options.devMode) {
            Log.info("No JSON message");
        }
    }
});

到此,我们已经知道 Laravel 是怎么和 Laravel-echo-server 利用 redis 订阅和发布消息的。同时,也知道是用 socket.io 和前端 emit/on 交互的。

下面我们看看前端是怎么接收消息的。

laravel-echo

前端需要安装两个插件:laravel-echosocket.io-client,除了做配置外,监听一个公开的 channel,写法还是比较简单的:

window.Echo.channel('public_channel')
.listen('RssPublicEvent', (e) => {
    that.names.push(e.name)
});

达到的效果就是,只要接收到服务器发出的在公开频道 public_channel 的事件 RssPublicEvent,就会把消息内容显示出来:

我们开始看看这个 Laravel-echo 源代码了:

先看配置信息:

window.Echo = new Echo({
    broadcaster: 'socket.io',
    host: window.location.hostname + ':6001',
    auth:
        {
            headers:
                {
                'authorization': 'Bearer ' + store.getters.token
                }
        }
});

配置的 broadcaster 是: socket.io,所有用的是:

// echo.ts
constructor(options: any) {
    this.options = options;

    if (typeof Vue === 'function' && Vue.http) {
        this.registerVueRequestInterceptor();
    }

    if (typeof axios === 'function') {
        this.registerAxiosRequestInterceptor();
    }

    if (typeof jQuery === 'function') {
        this.registerjQueryAjaxSetup();
    }

    if (this.options.broadcaster == 'pusher') {
        this.connector = new PusherConnector(this.options);
    } else if (this.options.broadcaster == 'socket.io') {
        this.connector = new SocketIoConnector(this.options);
    } else if (this.options.broadcaster == 'null') {
        this.connector = new NullConnector(this.options);
    }
}

接着看 channel 函数:

// echo.ts
channel(channel: string): Channel {
    return this.connector.channel(channel);
}

// socketio-connector.ts
channel(name: string): SocketIoChannel {
    if (!this.channels[name]) {
        this.channels[name] = new SocketIoChannel(
            this.socket,
            name,
            this.options
        );
    }

    return this.channels[name];
}

主要是创建 SocketIoChannel,我们看看怎么做 listen

// socketio-connector.ts
listen(event: string, callback: Function): SocketIoChannel {
    this.on(this.eventFormatter.format(event), callback);

    return this;
}

继续看 on()

on(event: string, callback: Function): void {
    let listener = (channel, data) => {
        if (this.name == channel) {
            callback(data);
        }
    };

    this.socket.on(event, listener);
    this.bind(event, listener);
}

到这就比较清晰了,只用利用 this.socket.on(event, listener);

注:更多有关 socketio/socket.io-client,可以看官网:https://github.com/socketio/socket.io-client

总结

到目前为止,通过解读这几个插件和源代码,我们基本跑通了一个 public channel 流程。

这过程主要参考:

下一步主要看看怎么解析一个 private channel

未完待续

1人推荐
随时随地看视频
慕课网APP

热门评论

老师、为什么我广播出去的数据可以写入日志、写入不了redis 、驱动也改、烦恼了很久了。

查看全部评论