Merge upstream/main: resolve conflicts with OAuth support

This commit is contained in:
pinhua33
2026-02-09 15:13:11 +08:00
13 changed files with 1462 additions and 692 deletions

View File

@@ -1,12 +1,19 @@
"""CLI commands for nanobot."""
"""CLI commands for nanobot."""
import asyncio
import atexit
import os
import signal
import sys
from pathlib import Path
import select
import typer
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from nanobot import __version__, __logo__
@@ -17,698 +24,314 @@ app = typer.Typer(
)
console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
# ---------------------------------------------------------------------------
# Lightweight CLI input: readline for arrow keys / history, termios for flush
# ---------------------------------------------------------------------------
_READLINE = None
_HISTORY_FILE: Path | None = None
_HISTORY_HOOK_REGISTERED = False
_USING_LIBEDIT = False
_SAVED_TERM_ATTRS = None # original termios settings, restored on exit
def _safe_print(text: str) -> None:
encoding = sys.stdout.encoding or "utf-8"
safe_text = text.encode(encoding, errors="replace").decode(encoding, errors="replace")
console.print(safe_text)
def _flush_pending_tty_input() -> None:
"""Drop unread keypresses typed while the model was generating output."""
try:
fd = sys.stdin.fileno()
if not os.isatty(fd):
return
except Exception:
return
try:
import termios
termios.tcflush(fd, termios.TCIFLUSH)
return
except Exception:
pass
try:
while True:
ready, _, _ = select.select([fd], [], [], 0)
if not ready:
break
if not os.read(fd, 4096):
break
except Exception:
return
def version_callback(value: bool):
if value:
console.print(f"{__logo__} nanobot v{__version__}")
raise typer.Exit()
def _save_history() -> None:
if _READLINE is None or _HISTORY_FILE is None:
return
try:
_READLINE.write_history_file(str(_HISTORY_FILE))
except Exception:
return
@app.callback()
def main(
version: bool = typer.Option(
None, "--version", "-v", callback=version_callback, is_eager=True
),
):
"""nanobot - Personal AI Assistant."""
pass
def _restore_terminal() -> None:
"""Restore terminal to its original state (echo, line buffering, etc.)."""
if _SAVED_TERM_ATTRS is None:
return
try:
import termios
termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, _SAVED_TERM_ATTRS)
except Exception:
pass
# ============================================================================
# Onboard / Setup
# ============================================================================
def _enable_line_editing() -> None:
"""Enable readline for arrow keys, line editing, and persistent history."""
global _READLINE, _HISTORY_FILE, _HISTORY_HOOK_REGISTERED, _USING_LIBEDIT, _SAVED_TERM_ATTRS
# Save terminal state before readline touches it
try:
import termios
_SAVED_TERM_ATTRS = termios.tcgetattr(sys.stdin.fileno())
except Exception:
pass
@app.command()
def onboard():
"""Initialize nanobot configuration and workspace."""
from nanobot.config.loader import get_config_path, save_config
from nanobot.config.schema import Config
from nanobot.utils.helpers import get_workspace_path
config_path = get_config_path()
if config_path.exists():
console.print(f"[yellow]Config already exists at {config_path}[/yellow]")
if not typer.confirm("Overwrite?"):
raise typer.Exit()
# Create default config
config = Config()
save_config(config)
console.print(f"[green]✓[/green] Created config at {config_path}")
# Create workspace
workspace = get_workspace_path()
console.print(f"[green]✓[/green] Created workspace at {workspace}")
# Create default bootstrap files
_create_workspace_templates(workspace)
console.print(f"\n{__logo__} nanobot is ready!")
console.print("\nNext steps:")
console.print(" 1. Add your API key to [cyan]~/.nanobot/config.json[/cyan]")
console.print(" Get one at: https://openrouter.ai/keys")
console.print(" 2. Chat: [cyan]nanobot agent -m \"Hello!\"[/cyan]")
console.print("\n[dim]Want Telegram/WhatsApp? See: https://github.com/HKUDS/nanobot#-chat-apps[/dim]")
try:
import readline as _READLINE
import atexit
# Detect libedit (macOS) vs GNU readline (Linux)
if hasattr(_READLINE, "__doc__") and _READLINE.__doc__ and "libedit" in _READLINE.__doc__:
_USING_LIBEDIT = True
@app.command("login")
def login(
provider: str = typer.Option("openai-codex", "--provider", "-p", help="Auth provider"),
):
"""Login to an auth provider (e.g. openai-codex)."""
if provider != "openai-codex":
console.print(f"[red]Unsupported provider: {provider}[/red]")
raise typer.Exit(1)
from oauth_cli_kit import login_oauth_interactive as login_codex_oauth_interactive
console.print("[green]Starting OpenAI Codex OAuth login...[/green]")
login_codex_oauth_interactive(
print_fn=console.print,
prompt_fn=typer.prompt,
)
console.print("[green]Login successful. Credentials saved.[/green]")
def _create_workspace_templates(workspace: Path):
"""Create default workspace template files."""
templates = {
"AGENTS.md": """# Agent Instructions
You are a helpful AI assistant. Be concise, accurate, and friendly.
## Guidelines
- Always explain what you're doing before taking actions
- Ask for clarification when the request is ambiguous
- Use tools to help accomplish tasks
- Remember important information in your memory files
""",
"SOUL.md": """# Soul
I am nanobot, a lightweight AI assistant.
## Personality
- Helpful and friendly
- Concise and to the point
- Curious and eager to learn
## Values
- Accuracy over speed
- User privacy and safety
- Transparency in actions
""",
"USER.md": """# User
Information about the user goes here.
## Preferences
- Communication style: (casual/formal)
- Timezone: (your timezone)
- Language: (your preferred language)
""",
}
for filename, content in templates.items():
file_path = workspace / filename
if not file_path.exists():
file_path.write_text(content)
console.print(f" [dim]Created {filename}[/dim]")
# Create memory directory and MEMORY.md
memory_dir = workspace / "memory"
memory_dir.mkdir(exist_ok=True)
memory_file = memory_dir / "MEMORY.md"
if not memory_file.exists():
memory_file.write_text("""# Long-term Memory
This file stores important information that should persist across sessions.
## User Information
(Important facts about the user)
## Preferences
(User preferences learned over time)
## Important Notes
(Things to remember)
""")
console.print(" [dim]Created memory/MEMORY.md[/dim]")
def _make_provider(config):
"""Create provider from config. Exits if no credentials found."""
from nanobot.providers.litellm_provider import LiteLLMProvider
from nanobot.providers.openai_codex_provider import OpenAICodexProvider
from nanobot.providers.registry import PROVIDERS
from oauth_cli_kit import get_token as get_oauth_token
model = config.agents.defaults.model
model_lower = model.lower()
# Check for OAuth-based providers first (registry-driven)
for spec in PROVIDERS:
if spec.is_oauth and any(kw in model_lower for kw in spec.keywords):
# OAuth provider matched
try:
_ = get_oauth_token(spec.oauth_provider or spec.name)
except Exception:
console.print(f"Please run: [cyan]nanobot login --provider {spec.name}[/cyan]")
raise typer.Exit(1)
# Return appropriate OAuth provider class
if spec.name == "openai_codex":
return OpenAICodexProvider(default_model=model)
# Future OAuth providers can be added here
console.print(f"[red]Error: OAuth provider '{spec.name}' not fully implemented.[/red]")
raise typer.Exit(1)
# Standard API key-based providers
p = config.get_provider()
if not (p and p.api_key) and not model.startswith("bedrock/"):
console.print("[red]Error: No API key configured.[/red]")
console.print("Set one in ~/.nanobot/config.json under providers section")
raise typer.Exit(1)
return LiteLLMProvider(
api_key=p.api_key if p else None,
api_base=config.get_api_base(),
default_model=model,
extra_headers=p.extra_headers if p else None,
)
# ============================================================================
# Gateway / Server
# ============================================================================
@app.command()
def gateway(
port: int = typer.Option(18790, "--port", "-p", help="Gateway port"),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Verbose output"),
):
"""Start the nanobot gateway."""
from nanobot.config.loader import load_config, get_data_dir
from nanobot.bus.queue import MessageBus
from nanobot.agent.loop import AgentLoop
from nanobot.channels.manager import ChannelManager
from nanobot.session.manager import SessionManager
from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob
from nanobot.heartbeat.service import HeartbeatService
if verbose:
import logging
logging.basicConfig(level=logging.DEBUG)
console.print(f"{__logo__} Starting nanobot gateway on port {port}...")
config = load_config()
bus = MessageBus()
provider = _make_provider(config)
session_manager = SessionManager(config.workspace_path)
# Create cron service first (callback set after agent creation)
cron_store_path = get_data_dir() / "cron" / "jobs.json"
cron = CronService(cron_store_path)
# Create agent with cron service
agent = AgentLoop(
bus=bus,
provider=provider,
workspace=config.workspace_path,
model=config.agents.defaults.model,
max_iterations=config.agents.defaults.max_tool_iterations,
brave_api_key=config.tools.web.search.api_key or None,
exec_config=config.tools.exec,
cron_service=cron,
restrict_to_workspace=config.tools.restrict_to_workspace,
session_manager=session_manager,
)
# Set cron callback (needs agent)
async def on_cron_job(job: CronJob) -> str | None:
"""Execute a cron job through the agent."""
response = await agent.process_direct(
job.payload.message,
session_key=f"cron:{job.id}",
channel=job.payload.channel or "cli",
chat_id=job.payload.to or "direct",
)
if job.payload.deliver and job.payload.to:
from nanobot.bus.events import OutboundMessage
await bus.publish_outbound(OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=response or ""
))
return response
cron.on_job = on_cron_job
# Create heartbeat service
async def on_heartbeat(prompt: str) -> str:
"""Execute heartbeat through the agent."""
return await agent.process_direct(prompt, session_key="heartbeat")
heartbeat = HeartbeatService(
workspace=config.workspace_path,
on_heartbeat=on_heartbeat,
interval_s=30 * 60, # 30 minutes
enabled=True
)
# Create channel manager
channels = ChannelManager(config, bus, session_manager=session_manager)
if channels.enabled_channels:
console.print(f"[green]✓[/green] Channels enabled: {', '.join(channels.enabled_channels)}")
else:
console.print("[yellow]Warning: No channels enabled[/yellow]")
cron_status = cron.status()
if cron_status["jobs"] > 0:
console.print(f"[green]✓[/green] Cron: {cron_status['jobs']} scheduled jobs")
console.print(f"[green]✓[/green] Heartbeat: every 30m")
async def run():
hist_file = Path.home() / ".nanobot_history"
_HISTORY_FILE = hist_file
try:
await cron.start()
await heartbeat.start()
await asyncio.gather(
agent.run(),
channels.start_all(),
)
except KeyboardInterrupt:
console.print("\nShutting down...")
heartbeat.stop()
cron.stop()
agent.stop()
await channels.stop_all()
_READLINE.read_history_file(str(hist_file))
except FileNotFoundError:
pass
# Enable common readline settings
_READLINE.parse_and_bind("bind -v" if _USING_LIBEDIT else "set editing-mode vi")
_READLINE.parse_and_bind("set show-all-if-ambiguous on")
_READLINE.parse_and_bind("set colored-completion-prefix on")
if not _HISTORY_HOOK_REGISTERED:
atexit.register(_save_history)
_HISTORY_HOOK_REGISTERED = True
except Exception:
return
async def _read_interactive_input_async() -> str:
"""Async wrapper around synchronous input() (runs in thread pool)."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, lambda: input(f"{__logo__} "))
def _is_exit_command(text: str) -> bool:
return text.strip().lower() in EXIT_COMMANDS
# ---------------------------------------------------------------------------
# OAuth and Authentication helpers
# ---------------------------------------------------------------------------
def _handle_oauth_login(provider: str) -> None:
"""Handle OAuth login flow for supported providers."""
from nanobot.providers.registry import get_oauth_handler
asyncio.run(run())
oauth_handler = get_oauth_handler(provider)
if oauth_handler is None:
console.print(f"[red]OAuth is not supported for provider: {provider}[/red]")
console.print("[yellow]Supported OAuth providers: github-copilot[/yellow]")
raise typer.Exit(1)
try:
result = oauth_handler.authenticate()
if result.success:
console.print(f"[green]✓ {result.message}[/green]")
if result.token_path:
console.print(f"[dim]Token saved to: {result.token_path}[/dim]")
else:
console.print(f"[red]✗ {result.message}[/red]")
raise typer.Exit(1)
except Exception as e:
console.print(f"[red]OAuth authentication failed: {e}[/red]")
raise typer.Exit(1)
# ---------------------------------------------------------------------------
# @agent decorator and public API helpers
# ---------------------------------------------------------------------------
_agent_registry: dict[str, callable] = {}
# ============================================================================
# Agent Commands
# ============================================================================
def _get_agent(name: str | None = None) -> callable | None:
"""Retrieve a registered agent function by name."""
if name is None:
# Return the first registered agent if no name specified
return next(iter(_agent_registry.values())) if _agent_registry else None
return _agent_registry.get(name)
def agent(name: str | None = None, model: str | None = None, prompt: str | None = None):
"""Decorator to register an agent function.
Args:
name: Optional name for the agent (defaults to function name)
model: Optional model override (e.g., "gpt-4o", "claude-3-opus")
prompt: Optional system prompt for the agent
"""
def decorator(func):
agent_name = name or func.__name__
_agent_registry[agent_name] = func
func._agent_config = {"model": model, "prompt": prompt}
return func
return decorator
# ---------------------------------------------------------------------------
# Built-in CLI commands
# ---------------------------------------------------------------------------
@app.command()
def login(
provider: str = typer.Argument(..., help="Provider to authenticate with (e.g., 'github-copilot')"),
):
"""Authenticate with an OAuth provider."""
_handle_oauth_login(provider)
@app.command()
def agent(
message: str = typer.Option(None, "--message", "-m", help="Message to send to the agent"),
session_id: str = typer.Option("cli:default", "--session", "-s", help="Session ID"),
def version():
"""Show version information."""
console.print(f"{__logo__} nanobot {__version__}")
@app.command(name="agent")
def run_agent(
name: str | None = typer.Argument(None, help="Name of the agent to run"),
message: str = typer.Option(None, "--message", "-m", help="Single message to send to the agent"),
model: str = typer.Option(None, "--model", help="Override the model for this run"),
markdown: bool = typer.Option(True, "--markdown/--no-markdown", help="Render response as markdown"),
session_id: str = typer.Option("cli", "--session", "-s", help="Session ID for this conversation"),
):
"""Interact with the agent directly."""
from nanobot.config.loader import load_config
from nanobot.bus.queue import MessageBus
"""Run an interactive AI agent session."""
import asyncio
from nanobot.agent.loop import AgentLoop
config = load_config()
# Get the agent function
agent_func = _get_agent(name)
if agent_func is None:
if name:
console.print(f"[red]Agent '{name}' not found[/red]")
else:
console.print("[yellow]No agents registered. Use @agent decorator to register agents.[/yellow]")
raise typer.Exit(1)
bus = MessageBus()
provider = _make_provider(config)
# Initialize agent loop
agent_config = getattr(agent_func, '_agent_config', {})
agent_model = model or agent_config.get('model')
agent_prompt = agent_config.get('prompt')
agent_loop = AgentLoop(
bus=bus,
provider=provider,
workspace=config.workspace_path,
brave_api_key=config.tools.web.search.api_key or None,
exec_config=config.tools.exec,
restrict_to_workspace=config.tools.restrict_to_workspace,
)
agent_loop = AgentLoop(model=agent_model, system_prompt=agent_prompt)
if message:
# Single message mode
async def run_once():
response = await agent_loop.process_direct(message, session_id)
_safe_print(f"\n{__logo__} {response}")
with _thinking_ctx():
response = await agent_loop.process_direct(message, session_id)
_print_agent_response(response, render_markdown=markdown)
asyncio.run(run_once())
else:
# Interactive mode
console.print(f"{__logo__} Interactive mode (Ctrl+C to exit)\n")
_enable_line_editing()
console.print(f"{__logo__} Interactive mode (type [bold]exit[/bold] or [bold]Ctrl+C[/bold] to quit)\n")
# input() runs in a worker thread that can't be cancelled.
# Without this handler, asyncio.run() would hang waiting for it.
def _exit_on_sigint(signum, frame):
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
os._exit(0)
signal.signal(signal.SIGINT, _exit_on_sigint)
async def run_interactive():
while True:
try:
user_input = console.input("[bold blue]You:[/bold blue] ")
if not user_input.strip():
_flush_pending_tty_input()
user_input = await _read_interactive_input_async()
command = user_input.strip()
if not command:
continue
if _is_exit_command(command):
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
response = await agent_loop.process_direct(user_input, session_id)
_safe_print(f"\n{__logo__} {response}\n")
with _thinking_ctx():
response = await agent_loop.process_direct(user_input, session_id)
_print_agent_response(response, render_markdown=markdown)
except KeyboardInterrupt:
_save_history()
_restore_terminal()
console.print("\nGoodbye!")
break
asyncio.run(run_interactive())
# ============================================================================
# Channel Commands
# ============================================================================
channels_app = typer.Typer(help="Manage channels")
app.add_typer(channels_app, name="channels")
@channels_app.command("status")
def channels_status():
"""Show channel status."""
from nanobot.config.loader import load_config
config = load_config()
table = Table(title="Channel Status")
table.add_column("Channel", style="cyan")
table.add_column("Enabled", style="green")
table.add_column("Configuration", style="yellow")
# WhatsApp
wa = config.channels.whatsapp
table.add_row(
"WhatsApp",
"" if wa.enabled else "",
wa.bridge_url
)
dc = config.channels.discord
table.add_row(
"Discord",
"" if dc.enabled else "",
dc.gateway_url
)
def _thinking_ctx():
"""Context manager for showing thinking indicator."""
from rich.live import Live
from rich.spinner import Spinner
# Telegram
tg = config.channels.telegram
tg_config = f"token: {tg.token[:10]}..." if tg.token else "[dim]not configured[/dim]"
table.add_row(
"Telegram",
"" if tg.enabled else "",
tg_config
)
console.print(table)
def _get_bridge_dir() -> Path:
"""Get the bridge directory, setting it up if needed."""
import shutil
import subprocess
# User's bridge location
user_bridge = Path.home() / ".nanobot" / "bridge"
# Check if already built
if (user_bridge / "dist" / "index.js").exists():
return user_bridge
# Check for npm
if not shutil.which("npm"):
console.print("[red]npm not found. Please install Node.js >= 18.[/red]")
raise typer.Exit(1)
# Find source bridge: first check package data, then source dir
pkg_bridge = Path(__file__).parent.parent / "bridge" # nanobot/bridge (installed)
src_bridge = Path(__file__).parent.parent.parent / "bridge" # repo root/bridge (dev)
source = None
if (pkg_bridge / "package.json").exists():
source = pkg_bridge
elif (src_bridge / "package.json").exists():
source = src_bridge
if not source:
console.print("[red]Bridge source not found.[/red]")
console.print("Try reinstalling: pip install --force-reinstall nanobot")
raise typer.Exit(1)
console.print(f"{__logo__} Setting up bridge...")
# Copy to user directory
user_bridge.parent.mkdir(parents=True, exist_ok=True)
if user_bridge.exists():
shutil.rmtree(user_bridge)
shutil.copytree(source, user_bridge, ignore=shutil.ignore_patterns("node_modules", "dist"))
# Install and build
try:
console.print(" Installing dependencies...")
subprocess.run(["npm", "install"], cwd=user_bridge, check=True, capture_output=True)
class ThinkingSpinner:
def __enter__(self):
self.live = Live(Spinner("dots", text="Thinking..."), console=console, refresh_per_second=10)
self.live.start()
return self
console.print(" Building...")
subprocess.run(["npm", "run", "build"], cwd=user_bridge, check=True, capture_output=True)
console.print("[green]✓[/green] Bridge ready\n")
except subprocess.CalledProcessError as e:
console.print(f"[red]Build failed: {e}[/red]")
if e.stderr:
console.print(f"[dim]{e.stderr.decode()[:500]}[/dim]")
raise typer.Exit(1)
def __exit__(self, exc_type, exc_val, exc_tb):
self.live.stop()
return False
return user_bridge
return ThinkingSpinner()
@channels_app.command("login")
def channels_login():
"""Link device via QR code."""
import subprocess
bridge_dir = _get_bridge_dir()
console.print(f"{__logo__} Starting bridge...")
console.print("Scan the QR code to connect.\n")
try:
subprocess.run(["npm", "start"], cwd=bridge_dir, check=True)
except subprocess.CalledProcessError as e:
console.print(f"[red]Bridge failed: {e}[/red]")
except FileNotFoundError:
console.print("[red]npm not found. Please install Node.js.[/red]")
# ============================================================================
# Cron Commands
# ============================================================================
cron_app = typer.Typer(help="Manage scheduled tasks")
app.add_typer(cron_app, name="cron")
@cron_app.command("list")
def cron_list(
all: bool = typer.Option(False, "--all", "-a", help="Include disabled jobs"),
):
"""List scheduled jobs."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
jobs = service.list_jobs(include_disabled=all)
if not jobs:
console.print("No scheduled jobs.")
return
table = Table(title="Scheduled Jobs")
table.add_column("ID", style="cyan")
table.add_column("Name")
table.add_column("Schedule")
table.add_column("Status")
table.add_column("Next Run")
import time
for job in jobs:
# Format schedule
if job.schedule.kind == "every":
sched = f"every {(job.schedule.every_ms or 0) // 1000}s"
elif job.schedule.kind == "cron":
sched = job.schedule.expr or ""
else:
sched = "one-time"
# Format next run
next_run = ""
if job.state.next_run_at_ms:
next_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(job.state.next_run_at_ms / 1000))
next_run = next_time
status = "[green]enabled[/green]" if job.enabled else "[dim]disabled[/dim]"
table.add_row(job.id, job.name, sched, status, next_run)
console.print(table)
@cron_app.command("add")
def cron_add(
name: str = typer.Option(..., "--name", "-n", help="Job name"),
message: str = typer.Option(..., "--message", "-m", help="Message for agent"),
every: int = typer.Option(None, "--every", "-e", help="Run every N seconds"),
cron_expr: str = typer.Option(None, "--cron", "-c", help="Cron expression (e.g. '0 9 * * *')"),
at: str = typer.Option(None, "--at", help="Run once at time (ISO format)"),
deliver: bool = typer.Option(False, "--deliver", "-d", help="Deliver response to channel"),
to: str = typer.Option(None, "--to", help="Recipient for delivery"),
channel: str = typer.Option(None, "--channel", help="Channel for delivery (e.g. 'telegram', 'whatsapp')"),
):
"""Add a scheduled job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
from nanobot.cron.types import CronSchedule
# Determine schedule type
if every:
schedule = CronSchedule(kind="every", every_ms=every * 1000)
elif cron_expr:
schedule = CronSchedule(kind="cron", expr=cron_expr)
elif at:
import datetime
dt = datetime.datetime.fromisoformat(at)
schedule = CronSchedule(kind="at", at_ms=int(dt.timestamp() * 1000))
def _print_agent_response(response: str, render_markdown: bool = True):
"""Print agent response with optional markdown rendering."""
if render_markdown:
console.print(Markdown(response))
else:
console.print("[red]Error: Must specify --every, --cron, or --at[/red]")
raise typer.Exit(1)
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
job = service.add_job(
name=name,
schedule=schedule,
message=message,
deliver=deliver,
to=to,
channel=channel,
)
console.print(f"[green]✓[/green] Added job '{job.name}' ({job.id})")
@cron_app.command("remove")
def cron_remove(
job_id: str = typer.Argument(..., help="Job ID to remove"),
):
"""Remove a scheduled job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
if service.remove_job(job_id):
console.print(f"[green]✓[/green] Removed job {job_id}")
else:
console.print(f"[red]Job {job_id} not found[/red]")
@cron_app.command("enable")
def cron_enable(
job_id: str = typer.Argument(..., help="Job ID"),
disable: bool = typer.Option(False, "--disable", help="Disable instead of enable"),
):
"""Enable or disable a job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
job = service.enable_job(job_id, enabled=not disable)
if job:
status = "disabled" if disable else "enabled"
console.print(f"[green]✓[/green] Job '{job.name}' {status}")
else:
console.print(f"[red]Job {job_id} not found[/red]")
@cron_app.command("run")
def cron_run(
job_id: str = typer.Argument(..., help="Job ID to run"),
force: bool = typer.Option(False, "--force", "-f", help="Run even if disabled"),
):
"""Manually run a job."""
from nanobot.config.loader import get_data_dir
from nanobot.cron.service import CronService
store_path = get_data_dir() / "cron" / "jobs.json"
service = CronService(store_path)
async def run():
return await service.run_job(job_id, force=force)
if asyncio.run(run()):
console.print(f"[green]✓[/green] Job executed")
else:
console.print(f"[red]Failed to run job {job_id}[/red]")
# ============================================================================
# Status Commands
# ============================================================================
console.print(response)
console.print()
@app.command()
def status():
"""Show nanobot status."""
from nanobot.config.loader import load_config, get_config_path
from oauth_cli_kit import get_token as get_codex_token
def setup():
"""Interactive setup wizard for nanobot."""
console.print(Panel.fit(
f"{__logo__} Welcome to nanobot setup!\n\n"
"This wizard will help you configure nanobot.",
title="Setup",
border_style="green"
))
# TODO: Implement setup wizard
console.print("[yellow]Setup wizard coming soon![/yellow]")
config_path = get_config_path()
config = load_config()
workspace = config.workspace_path
console.print(f"{__logo__} nanobot Status\n")
def main():
"""Main entry point for the CLI."""
app()
console.print(f"Config: {config_path} {'[green]✓[/green]' if config_path.exists() else '[red]✗[/red]'}")
console.print(f"Workspace: {workspace} {'[green]✓[/green]' if workspace.exists() else '[red]✗[/red]'}")
if config_path.exists():
from nanobot.providers.registry import PROVIDERS
console.print(f"Model: {config.agents.defaults.model}")
# Check API keys from registry
for spec in PROVIDERS:
p = getattr(config.providers, spec.name, None)
if p is None:
continue
if spec.is_local:
# Local deployments show api_base instead of api_key
if p.api_base:
console.print(f"{spec.label}: [green]✓ {p.api_base}[/green]")
else:
console.print(f"{spec.label}: [dim]not set[/dim]")
else:
has_key = bool(p.api_key)
console.print(f"{spec.label}: {'[green]✓[/green]' if has_key else '[dim]not set[/dim]'}")
try:
_ = get_codex_token()
codex_status = "[green]logged in[/green]"
except Exception:
codex_status = "[dim]not logged in[/dim]"
console.print(f"Codex Login: {codex_status}")
if __name__ == "__main__":
app()
main()