Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

基于Node.js的优先异步队列 #40

Open
Checkson opened this issue Sep 2, 2019 · 0 comments
Open

基于Node.js的优先异步队列 #40

Checkson opened this issue Sep 2, 2019 · 0 comments

Comments

@Checkson
Copy link
Owner

Checkson commented Sep 2, 2019

前言

想不到我在日常开发中,竟然遇到“优先异步队列”的需求。github上有类似功能,并且集成redis的库有KueBull等。但是,对于追求“简、易、轻”的我来说,其实并不满意。根据“二八原则”,我决定,自己来实现一个只有上述两者“两成”功能的轻量级开源库:priority-async-queue

开源库的命名为什么这么长呢?原因是,我认为开发者只需看一眼库名就知道其功能,一点都不含糊。但是,为了行文流畅,priority-async-queue下面统一简称为“paq”。

你可能不需要paq

按照 redux 作者的套路,首先,我们需要明确的是,你可能不需要 paq

只有遇到 N 个异步任务不能并行执行,并且只能顺序执行时,你才需要使用 paq。

简单来说,如果你需要执行的 N 个异步任务,并不存在资源争夺和占用、数据共享、严格的逻辑顺序、优先权对比等,paq 可能是没必要的,用了反而降低执行效率、影响程序性能。

paq 设计思路

paq 的设计思路非常简单,一共3个类:

  • Task 是描述每个待执行(异步/同步)任务的执行逻辑以及配置参数。
  • PriorityQueue 是控制每个待执行(异步/同步)任务的优先级队列、具有队列的基本属性和操作。
  • AsyncQueue 是控制每个待执行(异步/同步)任务能严格顺序地执行的队列。

下面是 paq 的程序流程图:

paq

paq 基本概念和API

1. addTask

addTask 是核心方法,它可以创建一个任务,并且添加到 paq 队列中。

paq.addTask([options, ]callback);

options 是一个可选对象,包含以下属性:

{
  id: undefined,          // 任务id
  priority: 'normal',     // 任务权重,例如: low, normal, mid, high, urgent
  context: null,          // 执行任务的上下文
  start: (ctx, options) => {}, // 任务将要被执行的回调
  completed: (ctx, res) => {}, // 任务执行完成后的回调
  failed: (ctx, err) => {},    // 任务执行失败后的回调
  remove: (ctx, options) => {} // 任务被删除后的回调
}

callback 是一个描述执行任务的逻辑的函数,它包含两个参数:ctxoptions

  • ctx 是任务所属的paq实例。
  • options 是此任务的options参数的最终值。
paq.addTask((ctx, options) => {
  console.log(ctx === paq); // true
});

2. removeTask

removeTask 方法是根据任务 id 来删除等待对列中的任务。

paq.removeTask(taskId);

如果成功删除任务,它将返回true。否则,它将返回false。

3. pause

pause 方法是暂停 paq 继续执行任务。

paq.pause();

注意: 但是,您无法暂停当前正在执行的任务,因为无法暂时检测到异步任务的进度。

4. isPause

isPause 属性,返回当前队列是否处于暂停状态。

paq.isPause; // return true or false.

5. resume

resume 方法,用来重新启动 paq 队列执行任务。

paq.resume();

6. clearTask

cleartTask 方法,用来清除 paq 等待队列中所有的任务。

paq.clearTask();

paq 用法

1. 基本用法

只要向 paq 添加任务,该任务就会自动被执行。

const PAQ = require('priority-async-queue');
const paq = new PAQ();

paq.addTask((ctx, options) => {
  console.log('This is a simple task!');
});

// This is a simple task!

2. 同步任务

你可以使用 paq 执行一系列同步任务,例如:

const syncTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask(() => {
      console.log('Step', i, 'sync');
      return i;
    });
  }
};

syncTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync

3. 异步任务

你还可以使用 paq 执行一系列的异步任务,例如:

const asyncTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask(() => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
};

asyncTask(3);

// Step 0 async
// Step 1 async
// Step 2 async

4. 混合任务

你甚至可以使用 paq 执行一系列同步和异步交错的任务,例如:

const mixTask = (n) => {
  asyncTask(n);
  syncTask(n);
  asyncTask(n);
};

mixTask(2);

// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async

5. 绑定执行上下文

有时如果你需要指定上下文来执行任务,例如:

const contextTask = (n) => {
  var testObj = {
    name: 'foo',
    sayName: (name) => {
      console.log(name);
    }
  };
  for (let i = 0; i < n; i++) {
    paq.addTask({ context: testObj }, function () {
      this.sayName(this.name + i);
    });
  }
};

contextTask(3);

// foo0
// foo1
// foo2

注意: this 在箭头函数中并不存在,或者说它是指向其定义处的上下文。

6. 延迟执行

paq 还支持延迟执行任务,例如:

const delayTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({ delay: 1000 * i }, () => {
      console.log('Step', i, 'sync');
      return i;
    });
  }
};

delayTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync

7. 优先权

如果需要执行的任务具有权重,例如:

const priorityTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({ priority: i === n - 1 ? 'high' : 'normal' }, () => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
};

priorityTask(5);

// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async

默认优先级映射如下:

{
  "low": 1,
  "normal": 0, // default
  "mid": -1,
  "high": -2,
  "urgent": -3
}

8. 回调函数

有时,你希望能够在任务的开始、完成、失败、被删除时做点事情,例如

const callbackTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      start: (ctx, options) => {
        console.log('start running task id is', options.id);
      },
      completed: (ctx, res) => {
        console.log('complete, result is', res);
      },
      failed: (ctx, err) => {
        console.log(err);
      }
    }, () => {
      if (i < n / 2) {
        throw new Error(i + ' is too small!');
      }
      return i;
    });
  }
};

callbackTask(5);

// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4

9. 删除任务

有时,你需要删除一些任务,例如:

const removeTask = (n) => {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      remove: (ctx, options) => {
        console.log('remove task id is', options.id);
      }
    }, () => {
      return new Promise(resolve => {
        setTimeout(() => {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
  console.log(paq.removeTask(3));
  console.log(paq.removeTask(5));
};

removeTask(5);

// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async

注意: 你必须在创建任务时分配id,并根据id删除任务。

paq 事件

如果需要监视 paq 队列的状态,paq 提供以下事件侦听器:

1. addTask

paq.on('addTask', (options) => {
  // Triggered when the queue adds a task.
});

2. startTask

paq.on('startTask', (options) => {
  // Triggered when a task in the queue is about to execute.
});

3. changeTask

paq.on('changeTask', (options) => {
  // Triggered when a task in the queue changes.
});

4. removeTask

paq.on('removeTask', (options) => {
  // Triggered when the queue remove a task.
});

5. completed

paq.on('completed', (options, result) => {
  // Triggered when the task execution in the queue is complete.
});

6. failed

paq.on('failed', (options, err) => {
  // Triggered when a task in the queue fails to execute.
});

最后,想补充的是:若遇到 Promise.allPromise.race 解决不了的需求,可以考虑一下 paq

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant