Files
task_flow/app/api/tasks.py

339 lines
8.8 KiB
Python

from __future__ import annotations
import csv
from io import StringIO
from datetime import UTC, datetime
from functools import lru_cache
from uuid import UUID, uuid4
from fastapi import APIRouter, Body, Depends, Path, Query, Response, status
from fastapi.responses import JSONResponse
from app.api.dto.tasks import (
ErrorDTO,
TaskCreateDTO,
TaskDTO,
TaskListDTO,
TaskListItemDTO,
TaskQueryDTO,
TaskUpdateDTO,
)
from app.domain import InvalidTransitionError, complete_task, start_task
from app.storage import JsonFileTaskRepository, StoredTask, create_task_repository
router = APIRouter(prefix="/api/tasks", tags=["tasks"])
@lru_cache(maxsize=1)
def get_task_repository() -> JsonFileTaskRepository:
return create_task_repository()
def error_response(
*,
status_code: int,
error: str,
message: str,
) -> JSONResponse:
payload = ErrorDTO(error=error, message=message)
return JSONResponse(
status_code=status_code,
content=payload.model_dump(mode="json"),
)
def parse_task_id(task_id: str) -> UUID | None:
try:
return UUID(task_id)
except (TypeError, ValueError):
return None
def to_task_dto(task: StoredTask) -> TaskDTO:
return TaskDTO.model_validate(task.model_dump(mode="python"))
def to_task_list_item_dto(task: StoredTask) -> TaskListItemDTO:
return TaskListItemDTO.model_validate(task.model_dump(mode="python"))
def protect_csv_cell(value: str) -> str:
stripped = value.lstrip()
if stripped.startswith(("=", "+", "-", "@")):
return f"'{value}"
return value
def build_tasks_csv(tasks: list[StoredTask]) -> str:
buffer = StringIO()
writer = csv.writer(buffer, lineterminator="\n")
writer.writerow(["id", "title", "status", "created_at", "started_at", "done_at"])
for task in tasks:
dto = to_task_list_item_dto(task)
writer.writerow(
[
str(task.id),
protect_csv_cell(task.title),
dto.status,
task.created_at.isoformat(),
task.started_at.isoformat() if task.started_at is not None else "",
task.done_at.isoformat() if task.done_at is not None else "",
],
)
return buffer.getvalue()
def get_task_or_error(
*,
repo: JsonFileTaskRepository,
task_id: str,
) -> tuple[StoredTask | None, JSONResponse | None]:
parsed_task_id = parse_task_id(task_id)
if parsed_task_id is None:
return None, error_response(
status_code=status.HTTP_400_BAD_REQUEST,
error="invalid_id",
message="Task id must be a valid UUID",
)
task = repo.get_task(parsed_task_id)
if task is None:
return None, error_response(
status_code=status.HTTP_404_NOT_FOUND,
error="invalid_id",
message="Task was not found",
)
return task, None
@router.get(
"",
response_model=TaskListDTO,
responses={
400: {"model": ErrorDTO},
},
)
def list_tasks(
limit: int = Query(default=100, ge=1, le=1000),
offset: int = Query(default=0, ge=0),
status_filter: str | None = Query(default=None, alias="status"),
search: str | None = Query(default=None, max_length=100),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskListDTO | JSONResponse:
try:
query = TaskQueryDTO(
limit=limit,
offset=offset,
status=status_filter,
search=search,
)
except Exception:
return error_response(
status_code=status.HTTP_400_BAD_REQUEST,
error="invalid_payload",
message="Invalid query parameters",
)
tasks = repo.list_tasks()
if query.status is not None:
tasks = [
task for task in tasks if to_task_list_item_dto(task).status == query.status
]
if query.search is not None:
needle = query.search.casefold()
tasks = [task for task in tasks if needle in task.title.casefold()]
total = len(tasks)
items = tasks[query.offset : query.offset + query.limit]
return TaskListDTO(
items=[to_task_list_item_dto(task) for task in items],
total=total,
limit=query.limit,
offset=query.offset,
)
@router.get("/export")
def export_tasks_csv(
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> Response:
csv_content = build_tasks_csv(repo.list_tasks())
return Response(
content=csv_content,
media_type="text/csv",
headers={
"Content-Disposition": 'attachment; filename="tasks.csv"',
},
)
@router.get(
"/{task_id}",
response_model=TaskDTO,
responses={
400: {"model": ErrorDTO},
404: {"model": ErrorDTO},
},
)
def get_task(
task_id: str = Path(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskDTO | JSONResponse:
task, error = get_task_or_error(repo=repo, task_id=task_id)
if error is not None:
return error
return to_task_dto(task)
@router.post(
"",
response_model=TaskDTO,
status_code=status.HTTP_201_CREATED,
responses={
400: {"model": ErrorDTO},
},
)
def create_task(
payload: TaskCreateDTO = Body(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskDTO | JSONResponse:
now = datetime.now(UTC)
task = StoredTask(
id=uuid4(),
title=payload.title,
created_at=now,
started_at=None,
done_at=None,
)
created_task = repo.create_task(task)
return to_task_dto(created_task)
@router.patch(
"/{task_id}",
response_model=TaskDTO,
responses={
400: {"model": ErrorDTO},
404: {"model": ErrorDTO},
},
)
def update_task(
task_id: str = Path(...),
payload: TaskUpdateDTO = Body(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskDTO | JSONResponse:
existing_task, error = get_task_or_error(repo=repo, task_id=task_id)
if error is not None:
return error
updated_task = StoredTask(
id=existing_task.id,
title=payload.title if payload.title is not None else existing_task.title,
created_at=existing_task.created_at,
started_at=existing_task.started_at,
done_at=existing_task.done_at,
)
repo.update_task(updated_task)
return to_task_dto(updated_task)
@router.delete(
"/{task_id}",
status_code=status.HTTP_204_NO_CONTENT,
response_class=Response,
responses={
400: {"model": ErrorDTO},
404: {"model": ErrorDTO},
},
)
def delete_task(
task_id: str = Path(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> Response:
parsed_task_id = parse_task_id(task_id)
if parsed_task_id is None:
return error_response(
status_code=status.HTTP_400_BAD_REQUEST,
error="invalid_id",
message="Task id must be a valid UUID",
)
deleted = repo.delete_task(parsed_task_id)
if not deleted:
return error_response(
status_code=status.HTTP_404_NOT_FOUND,
error="invalid_id",
message="Task was not found",
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.post(
"/{task_id}/start",
response_model=TaskDTO,
responses={
400: {"model": ErrorDTO},
404: {"model": ErrorDTO},
409: {"model": ErrorDTO},
},
)
def start_task_endpoint(
task_id: str = Path(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskDTO | JSONResponse:
task, error = get_task_or_error(repo=repo, task_id=task_id)
if error is not None:
return error
try:
updated_task = start_task(task)
except InvalidTransitionError as exc:
return error_response(
status_code=status.HTTP_409_CONFLICT,
error="invalid_transaction",
message=str(exc),
)
repo.update_task(updated_task)
return to_task_dto(updated_task)
@router.post(
"/{task_id}/done",
response_model=TaskDTO,
responses={
400: {"model": ErrorDTO},
404: {"model": ErrorDTO},
409: {"model": ErrorDTO},
},
)
def complete_task_endpoint(
task_id: str = Path(...),
repo: JsonFileTaskRepository = Depends(get_task_repository),
) -> TaskDTO | JSONResponse:
task, error = get_task_or_error(repo=repo, task_id=task_id)
if error is not None:
return error
try:
updated_task = complete_task(task)
except InvalidTransitionError as exc:
return error_response(
status_code=status.HTTP_409_CONFLICT,
error="invalid_transaction",
message=str(exc),
)
repo.update_task(updated_task)
return to_task_dto(updated_task)