WebSocket Real-Time: - Socket.IO server integrated - Real-time message delivery - User online/offline status - Typing indicators - Channel room management - Auto-join on channel access Direct Messages: - 1-on-1 chat API - DM history and conversations - @grimlock in DMs (AI responds) - Read receipts - Unread count tracking - WebSocket notifications File Management: - Upload files to channels - Download with streaming - File metadata tracking - File listing by channel - Delete with permissions Integration: - Messages broadcast via WebSocket - DM notifications via WebSocket - All APIs updated for real-time BACKEND IS FEATURE COMPLETE! - Auth ✅ - Channels ✅ - Messages ✅ - DMs ✅ - Files ✅ - WebSocket ✅ - @grimlock AI ✅ Ready for frontend development in next session!
154 lines
4.1 KiB
Python
154 lines
4.1 KiB
Python
"""
|
|
WebSocket Server - Real-time messaging and presence
|
|
"""
|
|
|
|
import socketio
|
|
import logging
|
|
from typing import Dict, Set
|
|
from core.models import User
|
|
from core.auth import decode_token
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create Socket.IO server
|
|
sio = socketio.AsyncServer(
|
|
async_mode='asgi',
|
|
cors_allowed_origins='*',
|
|
logger=True,
|
|
engineio_logger=True
|
|
)
|
|
|
|
# Track connected users: {user_id: set(session_ids)}
|
|
connected_users: Dict[int, Set[str]] = {}
|
|
|
|
# Track user channels: {session_id: user_id}
|
|
session_to_user: Dict[str, int] = {}
|
|
|
|
@sio.event
|
|
async def connect(sid, environ, auth):
|
|
"""Handle client connection"""
|
|
logger.info(f"Client connecting: {sid}")
|
|
|
|
# Authenticate via token
|
|
token = auth.get('token') if auth else None
|
|
if not token:
|
|
logger.warning(f"Connection rejected - no token: {sid}")
|
|
return False
|
|
|
|
# Verify token
|
|
payload = decode_token(token)
|
|
if not payload:
|
|
logger.warning(f"Connection rejected - invalid token: {sid}")
|
|
return False
|
|
|
|
user_id = int(payload.get('sub'))
|
|
|
|
# Track connection
|
|
if user_id not in connected_users:
|
|
connected_users[user_id] = set()
|
|
connected_users[user_id].add(sid)
|
|
session_to_user[sid] = user_id
|
|
|
|
logger.info(f"User {user_id} connected: {sid}")
|
|
|
|
# Notify others user is online
|
|
await sio.emit('user_online', {'user_id': user_id}, skip_sid=sid)
|
|
|
|
return True
|
|
|
|
@sio.event
|
|
async def disconnect(sid):
|
|
"""Handle client disconnect"""
|
|
user_id = session_to_user.get(sid)
|
|
if not user_id:
|
|
return
|
|
|
|
# Remove session
|
|
if user_id in connected_users:
|
|
connected_users[user_id].discard(sid)
|
|
if not connected_users[user_id]:
|
|
del connected_users[user_id]
|
|
# User fully offline
|
|
await sio.emit('user_offline', {'user_id': user_id})
|
|
|
|
del session_to_user[sid]
|
|
logger.info(f"User {user_id} disconnected: {sid}")
|
|
|
|
@sio.event
|
|
async def join_channel(sid, data):
|
|
"""Join a channel room"""
|
|
channel_id = data.get('channel_id')
|
|
if not channel_id:
|
|
return
|
|
|
|
await sio.enter_room(sid, f"channel_{channel_id}")
|
|
logger.info(f"Session {sid} joined channel {channel_id}")
|
|
|
|
@sio.event
|
|
async def leave_channel(sid, data):
|
|
"""Leave a channel room"""
|
|
channel_id = data.get('channel_id')
|
|
if not channel_id:
|
|
return
|
|
|
|
await sio.leave_room(sid, f"channel_{channel_id}")
|
|
logger.info(f"Session {sid} left channel {channel_id}")
|
|
|
|
@sio.event
|
|
async def typing_start(sid, data):
|
|
"""User started typing"""
|
|
channel_id = data.get('channel_id')
|
|
user_id = session_to_user.get(sid)
|
|
|
|
if channel_id and user_id:
|
|
await sio.emit(
|
|
'user_typing',
|
|
{'user_id': user_id, 'channel_id': channel_id},
|
|
room=f"channel_{channel_id}",
|
|
skip_sid=sid
|
|
)
|
|
|
|
@sio.event
|
|
async def typing_stop(sid, data):
|
|
"""User stopped typing"""
|
|
channel_id = data.get('channel_id')
|
|
user_id = session_to_user.get(sid)
|
|
|
|
if channel_id and user_id:
|
|
await sio.emit(
|
|
'user_stopped_typing',
|
|
{'user_id': user_id, 'channel_id': channel_id},
|
|
room=f"channel_{channel_id}",
|
|
skip_sid=sid
|
|
)
|
|
|
|
async def broadcast_new_message(channel_id: int, message_data: dict):
|
|
"""Broadcast new message to channel"""
|
|
await sio.emit(
|
|
'new_message',
|
|
message_data,
|
|
room=f"channel_{channel_id}"
|
|
)
|
|
|
|
async def broadcast_message_update(channel_id: int, message_data: dict):
|
|
"""Broadcast message update to channel"""
|
|
await sio.emit(
|
|
'message_updated',
|
|
message_data,
|
|
room=f"channel_{channel_id}"
|
|
)
|
|
|
|
async def send_dm_notification(user_id: int, message_data: dict):
|
|
"""Send DM notification to user"""
|
|
if user_id in connected_users:
|
|
for sid in connected_users[user_id]:
|
|
await sio.emit('new_dm', message_data, room=sid)
|
|
|
|
def get_connected_users() -> list:
|
|
"""Get list of online user IDs"""
|
|
return list(connected_users.keys())
|
|
|
|
def is_user_online(user_id: int) -> bool:
|
|
"""Check if user is online"""
|
|
return user_id in connected_users
|