Skip to content

Commit

Permalink
tmp update
Browse files Browse the repository at this point in the history
  • Loading branch information
pearone committed Dec 21, 2023
1 parent cc1a6f9 commit 6ebe9f5
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 0 deletions.
38 changes: 38 additions & 0 deletions packages/event/__tests__/community/epoll_queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// import { MessageQueue, Producer, Consumer } from '../../src/community/mutiple';
import {
MessageQueue,
Producer,
Consumer
} from '../../src/community/epoll_queue';

test('mutiple_queue', async () => {
// 创建消息队列实例
const messageQueue = new MessageQueue();
// 创建生产者实例
const producer1 = new Producer('Producer 1', messageQueue);
const producer2 = new Producer('Producer 2', messageQueue);

// 创建消费者实例
const consumer1 = new Consumer('Consumer 1', messageQueue);
const consumer2 = new Consumer('Consumer 2', messageQueue);

// 将生产者和消费者添加到消息队列
messageQueue.addProducer(producer1);
messageQueue.addProducer(producer2);
await messageQueue.addConsumer(consumer1);
await messageQueue.addConsumer(consumer2);

// 生产者广播消息
producer1.sendMessage('Hello from producer 1!');
producer2.sendMessage('Hello from producer 2!');
// producer1.sendMessage('Hello from producer 3!');
// producer2.sendMessage('Hello from producer 4!');
// producer2.sendMessage('Hello from producer 5!');
// producer1.sendMessage('Hello from producer 6!');
// producer2.sendMessage('Hello from producer 7!');
// producer1.sendMessage('Hello from producer 8!');
// producer2.sendMessage('Hello from producer 9!');
// producer2.sendMessage('Hello from producer 10!');
// producer1.sendMessage('Hello from producer 11!');
// producer2.sendMessage('Hello from producer 12!');
});
73 changes: 73 additions & 0 deletions packages/event/__tests__/community/priority_queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import {
Task,
MessageQueue,
CommunicationFormatterMessage
} from '../../src/community';

test('queue', () => {
// 创建一个执行器函数,用于模拟执行任务
function executor(task: Task<CommunicationFormatterMessage>) {
console.log(`Executing task with priority ${task.message.data.max}`);
}

// 创建一个任务队列实例
const priorityQueue = new MessageQueue({
executor,
/**
* 如果有优先级(is_priority_queue = true)
* 需要在message里增加priority
*/
is_priority_queue: true
});

// 创建一些任务对象
const task1 = new Task(
new CommunicationFormatterMessage({
sender: 'filter_id',
receiver: 'chart_id',
data: {
max: 11
},
priority: 3
})
);
const task2 = new Task(
new CommunicationFormatterMessage({
sender: 'filter_id',
receiver: 'chart_id',
data: {
max: 12
},
priority: 1
})
);
const task3 = new Task(
new CommunicationFormatterMessage({
sender: 'filter_id',
receiver: 'chart_id',
data: {
max: 13
},
priority: 2
})
);
const task4 = new Task(
new CommunicationFormatterMessage({
sender: 'filter_id',
receiver: 'chart_id',
data: {
max: 14
},
priority: 2
})
);

// 添加任务到队列
priorityQueue.add(task1);
priorityQueue.add(task2);
priorityQueue.add(task3);
priorityQueue.add(task4);

// 运行任务队列
priorityQueue.run();
});
8 changes: 8 additions & 0 deletions packages/event/src/community/all.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* 消息队列支持多个生产者和多个消费者,
* 规则1:生产者生产的内容只有部分的消费者是它的受众。
* 规则2:一些生产者生产的内容会让消费者消费后,消费者会转化为生产者,再促进其他消费者消费。
* 规则3:消费者想消费的内容需要等待多个生产者生产环节都生产完成后,再消费,生产行为中有规则2中的转化过程。
*/

export {};
130 changes: 130 additions & 0 deletions packages/event/src/community/epoll_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { genAutoIdString } from '..';

