Files
ObsidianAI/obsidian_automator/couch_manager.py

217 lines
8.1 KiB
Python
Raw Normal View History

2026-01-03 10:23:05 -06:00
import couchdb
import os
import json
import time
from urllib.parse import quote
class CouchDBManager:
def __init__(self, url, username, password, db_name):
self.url = url.rstrip("/")
self.username = username.strip()
self.password = password.strip()
self.db_name = db_name
def connect(self):
try:
if self.username and self.password:
if "://" in self.url:
protocol, rest = self.url.split("://", 1)
else:
protocol, rest = "http", self.url
safe_user = quote(self.username, safe="")
safe_pass = quote(self.password, safe="")
full_url = f"{protocol}://{safe_user}:{safe_pass}@{rest}"
else:
full_url = self.url
server = couchdb.Server(full_url)
return server[self.db_name]
except Exception as e:
raise Exception(f"CouchDB Connection Error: {e}")
def _calculate_size(self, db, children):
"""Calculate actual content size from chunks (LiveSync compatible)"""
size = 0
for chunk_id in children:
if chunk_id in db:
chunk = db[chunk_id]
data = chunk.get("data", chunk.get("content", ""))
size += len(str(data))
return size
def list_files(self, prefix_filter=""):
"""Returns a dict: {id: path} for all 'file' documents."""
db = self.connect()
files = {}
# Use _all_docs to scan quickly
for row in db.view('_all_docs', include_docs=True):
doc = row.doc
# Identify file metadata docs (has children array)
if "children" in doc and isinstance(doc["children"], list):
path = doc.get("path", doc["_id"])
# Filter by prefix (folder) if requested
# Normalizing path separators to /
norm_path = path.replace("\\", "/")
norm_prefix = prefix_filter.replace("\\", "/")
# Debug print for first few items to diagnose filtering mismatch
if len(files) < 3:
print(f"DEBUG: Found path '{norm_path}'. Filter: '{norm_prefix}'")
if not prefix_filter or norm_path.startswith(norm_prefix):
files[doc["_id"]] = path
print(f"DEBUG: list_files returning {len(files)} files after filtering.")
return files
def read_file_content(self, doc_id):
"""Reconstructs file content from chunks."""
db = self.connect()
doc = db[doc_id]
content = []
for chunk_id in doc.get("children", []):
if chunk_id in db:
chunk = db[chunk_id]
data = chunk.get("data") or chunk.get("content") or ""
# Check encryption
if str(data).startswith("%") or chunk.get("e_"):
return "[ENCRYPTED_CONTENT]"
content.append(str(data))
return "".join(content)
def move_file(self, doc_id, target_folder):
"""
LiveSync-compatible file move:
Updates the path and _id in place without breaking chunk references.
NOTE: This approach updates the document but LiveSync may still need
to reconcile the change. For best results, use Obsidian's built-in move.
"""
db = self.connect()
print(f"DEBUG: Attempting to move doc_id: {repr(doc_id)} to '{target_folder}'")
doc = None
actual_doc_id = None
try:
# Try direct lookup first (faster)
if doc_id in db:
doc = db[doc_id]
actual_doc_id = doc_id
else:
# Fallback: case-insensitive scan
doc_id_lower = doc_id.lower()
for row in db.view('_all_docs', include_docs=True):
if row.id.lower() == doc_id_lower:
doc = row.doc
actual_doc_id = row.id
break
if not doc:
return f"Error: Document {doc_id} not found in DB (Scan failed)."
except Exception as e:
return f"Error fetching document {doc_id}: {e}"
old_path = doc.get("path", actual_doc_id)
filename = os.path.basename(old_path)
# Construct new path
target_folder = target_folder.strip("/")
new_path = f"{target_folder}/{filename}"
new_doc_id = new_path.lower()
# Check if already at target
if actual_doc_id == new_doc_id:
return f"Already at {new_path}"
# IMPORTANT: For LiveSync compatibility, we need to:
# 1. Update path field (tells LiveSync where file should be)
# 2. Update mtime (triggers sync)
# 3. Keep all other metadata intact
# Update in place first
doc["path"] = new_path
doc["mtime"] = int(time.time() * 1000)
db.save(doc)
# If the doc_id needs to change, create new doc and delete old
if actual_doc_id != new_doc_id:
# Check if target exists
if new_doc_id in db:
# Target exists - merge/update it instead
target_doc = db[new_doc_id]
target_doc["path"] = new_path
target_doc["mtime"] = int(time.time() * 1000)
target_doc["children"] = doc["children"] # Use latest chunks
target_doc["size"] = self._calculate_size(db, doc["children"]) # Recalculate size
target_doc["ctime"] = doc.get("ctime", target_doc.get("ctime"))
target_doc["type"] = doc.get("type", "plain")
target_doc["eden"] = doc.get("eden", {})
db.save(target_doc)
# Delete old
db.delete(doc)
return f"Merged and moved to {new_path}"
else:
# Create new doc with new ID
new_doc = doc.copy()
new_doc["_id"] = new_doc_id
new_doc["path"] = new_path
new_doc["size"] = self._calculate_size(db, new_doc["children"]) # Recalculate size
del new_doc["_rev"]
db.save(new_doc)
# Delete old
old_doc = db[actual_doc_id] # Refresh to get latest _rev
db.delete(old_doc)
return f"Moved to {new_path}"
def flag_rewrite(self, doc_id, reason, tag):
"""
Appends a rewrite tag.
"""
db = self.connect()
print(f"DEBUG: Flagging rewrite for doc_id: {repr(doc_id)}")
doc = None
try:
# Try direct lookup first (faster)
if doc_id in db:
doc = db[doc_id]
else:
# Fallback: case-insensitive scan
doc_id_lower = doc_id.lower()
for row in db.view('_all_docs', include_docs=True):
if row.id.lower() == doc_id_lower:
doc = row.doc
break
if not doc:
return f"Error: Document {doc_id} not found (Scan failed)."
except Exception as e:
return f"Error fetching doc for rewrite {doc_id}: {e}"
# Create new chunk
chunk_id = f"h:{int(time.time())}" # Simple ID generation
chunk_content = f"\n\n{tag} {reason}\n"
chunk_doc = {
"_id": chunk_id,
"data": chunk_content,
"type": "chunk" # LiveSync convention
}
db.save(chunk_doc)
# Update metadata to point to new chunk
doc["children"].append(chunk_id)
doc["mtime"] = int(time.time() * 1000)
# CRITICAL: Update size to match actual content size
doc["size"] = self._calculate_size(db, doc["children"])
db.save(doc)
return f"Flagged {doc.get('path')} for rewrite."