refactor(api): centralize neo client lifecycle in skills route

extract a shared `_with_neo_client` wrapper to handle neo client
setup, teardown, and error responses in one place.

reduce duplicated try/except and `BayClient` context boilerplate across
neo skills endpoints while preserving existing request validation and
response payloads.
This commit is contained in:
RC-CHN
2026-02-17 17:06:11 +08:00
parent cf9a7235f7
commit b48919246d
+183 -231
View File
@@ -1,6 +1,6 @@
import os
import traceback
from typing import Any
from typing import Any, Awaitable, Callable
from quart import request
@@ -68,6 +68,24 @@ class SkillsRoute(Route):
)
return endpoint, access_token
async def _with_neo_client(
self,
operation: Callable[[Any], Awaitable[dict]],
) -> dict:
try:
endpoint, access_token = self._get_neo_client_config()
from shipyard_neo import BayClient
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
return await operation(client)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
async def get_skills(self):
try:
provider_settings = self.core_lifecycle.astrbot_config.get(
@@ -178,85 +196,61 @@ class SkillsRoute(Route):
return Response().error(str(e)).__dict__
async def get_neo_candidates(self):
try:
logger.info("[Neo] GET /skills/neo/candidates requested.")
endpoint, access_token = self._get_neo_client_config()
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
logger.info("[Neo] GET /skills/neo/candidates requested.")
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
from shipyard_neo import BayClient
async def _do(client):
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
result = _to_jsonable(candidates)
total = result.get("total", "?") if isinstance(result, dict) else "?"
logger.info(f"[Neo] Candidates fetched: total={total}")
return Response().ok(result).__dict__
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
result = _to_jsonable(candidates)
total = result.get("total", "?") if isinstance(result, dict) else "?"
logger.info(f"[Neo] Candidates fetched: total={total}")
return Response().ok(result).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)
async def get_neo_releases(self):
try:
logger.info("[Neo] GET /skills/neo/releases requested.")
endpoint, access_token = self._get_neo_client_config()
skill_key = request.args.get("skill_key")
stage = request.args.get("stage")
active_only = _to_bool(request.args.get("active_only"), False)
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
logger.info("[Neo] GET /skills/neo/releases requested.")
skill_key = request.args.get("skill_key")
stage = request.args.get("stage")
active_only = _to_bool(request.args.get("active_only"), False)
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
from shipyard_neo import BayClient
async def _do(client):
releases = await client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
)
result = _to_jsonable(releases)
total = result.get("total", "?") if isinstance(result, dict) else "?"
logger.info(f"[Neo] Releases fetched: total={total}")
return Response().ok(result).__dict__
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
releases = await client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
)
result = _to_jsonable(releases)
total = result.get("total", "?") if isinstance(result, dict) else "?"
logger.info(f"[Neo] Releases fetched: total={total}")
return Response().ok(result).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)
async def get_neo_payload(self):
try:
logger.info("[Neo] GET /skills/neo/payload requested.")
endpoint, access_token = self._get_neo_client_config()
payload_ref = request.args.get("payload_ref", "")
if not payload_ref:
return Response().error("Missing payload_ref").__dict__
logger.info("[Neo] GET /skills/neo/payload requested.")
payload_ref = request.args.get("payload_ref", "")
if not payload_ref:
return Response().error("Missing payload_ref").__dict__
from shipyard_neo import BayClient
async def _do(client):
payload = await client.skills.get_payload(payload_ref)
logger.info(f"[Neo] Payload fetched: ref={payload_ref}")
return Response().ok(_to_jsonable(payload)).__dict__
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
payload = await client.skills.get_payload(payload_ref)
logger.info(f"[Neo] Payload fetched: ref={payload_ref}")
return Response().ok(_to_jsonable(payload)).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)
async def evaluate_neo_candidate(self):
if DEMO_MODE:
@@ -265,36 +259,26 @@ class SkillsRoute(Route):
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
try:
logger.info("[Neo] POST /skills/neo/evaluate requested.")
endpoint, access_token = self._get_neo_client_config()
data = await request.get_json()
candidate_id = data.get("candidate_id")
passed_value = data.get("passed")
if not candidate_id or passed_value is None:
return Response().error("Missing candidate_id or passed").__dict__
passed = _to_bool(passed_value, False)
logger.info("[Neo] POST /skills/neo/evaluate requested.")
data = await request.get_json()
candidate_id = data.get("candidate_id")
passed_value = data.get("passed")
if not candidate_id or passed_value is None:
return Response().error("Missing candidate_id or passed").__dict__
passed = _to_bool(passed_value, False)
from shipyard_neo import BayClient
async def _do(client):
result = await client.skills.evaluate_candidate(
candidate_id,
passed=passed,
score=data.get("score"),
benchmark_id=data.get("benchmark_id"),
report=data.get("report"),
)
logger.info(f"[Neo] Candidate evaluated: id={candidate_id}, passed={passed}")
return Response().ok(_to_jsonable(result)).__dict__
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
result = await client.skills.evaluate_candidate(
candidate_id,
passed=passed,
score=data.get("score"),
benchmark_id=data.get("benchmark_id"),
report=data.get("report"),
)
logger.info(
f"[Neo] Candidate evaluated: id={candidate_id}, passed={passed}"
)
return Response().ok(_to_jsonable(result)).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)
async def promote_neo_candidate(self):
if DEMO_MODE:
@@ -303,82 +287,66 @@ class SkillsRoute(Route):
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
try:
logger.info("[Neo] POST /skills/neo/promote requested.")
endpoint, access_token = self._get_neo_client_config()
data = await request.get_json()
candidate_id = data.get("candidate_id")
stage = data.get("stage", "canary")
sync_to_local = _to_bool(data.get("sync_to_local"), True)
if not candidate_id:
return Response().error("Missing candidate_id").__dict__
if stage not in {"canary", "stable"}:
return Response().error("Invalid stage, must be canary/stable").__dict__
logger.info("[Neo] POST /skills/neo/promote requested.")
data = await request.get_json()
candidate_id = data.get("candidate_id")
stage = data.get("stage", "canary")
sync_to_local = _to_bool(data.get("sync_to_local"), True)
if not candidate_id:
return Response().error("Missing candidate_id").__dict__
if stage not in {"canary", "stable"}:
return Response().error("Invalid stage, must be canary/stable").__dict__
from shipyard_neo import BayClient
async def _do(client):
release = await client.skills.promote_candidate(candidate_id, stage=stage)
release_json = _to_jsonable(release)
logger.info(f"[Neo] Candidate promoted: id={candidate_id}, stage={stage}")
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
release = await client.skills.promote_candidate(
candidate_id, stage=stage
)
release_json = _to_jsonable(release)
logger.info(
f"[Neo] Candidate promoted: id={candidate_id}, stage={stage}"
)
sync_json = None
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
logger.info(
f"[Neo] Stable release synced to local: skill={sync_result.local_skill_name}"
)
except Exception as sync_err:
logger.error(
f"[Neo] Stable sync failed, rolling back: {sync_err}"
)
rollback_result = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
resp = Response().error(
"Stable promote synced failed and has been rolled back. "
f"sync_error={sync_err}"
)
resp.data = {
"release": release_json,
"rollback": _to_jsonable(rollback_result),
}
return resp.__dict__
# Try to push latest local skills to all active sandboxes.
sync_json = None
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
await sync_skills_to_active_sandboxes()
except Exception:
logger.warning("Failed to sync skills to active sandboxes.")
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
logger.info(
f"[Neo] Stable release synced to local: skill={sync_result.local_skill_name}"
)
except Exception as sync_err:
logger.error(f"[Neo] Stable sync failed, rolling back: {sync_err}")
rollback_result = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
resp = Response().error(
"Stable promote synced failed and has been rolled back. "
f"sync_error={sync_err}"
)
resp.data = {
"release": release_json,
"rollback": _to_jsonable(rollback_result),
}
return resp.__dict__
return (
Response().ok({"release": release_json, "sync": sync_json}).__dict__
)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
# Try to push latest local skills to all active sandboxes.
try:
await sync_skills_to_active_sandboxes()
except Exception:
logger.warning("Failed to sync skills to active sandboxes.")
return Response().ok({"release": release_json, "sync": sync_json}).__dict__
return await self._with_neo_client(_do)
async def rollback_neo_release(self):
if DEMO_MODE:
@@ -387,26 +355,18 @@ class SkillsRoute(Route):
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
try:
logger.info("[Neo] POST /skills/neo/rollback requested.")
endpoint, access_token = self._get_neo_client_config()
data = await request.get_json()
release_id = data.get("release_id")
if not release_id:
return Response().error("Missing release_id").__dict__
logger.info("[Neo] POST /skills/neo/rollback requested.")
data = await request.get_json()
release_id = data.get("release_id")
if not release_id:
return Response().error("Missing release_id").__dict__
from shipyard_neo import BayClient
async def _do(client):
result = await client.skills.rollback_release(release_id)
logger.info(f"[Neo] Release rolled back: id={release_id}")
return Response().ok(_to_jsonable(result)).__dict__
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
result = await client.skills.rollback_release(release_id)
logger.info(f"[Neo] Release rolled back: id={release_id}")
return Response().ok(_to_jsonable(result)).__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)
async def sync_neo_release(self):
if DEMO_MODE:
@@ -415,48 +375,40 @@ class SkillsRoute(Route):
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
try:
logger.info("[Neo] POST /skills/neo/sync requested.")
endpoint, access_token = self._get_neo_client_config()
data = await request.get_json()
release_id = data.get("release_id")
skill_key = data.get("skill_key")
require_stable = _to_bool(data.get("require_stable"), True)
if not release_id and not skill_key:
return Response().error("Missing release_id or skill_key").__dict__
logger.info("[Neo] POST /skills/neo/sync requested.")
data = await request.get_json()
release_id = data.get("release_id")
skill_key = data.get("skill_key")
require_stable = _to_bool(data.get("require_stable"), True)
if not release_id and not skill_key:
return Response().error("Missing release_id or skill_key").__dict__
from shipyard_neo import BayClient
async def _do(client):
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.sync_release(
client,
release_id=release_id,
skill_key=skill_key,
require_stable=require_stable,
)
logger.info(
f"[Neo] Release synced to local: skill={result.local_skill_name}, "
f"release_id={result.release_id}"
)
return (
Response()
.ok(
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
)
.__dict__
)
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.sync_release(
client,
release_id=release_id,
skill_key=skill_key,
require_stable=require_stable,
)
logger.info(
f"[Neo] Release synced to local: skill={result.local_skill_name}, "
f"release_id={result.release_id}"
)
return (
Response()
.ok(
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
)
.__dict__
)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
return await self._with_neo_client(_do)