From 4e7cfbdb56fceae34473367759449bce9d5b75ba Mon Sep 17 00:00:00 2001 From: sinasezza Date: Sat, 3 Aug 2024 20:57:05 +0330 Subject: [PATCH] feat(socket): join rooms and send messages in public and private rooms completed. --- chatApp/config/config.py | 6 +- chatApp/config/database.py | 5 + chatApp/main.py | 6 +- chatApp/models/message.py | 42 ++++++- chatApp/models/private_room.py | 42 +++++++ chatApp/models/public_room.py | 38 ++++++ chatApp/routes/chat.py | 36 +++++- chatApp/schemas/public_room.py | 17 +++ chatApp/sockets.py | 205 ++++++++++++++++++++++++++++++++- 9 files changed, 385 insertions(+), 12 deletions(-) diff --git a/chatApp/config/config.py b/chatApp/config/config.py index 4891c7e..c62a495 100644 --- a/chatApp/config/config.py +++ b/chatApp/config/config.py @@ -25,10 +25,10 @@ class Settings(BaseSettings): refresh_token_expire_days: int = Field(default=14) # CORS settings - cors_allow_origins: list[str] = Field(default=["*"]) + cors_allow_origins: list[str] | str = Field(default=["*"]) cors_allow_credentials: bool = Field(default=True) - cors_allow_methods: list[str] = Field(default=["*"]) - cors_allow_headers: list[str] = Field(default=["*"]) + cors_allow_methods: list[str] | str = Field(default=["*"]) + cors_allow_headers: list[str] | str = Field(default=["*"]) # Trusted hosts settings trusted_hosts: list[str] = Field(default=["127.0.0.1", "localhost"]) diff --git a/chatApp/config/database.py b/chatApp/config/database.py index c6b5e02..26181f6 100644 --- a/chatApp/config/database.py +++ b/chatApp/config/database.py @@ -1,4 +1,5 @@ import logging +from functools import lru_cache from motor.motor_asyncio import ( AsyncIOMotorClient, @@ -59,6 +60,7 @@ async def close_mongodb_connection(self) -> None: mongo_db = MongoDB() +@lru_cache def get_users_collection() -> AsyncIOMotorCollection: """ Retrieve the users collection from the MongoDB database. @@ -72,6 +74,7 @@ def get_users_collection() -> AsyncIOMotorCollection: return users_collection +@lru_cache def get_messages_collection() -> AsyncIOMotorCollection: """ Retrieve the messages collection from the MongoDB database. @@ -85,6 +88,7 @@ def get_messages_collection() -> AsyncIOMotorCollection: return messages_collection +@lru_cache def get_public_rooms_collection() -> AsyncIOMotorCollection: """ Retrieve the public rooms collection from the MongoDB database. @@ -98,6 +102,7 @@ def get_public_rooms_collection() -> AsyncIOMotorCollection: return rooms_collection +@lru_cache def get_private_rooms_collection() -> AsyncIOMotorCollection: """ Retrieve the private rooms collection from the MongoDB database. diff --git a/chatApp/main.py b/chatApp/main.py index f116898..99bef5d 100644 --- a/chatApp/main.py +++ b/chatApp/main.py @@ -48,9 +48,9 @@ async def shutdown_event(): ) # Include your routers for API endpoints -app.include_router(auth.router, prefix="/auth") -app.include_router(chat.router, prefix="/chat") -app.include_router(user.router, prefix="/user") +app.include_router(auth.router, prefix="/auth", tags=["auth"]) +app.include_router(chat.router, prefix="/chat", tags=["chat"]) +app.include_router(user.router, prefix="/user", tags=["user"]) @app.get("/") diff --git a/chatApp/models/message.py b/chatApp/models/message.py index 6691fdc..d08e237 100644 --- a/chatApp/models/message.py +++ b/chatApp/models/message.py @@ -1,17 +1,55 @@ +from collections.abc import Mapping from datetime import datetime +from typing import Any from pydantic import BaseModel, Field +from chatApp.config.database import get_messages_collection from chatApp.utils.object_id import PydanticObjectId +from .public_room import fetch_public_room_by_id + class Message(BaseModel): user_id: PydanticObjectId room_id: PydanticObjectId - content: str = Field(default=None) - media: str = Field(default=None) + room_type: str + content: str | None = Field(default=None) + media: str | None = Field(default=None) created_at: datetime = Field(default_factory=lambda: datetime.now()) class MessageInDB(Message): id: PydanticObjectId = Field(alias="_id", serialization_alias="id") + + +# async def insert_message(message: str): +# message_collection = get_messages_collection() +# result = await message_collection.insert_one(message.dict()) +# return result.inserted_id + + +async def get_public_messages( + room_id: str, +) -> tuple[bool, list[Mapping[str, Any]]]: + """ + Fetch public messages from a specific room. + + :param room_id: The ID of the public room to fetch messages from. + :return: A tuple where the first element is a boolean indicating success, + and the second element is a list of messages (each message represented as a dictionary). + """ + messages_collection = get_messages_collection() + + # Fetch the public room by ID + room = await fetch_public_room_by_id(room_id) + if room is None: + return False, [] + + # Fetch messages from the messages collection + cursor = messages_collection.find({"room_id": room_id}) + messages = await cursor.to_list( + length=None + ) # Await the cursor to list conversion + + return True, messages diff --git a/chatApp/models/private_room.py b/chatApp/models/private_room.py index d4d03cc..ea1cbb3 100644 --- a/chatApp/models/private_room.py +++ b/chatApp/models/private_room.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, Field +from chatApp.config.database import get_private_rooms_collection from chatApp.utils.object_id import PydanticObjectId @@ -13,3 +14,44 @@ class PrivateRoom(BaseModel): class PrivateRoomInDB(PrivateRoom): id: PydanticObjectId = Field(alias="_id") + + +async def fetch_private_room_by_id(id: str): + room_collection = get_private_rooms_collection() + return await room_collection.find_one({"_id": PydanticObjectId(id)}) + + +async def check_members_in_room(user1_id: str, user2_id: str): + rooms_collection = get_private_rooms_collection() + room = await rooms_collection.find_one( + { + "$or": [ + { + "member1": PydanticObjectId(user1_id), + "member2": PydanticObjectId(user2_id), + }, + { + "member1": PydanticObjectId(user2_id), + "member2": PydanticObjectId(user1_id), + }, + ], + } + ) + return str(room["_id"]) if room is not None else None + + +async def join_private_room(user1_id: str, user2_id: str) -> str: + rooms_collection = get_private_rooms_collection() + + room_id = await check_members_in_room(user1_id, user2_id) + + if room_id is not None: + return room_id + else: + room = await rooms_collection.insert_one( + { + "member1": PydanticObjectId(user1_id), + "member2": PydanticObjectId(user2_id), + } + ) + return str(room.inserted_id) diff --git a/chatApp/models/public_room.py b/chatApp/models/public_room.py index 34cdd22..66d8d15 100644 --- a/chatApp/models/public_room.py +++ b/chatApp/models/public_room.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, Field +from chatApp.config.database import get_public_rooms_collection from chatApp.utils.object_id import PydanticObjectId @@ -41,3 +42,40 @@ class PublicRoom(BaseModel): class PublicRoomInDB(PublicRoom): id: PydanticObjectId = Field(alias="_id", serialization_alias="id") + + +async def fetch_public_room_by_id(id: str): + room_collection = get_public_rooms_collection() + return await room_collection.find_one({"_id": PydanticObjectId(id)}) + + +async def join_public_room(room_id: str, user_id: str) -> bool: + rooms_collection = get_public_rooms_collection() + room = await rooms_collection.find_one({"_id": PydanticObjectId(room_id)}) + + # Ensure room is not None + if room is None: + return False + + # Define the expected structure of the room dictionary + # Adjust this according to the actual structure + ban_list: list[PydanticObjectId] = room.get("ban_list", []) + members: list[PydanticObjectId] = room.get("members", []) + + userObjId = PydanticObjectId(user_id) + if userObjId not in ban_list: + if userObjId not in members: + members.append(userObjId) + + # Update the room in the database + result = await rooms_collection.update_one( + {"_id": PydanticObjectId(room_id)}, + {"$set": {"members": members}}, # Update only members + ) + + if result.modified_count == 1: + return True + else: + return False # Failed to update the room in the database + return True # User is already a member + return False # User is banned diff --git a/chatApp/routes/chat.py b/chatApp/routes/chat.py index 5831f4b..f61c6ec 100644 --- a/chatApp/routes/chat.py +++ b/chatApp/routes/chat.py @@ -16,7 +16,7 @@ from chatApp.models.public_room import PublicRoom, PublicRoomInDB from chatApp.models.user import UserInDB from chatApp.schemas.private_room import CreatePrivateRoom -from chatApp.schemas.public_room import CreatePublicRoom +from chatApp.schemas.public_room import CreatePublicRoom, GetPulbicRoomsSchema from chatApp.utils.object_id import PydanticObjectId, is_valid_object_id router = APIRouter() @@ -75,6 +75,40 @@ async def join_public_room( return PublicRoomInDB(**room) +@router.get("/get-public-rooms/", response_model=Mapping[str, Any]) +async def get_public_rooms( + page: int = 1, + per_page: int = 10, +): + rooms_collection: AsyncIOMotorCollection = get_public_rooms_collection() + total_count = await rooms_collection.count_documents({}) + rooms = ( + await rooms_collection.find( + {}, + { + "_id": 1, + "name": 1, + "description": 1, + "owner": 1, + "created_at": 1, + }, + ) + .skip((page - 1) * per_page) + .limit(per_page) + .to_list(None) + ) + + data_to_return = { + "data": [GetPulbicRoomsSchema(**room) for room in rooms], + "meta": { + "total_count": total_count, + "page": page, + "per_page": per_page, + }, + } + return data_to_return + + @router.post( "/create-private-room/{person_id}", response_model=PrivateRoomInDB ) diff --git a/chatApp/schemas/public_room.py b/chatApp/schemas/public_room.py index 13682ae..70086b0 100644 --- a/chatApp/schemas/public_room.py +++ b/chatApp/schemas/public_room.py @@ -1,5 +1,9 @@ +from datetime import datetime + from pydantic import BaseModel, Field +from chatApp.utils.object_id import PydanticObjectId + class CreatePublicRoom(BaseModel): name: str = Field(..., description="Name of the public room") @@ -20,3 +24,16 @@ class CreatePublicRoom(BaseModel): max_latest_messages_access: int | None = Field( None, description="Maximum number of latest messages to access" ) + + +class GetPulbicRoomsSchema(BaseModel): + id: PydanticObjectId = Field( + ..., + description="id of the room", + alias="_id", + serialization_alias="id", + ) + owner: PydanticObjectId = Field(..., description="id of owner of the room") + name: str + description: str | None = None + created_at: datetime diff --git a/chatApp/sockets.py b/chatApp/sockets.py index 4388915..4768e9f 100644 --- a/chatApp/sockets.py +++ b/chatApp/sockets.py @@ -1,29 +1,228 @@ +from typing import Any + import socketio from chatApp.config.config import get_settings +from chatApp.config.database import get_messages_collection +from chatApp.config.logs import get_logger +from chatApp.models.message import Message +from chatApp.models.private_room import ( + check_members_in_room, + join_private_room, +) +from chatApp.models.public_room import ( + fetch_public_room_by_id, + join_public_room, +) +from chatApp.models.user import fetch_user_by_id +from chatApp.utils.object_id import PydanticObjectId settings = get_settings() # Define the Socket.IO server sio_server = socketio.AsyncServer( async_mode="asgi", - cors_allowed_origins=settings.cors_allow_origins, + cors_allowed_origins=[], + logger=get_logger("socket.io"), ) # Create the ASGI app using the defined server sio_app = socketio.ASGIApp( socketio_server=sio_server, socketio_path="/socket.io/", - other_asgi_app="main:app", ) -# Event handlers +# Global state management +class GlobalState: + all_clients: int = 0 + rooms_client_count: dict[str, int] = {} + + +global_state = GlobalState() + + @sio_server.event async def connect(sid: str, environ: dict, auth: dict) -> None: + """Handle a new client connection.""" + global_state.all_clients += 1 print(f"Client connected: {sid}") + print(f"Number of clients connected: {global_state.all_clients}") + await sio_server.emit("client_count", data=global_state.all_clients) @sio_server.event async def disconnect(sid: str) -> None: + """Handle client disconnection.""" + global_state.all_clients -= 1 print(f"Client disconnected: {sid}") + print(f"Number of clients connected: {global_state.all_clients}") + await sio_server.emit("client_count", data=global_state.all_clients) + + +@sio_server.event +async def joining_public_room(sid: str, data: dict[str, Any]) -> None: + """Handle user joining a public room.""" + room_id = data.get("room_id") + user_id = data.get("user_id") + + if not isinstance(room_id, str) or not isinstance(user_id, str): + await sio_server.emit( + "error", data="Invalid room_id or user_id", room=sid + ) + return + + room_joined: bool = await join_public_room(room_id, user_id) + + if room_joined: + await sio_server.enter_room(sid, room_id) + global_state.rooms_client_count[room_id] = ( + global_state.rooms_client_count.get(room_id, 0) + 1 + ) + room_members = global_state.rooms_client_count[room_id] + print(f"User {user_id} joined room {room_id}") + print(f"Number of users in the room {room_id}: {room_members}") + await sio_server.emit("room_count", data=room_members, room=room_id) + await sio_server.emit("user_joined", data=user_id, room=room_id) + else: + await sio_server.emit("error", data="Error joining room", room=sid) + + +@sio_server.event +async def joining_private_room(sid: str, data: dict[str, Any]) -> None: + """Handle user joining a private room.""" + user1 = data.get("user1") + user2 = data.get("user2") + + if not isinstance(user1, str) or not isinstance(user2, str): + await sio_server.emit("error", data="Invalid user1 or user2", room=sid) + return + + room_id = await join_private_room(user1, user2) + await sio_server.enter_room(sid, room_id) + print(f"User {user1} joined private room {room_id} with {user2}") + await sio_server.emit("user_joined", data=user1, room=room_id) + + +@sio_server.event +async def leave_room(sid: str, data: dict[str, Any]) -> None: + """Handle user leaving a room.""" + room_id = data.get("room_id") + user_id = data.get("user_id") + + if not isinstance(room_id, str) or not isinstance(user_id, str): + await sio_server.emit( + "error", data="Invalid room_id or user_id", room=sid + ) + return + + await sio_server.leave_room(sid, room_id) + global_state.rooms_client_count[room_id] = max( + global_state.rooms_client_count.get(room_id, 0) - 1, 0 + ) + room_members = global_state.rooms_client_count[room_id] + print(f"User {user_id} left room {room_id}") + print(f"Number of users in the room {room_id}: {room_members}") + await sio_server.emit("room_count", data=room_members, room=room_id) + await sio_server.emit("user_left", data=user_id, room=room_id) + + +@sio_server.event +async def send_public_message(sid: str, data: dict[str, Any]) -> None: + """Handle sending a message to a public group.""" + room_id = data.get("room_id") + message = data.get("message") + user_id = data.get("user_id") + + if ( + not isinstance(room_id, str) + or not isinstance(message, str) + or not isinstance(user_id, str) + ): + await sio_server.emit( + "error", data="Invalid room_id, message, or user_id", room=sid + ) + return + + print(f"Sending message to room {room_id}: {message} from user {user_id}") + + room = await fetch_public_room_by_id(room_id) + + if not room: + await sio_server.emit( + "error", data={"error": "Room not found"}, room=sid + ) + return + + messages_collection = get_messages_collection() + user = await fetch_user_by_id(user_id) + + if user and user["_id"] in room["members"]: + new_message = Message( + user_id=PydanticObjectId(user_id), + room_id=PydanticObjectId(room_id), + content=message, + room_type="public", + ) + await messages_collection.insert_one(new_message.model_dump()) + await sio_server.emit( + "message", + {"sid": sid, "message": message, "user_id": user_id}, + room=room_id, + ) + print(f"Message sent to room {room_id}: {message}") + else: + print("User is not a member of the room or user not found") + + +@sio_server.event +async def send_private_message(sid: str, data: dict[str, Any]) -> None: + """Handle sending a private message.""" + room_id = data.get("room_id") + user1_id = data.get("user1") + user2_id = data.get("user2") + message = data.get("message") + + if ( + not isinstance(room_id, str) + or not isinstance(user1_id, str) + or not isinstance(user2_id, str) + or not isinstance(message, str) + ): + await sio_server.emit( + "error", + data={"error": "Invalid room_id, user1, user2, or message"}, + room=sid, + ) + return + + user1 = await fetch_user_by_id(user1_id) + user2 = await fetch_user_by_id(user2_id) + if not user1 or not user2: + await sio_server.emit( + "error", data={"error": "Users not found"}, room=sid + ) + return + + room_id = await check_members_in_room(user1_id, user2_id) + if not room_id: + await sio_server.emit( + "error", data={"error": "Room not found"}, room=sid + ) + return + + messages_collection = get_messages_collection() + + new_message = Message( + user_id=PydanticObjectId(user1_id), + room_id=PydanticObjectId(room_id), + content=message, + room_type="private", + ) + await messages_collection.insert_one(new_message.model_dump()) + await sio_server.emit( + "message", + {"sid": sid, "message": message, "user_id": user1_id}, + room=room_id, + ) + print(f"Private message sent from {user1_id} to room {room_id}: {message}")