192 lines
6.2 KiB
Python
Executable File
192 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
CouchDB to Filesystem Sync Daemon
|
|
Monitors CouchDB changes and writes updated files to disk in real-time.
|
|
"""
|
|
import couchdb
|
|
import os
|
|
import time
|
|
import json
|
|
from urllib.parse import quote
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
USER = "admin"
|
|
PASSWORD = "DonCucarach0!?"
|
|
IP_ADDRESS = "100.100.112.48"
|
|
PORT = "5984"
|
|
DB_NAME = "obsidiandb"
|
|
TARGET_FOLDER = os.path.expanduser("~/ObsidianVault") # Change this to your Obsidian vault path
|
|
|
|
class CouchDBSyncDaemon:
|
|
def __init__(self, url, username, password, db_name, target_folder):
|
|
self.url = url
|
|
self.username = username
|
|
self.password = password
|
|
self.db_name = db_name
|
|
self.target_folder = target_folder
|
|
self.server = None
|
|
self.db = None
|
|
|
|
def connect(self):
|
|
"""Connect to CouchDB"""
|
|
safe_password = quote(self.password, safe="")
|
|
safe_username = quote(self.username, safe="")
|
|
full_url = f"{self.url.split('://')[0]}://{safe_username}:{safe_password}@{self.url.split('://')[1]}"
|
|
|
|
self.server = couchdb.Server(full_url)
|
|
self.db = self.server[self.db_name]
|
|
print(f"✅ Connected to CouchDB: {self.db_name}")
|
|
|
|
def get_all_docs_dict(self):
|
|
"""Fetch all documents into a dictionary for efficient lookup"""
|
|
all_docs = {}
|
|
for row in self.db.view('_all_docs', include_docs=True):
|
|
doc = row.doc
|
|
if not doc['_id'].startswith("_design"):
|
|
all_docs[doc['_id']] = doc
|
|
return all_docs
|
|
|
|
def write_file_to_disk(self, doc_id, all_docs):
|
|
"""Reconstruct and write a single file to disk"""
|
|
doc = all_docs.get(doc_id)
|
|
if not doc:
|
|
print(f"⚠️ Document {doc_id} not found")
|
|
return False
|
|
|
|
# Check if this is a file metadata document
|
|
if "children" not in doc or not isinstance(doc.get("children"), list):
|
|
# This might be a chunk or other document type - skip
|
|
return False
|
|
|
|
relative_path = doc.get("path", doc_id)
|
|
safe_path = relative_path.replace(":", "-").replace("|", "-")
|
|
full_local_path = os.path.join(self.target_folder, safe_path)
|
|
|
|
# Reconstruct content from chunks
|
|
full_content = []
|
|
is_encrypted = False
|
|
|
|
for chunk_id in doc["children"]:
|
|
chunk = all_docs.get(chunk_id)
|
|
if not chunk:
|
|
print(f"⚠️ Chunk {chunk_id} not found for {relative_path}")
|
|
continue
|
|
|
|
chunk_data = chunk.get("data") or chunk.get("content") or ""
|
|
|
|
# Check if encrypted
|
|
if str(chunk_data).startswith("%") or chunk.get("e_"):
|
|
is_encrypted = True
|
|
break
|
|
|
|
full_content.append(str(chunk_data))
|
|
|
|
if is_encrypted:
|
|
print(f"🔒 Skipping encrypted file: {relative_path}")
|
|
return False
|
|
|
|
note_text = "".join(full_content)
|
|
|
|
# Create parent directory if needed
|
|
os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
|
|
|
|
# Write file
|
|
with open(full_local_path, "w", encoding="utf-8") as f:
|
|
f.write(note_text)
|
|
|
|
print(f"📝 Wrote: {relative_path}")
|
|
return True
|
|
|
|
def handle_change(self, change, all_docs):
|
|
"""Handle a single change notification"""
|
|
doc_id = change['id']
|
|
is_deleted = change.get('deleted', False)
|
|
|
|
if is_deleted:
|
|
print(f"🗑️ Deleted: {doc_id}")
|
|
# TODO: Could implement file deletion here
|
|
return
|
|
|
|
# Try to write the file
|
|
self.write_file_to_disk(doc_id, all_docs)
|
|
|
|
def initial_sync(self):
|
|
"""Perform initial full sync"""
|
|
print("🔄 Starting initial sync...")
|
|
all_docs = self.get_all_docs_dict()
|
|
|
|
count = 0
|
|
for doc_id, doc in all_docs.items():
|
|
if "children" in doc and isinstance(doc.get("children"), list):
|
|
if self.write_file_to_disk(doc_id, all_docs):
|
|
count += 1
|
|
if count % 10 == 0:
|
|
print(f" Synced {count} files...")
|
|
|
|
print(f"✅ Initial sync complete: {count} files")
|
|
return count
|
|
|
|
def watch_changes(self):
|
|
"""Watch for changes and sync in real-time"""
|
|
print("👀 Watching for changes...")
|
|
|
|
# Get current sequence
|
|
since = self.db.info()['update_seq']
|
|
|
|
while True:
|
|
try:
|
|
# Use longpoll to wait for changes
|
|
changes = self.db.changes(since=since, feed='longpoll', timeout=30000, include_docs=True)
|
|
since = changes['last_seq']
|
|
|
|
if changes['results']:
|
|
# Fetch all docs for chunk lookup
|
|
all_docs = self.get_all_docs_dict()
|
|
|
|
for change in changes['results']:
|
|
self.handle_change(change, all_docs)
|
|
|
|
time.sleep(0.5)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⛔ Stopping sync daemon...")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
print(" Retrying in 5 seconds...")
|
|
time.sleep(5)
|
|
try:
|
|
self.connect() # Reconnect
|
|
except Exception as reconnect_error:
|
|
print(f"❌ Reconnection failed: {reconnect_error}")
|
|
|
|
def main():
|
|
# Ensure target folder exists
|
|
os.makedirs(TARGET_FOLDER, exist_ok=True)
|
|
|
|
print("=" * 60)
|
|
print("CouchDB → Filesystem Sync Daemon")
|
|
print("=" * 60)
|
|
print(f"Database: {DB_NAME}")
|
|
print(f"Target: {TARGET_FOLDER}")
|
|
print("=" * 60)
|
|
|
|
# Build URL
|
|
url = f"http://{IP_ADDRESS}:{PORT}/"
|
|
|
|
# Create daemon
|
|
daemon = CouchDBSyncDaemon(url, USER, PASSWORD, DB_NAME, TARGET_FOLDER)
|
|
|
|
# Connect
|
|
daemon.connect()
|
|
|
|
# Initial sync
|
|
daemon.initial_sync()
|
|
|
|
# Watch for changes
|
|
daemon.watch_changes()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|