Giter Site home page Giter Site logo

theanarkh / nodejs-threadpool Goto Github PK

View Code? Open in Web Editor NEW
72.0 72.0 22.0 37 KB

基于nodejs worker_threads的线程池。耗时操作或nodejs没有提供异步模式的api(例如解密、同步的文件api)都可以在线程池中执行,业务代码只需要返回一个Promise或async函数给线程池库,至于业务逻辑做什么操作,其实都可以,比如setTimeout,异步操作,async await等

License: MIT License

JavaScript 100.00%

nodejs-threadpool's Introduction

nodejs-threadpool

基于nodejs worker_threads的线程池。耗时操作或nodejs没有提供异步模式的api(例如解密、同步的文件api)都可以在线程池中执行,业务代码只需要返回一个Promise或async函数给线程池库,至于业务逻辑做什么操作,其实都可以,比如setTimeout,异步操作,async await等(设计文档https://zhuanlan.zhihu.com/p/266656697)。

支持文件和字符串模式,需要导出一个函数。

1 提供的线程池类型

// 同步处理任务
class ThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, sync: true});
    }
}
// cpu型的线程池,线程数和cpu核数一样,不支持动态扩容
class CPUThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: cores, expansion: false});
    }
}
// 只有一个线程的线程池,不支持动态扩容
class SingleThreadPool extends ThreadPool {
    constructor(options) {
        super({...options, coreThreads: 1, expansion: false });
    }
}
// 固定线程数的线程池,不支持动态扩容线程数
class FixedThreadPool extends ThreadPool {
    constructor(options) {
        super({ ...options, expansion: false });
    }
}

const defaultThreadPool = new ThreadPool();
const defaultCpuThreadPool = new CPUThreadPool();
const defaultFixedThreadPool = new FixedThreadPool();
const defaultSingleThreadPool = new SingleThreadPool();
module.exports = {
    ThreadPool,
    CPUThreadPool,
    FixedThreadPool,
    SingleThreadPool,
    defaultThreadPool, 
    defaultCpuThreadPool,
    defaultFixedThreadPool,
    defaultSingleThreadPool,
}

2 用户可以自定义线程池类型和参数

1 coreThreads:核心线程数,默认102 maxThreads:最大线程数,默认50,只在支持动态扩容的情况下,该参数有效,否则该参数等于核心线程数
3 timeout:任务执行的超时时间,全局配置,可针对单个任务设置
4 discardPolicy:任务超过阈值时的处理策略,策略如下
 	// 报错
    ABORT: 1,
    // 在主线程里执行
    CALLER_RUN: 2,
    // 丢弃最老的的任务
    DISCARD_OLDEST: 3,
    // 丢弃
    DISCARD: 4,
    // 不丢弃
    NOT_DISCARD: 5,
5 preCreate:是否预创建线程池
6 maxIdleTime:线程空闲多久后自动退出
7 maxWork:线程池最大任务数 
8 expansion:是否支持动态扩容线程,阈值是最大线程数

3 线程池给用户侧返回的是UserWork类的对象 支持的api

设置任务的超时时间
setTimeout
// 取消之前的超时时间设置
clearTimeout
// 取消任务的执行
cancel

UserWork类继承EventEmitter 支持的事件有

// 任务超时
timeout
// 任务执行完成,执行结果由用户的业务代码决定,在回调里可以拿到
done
// 任务执行出错,具体原因在回调里可以拿到
error
// 任务过载,当前任务被取消

4 使用 例子1 index.js

const { defaultThreadPool } = require('nodejs-thread-pool').threadPool;
const path = require('path');
async function test() {
	const worker = await defaultThreadPool.submit(path.resolve(__dirname, 'event.js'));
	worker.on('done', function() {
        console.log(...arguments)
    })

    worker.on('error', function() {
        console.log(...arguments)
    })
}
test()

event.js

module.exports = async function() {
    return await new Promise((resolve, reject) => {
        setTimeout(() => {
            resolve({type: 'async event'});
            console.log(1)
        },3000)
    })
} 

例子2

const { defaultThreadPool } = require('nodejs-thread-pool').threadPool;
const path = require('path');
async function test() {
    const work1 = await defaultThreadPool.submit('async function({a, b}) { return a + b; }', {a: 1, b: 1});
    work1.on('done',  function() {
        console.log(...arguments);
    })
    const work = await defaultThreadPool.submit(`async function(params) { return await new Promise((resolve) => {console.log(params); setTimeout(() => {resolve(1)}, 3000)})  }`, {name: 22}); 
    work.on('done', function() {
        console.log(...arguments);
    });
}

test()

nodejs-threadpool's People

Contributors

theanarkh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

nodejs-threadpool's Issues

用了半天总算调试好了,目前用起来还不错,作者可能比较懒,没有写清楚用法,例子中还有一点错误,我完善一些用法吧。

安装模块:
npm install -S nodejs-threadpool