/**
* 消息队列对象
* 场景:生产者生产一条数据通知所有消费者,消费者中选择一个消费者消费,
* 消费有时长,在等待过程中,生产者会生产内容,其他的消费者消费。
* 内容不针对特定消费者,消费者不存在差异。
* 场景:线程池
*/
export class MessageQueue<T> {
// 存储消息的数组
messages: Map<string, T> = new Map();
// 存储生产者的数组
producers: Producer<T>[];
// 存储消费者的数组
consumers: Consumer<T>[];

constructor() {
this.messages = new Map();
this.producers = [];
this.consumers = [];
}

// 添加生产者到队列
addProducer(producer: Producer<T>) {
this.producers.push(producer);
}

// 添加消费者到队列
async addConsumer(consumer: Consumer<T>) {
this.consumers.push(consumer);
/** 通知消费者来消费 */
await this.noticeMessage();
}

// 从队列中移除消费者
removeConsumer(consumer: Consumer<T>) {
const index = this.consumers.indexOf(consumer);
if (index > -1) {
this.consumers.splice(index, 1);
}
}

// 生产者向队列发送消息
async sendMessage(message: T) {
const token = genAutoIdString();
// 将消息添加到消息队列
this.messages.set(token, message);
/** 通知消费者来消费 */
await this.noticeMessage();
}

// 通知所有存在的消费者有新消息到达
async noticeMessage() {
for (const consumer of this.consumers) {
const res = await consumer.receiveMessage();
if (res.success) {
break;
}
}
}

// 消费者从消息列表中接收消息
async receiveMessage(consumer: Consumer<T>) {
const token = this.messages.keys().next().value;
const message = this.messages.get(token);

/** 如果消息存在,说明没人消费可以处理 */
if (message) {
this.removeConsumer(consumer);
const res = await consumer.consumeMessage(message);
// @ts-ignore
console.log(res.message);
this.messages.delete(token);
this.addConsumer(consumer);

return { success: true };
} else {
return { success: false };
}
}
}

// 创建生产者对象
export class Producer<T> {
// 消息队列名称
name: string;
// 消息队列对象
messageQueue: MessageQueue<T>;

constructor(name: string, messageQueue: MessageQueue<T>) {
this.name = name;
this.messageQueue = messageQueue;
}

// 发送消息
sendMessage(message: T) {
this.messageQueue.sendMessage(message);
}
}

// 创建消费者对象
export class Consumer<T> {
// 消息队列名称
name: string;
// 消息队列对象
messageQueue: MessageQueue<T>;

constructor(name: string, messageQueue: MessageQueue<T>) {
this.name = name;
this.messageQueue = messageQueue;
}
// 接收消息
async receiveMessage() {
return await this.messageQueue.receiveMessage(this);
}

// 处理接收到的消息
consumeMessage(message: T) {
const time = Math.random() * 1000;

return new Promise((resolve) => {
setTimeout(() => {
resolve({
message: `Consumer ${this.name} \n received message: ${message}`
});
}, time);
});
}
}
66 changes: 66 additions & 0 deletions packages/event/src/community/message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Task, MessageQueue } from './priority_queue';
/**
* 单向通信
* 有接收方和发送方的概念
*/
export class CommunicationFormatterMessage {
/** 发送方 */
sender: string;
/** 接收方 */
receiver: string;
/** 参数 */
data: Record<string, any>;
/** 发送时间 */
timestamp = new Date().getTime();
/** 优先级 */
priority = Infinity;

constructor({
sender,
receiver,
data,
priority
}: {
sender: string;
receiver: string;
data: Record<string, any>;
priority?: number;
}) {
this.sender = sender;
this.receiver = receiver;
this.data = data;
if (priority !== undefined) {
this.priority = priority;
}
}
}

/**
* 双向通信 —— 信道和信息
*/
export class ChannelFormatterMessage {
/** 管道 */
channel: string;
/** 行为 */
action: string;
/** 参数 */
data: Record<string, any>;
/** 发送时间 */
timestamp = new Date().getTime();

constructor({
channel,
action,
data
}: {
channel: string;
action: string;
data: Record<string, any>;
}) {
this.channel = channel;
this.action = action;
this.data = data;
}
}

export { Task, MessageQueue };
Loading

0 comments on commit 6ebe9f5

Please sign in to comment.