Coverage for node / src / stigmem_node / routes / federation / peers.py: 71%
22 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-25 01:49 +0000
1"""Federation peer registration and listing routes."""
3from __future__ import annotations
5import json
6from typing import Annotated, Any
8from fastapi import BackgroundTasks, Depends, HTTPException, status
10from ...auth import Identity, resolve_identity
11from ...db import db
12from ...models.federation import (
13 PeerApprovalRequest,
14 PeerApprovalResponse,
15 PeerRegisterRequest,
16 PeerRegisterResponse,
17)
18from .._federation_impl import approve_peer_impl, register_peer_impl
19from .common import router
22@router.post(
23 "/v1/federation/peers",
24 response_model=PeerRegisterResponse,
25 status_code=status.HTTP_201_CREATED,
26)
27async def register_peer(
28 req: PeerRegisterRequest,
29 background_tasks: BackgroundTasks,
30 identity: Annotated[Identity, Depends(resolve_identity)],
31) -> PeerRegisterResponse:
32 """Register a peer.
34 Fetches its well-known doc and verifies declaration_sig
35 (Spec-05-Federation-Trust).
36 """
37 # Implementation lives in _federation_impl.register_peer_impl.
38 return await register_peer_impl(req, background_tasks, identity)
41@router.post(
42 "/v1/federation/peers/{peer_id}/approve",
43 response_model=PeerApprovalResponse,
44)
45def approve_peer(
46 peer_id: str,
47 req: PeerApprovalRequest,
48 background_tasks: BackgroundTasks,
49 identity: Annotated[Identity, Depends(resolve_identity)],
50) -> PeerApprovalResponse:
51 """Approve a pending peer after out-of-band public-key confirmation."""
52 return approve_peer_impl(peer_id, req.pubkey_fingerprint, background_tasks, identity)
55# ---------------------------------------------------------------------------
56# GET /v1/federation/peers — list peers (§5.7)
57# ---------------------------------------------------------------------------
60@router.get("/v1/federation/peers")
61def list_peers(
62 identity: Annotated[Identity, Depends(resolve_identity)],
63) -> dict[str, Any]:
64 if not identity.can_federate():
65 raise HTTPException(status_code=403, detail="federate permission required")
66 with db() as conn:
67 rows = conn.execute(
68 "SELECT id, node_id, node_url, status, allowed_scopes, established_at FROM peers"
69 ).fetchall()
70 return {
71 "peers": [
72 {
73 "peer_id": r["id"],
74 "node_id": r["node_id"],
75 "node_url": r["node_url"],
76 "status": r["status"],
77 "allowed_scopes": json.loads(r["allowed_scopes"]),
78 "established_at": r["established_at"],
79 }
80 for r in rows
81 ]
82 }