""" WebSocket Server - Real-time messaging and presence """ import socketio import logging from typing import Dict, Set from core.models import User from core.auth import decode_token logger = logging.getLogger(__name__) # Create Socket.IO server sio = socketio.AsyncServer( async_mode='asgi', cors_allowed_origins='*', logger=True, engineio_logger=True ) # Track connected users: {user_id: set(session_ids)} connected_users: Dict[int, Set[str]] = {} # Track user channels: {session_id: user_id} session_to_user: Dict[str, int] = {} @sio.event async def connect(sid, environ, auth): """Handle client connection""" logger.info(f"Client connecting: {sid}") # Authenticate via token token = auth.get('token') if auth else None if not token: logger.warning(f"Connection rejected - no token: {sid}") return False # Verify token payload = decode_token(token) if not payload: logger.warning(f"Connection rejected - invalid token: {sid}") return False user_id = int(payload.get('sub')) # Track connection if user_id not in connected_users: connected_users[user_id] = set() connected_users[user_id].add(sid) session_to_user[sid] = user_id logger.info(f"User {user_id} connected: {sid}") # Notify others user is online await sio.emit('user_online', {'user_id': user_id}, skip_sid=sid) return True @sio.event async def disconnect(sid): """Handle client disconnect""" user_id = session_to_user.get(sid) if not user_id: return # Remove session if user_id in connected_users: connected_users[user_id].discard(sid) if not connected_users[user_id]: del connected_users[user_id] # User fully offline await sio.emit('user_offline', {'user_id': user_id}) del session_to_user[sid] logger.info(f"User {user_id} disconnected: {sid}") @sio.event async def join_channel(sid, data): """Join a channel room""" channel_id = data.get('channel_id') if not channel_id: return await sio.enter_room(sid, f"channel_{channel_id}") logger.info(f"Session {sid} joined channel {channel_id}") @sio.event async def leave_channel(sid, data): """Leave a channel room""" channel_id = data.get('channel_id') if not channel_id: return await sio.leave_room(sid, f"channel_{channel_id}") logger.info(f"Session {sid} left channel {channel_id}") @sio.event async def typing_start(sid, data): """User started typing""" channel_id = data.get('channel_id') user_id = session_to_user.get(sid) if channel_id and user_id: await sio.emit( 'user_typing', {'user_id': user_id, 'channel_id': channel_id}, room=f"channel_{channel_id}", skip_sid=sid ) @sio.event async def typing_stop(sid, data): """User stopped typing""" channel_id = data.get('channel_id') user_id = session_to_user.get(sid) if channel_id and user_id: await sio.emit( 'user_stopped_typing', {'user_id': user_id, 'channel_id': channel_id}, room=f"channel_{channel_id}", skip_sid=sid ) async def broadcast_new_message(channel_id: int, message_data: dict): """Broadcast new message to channel""" await sio.emit( 'new_message', message_data, room=f"channel_{channel_id}" ) async def broadcast_message_update(channel_id: int, message_data: dict): """Broadcast message update to channel""" await sio.emit( 'message_updated', message_data, room=f"channel_{channel_id}" ) async def send_dm_notification(user_id: int, message_data: dict): """Send DM notification to user""" if user_id in connected_users: for sid in connected_users[user_id]: await sio.emit('new_dm', message_data, room=sid) def get_connected_users() -> list: """Get list of online user IDs""" return list(connected_users.keys()) def is_user_online(user_id: int) -> bool: """Check if user is online""" return user_id in connected_users