使用:
模块中主要对象threadPool一共开放了8个接口,前4个分别是:
ThreadPool 基础类,可以自由配置线程数;
CPUThreadPool类 以CPU核心数为线程数;
SingleThreadPool类 单线程;
FixedThreadPool类 没研究;
后面4个default开头的是前4个类的默认配置对象。

基本上常用的就是ThreadPool基础类和CPUThreadPool类。
除了threadPool对象外还有2个config和constants,分别是常量和类型,不用关心。

例子:
------创建线程的js代码-----------------------------------------------------------
const threadPool=require('nodejs-threadpool').threadPool; //引入模块,作者的例子中require('nodejs-thread-pool')是错的
//自定义配置
const options={
coreThreads:10, // 核心线程数
expansion:false, //是否支持动态线程,如果false最大线程数为coreThreads,最大线程数默认50,在config.js中MAX_THREADS修改
discardPolicy:5, //超过任务队列长度时的处理策略,constants.js中DISCARD_POLICY结构,默认不丢弃
preCreate:false, //是否预创建子线程
maxIdleTime:10 * 60 * 1000, //线程最大空闲时间,达到后自动退出,毫秒。
maxWork:Infinity, //最大任务数
// timeout: //任务超时时间
}
//创建一个多线程对象
const wokerObj=new threadPool.ThreadPool(options);
//如果是线程执行js文件需要path模块
const path = require('path');
//submit方法提交任务到线程池,因为是异步操作所以需要用await等待返回一个线程对象,为了定义后面的on事件,
//第一个参数可以是js文件或者是js代码,如果是js文件必须是绝对路径,如果是js代码必须是一个函数,
//第二个参数是传递给线程的参数,因为只能传一个值,所以最方便的就是把所有参数装进一个对象
const worker =await wokerObj.submit(path.resolve(__dirname, './src/makeDisp.js'),{op1:123,op2:'ab'});
worker.on('done', function() {
console.log(...arguments); //打印了被执行的任务rerurn回来的值。
});
worker.on('error', function() {
console.log(...arguments); //如果出错,打印错误信息
});
...其他代码
-----创建线程js结束----------------------------------------------------------------
-----makeDisp.js内容---------------------------------------------------------------
module.exports = function (op) {
const {op1,op2}=op; //解构参数
...其他代码
return op1+op2; //这个会回传到创建线程
}
-----makeDisp.js结束----------------------------------------------------------------

引入脚本希望能加入对es6方式的支持

worker.js 中的代码
if (!isFunction(aFunction)) {
throw new Error('work type error: js file or string');
}
work.data = await aFunction(options);
parentPort.postMessage({event: 'done', work});

我目前改成这样在用的
if (isFunction(aFunction)) {
work.data = await aFunction(options);
parentPort.postMessage({ event: 'done', work });
return;
}
if (isFunction(aFunction.default)) {
work.data = await aFunction.default(options);
parentPort.postMessage({ event: 'done', work });
return;
}
throw new Error('work type error: js file or string');

worker退出有bug

` // 创建线程
newThread() {
const worker = new Worker(workerPath);
const thread = new Thread({worker});
this.workerQueue.push(thread);
const threadId = worker.threadId;
worker.on('exit', () => {
// 找到该线程对应的数据结构,然后删除该线程的数据结构
const position = this.workerQueue.findIndex(({worker}) => {
return worker.threadId === threadId;
});
const exitedThread = this.workerQueue.splice(position, 1);
// 退出时状态是BUSY说明还在处理任务(非正常退出)
this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;
});
// 和子线程通信
worker.on('message', (result) => {
const {
work,
event,
} = result;
const { data, error, workId } = work;
// 通过workId拿到对应的userWork
const userWork = this.workPool[workId];
// 不存在说明任务被取消了
if (!userWork) {
return;
}
// 修改线程池数据结构
this.endWork(userWork);

        // 修改线程数据结构
        thread.setLastWorkTime(Date.now());
        
        // 还有任务则通知子线程处理,否则修改子线程状态为空闲
        if (this.queue.length) {
            // 从任务队列拿到一个任务交给子线程
            this.submitWorkToThread(thread, this.queue.shift());
        } else {
            thread.setState(THREAD_STATE.IDLE);
        }
       
        switch(event) {
            case 'done':
                // 通知用户,任务完成
                userWork.emit('done', data);
                break;
            case 'error':
                // 通知用户,任务出错
                if (EventEmitter.listenerCount(userWork, 'error')) {
                    userWork.emit('error', error);
                }
                break;
            default: break;
        }
    });
    worker.on('error', (...rest) => {
        console.error(...rest);
    });
    return thread;
}`

在这个方法中 worker.on('exit', ()=>{
//在这里 worker.theadId已经变成-1了 ,但是在外边 给这个赋值为 const threadId = worker.threadId; 还是原来的值,这两个值两个是不可能相等的,因此不会从workerQueue中移出,下次调用,postmessage就会无效
},

maxThreads参数无效

src/threadPool.js文件,在ThreadPool的构造函数中初始化参数 这个逻辑根本没用到options中的maxThreads
// 线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数
this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.