基于 EventEmitter 的双向数据 pipeline 实现
想法来源
Gulp
Gulp 是前端工具链中常用的流式任务执行器,适用于许多小型库的编译打包任务。它的设计思想其实很像 Linux 命令行里面的 Pipe(管道):
1 | gulp.src(paths.scripts.src, { sourcemaps: true }) |
gulp 是单向的,即对于同一个 pipeline,数据一般不能被逆向还原。
TCP/IP stack
我们知道,计算机网络协议是分层设计的,每层分别为数据赋予不同的含义、完成不同的使命。源主机采用网络协议栈将原始二进制流 层层编码(encode) 后送往目的主机,目的主机采用同样的协议栈将数据 层层解码(decode) 后得到原始数据。典型的 HTTP 协议将请求数据通过 TCP/IP 协议栈自上而下编码后送出,之后自下而上解码后得到响应数据:
1 | +---------+ |
TCP/IP 协议栈是双向的,即对于同一套协议,数据既可以被编码也可以被解码。
那么问题来了,是否可以抽象一种轻量的 Pipeline,实现类似网络协议栈双向数据流的处理能力,并且能够让用户定制化每层的处理逻辑?
数据流
设计之前,先根据数据流划分功能模块,这里 PIPE
是数据和各个数据处理单元的调度者,PIPE_UNIT_x
是每层数据的处理单元,可以有多个,并且按顺序前后串联。
1 | +------------------ PIPE -------------------+ |
用户可以实现自己的 PIPE_UNIT
来达到定制化处理逻辑的功能,也可以任意调换 PIPE_UNIT
的顺序来达到不同的处理效果。
Pipe 设计
Pipe
需要提供一个数据入口来启动链式处理流程:
1 | const EventEmitter = require('events'); |
PipeUnit 接口设计
PipeUnit
需要暴露编码(encode)和解码(decode)两个接口,考虑到处理单元可能异步执行,因此使用 async
黑膜法:
1 | class PipeUnit extends EventEmitter { |
实现 PipeUnit
首先实现一个提供压缩、解压缩功能的 PipeUnit
:
1 | const zlib = require('zlib'); |
下面再实现一个提供 AES
对称加解密功能的 PipeUnit
,这次采用同步执行:
1 | const crypto = require('crypto'); |
实际运行
1 | // 自由组合处理单元 |
可以看到,通过对 EventEmitter 简单的封装就可以实现双向数据 pipeline,同时支持异步单元操作。
性能测试
功能实现了,性能又如何呢?抛开 PipeUnit
的业务实现,简单分析一下链式 EventEmitter 结构的性能影响因素,理论上很大程度取决于 EventEmitter 本身的性能,Pipe::feed
只在第一次被调用时构建响应链,之后的调用几乎不会有性能损失。
测试用例
Node.js 版本如下:
1 | > process.versions |
下面分别考察 0 ~ 30000(每次递增 1000) 个 PipeUnit
实例的执行时间,来评估上述设计的性能表现:
1 | const { performance } = require('perf_hooks'); |
执行4次,可以将结果绘制到一张图中:
可以看到每次运行的结果高度一致,由上万个 PipeUnit
构成的链式 EventEmitter 能够以令人满意的效率完成运行。
不过出人意料的是,在特定数量的 PipeUnit
上总会出现尖峰,这可能和 V8 引擎的优化机制有关,作者能力有限,感兴趣的同学可以深挖原因。