feat(taskflow): add core task API, storage persistence, csv export, stats page, and test coverage

This commit is contained in:
Alexander Kalinovsky
2026-04-01 17:56:03 +03:00
commit 19d659df6b
31 changed files with 4197 additions and 0 deletions

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@

216
tests/test_api_stats.py Normal file
View File

@@ -0,0 +1,216 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from fastapi.testclient import TestClient
from app.api.tasks import get_task_repository
from app.main import app
from app.storage import JsonFileTaskRepository, StoredTask
def make_repo(tmp_path) -> JsonFileTaskRepository:
return JsonFileTaskRepository(tmp_path / "tasks.json")
def make_task(
*,
title: str = "Task",
started: bool = False,
done: bool = False,
) -> StoredTask:
now = datetime.now(UTC)
started_at = now - timedelta(minutes=10) if started else None
done_at = now - timedelta(minutes=1) if done else None
return StoredTask(
id=uuid4(),
title=title,
created_at=now - timedelta(hours=1),
started_at=started_at,
done_at=done_at,
)
def make_client(tmp_path) -> tuple[TestClient, JsonFileTaskRepository]:
repo = make_repo(tmp_path)
app.dependency_overrides[get_task_repository] = lambda: repo
client = TestClient(app)
return client, repo
def teardown_function() -> None:
app.dependency_overrides.clear()
def test_stats_returns_html_page_for_empty_board(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/stats")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "TaskFlow Stats" in response.text
assert "No task selected" in response.text
assert "Backlog" in response.text
assert "In Progress" in response.text
assert "Done" in response.text
assert response.text.count("No tasks") == 3
def test_stats_renders_tasks_in_all_kanban_columns(tmp_path):
client, repo = make_client(tmp_path)
backlog = make_task(title="Backlog title", started=False, done=False)
in_progress = make_task(title="In progress title", started=True, done=False)
done = make_task(title="Done title", started=True, done=True)
repo.create_task(backlog)
repo.create_task(in_progress)
repo.create_task(done)
response = client.get("/stats")
assert response.status_code == 200
assert "Backlog title" in response.text
assert "In progress title" in response.text
assert "Done title" in response.text
assert "Backlog" in response.text
assert "In Progress" in response.text
assert "Done" in response.text
def test_stats_preselects_first_task_in_details_block(tmp_path):
client, repo = make_client(tmp_path)
first = make_task(title="First task", started=False, done=False)
second = make_task(title="Second task", started=True, done=False)
repo.create_task(first)
repo.create_task(second)
response = client.get("/stats")
assert response.status_code == 200
assert 'id="selected-title"' in response.text
assert 'id="selected-created-at"' in response.text
assert "First task" in response.text
assert first.created_at.isoformat() in response.text
def test_stats_shows_placeholders_for_missing_dates(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Backlog only", started=False, done=False)
repo.create_task(task)
response = client.get("/stats")
assert response.status_code == 200
assert 'id="selected-started-at"' in response.text
assert 'id="selected-done-at"' in response.text
assert 'id="selected-cycle-time"' in response.text
assert ">—<" in response.text
def test_stats_renders_cycle_time_for_completed_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Completed task", started=True, done=True)
repo.create_task(task)
response = client.get("/stats")
assert response.status_code == 200
assert 'id="selected-cycle-time"' in response.text
assert "0h 9m" in response.text
assert ">—<" not in response.text.split('id="selected-cycle-time"', 1)[1].split("</div>", 2)[1]
def test_stats_embeds_task_payload_for_client_side_switching(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Payload task", started=True, done=True)
repo.create_task(task)
response = client.get("/stats")
assert response.status_code == 200
assert "data-task='" in response.text
assert str(task.id) in response.text
assert '"title": "Payload task"' in response.text
assert '"status": "done"' in response.text
assert '"created_at":' in response.text
assert '"started_at":' in response.text
assert '"done_at":' in response.text
assert '"cycle_time":' in response.text
def test_stats_includes_js_hooks_for_dynamic_selected_task_update(tmp_path):
client, repo = make_client(tmp_path)
repo.create_task(make_task(title="Clickable"))
response = client.get("/stats")
assert response.status_code == 200
assert 'id="selected-title"' in response.text
assert 'id="selected-status"' in response.text
assert 'id="selected-started-at"' in response.text
assert 'id="selected-done-at"' in response.text
assert 'id="selected-cycle-time"' in response.text
assert 'id="selected-created-at"' in response.text
assert 'document.querySelectorAll(".task-card")' in response.text
assert 'card.addEventListener("click"' in response.text
assert "JSON.parse(card.dataset.task)" in response.text
def test_stats_handles_utf8_titles(tmp_path):
client, repo = make_client(tmp_path)
repo.create_task(make_task(title="Задача пример"))
response = client.get("/stats")
assert response.status_code == 200
assert "Задача пример" in response.text
def test_stats_escapes_html_in_task_title(tmp_path):
client, repo = make_client(tmp_path)
repo.create_task(make_task(title="<script>alert(1)</script>"))
response = client.get("/stats")
assert response.status_code == 200
assert "<script>alert(1)</script>" not in response.text
assert "&lt;script&gt;alert(1)&lt;/script&gt;" in response.text
def test_stats_rejects_unsupported_method(tmp_path):
client, _repo = make_client(tmp_path)
response = client.post("/stats")
assert response.status_code == 405
def test_stats_still_works_after_recovery_from_corrupted_file(tmp_path):
file_path = tmp_path / "tasks.json"
file_path.write_text("{broken", encoding="utf-8")
repo = JsonFileTaskRepository(file_path)
app.dependency_overrides[get_task_repository] = lambda: repo
client = TestClient(app)
response = client.get("/stats")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "TaskFlow Stats" in response.text
assert "No task selected" in response.text
def test_stats_renders_created_started_and_done_values_for_completed_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Detailed task", started=True, done=True)
repo.create_task(task)
response = client.get("/stats")
assert response.status_code == 200
assert task.created_at.isoformat() in response.text
assert task.started_at.isoformat() in response.text
assert task.done_at.isoformat() in response.text

502
tests/test_api_tasks.py Normal file
View File

@@ -0,0 +1,502 @@
from __future__ import annotations
import csv
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from fastapi.testclient import TestClient
from app.api.tasks import get_task_repository
from app.main import app
from app.storage import JsonFileTaskRepository, StoredTask
def make_repo(tmp_path) -> JsonFileTaskRepository:
return JsonFileTaskRepository(tmp_path / "tasks.json")
def make_task(
*,
title: str = "Task",
started: bool = False,
done: bool = False,
) -> StoredTask:
now = datetime.now(UTC)
started_at = now - timedelta(minutes=10) if started else None
done_at = now - timedelta(minutes=1) if done else None
return StoredTask(
id=uuid4(),
title=title,
created_at=now - timedelta(hours=1),
started_at=started_at,
done_at=done_at,
)
def make_client(tmp_path) -> tuple[TestClient, JsonFileTaskRepository]:
repo = make_repo(tmp_path)
app.dependency_overrides[get_task_repository] = lambda: repo
client = TestClient(app)
return client, repo
def teardown_function() -> None:
app.dependency_overrides.clear()
def test_list_tasks_returns_empty_list_by_default(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks")
assert response.status_code == 200
assert response.json() == {
"items": [],
"total": 0,
"limit": 100,
"offset": 0,
}
def test_list_tasks_filters_by_status_and_search(tmp_path):
client, repo = make_client(tmp_path)
backlog = make_task(title="Alpha backlog")
in_progress = make_task(title="Alpha in progress", started=True, done=False)
done = make_task(title="Beta done", started=True, done=True)
repo.create_task(backlog)
repo.create_task(in_progress)
repo.create_task(done)
response = client.get(
"/api/tasks",
params={"status": "in_progress", "search": "alpha"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert len(data["items"]) == 1
assert data["items"][0]["id"] == str(in_progress.id)
assert data["items"][0]["status"] == "in_progress"
def test_list_tasks_rejects_invalid_status_filter(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks", params={"status": "unknown"})
assert response.status_code == 400
assert response.json()["error"] == "invalid_payload"
def test_get_task_returns_existing_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Lookup task")
repo.create_task(task)
response = client.get(f"/api/tasks/{task.id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == str(task.id)
assert data["title"] == task.title
assert data["status"] == "backlog"
def test_get_task_returns_done_status_for_completed_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Completed", started=True, done=True)
repo.create_task(task)
response = client.get(f"/api/tasks/{task.id}")
assert response.status_code == 200
assert response.json()["status"] == "done"
def test_get_task_returns_invalid_id_for_bad_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks/bad-id")
assert response.status_code == 400
assert response.json() == {
"error": "invalid_id",
"message": "Task id must be a valid UUID",
}
def test_create_task_creates_new_backlog_task(tmp_path):
client, repo = make_client(tmp_path)
response = client.post("/api/tasks", json={"title": "Test task"})
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test task"
assert data["started_at"] is None
assert data["done_at"] is None
assert data["status"] == "backlog"
assert repo.get_task(uuid4()) is None
assert len(repo.list_tasks()) == 1
def test_create_task_trims_title(tmp_path):
client, repo = make_client(tmp_path)
response = client.post("/api/tasks", json={"title": " Trim me "})
assert response.status_code == 201
data = response.json()
assert data["title"] == "Trim me"
assert repo.list_tasks()[0].title == "Trim me"
def test_create_task_rejects_invalid_payload(tmp_path):
client, _repo = make_client(tmp_path)
response = client.post("/api/tasks", json={"title": ""})
assert response.status_code == 422
def test_update_task_updates_title(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Old title")
repo.create_task(task)
response = client.patch(
f"/api/tasks/{task.id}",
json={"title": "Updated title"},
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Updated title"
reloaded = repo.get_task(task.id)
assert reloaded is not None
assert reloaded.title == "Updated title"
assert reloaded.created_at == task.created_at
def test_update_task_keeps_existing_timestamps(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Workflow", started=True, done=True)
repo.create_task(task)
response = client.patch(
f"/api/tasks/{task.id}",
json={"title": "Workflow updated"},
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Workflow updated"
assert datetime.fromisoformat(data["started_at"]) == task.started_at
assert datetime.fromisoformat(data["done_at"]) == task.done_at
def test_update_task_returns_not_found_for_missing_task(tmp_path):
client, _repo = make_client(tmp_path)
response = client.patch(
f"/api/tasks/{uuid4()}",
json={"title": "Updated"},
)
assert response.status_code == 404
assert response.json()["error"] == "invalid_id"
def test_delete_task_removes_existing_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Delete me")
repo.create_task(task)
response = client.delete(f"/api/tasks/{task.id}")
assert response.status_code == 204
assert repo.get_task(task.id) is None
def test_delete_task_only_removes_target_task(tmp_path):
client, repo = make_client(tmp_path)
task_1 = make_task(title="Keep me")
task_2 = make_task(title="Delete me")
repo.create_task(task_1)
repo.create_task(task_2)
response = client.delete(f"/api/tasks/{task_2.id}")
assert response.status_code == 204
remaining = repo.list_tasks()
assert remaining == [task_1]
def test_delete_task_returns_invalid_id_for_bad_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.delete("/api/tasks/bad-id")
assert response.status_code == 400
assert response.json()["error"] == "invalid_id"
def test_start_task_transitions_backlog_to_in_progress(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Start me")
repo.create_task(task)
before_call = datetime.now(UTC)
response = client.post(f"/api/tasks/{task.id}/start")
after_call = datetime.now(UTC)
assert response.status_code == 200
data = response.json()
assert data["status"] == "in_progress"
assert data["done_at"] is None
started_at = datetime.fromisoformat(data["started_at"])
assert before_call <= started_at <= after_call
def test_start_task_is_idempotent(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Already started", started=True, done=False)
repo.create_task(task)
response = client.post(f"/api/tasks/{task.id}/start")
assert response.status_code == 200
data = response.json()
assert data["status"] == "in_progress"
assert datetime.fromisoformat(data["started_at"]) == task.started_at
def test_start_task_rejects_completed_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Completed", started=True, done=True)
repo.create_task(task)
response = client.post(f"/api/tasks/{task.id}/start")
assert response.status_code == 409
assert response.json()["error"] == "invalid_transaction"
def test_done_task_transitions_in_progress_to_done(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Complete me", started=True, done=False)
repo.create_task(task)
before_call = datetime.now(UTC)
response = client.post(f"/api/tasks/{task.id}/done")
after_call = datetime.now(UTC)
assert response.status_code == 200
data = response.json()
assert data["status"] == "done"
assert datetime.fromisoformat(data["started_at"]) == task.started_at
done_at = datetime.fromisoformat(data["done_at"])
assert before_call <= done_at <= after_call
assert done_at >= task.started_at
def test_done_task_is_idempotent_for_completed_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Already done", started=True, done=True)
repo.create_task(task)
response = client.post(f"/api/tasks/{task.id}/done")
assert response.status_code == 200
data = response.json()
assert data["status"] == "done"
assert datetime.fromisoformat(data["done_at"]) == task.done_at
def test_done_task_rejects_backlog_task(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Backlog")
repo.create_task(task)
response = client.post(f"/api/tasks/{task.id}/done")
assert response.status_code == 409
assert response.json()["error"] == "invalid_transaction"
def test_list_tasks_rejects_invalid_limit(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks", params={"limit": 0})
assert response.status_code == 422
def test_get_task_returns_not_found_for_missing_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get(f"/api/tasks/{uuid4()}")
assert response.status_code == 404
assert response.json()["error"] == "invalid_id"
def test_patch_task_rejects_invalid_payload(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Patch me")
repo.create_task(task)
response = client.patch(
f"/api/tasks/{task.id}",
json={"title": ""},
)
assert response.status_code == 422
def test_delete_task_returns_not_found_for_missing_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.delete(f"/api/tasks/{uuid4()}")
assert response.status_code == 404
assert response.json()["error"] == "invalid_id"
def test_start_task_returns_not_found_for_missing_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.post(f"/api/tasks/{uuid4()}/start")
assert response.status_code == 404
assert response.json()["error"] == "invalid_id"
def test_done_task_returns_not_found_for_missing_uuid(tmp_path):
client, _repo = make_client(tmp_path)
response = client.post(f"/api/tasks/{uuid4()}/done")
assert response.status_code == 404
assert response.json()["error"] == "invalid_id"
def test_export_tasks_csv_returns_header_only_for_empty_storage(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks/export")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/csv")
assert response.headers["content-disposition"] == 'attachment; filename="tasks.csv"'
assert response.text == "id,title,status,created_at,started_at,done_at\n"
def test_export_tasks_csv_returns_rows_for_multiple_statuses(tmp_path):
client, repo = make_client(tmp_path)
backlog = make_task(title="Backlog task", started=False, done=False)
in_progress = make_task(title="In progress task", started=True, done=False)
done = make_task(title="Done task", started=True, done=True)
repo.create_task(backlog)
repo.create_task(in_progress)
repo.create_task(done)
response = client.get("/api/tasks/export")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/csv")
rows = list(csv.DictReader(response.text.splitlines()))
assert len(rows) == 3
by_id = {row["id"]: row for row in rows}
assert by_id[str(backlog.id)]["title"] == "Backlog task"
assert by_id[str(backlog.id)]["status"] == "backlog"
assert by_id[str(backlog.id)]["started_at"] == ""
assert by_id[str(backlog.id)]["done_at"] == ""
assert by_id[str(in_progress.id)]["title"] == "In progress task"
assert by_id[str(in_progress.id)]["status"] == "in_progress"
assert datetime.fromisoformat(by_id[str(in_progress.id)]["created_at"]) == in_progress.created_at
assert datetime.fromisoformat(by_id[str(in_progress.id)]["started_at"]) == in_progress.started_at
assert by_id[str(in_progress.id)]["done_at"] == ""
assert by_id[str(done.id)]["title"] == "Done task"
assert by_id[str(done.id)]["status"] == "done"
assert datetime.fromisoformat(by_id[str(done.id)]["created_at"]) == done.created_at
assert datetime.fromisoformat(by_id[str(done.id)]["started_at"]) == done.started_at
assert datetime.fromisoformat(by_id[str(done.id)]["done_at"]) == done.done_at
def test_export_tasks_csv_protects_against_csv_injection(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="=2+2")
repo.create_task(task)
response = client.get("/api/tasks/export")
assert response.status_code == 200
rows = list(csv.DictReader(response.text.splitlines()))
assert len(rows) == 1
assert rows[0]["title"] == "'=2+2"
def test_export_tasks_csv_preserves_utf8_titles(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Задача пример")
repo.create_task(task)
response = client.get("/api/tasks/export")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/csv")
assert "Задача пример" in response.text
def test_export_tasks_csv_uses_empty_strings_for_absent_timestamps(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="No timestamps", started=False, done=False)
repo.create_task(task)
response = client.get("/api/tasks/export")
assert response.status_code == 200
rows = list(csv.DictReader(response.text.splitlines()))
assert len(rows) == 1
assert rows[0]["started_at"] == ""
assert rows[0]["done_at"] == ""
def test_export_tasks_csv_is_stable_across_repeated_calls(tmp_path):
client, repo = make_client(tmp_path)
task = make_task(title="Stable export", started=True, done=True)
repo.create_task(task)
response_1 = client.get("/api/tasks/export")
response_2 = client.get("/api/tasks/export")
assert response_1.status_code == 200
assert response_2.status_code == 200
assert response_1.headers["content-type"].startswith("text/csv")
assert response_2.headers["content-type"].startswith("text/csv")
assert response_1.text == response_2.text
def test_export_tasks_csv_rejects_unsupported_method(tmp_path):
client, _repo = make_client(tmp_path)
response = client.post("/api/tasks/export")
assert response.status_code == 405
def test_export_route_is_not_shadowed_by_task_id_route(tmp_path):
client, _repo = make_client(tmp_path)
response = client.get("/api/tasks/export")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/csv")

View File

@@ -0,0 +1,96 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import uuid4
import pytest
from app.domain import InvalidTransitionError, complete_task, start_task
from app.storage import StoredTask
def make_task(
*,
title: str = "Task",
started_at: datetime | None = None,
done_at: datetime | None = None,
) -> StoredTask:
return StoredTask(
id=uuid4(),
title=title,
created_at=datetime.now(UTC),
started_at=started_at,
done_at=done_at,
)
def test_start_task_from_backlog_sets_started_at():
task = make_task(started_at=None, done_at=None)
before_call = datetime.now(UTC)
result = start_task(task)
after_call = datetime.now(UTC)
assert result.id == task.id
assert result.title == task.title
assert result.created_at == task.created_at
assert result.done_at is None
assert result.started_at is not None
assert before_call <= result.started_at <= after_call
def test_start_task_is_idempotent_for_already_started_task():
started_at = datetime.now(UTC) - timedelta(minutes=5)
task = make_task(started_at=started_at, done_at=None)
result = start_task(task)
assert result == task
assert result.started_at == started_at
def test_start_task_raises_for_completed_task():
started_at = datetime.now(UTC) - timedelta(minutes=10)
done_at = datetime.now(UTC) - timedelta(minutes=1)
task = make_task(started_at=started_at, done_at=done_at)
with pytest.raises(InvalidTransitionError, match="invalid_transaction"):
start_task(task)
def test_complete_task_from_in_progress_sets_done_at():
started_at = datetime.now(UTC) - timedelta(minutes=10)
task = make_task(started_at=started_at, done_at=None)
before_call = datetime.now(UTC)
result = complete_task(task)
after_call = datetime.now(UTC)
assert result.id == task.id
assert result.title == task.title
assert result.created_at == task.created_at
assert result.started_at == started_at
assert result.done_at is not None
assert result.started_at <= result.done_at
assert before_call <= result.done_at <= after_call
def test_complete_task_is_idempotent_for_already_completed_task():
started_at = datetime.now(UTC) - timedelta(minutes=10)
done_at = datetime.now(UTC) - timedelta(minutes=1)
task = make_task(started_at=started_at, done_at=done_at)
result = complete_task(task)
assert result == task
assert result.done_at == done_at
def test_complete_task_raises_for_backlog_task():
task = make_task(started_at=None, done_at=None)
with pytest.raises(InvalidTransitionError, match="invalid_transaction"):
complete_task(task)

45
tests/test_health.py Normal file
View File

@@ -0,0 +1,45 @@
from datetime import datetime
import pytest
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_health_ok():
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert "server_time" in data
# Validate ISO-8601 format (basic check via fromisoformat)
parsed = datetime.fromisoformat(data["server_time"])
assert isinstance(parsed, datetime)
def test_health_idempotent_time_increases():
response1 = client.get("/health")
response2 = client.get("/health")
assert response1.status_code == 200
assert response2.status_code == 200
data1 = response1.json()
data2 = response2.json()
t1 = datetime.fromisoformat(data1["server_time"])
t2 = datetime.fromisoformat(data2["server_time"])
assert t2 >= t1
def test_health_method_not_allowed():
response = client.post("/health")
assert response.status_code == 405

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from uuid import uuid4
import pytest
from app.storage import DATA_FORMAT_VERSION, StoredTask
from app.storage.repository import JsonFileTaskRepository
def make_task(title: str = "Task") -> StoredTask:
return StoredTask(
id=uuid4(),
title=title,
created_at=datetime.now(UTC),
started_at=None,
done_at=None,
)
def test_interrupted_save_before_rename_keeps_original_data_intact(tmp_path, monkeypatch):
file_path = tmp_path / "tasks.json"
repo = JsonFileTaskRepository(file_path)
original_task = make_task("Original")
repo.create_task(original_task)
replacement_called = False
real_replace = Path.replace
def failing_replace(self: Path, target: Path):
nonlocal replacement_called
replacement_called = True
raise RuntimeError("simulated crash before rename")
monkeypatch.setattr(Path, "replace", failing_replace)
with pytest.raises(RuntimeError, match="simulated crash before rename"):
repo.create_task(make_task("New task"))
assert replacement_called is True
reloaded_repo = JsonFileTaskRepository(file_path)
assert reloaded_repo.list_tasks() == [original_task]
tmp_file = tmp_path / "tasks.json.tmp"
assert tmp_file.exists()
monkeypatch.setattr(Path, "replace", real_replace)
def test_normal_save_replaces_file_with_complete_payload(tmp_path):
file_path = tmp_path / "tasks.json"
repo = JsonFileTaskRepository(file_path)
task_1 = make_task("One")
task_2 = make_task("Two")
repo.create_task(task_1)
repo.create_task(task_2)
payload = json.loads(file_path.read_text(encoding="utf-8"))
assert payload["version"] == DATA_FORMAT_VERSION
assert len(payload["tasks"]) == 2
assert {item["id"] for item in payload["tasks"]} == {str(task_1.id), str(task_2.id)}
def test_stale_tmp_file_does_not_break_startup(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
task = make_task("Persisted")
repo_1.create_task(task)
stale_tmp = tmp_path / "tasks.json.tmp"
stale_tmp.write_text('{"version": 999, "tasks": "broken"}', encoding="utf-8")
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.list_tasks() == [task]
assert stale_tmp.exists()

View File

@@ -0,0 +1,205 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import uuid4
import pytest
from pydantic import ValidationError
from app.storage import StoredTask
from app.storage.repository import JsonFileTaskRepository, StorageTaskNotFoundError
def make_task(
*,
title: str = "Task",
started: bool = False,
done: bool = False,
) -> StoredTask:
now = datetime.now(UTC)
started_at = now + timedelta(seconds=1) if started else None
done_at = now + timedelta(seconds=2) if done else None
return StoredTask(
id=uuid4(),
title=title,
created_at=now,
started_at=started_at,
done_at=done_at,
)
def make_repo(tmp_path) -> JsonFileTaskRepository:
return JsonFileTaskRepository(tmp_path / "tasks.json")
def test_create_task_in_empty_storage(tmp_path):
repo = make_repo(tmp_path)
task = make_task(title="First task")
created = repo.create_task(task)
tasks = repo.list_tasks()
assert created == task
assert len(tasks) == 1
assert tasks[0] == task
def test_create_task_sequentially_preserves_insertion_order(tmp_path):
repo = make_repo(tmp_path)
task_1 = make_task(title="First")
task_2 = make_task(title="Second")
repo.create_task(task_1)
repo.create_task(task_2)
tasks = repo.list_tasks()
assert [task.id for task in tasks] == [task_1.id, task_2.id]
def test_create_task_invalid_model_rejected_before_repository_call(tmp_path):
repo = make_repo(tmp_path)
with pytest.raises(ValidationError):
StoredTask(
id=uuid4(),
title="",
created_at=datetime.now(UTC),
)
assert repo.list_tasks() == []
def test_list_tasks_returns_empty_list_for_fresh_storage(tmp_path):
repo = make_repo(tmp_path)
assert repo.list_tasks() == []
def test_list_tasks_returns_persisted_tasks(tmp_path):
repo = make_repo(tmp_path)
task_1 = make_task(title="One")
task_2 = make_task(title="Two")
repo.create_task(task_1)
repo.create_task(task_2)
tasks = repo.list_tasks()
assert tasks == [task_1, task_2]
def test_get_task_returns_existing_task(tmp_path):
repo = make_repo(tmp_path)
task = make_task(title="Lookup me")
repo.create_task(task)
found = repo.get_task(task.id)
assert found == task
def test_get_task_returns_none_for_missing_id(tmp_path):
repo = make_repo(tmp_path)
repo.create_task(make_task(title="Other task"))
found = repo.get_task(uuid4())
assert found is None
def test_get_task_with_wrong_runtime_type_returns_none_and_does_not_change_storage(tmp_path):
repo = make_repo(tmp_path)
task = make_task(title="Type safety")
repo.create_task(task)
found = repo.get_task("not-a-uuid") # type: ignore[arg-type]
assert found is None
assert repo.list_tasks() == [task]
def test_update_task_replaces_existing_fields(tmp_path):
repo = make_repo(tmp_path)
original = make_task(title="Old title")
repo.create_task(original)
updated = StoredTask(
id=original.id,
title="New title",
created_at=original.created_at,
started_at=original.started_at,
done_at=original.done_at,
)
result = repo.update_task(updated)
assert result == updated
assert repo.get_task(original.id) == updated
def test_update_task_persists_transition_related_fields(tmp_path):
repo = make_repo(tmp_path)
original = make_task(title="Workflow")
repo.create_task(original)
now = datetime.now(UTC)
updated = StoredTask(
id=original.id,
title=original.title,
created_at=original.created_at,
started_at=now,
done_at=now + timedelta(minutes=5),
)
repo.update_task(updated)
reloaded = repo.get_task(original.id)
assert reloaded is not None
assert reloaded.started_at == updated.started_at
assert reloaded.done_at == updated.done_at
def test_update_task_raises_for_missing_task(tmp_path):
repo = make_repo(tmp_path)
missing = make_task(title="Missing")
with pytest.raises(StorageTaskNotFoundError):
repo.update_task(missing)
assert repo.list_tasks() == []
def test_delete_task_removes_existing_task(tmp_path):
repo = make_repo(tmp_path)
task = make_task(title="Delete me")
repo.create_task(task)
deleted = repo.delete_task(task.id)
assert deleted is True
assert repo.list_tasks() == []
def test_delete_task_only_affects_requested_task(tmp_path):
repo = make_repo(tmp_path)
task_1 = make_task(title="Keep me")
task_2 = make_task(title="Delete me")
repo.create_task(task_1)
repo.create_task(task_2)
deleted = repo.delete_task(task_2.id)
assert deleted is True
assert repo.list_tasks() == [task_1]
def test_delete_task_returns_false_for_missing_task(tmp_path):
repo = make_repo(tmp_path)
task = make_task(title="Existing")
repo.create_task(task)
deleted = repo.delete_task(uuid4())
assert deleted is False
assert repo.list_tasks() == [task]

View File

@@ -0,0 +1,146 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from uuid import uuid4
from app.storage import DATA_FORMAT_VERSION, StoredTask
from app.storage.repository import JsonFileTaskRepository
def make_task(
*,
title: str = "Task",
started: bool = False,
done: bool = False,
) -> StoredTask:
now = datetime.now(UTC)
started_at = now + timedelta(seconds=1) if started else None
done_at = now + timedelta(seconds=2) if done else None
return StoredTask(
id=uuid4(),
title=title,
created_at=now,
started_at=started_at,
done_at=done_at,
)
def test_replace_all_replaces_empty_state_with_provided_tasks(tmp_path):
repo = JsonFileTaskRepository(tmp_path / "tasks.json")
task_1 = make_task(title="One")
task_2 = make_task(title="Two")
repo.replace_all([task_1, task_2])
assert repo.list_tasks() == [task_1, task_2]
def test_replace_all_fully_overwrites_previous_content(tmp_path):
repo = JsonFileTaskRepository(tmp_path / "tasks.json")
old_task = make_task(title="Old")
new_task = make_task(title="New")
repo.create_task(old_task)
repo.replace_all([new_task])
assert repo.list_tasks() == [new_task]
def test_data_format_version_matches_constant(tmp_path):
repo = JsonFileTaskRepository(tmp_path / "tasks.json")
assert repo.data_format_version == DATA_FORMAT_VERSION
def test_data_format_version_is_stable_across_instances(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
repo_2 = JsonFileTaskRepository(file_path)
assert repo_1.data_format_version == repo_2.data_format_version == DATA_FORMAT_VERSION
def test_missing_file_is_created_automatically(tmp_path):
file_path = tmp_path / "nested" / "tasks.json"
repo = JsonFileTaskRepository(file_path)
assert repo.list_tasks() == []
assert file_path.exists()
def test_existing_valid_file_is_loaded_on_new_instance(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
task = make_task(title="Persisted")
repo_1.create_task(task)
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.list_tasks() == [task]
def test_parent_directory_is_created_if_missing(tmp_path):
file_path = tmp_path / "deep" / "nested" / "data" / "tasks.json"
repo = JsonFileTaskRepository(file_path)
assert file_path.parent.exists()
assert repo.list_tasks() == []
def test_current_version_payload_loads_as_is(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
task = make_task(title="Versioned")
repo_1.create_task(task)
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.data_format_version == DATA_FORMAT_VERSION
assert repo_2.list_tasks() == [task]
def test_empty_current_version_payload_loads_without_backup(tmp_path):
file_path = tmp_path / "tasks.json"
repo = JsonFileTaskRepository(file_path)
assert repo.list_tasks() == []
assert not list(tmp_path.glob("tasks.json.corrupted*"))
def test_persistence_between_repository_instances(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
task = make_task(title="Cross-session")
repo_1.create_task(task)
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.get_task(task.id) == task
def test_multiple_operations_survive_restart(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
task_1 = make_task(title="First")
task_2 = make_task(title="Second")
repo_1.create_task(task_1)
repo_1.create_task(task_2)
repo_1.delete_task(task_1.id)
updated_task_2 = StoredTask(
id=task_2.id,
title="Second updated",
created_at=task_2.created_at,
started_at=datetime.now(UTC),
done_at=None,
)
repo_1.update_task(updated_task_2)
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.list_tasks() == [updated_task_2]

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from uuid import uuid4
from app.storage import DATA_FORMAT_VERSION, StoredTask
from app.storage.repository import JsonFileTaskRepository
def make_task(title: str = "Task") -> StoredTask:
return StoredTask(
id=uuid4(),
title=title,
created_at=datetime.now(UTC),
started_at=None,
done_at=None,
)
def test_corrupted_json_triggers_backup_and_reset(tmp_path):
file_path = tmp_path / "tasks.json"
file_path.write_text('{"version": 1, "tasks": [', encoding="utf-8")
repo = JsonFileTaskRepository(file_path)
assert repo.list_tasks() == []
backups = list(tmp_path.glob("tasks.json.corrupted*"))
assert len(backups) == 1
assert backups[0].read_text(encoding="utf-8") == '{"version": 1, "tasks": ['
restored = json.loads(file_path.read_text(encoding="utf-8"))
assert restored["version"] == DATA_FORMAT_VERSION
assert restored["tasks"] == []
def test_invalid_schema_triggers_backup_and_reset(tmp_path):
file_path = tmp_path / "tasks.json"
file_path.write_text(
json.dumps(
{
"version": 1,
"tasks": [
{
"id": str(uuid4()),
"created_at": datetime.now(UTC).isoformat(),
},
],
},
),
encoding="utf-8",
)
repo = JsonFileTaskRepository(file_path)
assert repo.list_tasks() == []
backups = list(tmp_path.glob("tasks.json.corrupted*"))
assert len(backups) == 1
def test_repository_remains_writable_after_recovery(tmp_path):
file_path = tmp_path / "tasks.json"
file_path.write_text("not valid json", encoding="utf-8")
repo = JsonFileTaskRepository(file_path)
task = make_task("Recovered")
repo.create_task(task)
assert repo.list_tasks() == [task]
def test_unsupported_version_creates_backup_and_resets_state(tmp_path):
file_path = tmp_path / "tasks.json"
file_path.write_text(
json.dumps({"version": 999, "tasks": []}),
encoding="utf-8",
)
repo = JsonFileTaskRepository(file_path)
assert repo.list_tasks() == []
backups = list(tmp_path.glob("tasks.json.corrupted*"))
assert len(backups) == 1
restored = json.loads(file_path.read_text(encoding="utf-8"))
assert restored["version"] == DATA_FORMAT_VERSION
assert restored["tasks"] == []
def test_restart_after_corrupted_persisted_state_does_not_crash(tmp_path):
file_path = tmp_path / "tasks.json"
repo_1 = JsonFileTaskRepository(file_path)
repo_1.create_task(make_task("Initial"))
file_path.write_text("{broken", encoding="utf-8")
repo_2 = JsonFileTaskRepository(file_path)
assert repo_2.list_tasks() == []
assert len(list(tmp_path.glob("tasks.json.corrupted*"))) == 1