import os import subprocess import json import urllib.request import urllib.error import base64 import shutil REMEMBER_URL = "https://remember.thefoldwithin.earth/api/v1" FORGEJO_LOCAL_URL = "http://forgejo.local/api/v1" AUTH = ("mrhavens", "Aok4y2k!") BACKUP_DIR = "/home/antigravity/scratch/remember_backups" os.makedirs(BACKUP_DIR, exist_ok=True) def _auth_headers(): auth_str = f"{AUTH[0]}:{AUTH[1]}" b64_auth = base64.b64encode(auth_str.encode('ascii')).decode('ascii') return { "Authorization": f"Basic {b64_auth}", "Content-Type": "application/json", "User-Agent": "curl/7.81.0" } def get_all_repos(api_url): repos = [] page = 1 while True: req = urllib.request.Request(f"{api_url}/user/repos?limit=50&page={page}", headers=_auth_headers()) try: with urllib.request.urlopen(req) as resp: if resp.status != 200: print(f"Error fetching repos: {resp.status}") break data = json.loads(resp.read().decode('utf-8')) if not data: break repos.extend(data) page += 1 except Exception as e: print(f"Failed to fetch {api_url}: {e}") break return repos print("Fetching repos from remember...") remember_repos = get_all_repos(REMEMBER_URL) print(f"Found {len(remember_repos)} repos on remember.thefoldwithin.earth") print("Fetching repos from forgejo.local...") local_repos = get_all_repos(FORGEJO_LOCAL_URL) local_repo_names = {r['name'] for r in local_repos} print(f"Found {len(local_repos)} repos on forgejo.local") for repo in remember_repos: name = repo['name'] clone_url = repo['clone_url'].replace("https://", f"https://{AUTH[0]}:{AUTH[1]}@") print(f"\n--- Processing {name} ---") # 1. Create on forgejo.local if it doesn't exist if name not in local_repo_names: print(f"Creating {name} on forgejo.local...") req = urllib.request.Request(f"{FORGEJO_LOCAL_URL}/user/repos", data=json.dumps({"name": name, "private": repo['private']}).encode('utf-8'), headers=_auth_headers(), method="POST") try: urllib.request.urlopen(req) except urllib.error.HTTPError as e: if e.code != 409: print(f"Failed to create repo {name}: {e.code} {e.reason}") continue else: print(f"{name} already exists on forgejo.local.") # 2. Clone and Push repo_dir = os.path.join(BACKUP_DIR, name) if os.path.exists(repo_dir): shutil.rmtree(repo_dir) print(f"Cloning {name} from remember...") try: subprocess.run(["git", "clone", "--mirror", clone_url, repo_dir], check=True, capture_output=True) print(f"Pushing {name} to forgejo.local...") push_url = f"http://{AUTH[0]}:{AUTH[1]}@forgejo.local/mrhavens/{name}.git" subprocess.run(["git", "push", "--mirror", push_url], cwd=repo_dir, check=True, capture_output=True) print(f"Successfully mirrored {name}!") except subprocess.CalledProcessError as e: print(f"Error processing {name}: {e}") if e.stderr: print(e.stderr.decode('utf-8', errors='ignore')) # Clean up to save space if os.path.exists(repo_dir): shutil.rmtree(repo_dir) print("\nFull synchronization complete. All public repositories are now secured on the physical node.")