#!/usr/bin/env python3 """ CouchDB to Filesystem Sync Daemon Monitors CouchDB changes and writes updated files to disk in real-time. """ import couchdb import os import time import json from urllib.parse import quote from pathlib import Path # Configuration USER = "admin" PASSWORD = "DonCucarach0!?" IP_ADDRESS = "100.100.112.48" PORT = "5984" DB_NAME = "obsidiandb" TARGET_FOLDER = os.path.expanduser("~/ObsidianVault") # Change this to your Obsidian vault path class CouchDBSyncDaemon: def __init__(self, url, username, password, db_name, target_folder): self.url = url self.username = username self.password = password self.db_name = db_name self.target_folder = target_folder self.server = None self.db = None def connect(self): """Connect to CouchDB""" safe_password = quote(self.password, safe="") safe_username = quote(self.username, safe="") full_url = f"{self.url.split('://')[0]}://{safe_username}:{safe_password}@{self.url.split('://')[1]}" self.server = couchdb.Server(full_url) self.db = self.server[self.db_name] print(f"✅ Connected to CouchDB: {self.db_name}") def get_all_docs_dict(self): """Fetch all documents into a dictionary for efficient lookup""" all_docs = {} for row in self.db.view('_all_docs', include_docs=True): doc = row.doc if not doc['_id'].startswith("_design"): all_docs[doc['_id']] = doc return all_docs def write_file_to_disk(self, doc_id, all_docs): """Reconstruct and write a single file to disk""" doc = all_docs.get(doc_id) if not doc: print(f"⚠️ Document {doc_id} not found") return False # Check if this is a file metadata document if "children" not in doc or not isinstance(doc.get("children"), list): # This might be a chunk or other document type - skip return False relative_path = doc.get("path", doc_id) safe_path = relative_path.replace(":", "-").replace("|", "-") full_local_path = os.path.join(self.target_folder, safe_path) # Reconstruct content from chunks full_content = [] is_encrypted = False for chunk_id in doc["children"]: chunk = all_docs.get(chunk_id) if not chunk: print(f"⚠️ Chunk {chunk_id} not found for {relative_path}") continue chunk_data = chunk.get("data") or chunk.get("content") or "" # Check if encrypted if str(chunk_data).startswith("%") or chunk.get("e_"): is_encrypted = True break full_content.append(str(chunk_data)) if is_encrypted: print(f"🔒 Skipping encrypted file: {relative_path}") return False note_text = "".join(full_content) # Create parent directory if needed os.makedirs(os.path.dirname(full_local_path), exist_ok=True) # Write file with open(full_local_path, "w", encoding="utf-8") as f: f.write(note_text) print(f"📝 Wrote: {relative_path}") return True def handle_change(self, change, all_docs): """Handle a single change notification""" doc_id = change['id'] is_deleted = change.get('deleted', False) if is_deleted: print(f"🗑️ Deleted: {doc_id}") # TODO: Could implement file deletion here return # Try to write the file self.write_file_to_disk(doc_id, all_docs) def initial_sync(self): """Perform initial full sync""" print("🔄 Starting initial sync...") all_docs = self.get_all_docs_dict() count = 0 for doc_id, doc in all_docs.items(): if "children" in doc and isinstance(doc.get("children"), list): if self.write_file_to_disk(doc_id, all_docs): count += 1 if count % 10 == 0: print(f" Synced {count} files...") print(f"✅ Initial sync complete: {count} files") return count def watch_changes(self): """Watch for changes and sync in real-time""" print("👀 Watching for changes...") # Get current sequence since = self.db.info()['update_seq'] while True: try: # Use longpoll to wait for changes changes = self.db.changes(since=since, feed='longpoll', timeout=30000, include_docs=True) since = changes['last_seq'] if changes['results']: # Fetch all docs for chunk lookup all_docs = self.get_all_docs_dict() for change in changes['results']: self.handle_change(change, all_docs) time.sleep(0.5) except KeyboardInterrupt: print("\n⛔ Stopping sync daemon...") break except Exception as e: print(f"❌ Error: {e}") print(" Retrying in 5 seconds...") time.sleep(5) try: self.connect() # Reconnect except Exception as reconnect_error: print(f"❌ Reconnection failed: {reconnect_error}") def main(): # Ensure target folder exists os.makedirs(TARGET_FOLDER, exist_ok=True) print("=" * 60) print("CouchDB → Filesystem Sync Daemon") print("=" * 60) print(f"Database: {DB_NAME}") print(f"Target: {TARGET_FOLDER}") print("=" * 60) # Build URL url = f"http://{IP_ADDRESS}:{PORT}/" # Create daemon daemon = CouchDBSyncDaemon(url, USER, PASSWORD, DB_NAME, TARGET_FOLDER) # Connect daemon.connect() # Initial sync daemon.initial_sync() # Watch for changes daemon.watch_changes() if __name__ == "__main__": main()