""" Authentication API Endpoints """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from pydantic import BaseModel, EmailStr from typing import Optional from core.database import get_db from core.models import User, UserRole from core.auth import verify_password, get_password_hash, create_access_token, decode_token router = APIRouter() security = HTTPBearer() # Pydantic models class UserRegister(BaseModel): email: EmailStr name: str password: str role: UserRole = UserRole.ENGINEER class UserLogin(BaseModel): email: EmailStr password: str class Token(BaseModel): access_token: str token_type: str = "bearer" class UserResponse(BaseModel): id: int email: str name: str role: UserRole is_active: bool is_online: bool class Config: from_attributes = True # Dependency to get current user from token async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Get current authenticated user""" token = credentials.credentials payload = decode_token(token) if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials" ) user_id = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials" ) user = db.query(User).filter(User.id == int(user_id)).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) return user @router.post("/register", response_model=UserResponse) async def register(user_data: UserRegister, db: Session = Depends(get_db)): """Register a new user""" # Check if user exists existing_user = db.query(User).filter(User.email == user_data.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new user user = User( email=user_data.email, name=user_data.name, password_hash=get_password_hash(user_data.password), role=user_data.role ) db.add(user) db.commit() db.refresh(user) return user @router.post("/login", response_model=Token) async def login(credentials: UserLogin, db: Session = Depends(get_db)): """Login and get access token""" # Find user user = db.query(User).filter(User.email == credentials.email).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) # Verify password if not verify_password(credentials.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) # Create access token access_token = create_access_token(data={"sub": str(user.id)}) # Update user online status user.is_online = True db.commit() return {"access_token": access_token} @router.get("/me", response_model=UserResponse) async def get_me(current_user: User = Depends(get_current_user)): """Get current user info""" return current_user @router.post("/logout") async def logout( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """Logout user""" current_user.is_online = False db.commit() return {"message": "Logged out successfully"}