Files
grimlock/backend/api/channels.py
JA a0fc800f57 Final fixes: Channel response model and deployment documentation
- Fixed create_channel to return proper ChannelResponse object
- Added DEPLOYMENT_STATUS.md with complete system status
- All core functionality working
- Ready for WebSocket debugging and end-to-end testing

Next: Fix WebSocket auth (403) and test full message flow
2026-02-14 04:47:32 +00:00

193 lines
5.1 KiB
Python

"""
Channels API - Channel management and operations
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional
from core.database import get_db
from core.models import Channel, User, ChannelType
from api.auth import get_current_user
router = APIRouter()
# Pydantic models
class ChannelCreate(BaseModel):
name: str
description: Optional[str] = None
type: ChannelType = ChannelType.PUBLIC
class ChannelResponse(BaseModel):
id: int
name: str
description: Optional[str]
type: ChannelType
member_count: int
created_at: str
class Config:
from_attributes = True
class ChannelListResponse(BaseModel):
id: int
name: str
description: Optional[str]
type: ChannelType
unread_count: int = 0 # TODO: Implement actual unread counting
class Config:
from_attributes = True
@router.post("/", response_model=ChannelResponse)
async def create_channel(
channel_data: ChannelCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Create a new channel"""
# Check if channel name already exists
existing = db.query(Channel).filter(Channel.name == channel_data.name).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Channel name already exists"
)
# Create channel
channel = Channel(
name=channel_data.name,
description=channel_data.description,
type=channel_data.type,
created_by=current_user.id
)
db.add(channel)
db.commit()
db.refresh(channel)
# Add creator as member
channel.members.append(current_user)
db.commit()
db.refresh(channel)
return ChannelResponse(
id=channel.id,
name=channel.name,
description=channel.description,
type=channel.type,
member_count=len(channel.members),
created_at=channel.created_at.isoformat()
)
@router.get("/", response_model=List[ChannelListResponse])
async def list_channels(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""List all channels user is a member of"""
channels = current_user.channels
return [
{
"id": c.id,
"name": c.name,
"description": c.description,
"type": c.type,
"unread_count": 0 # TODO: Implement
}
for c in channels
]
@router.get("/{channel_id}", response_model=ChannelResponse)
async def get_channel(
channel_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get channel details"""
channel = db.query(Channel).filter(Channel.id == channel_id).first()
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Channel not found"
)
# Check if user is member (for private channels)
if channel.type == ChannelType.PRIVATE:
if current_user not in channel.members:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a member of this channel"
)
return {
**channel.__dict__,
"member_count": len(channel.members),
"created_at": channel.created_at.isoformat()
}
@router.post("/{channel_id}/join")
async def join_channel(
channel_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Join a channel"""
channel = db.query(Channel).filter(Channel.id == channel_id).first()
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Channel not found"
)
# Can't join private channels (must be invited)
if channel.type == ChannelType.PRIVATE:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot join private channel"
)
# Check if already a member
if current_user in channel.members:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Already a member"
)
channel.members.append(current_user)
db.commit()
return {"message": f"Joined channel #{channel.name}"}
@router.post("/{channel_id}/leave")
async def leave_channel(
channel_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Leave a channel"""
channel = db.query(Channel).filter(Channel.id == channel_id).first()
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Channel not found"
)
if current_user not in channel.members:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Not a member of this channel"
)
channel.members.remove(current_user)
db.commit()
return {"message": f"Left channel #{channel.name}"}