import couchdb import os import json import time from urllib.parse import quote class CouchDBManager: def __init__(self, url, username, password, db_name): self.url = url.rstrip("/") self.username = username.strip() self.password = password.strip() self.db_name = db_name def connect(self): try: if self.username and self.password: if "://" in self.url: protocol, rest = self.url.split("://", 1) else: protocol, rest = "http", self.url safe_user = quote(self.username, safe="") safe_pass = quote(self.password, safe="") full_url = f"{protocol}://{safe_user}:{safe_pass}@{rest}" else: full_url = self.url server = couchdb.Server(full_url) return server[self.db_name] except Exception as e: raise Exception(f"CouchDB Connection Error: {e}") def _calculate_size(self, db, children): """Calculate actual content size from chunks (LiveSync compatible)""" size = 0 for chunk_id in children: if chunk_id in db: chunk = db[chunk_id] data = chunk.get("data", chunk.get("content", "")) size += len(str(data)) return size def list_files(self, prefix_filter=""): """Returns a dict: {id: path} for all 'file' documents.""" db = self.connect() files = {} # Use _all_docs to scan quickly for row in db.view('_all_docs', include_docs=True): doc = row.doc # Identify file metadata docs (has children array) if "children" in doc and isinstance(doc["children"], list): path = doc.get("path", doc["_id"]) # Filter by prefix (folder) if requested # Normalizing path separators to / norm_path = path.replace("\\", "/") norm_prefix = prefix_filter.replace("\\", "/") # Debug print for first few items to diagnose filtering mismatch if len(files) < 3: print(f"DEBUG: Found path '{norm_path}'. Filter: '{norm_prefix}'") if not prefix_filter or norm_path.startswith(norm_prefix): files[doc["_id"]] = path print(f"DEBUG: list_files returning {len(files)} files after filtering.") return files def read_file_content(self, doc_id): """Reconstructs file content from chunks.""" db = self.connect() doc = db[doc_id] content = [] for chunk_id in doc.get("children", []): if chunk_id in db: chunk = db[chunk_id] data = chunk.get("data") or chunk.get("content") or "" # Check encryption if str(data).startswith("%") or chunk.get("e_"): return "[ENCRYPTED_CONTENT]" content.append(str(data)) return "".join(content) def move_file(self, doc_id, target_folder): """ LiveSync-compatible file move: Updates the path and _id in place without breaking chunk references. NOTE: This approach updates the document but LiveSync may still need to reconcile the change. For best results, use Obsidian's built-in move. """ db = self.connect() print(f"DEBUG: Attempting to move doc_id: {repr(doc_id)} to '{target_folder}'") doc = None actual_doc_id = None try: # Try direct lookup first (faster) if doc_id in db: doc = db[doc_id] actual_doc_id = doc_id else: # Fallback: case-insensitive scan doc_id_lower = doc_id.lower() for row in db.view('_all_docs', include_docs=True): if row.id.lower() == doc_id_lower: doc = row.doc actual_doc_id = row.id break if not doc: return f"Error: Document {doc_id} not found in DB (Scan failed)." except Exception as e: return f"Error fetching document {doc_id}: {e}" old_path = doc.get("path", actual_doc_id) filename = os.path.basename(old_path) # Construct new path target_folder = target_folder.strip("/") new_path = f"{target_folder}/{filename}" new_doc_id = new_path.lower() # Check if already at target if actual_doc_id == new_doc_id: return f"Already at {new_path}" # IMPORTANT: For LiveSync compatibility, we need to: # 1. Update path field (tells LiveSync where file should be) # 2. Update mtime (triggers sync) # 3. Keep all other metadata intact # Update in place first doc["path"] = new_path doc["mtime"] = int(time.time() * 1000) db.save(doc) # If the doc_id needs to change, create new doc and delete old if actual_doc_id != new_doc_id: # Check if target exists if new_doc_id in db: # Target exists - merge/update it instead target_doc = db[new_doc_id] target_doc["path"] = new_path target_doc["mtime"] = int(time.time() * 1000) target_doc["children"] = doc["children"] # Use latest chunks target_doc["size"] = self._calculate_size(db, doc["children"]) # Recalculate size target_doc["ctime"] = doc.get("ctime", target_doc.get("ctime")) target_doc["type"] = doc.get("type", "plain") target_doc["eden"] = doc.get("eden", {}) db.save(target_doc) # Delete old db.delete(doc) return f"Merged and moved to {new_path}" else: # Create new doc with new ID new_doc = doc.copy() new_doc["_id"] = new_doc_id new_doc["path"] = new_path new_doc["size"] = self._calculate_size(db, new_doc["children"]) # Recalculate size del new_doc["_rev"] db.save(new_doc) # Delete old old_doc = db[actual_doc_id] # Refresh to get latest _rev db.delete(old_doc) return f"Moved to {new_path}" def flag_rewrite(self, doc_id, reason, tag): """ Appends a rewrite tag. """ db = self.connect() print(f"DEBUG: Flagging rewrite for doc_id: {repr(doc_id)}") doc = None try: # Try direct lookup first (faster) if doc_id in db: doc = db[doc_id] else: # Fallback: case-insensitive scan doc_id_lower = doc_id.lower() for row in db.view('_all_docs', include_docs=True): if row.id.lower() == doc_id_lower: doc = row.doc break if not doc: return f"Error: Document {doc_id} not found (Scan failed)." except Exception as e: return f"Error fetching doc for rewrite {doc_id}: {e}" # Create new chunk chunk_id = f"h:{int(time.time())}" # Simple ID generation chunk_content = f"\n\n{tag} {reason}\n" chunk_doc = { "_id": chunk_id, "data": chunk_content, "type": "chunk" # LiveSync convention } db.save(chunk_doc) # Update metadata to point to new chunk doc["children"].append(chunk_id) doc["mtime"] = int(time.time() * 1000) # CRITICAL: Update size to match actual content size doc["size"] = self._calculate_size(db, doc["children"]) db.save(doc) return f"Flagged {doc.get('path')} for rewrite."