伴随10.5.0的发布,Node.js 新增了对多线程的实验性支持(worker_threads模块)。
为什么需要多线程?
Node.js由于JS的执行在单一线程,导致CPU密集计算的任务可能会使主线程会处于繁忙的状态,进而影响服务的性能,虽然可以通过child_process模块创建子进程的方式来解决,但是一方面进程之间无法共享内存,另一方面创建进程的开销也不小。所以在10.5.0版本中Node.js提供了worker_threads模块来支持多线程,一直以来被人所诟病的不擅长CPU密集计算有望成为历史。
如何启用多线程?
多线程目前仍然处于实验阶段,所以启动时需要增加--experimental-worker
flag才能生效。
如何创建多线程?
worker_threads
模块中比较重要的几个类:
MessageChannel: 用于创建异步、双向通信的通道实例。MessageChannel实例包含两个属性port1和port2,这两个属性都是MessagePort的实例。
MessagePort: 用于表示MessageChannel通道的终端,用于Worker之间传输结构化数据、内存区域和其他的MessagePort。MessagePort继承了EventEmitter,因此可以使用postMessage和on方法实现消息的传递与接收。
Worker: 用于创建单独的JS线程。
worker_threads
模块中比较重要的几个属性:
parentPort: 子线程中的parentPort指向可以与主线程进行通信的MessagePort。
子线程向父线程发送消息
parentPort.postMessage(...)
子线程接受来自父线程的消息
parentPort.on('message', (msg) => ...)
isMainThread: 用于区分当前文件是否在主线程中执行
workerData: 用于传递给Worker构造函数的data副本,在子线程中可以通过workerData获取到父进程传入的数据。
了解常用类与属性之后再来看一下代码示例
const { Worker, parentPort, isMainThread } = require('worker_threads'); if (isMainThread) { const w = new Worker(__filename, { workerData: { name: 'Randal' } }); w.postMessage(1e10); const startTime = Date.now(); w.on('message', function(msg) { console.log('main thread get message: ' + msg); console.log('compute time ellapsed: ' + (Date.now() - startTime) / 1000); }); console.log('main thread executing'); } else { const longComputation = (val) => { let sum = 0; for (let i = 0; i < val; i++) { sum += i; }; return sum; }; parentPort.on('message', (msg) => { console.log(`${workerData.name} worker get message: ` + msg); parentPort.postMessage(longComputation(msg)); }); } // 执行结果 main thread executing Randal worker get message: 10000000000 main thread get message: 49999999990067860000 compute time ellapsed: 14.954
线程间如何传输数据?
port.postMessag(value[, transferList])
除了value之外,postMessage方法还支持传入transferList参数,transferList是一个List,支持的数据类型包括ArrayBuffer和MessagePort对象,transferList中的对象在传输完成后,在发送对象的线程中就不可以继续使用了。
const { Worker, isMainThread, parentPort } = require('worker_threads'); // 主线程 if (isMainThread) { const sab = new ArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 100); const ia = new Int32Array(sab); for (let i = 0; i < ia.length; i++) { ia[i] = i; } console.log("this is the main thread"); for (let i = 0; i < 1; i++) { let w = new Worker(__filename); console.log('before transfer: ', sab); w.postMessage(null, [ sab ]); setTimeout(() => { console.log('after transfer: ', sab); }, 1000); } } else { console.log("this isn't main thread"); } // 输出结果 this is the main thread before transfer: ArrayBuffer { byteLength: 400 } this isn't main thread after transfer: ArrayBuffer { byteLength: 0 }
如果ArrayBuffer是通过value传输的(且在transferList中不存在),则传输过去的是副本,如下所示:
w.postMessage(sab); // 输出结果 this is the main thread before transfer: ArrayBuffer { byteLength: 400 } this isn't main thread after transfer: ArrayBuffer { byteLength: 400 }
线程间如何共享内存?
轮到SharedArrayBuffer出场了,如果postMessage中的value是SharedArrayBuffer的话,则线程之间就可以共享内存,如下面例子所示:
const { Worker, isMainThread, parentPort } = require('worker_threads'); // 主线程 if (isMainThread) { const sab = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * 5); const ia = new Int32Array(sab); for (let i = 0; i < ia.length; i++) { ia[i] = i; } for (let i = 0; i < 2; i++) { let w = new Worker(__filename); w.postMessage(sab); w.on('message', () => { console.log(ia); }); } } else { parentPort.on('message', (msg) => { const ia = new Int32Array(msg, 0, 1); ia[0] = ia[0] + 1; parentPort.postMessage('done'); }); } // 输出结果 Int32Array [ 1, 1, 2, 3, 4 ] Int32Array [ 2, 1, 2, 3, 4 ]