Skip to content

Commit

Permalink
Merge pull request #1 from IamZhenHong/milestone-6
Browse files Browse the repository at this point in the history
Milestone 6
  • Loading branch information
IamZhenHong authored Nov 7, 2024
2 parents 578d65f + c4145b6 commit f924c8f
Show file tree
Hide file tree
Showing 1,031 changed files with 71,557 additions and 6 deletions.
139 changes: 139 additions & 0 deletions backend/collaboration-service/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
const { Mistral } = require('@mistralai/mistralai');

const amqp = require('amqplib/callback_api');
const { sendWsMessage, broadcastToRoom } = require('./ws');
const axios = require('axios');
const dotenv = require('dotenv');
dotenv.config();

const CLOUDAMQP_URL = process.env.CLOUDAMQP_URL;
const COLLAB_SERVICE_URL = "http://localhost:8003";

function arrayEquals(a, b) {
return Array.isArray(a) &&
Array.isArray(b) &&
a.length === b.length &&
a.every((val, index) => val === b[index]);
}

function checkSubset(parentArray, subsetArray) {
return subsetArray.every((el) => {
return parentArray.includes(el)
});
}

// In-memory store to track unmatched users
let unmatchedUsers = [];

// Function to set up RabbitMQ consumer
const setupConsumer = () => {
amqp.connect(CLOUDAMQP_URL, (err, conn) => {
if (err) {
console.error('Connection error in consumer.js:', err);
return;
}

conn.createChannel((err, ch) => {
if (err) throw err;
const queue = 'collab_queue';
ch.assertQueue(queue, { durable: false });

console.log('Listening for messages in RabbitMQ queue for collab...');
ch.consume(queue, async (msg) => {
const userRequest = JSON.parse(msg.content.toString());
console.log('Received user request:', userRequest);
console.log('User request type:', userRequest.type);
if (userRequest.status === 'cancel') {
// Handle cancel request
const userIndex = unmatchedUsers.findIndex(u => u.userId === userRequest.userId);
if (userIndex !== -1) {
console.log(`Cancelling request for user ${userRequest.userId}`);
clearTimeout(unmatchedUsers[userIndex].timeoutId); // Clear any pending timeout
unmatchedUsers.splice(userIndex, 1); // Remove user from unmatched list
sendWsMessage(userRequest.userId, { status: 'CANCELLED' });
console.log(`Cancelled matching request for user ${userRequest.userId}`);
} else {
console.log(`No unmatched request found for user ${userRequest.userId}`);
}
sendWsMessage(userRequest.userId, { status: 'CANCELLED' });
console.log(`Cancelled matching request for user ${userRequest.userId}`);
} else if (userRequest.type === 'ASK_COPILOT') {
// Function to make the API call with retry logic

try {
const apiKey = process.env.MISTRAL_API_KEY;
const client = new Mistral({ apiKey: apiKey });
prompt = userRequest.prompt;
currentCode = userRequest.code;

const chatResponse = await client.chat.complete({
model: 'mistral-large-latest',
messages: [{role: 'user', content: currentCode + '\n' + prompt}],
});
console.log('Asking Copilot:', chatResponse);

broadcastToRoom(userRequest.roomId, { type: 'ASK_COPILOT', response: chatResponse.choices[0].message.content });
} catch (error) {
console.error("Failed to fetch chat response:", error);
broadcastToRoom(userRequest.roomId, { type: 'ASK_COPILOT', response: "Error fetching response from assistant." });
}
}
else {
// Handle match request
const match = unmatchedUsers.find(u =>
checkSubset(u.category, userRequest.category) ||
checkSubset(userRequest.category, u.category)
) || unmatchedUsers.find(u => u.difficulty === userRequest.difficulty);

if (match) {
try {
console.log(`Matched user ${userRequest.userId} with user ${match.userId}`);

// Create room in collaboration service
const response = await axios.post(`${COLLAB_SERVICE_URL}/rooms/create`, {
users: [userRequest.userId, match.userId],
difficulty: userRequest.difficulty,
category: userRequest.category
});
console.log(response.data);
const { roomId } = response.data;

// Notify both users
[userRequest, match].forEach(user => {
sendWsMessage(user.userId, {
status: 'MATCH_FOUND',
roomId,
matchedUserId: user === userRequest ? match.userId : userRequest.userId,
difficulty: userRequest.difficulty,
category: userRequest.category
});
});

// Clear the timeouts for both users
clearTimeout(match.timeoutId);

// Remove matched user from unmatchedUsers
unmatchedUsers = unmatchedUsers.filter(u => u.userId !== match.userId);
} catch (error) {
console.error('Error creating room:', error);
}
} else {
// Set a timeout to remove unmatched users after 30 seconds
const timeoutId = setTimeout(() => {
unmatchedUsers = unmatchedUsers.filter(u => u.userId !== userRequest.userId);
sendWsMessage(userRequest.userId, { status: 'timeout' });
}, 30000); // 30 seconds timeout

// Add the new user with their timeout ID
unmatchedUsers.push({ ...userRequest, timeoutId });
}
}

ch.ack(msg); // Acknowledge message processing
});
});
});
};


module.exports = { setupConsumer };
16 changes: 16 additions & 0 deletions backend/collaboration-service/controllers/copilotControllers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const { sendToQueue } = require("../../collaboration-service/mq");

const askCopilot = async (req, res) => {

const { code, prompt, type, roomId } = req.body;
console.log(`Received request to ask Copilot for prompt: ${prompt}`);

sendToQueue({ code, prompt, type, roomId });

res.status(200).send({ status: 'Request received. Waiting for Copilot response.' });

};

module.exports = {
askCopilot,
};
10 changes: 10 additions & 0 deletions backend/collaboration-service/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
const dotenv = require('dotenv');
dotenv.config();
const express = require('express');
const cors = require('cors');
const { setupWebSocket } = require('./ws');
const roomRoutes = require('./routes/room');
const { setupConsumer } = require('./consumer');
require('dotenv').config();
const amqp = require('amqplib/callback_api');


const app = express();
const PORT = process.env.PORT || 8003;
Expand All @@ -17,3 +23,7 @@ const server = app.listen(PORT, () => {
app.use('/rooms', roomRoutes);

setupWebSocket(server);

setupConsumer();


33 changes: 33 additions & 0 deletions backend/collaboration-service/mq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const amqp = require('amqplib/callback_api');
const dotenv = require('dotenv');
dotenv.config();

const CLOUDAMQP_URL = process.env.CLOUDAMQP_URL;

let channel;

// Establish connection to RabbitMQ and create a channel
amqp.connect(CLOUDAMQP_URL, (err, conn) => {
if (err) throw err;

conn.createChannel((err, ch) => {
if (err) throw err;
channel = ch;
const queue = 'collab_queue';
ch.assertQueue(queue, { durable: false });
console.log('RabbitMQ connected, queue asserted:', queue);
});
});

// Function to send messages to the queue
const sendToQueue = (message) => {
const queue = 'collab_queue';
if (!channel) {
console.error('RabbitMQ channel not initialised');
return;
}
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
console.log('Sent message to RabbitMQ for collab:', message);
};

module.exports = { sendToQueue };
Loading

0 comments on commit f924c8f

Please sign in to comment.