Source code for scitex_dev._creds._rotate

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Core ecosystem-wide credential rotation logic.

Public surface
--------------
- ``CREDENTIALS_PATH`` — local source (``~/.claude/.credentials.json``)
- ``CREDENTIALS_SLOT`` — un-prefixed GH Actions secret slot name
- ``SHA256_VAR``       — sidecar repo variable name
- ``validate_source``  — read+parse+expiry check; returns SourceState
- ``rotate_all``       — iterate the ECOSYSTEM registry and update each
                         repo whose sha256 sidecar differs from local

Design notes
------------
- ``gh`` is invoked directly via ``subprocess.run`` so we don't pull a
  hard dep on ``scitex-git``. The single-repo sac command can stay on
  scitex-git; this multiplexer is its own concern.
- The credential body is NEVER logged. Only sha256, byte count, and
  the secret slot name are surfaced.
- Errors are returned per-repo (``status='error'``) rather than raising,
  so one broken remote doesn't abort the rotation across 50+ repos.
"""

from __future__ import annotations

import hashlib
import json
import shutil
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable

from .._ecosystem import ECOSYSTEM, get_local_path

CREDENTIALS_PATH = Path.home() / ".claude" / ".credentials.json"
CREDENTIALS_SLOT = "CLAUDE_CODE_CREDENTIALS_JSON"
SHA256_VAR = f"{CREDENTIALS_SLOT}_SHA256"


@dataclass(frozen=True)
class SourceState:
    """Outcome of validating the local credentials file."""

    content: str
    sha256: str
    byte_count: int


[docs] @dataclass class RotateResult: """Per-repo rotation outcome.""" package: str repo: str | None status: str # unchanged | rotated | skipped | error | dry-run message: str = ""
[docs] def is_error(self) -> bool: return self.status == "error"
def _sha256_hex(data: str) -> str: return hashlib.sha256(data.encode("utf-8")).hexdigest() def _now_ms() -> int: return int(time.time() * 1000)
[docs] def validate_source( path: Path = CREDENTIALS_PATH, *, now_ms: Callable[[], int] = _now_ms, ) -> SourceState | None: """Return SourceState, or None if rotation should be a silent no-op. Silent-no-op (returns None) is reserved for the operator-on-a-fresh- -laptop / expired-token cases that should NOT spam every hour: - source file is missing - ``claudeAiOauth.expiresAt`` (or ``expires_at``) is in the past Raises ``ValueError`` for cases that DO deserve a loud error: - file present but doesn't parse as JSON - file parses but has no ``claudeAiOauth`` key """ if not path.is_file(): return None # silent: no creds yet, operator hasn't logged in content = path.read_text() try: payload = json.loads(content) except json.JSONDecodeError as exc: raise ValueError(f"{path} is not valid JSON: {exc}") from exc oauth = payload.get("claudeAiOauth") if not isinstance(oauth, dict): raise ValueError(f"{path} has no .claudeAiOauth key") # Accept either camelCase or snake_case for forward compat with # whatever rev of the Claude Code client wrote the file. expires = oauth.get("expiresAt", oauth.get("expires_at")) if isinstance(expires, (int, float)): # Heuristic: values > 1e12 are ms-since-epoch, otherwise seconds. cutoff_ms = expires if expires > 1e12 else expires * 1000 if cutoff_ms <= now_ms(): return None # silent: token expired, operator must re-login return SourceState( content=content, sha256=_sha256_hex(content), byte_count=len(content.encode("utf-8")), )
def _detect_repo_for_package( name: str, *, ecosystem: dict | None = None, local_path_lookup=None, ) -> str | None: """Return ``owner/repo`` for a package, preferring the local clone. ``ecosystem`` / ``local_path_lookup`` are injection hooks for tests so they don't need to monkey-patch module-level state. """ if local_path_lookup is None: local_path_lookup = get_local_path if ecosystem is None: ecosystem = ECOSYSTEM local = local_path_lookup(name) if local is not None and (local / ".git").exists(): try: out = subprocess.check_output( ["git", "-C", str(local), "remote", "get-url", "origin"], text=True, stderr=subprocess.DEVNULL, timeout=10, ).strip() except ( subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired, ): out = "" if out: return _normalise_remote_url(out) # Fall back to the recorded registry URL. info = ecosystem.get(name) or {} repo = info.get("github_repo") return repo or None def _normalise_remote_url(url: str) -> str: """git@github.com:o/r[.git] or https://github.com/o/r[.git] → o/r.""" if url.startswith("git@"): path = url.split(":", 1)[1] elif "://" in url: path = url.split("://", 1)[1].split("/", 1)[1] else: path = url return path[:-4] if path.endswith(".git") else path def _gh_get_variable(repo: str, name: str) -> str | None: """Return the value of a repo variable, or None if missing.""" try: r = subprocess.run( ["gh", "variable", "get", name, "--repo", repo], capture_output=True, text=True, timeout=30, check=False, ) except FileNotFoundError as exc: raise RuntimeError("'gh' CLI not found on PATH") from exc if r.returncode != 0: # gh returns non-zero both for "missing" and "real error"; treat # all as missing — the caller will then push a fresh value. return None out = r.stdout.strip() return out or None def _gh_set_secret(repo: str, name: str, body: str) -> None: r = subprocess.run( ["gh", "secret", "set", name, "--repo", repo, "--body", body], capture_output=True, text=True, timeout=60, check=False, ) if r.returncode != 0: raise RuntimeError( f"gh secret set failed for {repo}: {r.stderr.strip() or r.stdout.strip()}" ) def _gh_set_variable(repo: str, name: str, value: str) -> None: r = subprocess.run( ["gh", "variable", "set", name, "--repo", repo, "--body", value], capture_output=True, text=True, timeout=30, check=False, ) if r.returncode != 0: raise RuntimeError( f"gh variable set failed for {repo}: {r.stderr.strip() or r.stdout.strip()}" ) def _rotate_one( package: str, source: SourceState, *, dry_run: bool, force: bool, ecosystem: dict | None = None, local_path_lookup=None, ) -> RotateResult: repo = _detect_repo_for_package( package, ecosystem=ecosystem, local_path_lookup=local_path_lookup ) if not repo: return RotateResult( package=package, repo=None, status="skipped", message="no remote" ) try: remote_sha = _gh_get_variable(repo, SHA256_VAR) except RuntimeError as exc: return RotateResult( package=package, repo=repo, status="error", message=str(exc) ) if not force and remote_sha == source.sha256: return RotateResult( package=package, repo=repo, status="unchanged", message=f"sha256={source.sha256[:12]}…", ) if dry_run: return RotateResult( package=package, repo=repo, status="dry-run", message=f"would rotate (local sha={source.sha256[:12]}…)", ) try: _gh_set_secret(repo, CREDENTIALS_SLOT, source.content) _gh_set_variable(repo, SHA256_VAR, source.sha256) except RuntimeError as exc: return RotateResult( package=package, repo=repo, status="error", message=str(exc) ) return RotateResult( package=package, repo=repo, status="rotated", message=f"sha256={source.sha256[:12]}…", )
[docs] def rotate_all( *, packages: Iterable[str] | None = None, only: Iterable[str] | None = None, exclude: Iterable[str] | None = None, source_path: Path = CREDENTIALS_PATH, dry_run: bool = False, force: bool = False, ecosystem: dict | None = None, local_path_lookup=None, ) -> list[RotateResult]: """Rotate the credential across every (or a filtered set of) repo. Returns one ``RotateResult`` per package processed. Returns an empty list if the source is silently absent/expired — that is the "fresh laptop / expired token" exit-0 path, deliberately quiet. """ if shutil.which("gh") is None: raise RuntimeError("'gh' CLI not found on PATH") source = validate_source(source_path) if source is None: return [] eco = ecosystem if ecosystem is not None else ECOSYSTEM pkg_list = list(packages) if packages is not None else list(eco.keys()) if only: only_set = set(only) pkg_list = [p for p in pkg_list if p in only_set] if exclude: excl = set(exclude) pkg_list = [p for p in pkg_list if p not in excl] return [ _rotate_one( pkg, source, dry_run=dry_run, force=force, ecosystem=eco, local_path_lookup=local_path_lookup, ) for pkg in pkg_list ]
# EOF