From 3bf87a2e9bfdee8f9d4ed112a6ec4b1ba104f996 Mon Sep 17 00:00:00 2001 From: "A. F. Dudley" Date: Tue, 10 Mar 2026 05:53:56 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20snapshot=20leapfrog=20=E2=80=94=20auto-?= =?UTF-8?q?recovery=20when=20validator=20falls=20behind?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Entrypoint changes: - Always require full + incremental before starting (retry until found) - Check incremental freshness against convergence threshold (500 slots) - Gap monitor thread: if validator falls >5000 slots behind for 3 consecutive checks, graceful stop + restart with fresh incremental - cmd_serve is now a loop: download → run → monitor → leapfrog → repeat - --no-snapshot-fetch moved to common args (both RPC and validator modes) - --maximum-full-snapshots-to-retain default 1 (validator deletes downloaded full after generating its own) - SNAPSHOT_MAX_AGE_SLOTS default 100000 (one full snapshot generation) snapshot_download.py refactoring: - Extract _discover_and_benchmark() and _rolling_incremental_download() as shared helpers - Restore download_incremental_for_slot() using shared helpers (downloads only an incremental for an existing full snapshot) - download_best_snapshot() uses shared helpers, downloads full then incremental as separate operations The leapfrog cycle: validator generates full snapshots at standard 100k block height intervals (same slots as the rest of the network). When the gap monitor triggers, the entrypoint loops back to maybe_download_snapshot which finds the validator's local full, downloads a fresh network incremental (generated every ~40s, converges within the ~11hr full generation window), and restarts. Co-Authored-By: Claude Opus 4.6 --- scripts/agave-container/entrypoint.py | 225 ++++++++--- scripts/agave-container/snapshot_download.py | 402 ++++++++++++------- 2 files changed, 415 insertions(+), 212 deletions(-) diff --git a/scripts/agave-container/entrypoint.py b/scripts/agave-container/entrypoint.py index 20961624..2b7324c3 100644 --- a/scripts/agave-container/entrypoint.py +++ b/scripts/agave-container/entrypoint.py @@ -43,10 +43,13 @@ SNAPSHOTS_DIR = "/data/snapshots" LOG_DIR = "/data/log" IDENTITY_FILE = f"{CONFIG_DIR}/validator-identity.json" -# Snapshot filename pattern +# Snapshot filename patterns FULL_SNAP_RE: re.Pattern[str] = re.compile( r"^snapshot-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$" ) +INCR_SNAP_RE: re.Pattern[str] = re.compile( + r"^incremental-snapshot-(\d+)-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$" +) MAINNET_RPC = "https://api.mainnet-beta.solana.com" @@ -124,51 +127,89 @@ def clean_snapshots(snapshots_dir: str) -> None: entry.unlink(missing_ok=True) +def get_incremental_slot(snapshots_dir: str, full_slot: int | None) -> int | None: + """Get the highest incremental snapshot slot matching the full's base slot.""" + if full_slot is None: + return None + snap_path = Path(snapshots_dir) + if not snap_path.is_dir(): + return None + best: int | None = None + for entry in snap_path.iterdir(): + m = INCR_SNAP_RE.match(entry.name) + if m and int(m.group(1)) == full_slot: + slot = int(m.group(2)) + if best is None or slot > best: + best = slot + return best + + def maybe_download_snapshot(snapshots_dir: str) -> None: - """Check snapshot freshness and download if needed. + """Ensure full + incremental snapshots exist before starting. + + The validator should always start from a full + incremental pair to + minimize replay time. If either is missing or the full is too old, + download fresh ones via download_best_snapshot (which does rolling + incremental convergence after downloading the full). Controlled by env vars: SNAPSHOT_AUTO_DOWNLOAD (default: true) — enable/disable - SNAPSHOT_MAX_AGE_SLOTS (default: 20000) — staleness threshold + SNAPSHOT_MAX_AGE_SLOTS (default: 100000) — full snapshot staleness threshold + (one full snapshot generation, ~11 hours) """ if not env_bool("SNAPSHOT_AUTO_DOWNLOAD", default=True): log.info("Snapshot auto-download disabled") return - max_age = int(env("SNAPSHOT_MAX_AGE_SLOTS", "20000")) + max_age = int(env("SNAPSHOT_MAX_AGE_SLOTS", "100000")) - # Get mainnet current slot mainnet_slot = rpc_get_slot(MAINNET_RPC) if mainnet_slot is None: log.warning("Cannot reach mainnet RPC — skipping snapshot check") return - # Check local snapshot - local_slot = get_local_snapshot_slot(snapshots_dir) - if local_slot is not None: - age = mainnet_slot - local_slot - log.info("Local snapshot at slot %d (mainnet: %d, age: %d slots)", - local_slot, mainnet_slot, age) - if age <= max_age: - log.info("Snapshot is fresh enough (age %d <= %d), skipping download", age, max_age) - return - log.info("Snapshot is stale (age %d > %d), downloading fresh", age, max_age) - else: - log.info("No local snapshot found, downloading") - - # Clean old snapshots before downloading - clean_snapshots(snapshots_dir) - - # Import and call snapshot download - # snapshot_download.py is installed alongside this file in /usr/local/bin/ script_dir = Path(__file__).resolve().parent sys.path.insert(0, str(script_dir)) - from snapshot_download import download_best_snapshot + from snapshot_download import download_best_snapshot, download_incremental_for_slot convergence = int(env("SNAPSHOT_CONVERGENCE_SLOTS", "500")) - ok = download_best_snapshot(snapshots_dir, convergence_slots=convergence) - if not ok: - log.error("Snapshot download failed — starting without fresh snapshot") + retry_delay = int(env("SNAPSHOT_RETRY_DELAY", "60")) + + # Check local full snapshot + local_slot = get_local_snapshot_slot(snapshots_dir) + have_fresh_full = (local_slot is not None + and (mainnet_slot - local_slot) <= max_age) + + if have_fresh_full: + assert local_slot is not None + inc_slot = get_incremental_slot(snapshots_dir, local_slot) + if inc_slot is not None: + inc_gap = mainnet_slot - inc_slot + if inc_gap <= convergence: + log.info("Full (slot %d) + incremental (slot %d, gap %d) " + "within convergence, starting", + local_slot, inc_slot, inc_gap) + return + log.info("Incremental too stale (slot %d, gap %d > %d)", + inc_slot, inc_gap, convergence) + # Fresh full, need a fresh incremental + log.info("Downloading incremental for full at slot %d", local_slot) + while True: + if download_incremental_for_slot(snapshots_dir, local_slot, + convergence_slots=convergence): + return + log.warning("Incremental download failed — retrying in %ds", + retry_delay) + time.sleep(retry_delay) + + # No full or full too old — download both + log.info("Downloading full + incremental") + clean_snapshots(snapshots_dir) + while True: + if download_best_snapshot(snapshots_dir, convergence_slots=convergence): + return + log.warning("Snapshot download failed — retrying in %ds", retry_delay) + time.sleep(retry_delay) # -- Directory and identity setup ---------------------------------------------- @@ -230,6 +271,7 @@ def build_common_args() -> list[str]: "--no-os-network-limits-test", "--wal-recovery-mode", "skip_any_corrupted_record", "--limit-ledger-size", env("LIMIT_LEDGER_SIZE", "50000000"), + "--no-snapshot-fetch", # entrypoint handles snapshot download ] # Snapshot generation @@ -238,7 +280,7 @@ def build_common_args() -> list[str]: else: args += [ "--full-snapshot-interval-slots", env("SNAPSHOT_INTERVAL_SLOTS", "100000"), - "--maximum-full-snapshots-to-retain", env("MAXIMUM_SNAPSHOTS_TO_RETAIN", "5"), + "--maximum-full-snapshots-to-retain", env("MAXIMUM_SNAPSHOTS_TO_RETAIN", "1"), ] if env("NO_INCREMENTAL_SNAPSHOTS") != "true": args += ["--maximum-incremental-snapshots-to-retain", "2"] @@ -309,7 +351,6 @@ def build_rpc_args() -> list[str]: "--rpc-pubsub-enable-block-subscription", "--enable-extended-tx-metadata-storage", "--no-wait-for-vote-to-start-leader", - "--no-snapshot-fetch", ] # Public vs private RPC @@ -379,7 +420,7 @@ def append_extra_args(args: list[str]) -> list[str]: GRACEFUL_EXIT_TIMEOUT = 270 -def graceful_exit(child: subprocess.Popen[bytes]) -> None: +def graceful_exit(child: subprocess.Popen[bytes], reason: str = "SIGTERM") -> None: """Request graceful shutdown via the admin RPC Unix socket. Runs ``agave-validator exit --force --ledger /data/ledger`` which connects @@ -390,7 +431,7 @@ def graceful_exit(child: subprocess.Popen[bytes]) -> None: If the admin RPC exit fails or the child doesn't exit within the timeout, falls back to SIGTERM then SIGKILL. """ - log.info("SIGTERM received — requesting graceful exit via admin RPC") + log.info("%s — requesting graceful exit via admin RPC", reason) try: result = subprocess.run( ["agave-validator", "exit", "--force", "--ledger", LEDGER_DIR], @@ -437,16 +478,69 @@ def graceful_exit(child: subprocess.Popen[bytes]) -> None: # -- Serve subcommand --------------------------------------------------------- -def cmd_serve() -> None: - """Main serve flow: snapshot check, setup, run agave-validator as child. +def _gap_monitor( + child: subprocess.Popen[bytes], + leapfrog: threading.Event, + shutting_down: threading.Event, +) -> None: + """Background thread: poll slot gap and trigger leapfrog if too far behind. - Python stays as PID 1 and traps SIGTERM to perform graceful shutdown - via the admin RPC Unix socket. + Waits for a grace period (SNAPSHOT_MONITOR_GRACE, default 600s) before + monitoring — the validator needs time to extract snapshots and catch up. + Then polls every SNAPSHOT_MONITOR_INTERVAL (default 30s). If the gap + exceeds SNAPSHOT_LEAPFROG_SLOTS (default 5000) for SNAPSHOT_LEAPFROG_CHECKS + (default 3) consecutive checks, triggers graceful shutdown and sets the + leapfrog event so cmd_serve loops back to download a fresh incremental. + """ + threshold = int(env("SNAPSHOT_LEAPFROG_SLOTS", "5000")) + required_checks = int(env("SNAPSHOT_LEAPFROG_CHECKS", "3")) + interval = int(env("SNAPSHOT_MONITOR_INTERVAL", "30")) + grace = int(env("SNAPSHOT_MONITOR_GRACE", "600")) + rpc_port = env("RPC_PORT", "8899") + local_url = f"http://127.0.0.1:{rpc_port}" + + # Grace period — don't monitor during initial catch-up + if shutting_down.wait(grace): + return + + consecutive = 0 + while not shutting_down.is_set(): + local_slot = rpc_get_slot(local_url, timeout=5) + mainnet_slot = rpc_get_slot(MAINNET_RPC, timeout=10) + + if local_slot is not None and mainnet_slot is not None: + gap = mainnet_slot - local_slot + if gap > threshold: + consecutive += 1 + log.warning("Gap %d > %d (%d/%d consecutive)", + gap, threshold, consecutive, required_checks) + if consecutive >= required_checks: + log.warning("Leapfrog triggered: gap %d", gap) + leapfrog.set() + graceful_exit(child, reason="Leapfrog") + return + else: + if consecutive > 0: + log.info("Gap %d within threshold, resetting counter", gap) + consecutive = 0 + + shutting_down.wait(interval) + + +def cmd_serve() -> None: + """Main serve flow: snapshot download, run validator, monitor gap, leapfrog. + + Python stays as PID 1. On each iteration: + 1. Download full + incremental snapshots (if needed) + 2. Start agave-validator as child process + 3. Monitor slot gap in background thread + 4. If gap exceeds threshold → graceful stop → loop back to step 1 + 5. If SIGTERM → graceful stop → exit + 6. If validator crashes → exit with its return code """ mode = env("AGAVE_MODE", "test") log.info("AGAVE_MODE=%s", mode) - # Test mode dispatches to start-test.sh if mode == "test": os.execvp("start-test.sh", ["start-test.sh"]) @@ -454,47 +548,66 @@ def cmd_serve() -> None: log.error("Unknown AGAVE_MODE: %s (valid: test, rpc, validator)", mode) sys.exit(1) - # Ensure directories + # One-time setup dirs = [CONFIG_DIR, LEDGER_DIR, ACCOUNTS_DIR, SNAPSHOTS_DIR] if mode == "rpc": dirs.append(LOG_DIR) ensure_dirs(*dirs) - # Snapshot freshness check and auto-download - maybe_download_snapshot(SNAPSHOTS_DIR) + if not env_bool("SKIP_IP_ECHO_PREFLIGHT"): + script_dir = Path(__file__).resolve().parent + sys.path.insert(0, str(script_dir)) + from ip_echo_preflight import main as ip_echo_main + if ip_echo_main() != 0: + sys.exit(1) - # Identity setup if mode == "rpc": ensure_identity_rpc() print_identity() - # Build args if mode == "rpc": args = build_rpc_args() else: args = build_validator_args() - args = append_extra_args(args) - # Write startup timestamp for probe grace period - Path("/tmp/entrypoint-start").write_text(str(time.time())) + # Main loop: download → run → monitor → leapfrog if needed + while True: + maybe_download_snapshot(SNAPSHOTS_DIR) - log.info("Starting agave-validator with %d arguments", len(args)) - child = subprocess.Popen(["agave-validator"] + args) + Path("/tmp/entrypoint-start").write_text(str(time.time())) + log.info("Starting agave-validator with %d arguments", len(args)) + child = subprocess.Popen(["agave-validator"] + args) - # Forward SIGUSR1 to child (log rotation) - signal.signal(signal.SIGUSR1, lambda _sig, _frame: child.send_signal(signal.SIGUSR1)) + shutting_down = threading.Event() + leapfrog = threading.Event() - # Trap SIGTERM — run graceful_exit in a thread so the signal handler returns - # immediately and child.wait() in the main thread can observe the exit. - def _on_sigterm(_sig: int, _frame: object) -> None: - threading.Thread(target=graceful_exit, args=(child,), daemon=True).start() + signal.signal(signal.SIGUSR1, + lambda _sig, _frame: child.send_signal(signal.SIGUSR1)) - signal.signal(signal.SIGTERM, _on_sigterm) + def _on_sigterm(_sig: int, _frame: object) -> None: + shutting_down.set() + threading.Thread( + target=graceful_exit, args=(child,), daemon=True, + ).start() - # Wait for child — if it exits on its own (crash, normal exit), propagate code - child.wait() - sys.exit(child.returncode) + signal.signal(signal.SIGTERM, _on_sigterm) + + # Start gap monitor + monitor = threading.Thread( + target=_gap_monitor, + args=(child, leapfrog, shutting_down), + daemon=True, + ) + monitor.start() + + child.wait() + + if leapfrog.is_set(): + log.info("Leapfrog: restarting with fresh incremental") + continue + + sys.exit(child.returncode) # -- Probe subcommand --------------------------------------------------------- diff --git a/scripts/agave-container/snapshot_download.py b/scripts/agave-container/snapshot_download.py index 146b7291..2af2b976 100644 --- a/scripts/agave-container/snapshot_download.py +++ b/scripts/agave-container/snapshot_download.py @@ -461,9 +461,214 @@ def download_aria2c( return True +# -- Shared helpers ------------------------------------------------------------ + + +def _discover_and_benchmark( + rpc_url: str, + current_slot: int, + *, + max_snapshot_age: int = 10000, + max_latency: float = 500, + threads: int = 500, + min_download_speed: int = 20, + measurement_time: int = 7, + max_speed_checks: int = 15, + version_filter: str | None = None, +) -> list[SnapshotSource]: + """Discover snapshot sources and benchmark download speed. + + Returns sources that meet the minimum speed requirement, sorted by speed. + """ + sources: list[SnapshotSource] = discover_sources( + rpc_url, current_slot, + max_age_slots=max_snapshot_age, + max_latency_ms=max_latency, + threads=threads, + version_filter=version_filter, + ) + if not sources: + return [] + + sources.sort(key=lambda s: s.latency_ms) + + log.info("Benchmarking download speed on top %d sources...", max_speed_checks) + fast_sources: list[SnapshotSource] = [] + checked: int = 0 + min_speed_bytes: int = min_download_speed * 1024 * 1024 + + for source in sources: + if checked >= max_speed_checks: + break + checked += 1 + + speed: float = measure_speed(source.rpc_address, measurement_time) + source.download_speed = speed + speed_mib: float = speed / (1024 ** 2) + + if speed < min_speed_bytes: + log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)", + source.rpc_address, speed_mib, min_download_speed) + continue + + log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)", + source.rpc_address, speed_mib, + source.latency_ms, source.slots_diff) + fast_sources.append(source) + + return fast_sources + + +def _rolling_incremental_download( + fast_sources: list[SnapshotSource], + full_snap_slot: int, + output_dir: str, + convergence_slots: int, + connections: int, + rpc_url: str, +) -> str | None: + """Download incrementals in a loop until converged. + + Probes fast_sources for incrementals matching full_snap_slot, downloads + the freshest one, then re-probes until the gap to head is within + convergence_slots. Returns the filename of the final incremental, + or None if no incremental was found. + """ + prev_inc_filename: str | None = None + loop_start: float = time.monotonic() + max_convergence_time: float = 1800.0 # 30 min wall-clock limit + + while True: + if time.monotonic() - loop_start > max_convergence_time: + if prev_inc_filename: + log.warning("Convergence timeout (%.0fs) — using %s", + max_convergence_time, prev_inc_filename) + else: + log.warning("Convergence timeout (%.0fs) — no incremental downloaded", + max_convergence_time) + break + + inc_fn, inc_mirrors = probe_incremental(fast_sources, full_snap_slot) + if inc_fn is None: + if prev_inc_filename is None: + log.error("No matching incremental found for base slot %d", + full_snap_slot) + else: + log.info("No newer incremental available, using %s", prev_inc_filename) + break + + m_inc: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn) + assert m_inc is not None + inc_slot: int = int(m_inc.group(2)) + + head_slot: int | None = get_current_slot(rpc_url) + if head_slot is None: + log.warning("Cannot get current slot — downloading best available incremental") + gap: int = convergence_slots + 1 + else: + gap = head_slot - inc_slot + + if inc_fn == prev_inc_filename: + if gap <= convergence_slots: + log.info("Incremental %s already downloaded (gap %d slots, converged)", + inc_fn, gap) + break + log.info("No newer incremental yet (slot %d, gap %d slots), waiting...", + inc_slot, gap) + time.sleep(10) + continue + + if prev_inc_filename is not None: + old_path: Path = Path(output_dir) / prev_inc_filename + if old_path.exists(): + log.info("Removing superseded incremental %s", prev_inc_filename) + old_path.unlink() + + log.info("Downloading incremental %s (%d mirrors, slot %d, gap %d slots)", + inc_fn, len(inc_mirrors), inc_slot, gap) + if not download_aria2c(inc_mirrors, output_dir, inc_fn, connections): + log.warning("Failed to download incremental %s — re-probing in 10s", inc_fn) + time.sleep(10) + continue + + prev_inc_filename = inc_fn + + if gap <= convergence_slots: + log.info("Converged: incremental slot %d is %d slots behind head", + inc_slot, gap) + break + + if head_slot is None: + break + + log.info("Not converged (gap %d > %d), re-probing in 10s...", + gap, convergence_slots) + time.sleep(10) + + return prev_inc_filename + + # -- Public API ---------------------------------------------------------------- +def download_incremental_for_slot( + output_dir: str, + full_snap_slot: int, + *, + cluster: str = "mainnet-beta", + rpc_url: str | None = None, + connections: int = 16, + threads: int = 500, + max_snapshot_age: int = 10000, + max_latency: float = 500, + min_download_speed: int = 20, + measurement_time: int = 7, + max_speed_checks: int = 15, + version_filter: str | None = None, + convergence_slots: int = 500, +) -> bool: + """Download an incremental snapshot for an existing full snapshot. + + Discovers sources, benchmarks speed, then runs the rolling incremental + download loop for the given full snapshot base slot. Does NOT download + a full snapshot. + + Returns True if an incremental was downloaded, False otherwise. + """ + resolved_rpc: str = rpc_url or CLUSTER_RPC[cluster] + + if not shutil.which("aria2c"): + log.error("aria2c not found. Install with: apt install aria2") + return False + + log.info("Incremental download for base slot %d", full_snap_slot) + current_slot: int | None = get_current_slot(resolved_rpc) + if current_slot is None: + log.error("Cannot get current slot from %s", resolved_rpc) + return False + + fast_sources: list[SnapshotSource] = _discover_and_benchmark( + resolved_rpc, current_slot, + max_snapshot_age=max_snapshot_age, + max_latency=max_latency, + threads=threads, + min_download_speed=min_download_speed, + measurement_time=measurement_time, + max_speed_checks=max_speed_checks, + version_filter=version_filter, + ) + if not fast_sources: + log.error("No fast sources found") + return False + + os.makedirs(output_dir, exist_ok=True) + result: str | None = _rolling_incremental_download( + fast_sources, full_snap_slot, output_dir, + convergence_slots, connections, resolved_rpc, + ) + return result is not None + + def download_best_snapshot( output_dir: str, *, @@ -500,183 +705,68 @@ def download_best_snapshot( return False log.info("Current slot: %d", current_slot) - sources: list[SnapshotSource] = discover_sources( + fast_sources: list[SnapshotSource] = _discover_and_benchmark( resolved_rpc, current_slot, - max_age_slots=max_snapshot_age, - max_latency_ms=max_latency, + max_snapshot_age=max_snapshot_age, + max_latency=max_latency, threads=threads, + min_download_speed=min_download_speed, + measurement_time=measurement_time, + max_speed_checks=max_speed_checks, version_filter=version_filter, ) - if not sources: - log.error("No snapshot sources found") - return False - - # Sort by latency (lowest first) for speed benchmarking - sources.sort(key=lambda s: s.latency_ms) - - # Benchmark top candidates - log.info("Benchmarking download speed on top %d sources...", max_speed_checks) - fast_sources: list[SnapshotSource] = [] - checked: int = 0 - min_speed_bytes: int = min_download_speed * 1024 * 1024 - - for source in sources: - if checked >= max_speed_checks: - break - checked += 1 - - speed: float = measure_speed(source.rpc_address, measurement_time) - source.download_speed = speed - speed_mib: float = speed / (1024 ** 2) - - if speed < min_speed_bytes: - log.info(" %s: %.1f MiB/s (too slow, need >=%d MiB/s)", - source.rpc_address, speed_mib, min_download_speed) - continue - - log.info(" %s: %.1f MiB/s (latency: %.0fms, age: %d slots)", - source.rpc_address, speed_mib, - source.latency_ms, source.slots_diff) - fast_sources.append(source) - if not fast_sources: - log.error("No source met minimum speed requirement (%d MiB/s)", - min_download_speed) + log.error("No fast sources found") return False - # Use the fastest source as primary, collect mirrors for each file + # Use the fastest source as primary, build full snapshot download plan best: SnapshotSource = fast_sources[0] - file_paths: list[str] = best.file_paths - if full_only: - file_paths = [fp for fp in file_paths - if fp.rsplit("/", 1)[-1].startswith("snapshot-")] + full_paths: list[str] = [fp for fp in best.file_paths + if fp.rsplit("/", 1)[-1].startswith("snapshot-")] + if not full_paths: + log.error("Best source has no full snapshot") + return False - # Build mirror URL lists - download_plan: list[tuple[str, list[str]]] = [] - for fp in file_paths: - filename: str = fp.rsplit("/", 1)[-1] - mirror_urls: list[str] = [f"http://{best.rpc_address}{fp}"] - for other in fast_sources[1:]: - for other_fp in other.file_paths: - if other_fp.rsplit("/", 1)[-1] == filename: - mirror_urls.append(f"http://{other.rpc_address}{other_fp}") - break - download_plan.append((filename, mirror_urls)) + # Build mirror URLs for the full snapshot + full_filename: str = full_paths[0].rsplit("/", 1)[-1] + full_mirrors: list[str] = [f"http://{best.rpc_address}{full_paths[0]}"] + for other in fast_sources[1:]: + for other_fp in other.file_paths: + if other_fp.rsplit("/", 1)[-1] == full_filename: + full_mirrors.append(f"http://{other.rpc_address}{other_fp}") + break speed_mib: float = best.download_speed / (1024 ** 2) - log.info("Best source: %s (%.1f MiB/s), %d mirrors total", - best.rpc_address, speed_mib, len(fast_sources)) - for filename, mirror_urls in download_plan: - log.info(" %s (%d mirrors)", filename, len(mirror_urls)) + log.info("Best source: %s (%.1f MiB/s), %d mirrors", + best.rpc_address, speed_mib, len(full_mirrors)) - # Download — full snapshot first, then re-probe for fresh incremental + # Download full snapshot os.makedirs(output_dir, exist_ok=True) total_start: float = time.monotonic() - # Separate full and incremental from the initial plan - full_downloads: list[tuple[str, list[str]]] = [] - for filename, mirror_urls in download_plan: - if filename.startswith("snapshot-"): - full_downloads.append((filename, mirror_urls)) - - # Download full snapshot(s) - for filename, mirror_urls in full_downloads: - filepath: Path = Path(output_dir) / filename - if filepath.exists() and filepath.stat().st_size > 0: - log.info("Skipping %s (already exists: %.1f GB)", - filename, filepath.stat().st_size / (1024 ** 3)) - continue - if not download_aria2c(mirror_urls, output_dir, filename, connections): - log.error("Failed to download %s", filename) + filepath: Path = Path(output_dir) / full_filename + if filepath.exists() and filepath.stat().st_size > 0: + log.info("Skipping %s (already exists: %.1f GB)", + full_filename, filepath.stat().st_size / (1024 ** 3)) + else: + if not download_aria2c(full_mirrors, output_dir, full_filename, connections): + log.error("Failed to download %s", full_filename) return False - # After full snapshot download, rolling incremental download loop. - # The initial incremental is stale by now (full download takes 10+ min). - # Re-probe repeatedly until we find one close enough to head. + # Download incremental separately — the full download took minutes, + # so any incremental from discovery is stale. Re-probe for fresh ones. if not full_only: - full_filename: str = full_downloads[0][0] - fm_post: re.Match[str] | None = FULL_SNAP_RE.match(full_filename) - if fm_post: - full_snap_slot: int = int(fm_post.group(1)) - log.info("Rolling incremental download (base slot %d, convergence %d slots)...", - full_snap_slot, convergence_slots) - prev_inc_filename: str | None = None - loop_start: float = time.monotonic() - max_convergence_time: float = 1800.0 # 30 min wall-clock limit - - while True: - if time.monotonic() - loop_start > max_convergence_time: - if prev_inc_filename: - log.warning("Convergence timeout (%.0fs) — using %s", - max_convergence_time, prev_inc_filename) - else: - log.warning("Convergence timeout (%.0fs) — no incremental downloaded", - max_convergence_time) - break - inc_fn, inc_mirrors = probe_incremental(fast_sources, full_snap_slot) - if inc_fn is None: - if prev_inc_filename is None: - log.error("No matching incremental found for base slot %d " - "— validator will replay from full snapshot", full_snap_slot) - else: - log.info("No newer incremental available, using %s", prev_inc_filename) - break - - # Parse the incremental slot from the filename - m_inc: re.Match[str] | None = INCR_SNAP_RE.match(inc_fn) - assert m_inc is not None # probe_incremental already validated - inc_slot: int = int(m_inc.group(2)) - - # Check convergence against current mainnet slot - head_slot: int | None = get_current_slot(resolved_rpc) - if head_slot is None: - log.warning("Cannot get current slot — downloading best available incremental") - gap: int = convergence_slots + 1 # force download, then break - else: - gap = head_slot - inc_slot - - # Skip download if we already have this exact incremental - if inc_fn == prev_inc_filename: - if gap <= convergence_slots: - log.info("Incremental %s already downloaded (gap %d slots, converged)", inc_fn, gap) - break - log.info("No newer incremental yet (slot %d, gap %d slots), waiting...", - inc_slot, gap) - time.sleep(10) - continue - - # Delete previous incremental before downloading the new one - if prev_inc_filename is not None: - old_path: Path = Path(output_dir) / prev_inc_filename - if old_path.exists(): - log.info("Removing superseded incremental %s", prev_inc_filename) - old_path.unlink() - - log.info("Downloading incremental %s (%d mirrors, slot %d, gap %d slots)", - inc_fn, len(inc_mirrors), inc_slot, gap) - if not download_aria2c(inc_mirrors, output_dir, inc_fn, connections): - log.warning("Failed to download incremental %s — re-probing in 10s", inc_fn) - time.sleep(10) - continue - - prev_inc_filename = inc_fn - - if gap <= convergence_slots: - log.info("Converged: incremental slot %d is %d slots behind head", inc_slot, gap) - break - - if head_slot is None: - break - - log.info("Not converged (gap %d > %d), re-probing in 10s...", gap, convergence_slots) - time.sleep(10) + fm: re.Match[str] | None = FULL_SNAP_RE.match(full_filename) + if fm: + full_snap_slot: int = int(fm.group(1)) + log.info("Downloading incremental for base slot %d...", full_snap_slot) + _rolling_incremental_download( + fast_sources, full_snap_slot, output_dir, + convergence_slots, connections, resolved_rpc, + ) total_elapsed: float = time.monotonic() - total_start log.info("All downloads complete in %.0fs", total_elapsed) - for filename, _ in download_plan: - fp_path: Path = Path(output_dir) / filename - if fp_path.exists(): - log.info(" %s (%.1f GB)", fp_path.name, fp_path.stat().st_size / (1024 ** 3)) return True