270 lines
10 KiB
Python
270 lines
10 KiB
Python
import reflex as rx
|
|
import asyncio
|
|
import schedule
|
|
import os
|
|
from sqlmodel import select
|
|
from ..models import Configuration
|
|
from ..agent import ObsidianAgent
|
|
from ..note_server import FileSystemServer, CouchDBNoteServer
|
|
from ..couch_sync import CouchDBSync
|
|
|
|
class AppState(rx.State):
|
|
# Live Status
|
|
logs: list[str] = ["Waiting for agent..."]
|
|
is_running: bool = False
|
|
_scheduler_task: bool = False
|
|
|
|
# Configuration (Loaded from DB)
|
|
api_key: str = ""
|
|
input_folder: str = "" # Used as Prefix for CouchDB mode
|
|
philosophy: str = ""
|
|
rewrite_tag: str = ""
|
|
|
|
# CouchDB Config
|
|
couch_url: str = ""
|
|
couch_user: str = ""
|
|
couch_pass: str = ""
|
|
couch_passphrase: str = ""
|
|
couch_db: str = ""
|
|
|
|
# Subfolder Selection
|
|
available_subfolders: list[str] = []
|
|
selected_subfolder: str = ""
|
|
|
|
def _get_active_server(self):
|
|
"""Returns the appropriate NoteServer based on config."""
|
|
if self.couch_url:
|
|
return CouchDBNoteServer(self.couch_url, self.couch_user, self.couch_pass, self.couch_db)
|
|
else:
|
|
return FileSystemServer()
|
|
|
|
def load_config(self):
|
|
"""Loads configuration from the database into State."""
|
|
with rx.session() as session:
|
|
config = session.exec(select(Configuration)).first()
|
|
if config:
|
|
self.api_key = config.gemini_api_key
|
|
self.input_folder = config.input_folder
|
|
self.philosophy = config.philosophy
|
|
self.rewrite_tag = config.rewrite_tag
|
|
|
|
# CouchDB
|
|
self.couch_url = config.couchdb_url or ""
|
|
self.couch_user = config.couchdb_user or ""
|
|
self.couch_pass = config.couchdb_password or ""
|
|
self.couch_passphrase = config.couchdb_passphrase or ""
|
|
self.couch_db = config.couchdb_db_name or "obsidian_livesync"
|
|
|
|
# Auto-refresh folders on load
|
|
# Use asyncio.create_task to run async refresh if needed,
|
|
# but refresh_subfolders is sync in implementation for now (though server calls might block)
|
|
# We will call it directly but handle errors gracefully
|
|
try:
|
|
self.refresh_subfolders()
|
|
except:
|
|
pass
|
|
|
|
def refresh_subfolders(self):
|
|
"""Scans the input folder for subdirectories."""
|
|
try:
|
|
server = self._get_active_server()
|
|
# If using CouchDB, input_folder acts as a root prefix (e.g. "Inbox")
|
|
# If using FS, it's a path.
|
|
root = self.input_folder or ""
|
|
|
|
folders = server.list_subfolders(root)
|
|
self.available_subfolders = ["(All Notes)"] + sorted(folders)
|
|
if not self.selected_subfolder:
|
|
self.selected_subfolder = "(All Notes)"
|
|
except Exception as e:
|
|
print(f"Error refreshing folders: {e}")
|
|
self.available_subfolders = ["(Error)"]
|
|
|
|
def save_config(self):
|
|
"""Persists current state config back to DB."""
|
|
with rx.session() as session:
|
|
config = session.exec(select(Configuration)).first()
|
|
if not config:
|
|
config = Configuration()
|
|
session.add(config)
|
|
|
|
config.gemini_api_key = self.api_key
|
|
config.input_folder = self.input_folder
|
|
config.philosophy = self.philosophy
|
|
config.rewrite_tag = self.rewrite_tag
|
|
|
|
# CouchDB
|
|
config.couchdb_url = self.couch_url
|
|
config.couchdb_user = self.couch_user
|
|
config.couchdb_password = self.couch_pass
|
|
config.couchdb_passphrase = self.couch_passphrase
|
|
config.couchdb_db_name = self.couch_db
|
|
|
|
session.add(config)
|
|
session.commit()
|
|
|
|
try:
|
|
self.refresh_subfolders()
|
|
except:
|
|
pass
|
|
|
|
return rx.window_alert("Configuration saved.")
|
|
|
|
async def run_couch_sync(self):
|
|
"""Triggers the CouchDB download."""
|
|
async with self:
|
|
if self.is_running: return
|
|
self.is_running = True
|
|
self.logs.append("--- Starting CouchDB Sync ---")
|
|
self.load_config()
|
|
|
|
if not self.couch_url:
|
|
self.logs.append("Error: CouchDB URL not configured.")
|
|
self.is_running = False
|
|
return
|
|
|
|
try:
|
|
syncer = CouchDBSync(self.couch_url, self.couch_user, self.couch_pass, self.couch_db, self.couch_passphrase)
|
|
|
|
sync_logs = await asyncio.to_thread(
|
|
syncer.fetch_notes,
|
|
self.input_folder
|
|
)
|
|
|
|
async with self:
|
|
self.logs.extend(sync_logs)
|
|
self.logs.append("--- CouchDB Sync Complete ---")
|
|
self.refresh_subfolders() # Auto-refresh after sync
|
|
except Exception as e:
|
|
async with self:
|
|
self.logs.append(f"Sync Error: {str(e)}")
|
|
finally:
|
|
async with self:
|
|
self.is_running = False
|
|
|
|
def reset_state(self):
|
|
"""Force reset running state on load."""
|
|
self.is_running = False
|
|
|
|
async def run_agent_process(self):
|
|
"""Runs the agent and provides real-time feedback."""
|
|
if self.is_running:
|
|
yield rx.window_alert("Agent is already running!")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.logs.append("--- Starting AI Agent ---")
|
|
yield # Force UI update immediately
|
|
|
|
async with self:
|
|
self.load_config()
|
|
# Strict check only for API Key. Input folder is optional for CouchDB (root scan)
|
|
if not self.api_key:
|
|
self.logs.append("Error: Missing API Key.")
|
|
self.is_running = False
|
|
return
|
|
|
|
# If FS mode, input_folder is required.
|
|
if not self.couch_url and not self.input_folder:
|
|
self.logs.append("Error: Input Folder required for File System mode.")
|
|
self.is_running = False
|
|
return
|
|
|
|
# Determine target path
|
|
target_path = self.input_folder
|
|
if self.selected_subfolder and self.selected_subfolder != "(All Notes)":
|
|
target_path = os.path.join(self.input_folder, self.selected_subfolder)
|
|
self.logs.append(f"Targeting specific folder: {self.selected_subfolder}")
|
|
|
|
# Derive Vault Root (Parent of the configured Inbox)
|
|
vault_root = os.path.dirname(self.input_folder.rstrip(os.sep))
|
|
|
|
try:
|
|
# yield again to ensure user sees "Starting"
|
|
yield
|
|
|
|
# Use active server (CouchDB or FS)
|
|
server = self._get_active_server()
|
|
|
|
# For CouchDB, we don't need a vault_root calculator like FS,
|
|
# because paths are relative to the DB root already.
|
|
# But agent expects one. We can pass "" for CouchDB.
|
|
vault_root = ""
|
|
if isinstance(server, FileSystemServer):
|
|
vault_root = os.path.dirname(self.input_folder.rstrip(os.sep))
|
|
|
|
agent = ObsidianAgent(api_key=self.api_key, server=server)
|
|
|
|
# 1. Get the list of notes first
|
|
notes = await asyncio.to_thread(server.list_notes, target_path)
|
|
|
|
async with self:
|
|
self.logs.append(f"Found {len(notes)} notes to process.")
|
|
yield
|
|
|
|
if not notes:
|
|
async with self:
|
|
self.logs.append("No notes found in that scope.")
|
|
self.is_running = False
|
|
return
|
|
|
|
# 2. Process them one by one in the thread, but updating state in between
|
|
for i, note_path in enumerate(notes):
|
|
# Rate Limit Sleep (except for the first one)
|
|
if i > 0:
|
|
async with self:
|
|
self.logs.append(" ...Waiting 15s (Free Tier Rate Limit)...")
|
|
yield
|
|
await asyncio.sleep(15)
|
|
|
|
filename = os.path.basename(note_path)
|
|
content = server.read_note(note_path)
|
|
preview = (content[:50].replace("\n", " ") + "...") if content else "[EMPTY NOTE]"
|
|
|
|
async with self:
|
|
self.logs.append(f"[{i+1}/{len(notes)}] Analyzing: {filename}")
|
|
self.logs.append(f" Context: \"{preview}\"")
|
|
yield # Push update to UI
|
|
|
|
# Process single note in thread
|
|
result = await asyncio.to_thread(
|
|
agent._process_single_note_with_retry,
|
|
note_path,
|
|
content,
|
|
self.philosophy,
|
|
self.rewrite_tag,
|
|
vault_root
|
|
)
|
|
|
|
async with self:
|
|
self.logs.append(f" -> {result}")
|
|
yield # Push result
|
|
|
|
async with self:
|
|
self.logs.append("--- All Notes Processed ---")
|
|
|
|
except Exception as e:
|
|
async with self:
|
|
self.logs.append(f"Critical Error: {str(e)}")
|
|
finally:
|
|
async with self:
|
|
self.is_running = False
|
|
yield
|
|
|
|
async def start_scheduler(self):
|
|
async with self:
|
|
if self._scheduler_task: return
|
|
self._scheduler_task = True
|
|
self.logs.append("Scheduler active (Daily @ 02:00).")
|
|
asyncio.create_task(self._run_scheduler_loop())
|
|
|
|
async def _run_scheduler_loop(self):
|
|
def job():
|
|
# In a real deployment, we'd need a way to trigger the async task from here
|
|
# For now, we print to stdout which will show in the server logs
|
|
print("Scheduler Triggered")
|
|
|
|
schedule.every().day.at("02:00").do(job)
|
|
while True:
|
|
schedule.run_pending()
|
|
await asyncio.sleep(60) |