486 lines
15 KiB
Python
486 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Agave validator entrypoint — snapshot management, arg construction, liveness probe.
|
|
|
|
Two subcommands:
|
|
entrypoint.py serve (default) — snapshot freshness check + exec agave-validator
|
|
entrypoint.py probe — liveness probe (slot lag check, exits 0/1)
|
|
|
|
Replaces the bash entrypoint.sh / start-rpc.sh / start-validator.sh with a single
|
|
Python module. Test mode still dispatches to start-test.sh.
|
|
|
|
All configuration comes from environment variables — same vars as the original
|
|
bash scripts. See compose files for defaults.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from urllib.request import Request
|
|
|
|
log: logging.Logger = logging.getLogger("entrypoint")
|
|
|
|
# Directories
|
|
CONFIG_DIR = "/data/config"
|
|
LEDGER_DIR = "/data/ledger"
|
|
ACCOUNTS_DIR = "/data/accounts"
|
|
SNAPSHOTS_DIR = "/data/snapshots"
|
|
LOG_DIR = "/data/log"
|
|
IDENTITY_FILE = f"{CONFIG_DIR}/validator-identity.json"
|
|
|
|
# Snapshot filename pattern
|
|
FULL_SNAP_RE: re.Pattern[str] = re.compile(
|
|
r"^snapshot-(\d+)-[A-Za-z0-9]+\.tar\.(zst|bz2)$"
|
|
)
|
|
|
|
MAINNET_RPC = "https://api.mainnet-beta.solana.com"
|
|
|
|
|
|
# -- Helpers -------------------------------------------------------------------
|
|
|
|
|
|
def env(name: str, default: str = "") -> str:
|
|
"""Read env var with default."""
|
|
return os.environ.get(name, default)
|
|
|
|
|
|
def env_required(name: str) -> str:
|
|
"""Read required env var, exit if missing."""
|
|
val = os.environ.get(name)
|
|
if not val:
|
|
log.error("%s is required but not set", name)
|
|
sys.exit(1)
|
|
return val
|
|
|
|
|
|
def env_bool(name: str, default: bool = False) -> bool:
|
|
"""Read boolean env var (true/false/1/0)."""
|
|
val = os.environ.get(name, "").lower()
|
|
if not val:
|
|
return default
|
|
return val in ("true", "1", "yes")
|
|
|
|
|
|
def rpc_get_slot(url: str, timeout: int = 10) -> int | None:
|
|
"""Get current slot from a Solana RPC endpoint."""
|
|
payload = json.dumps({
|
|
"jsonrpc": "2.0", "id": 1,
|
|
"method": "getSlot", "params": [],
|
|
}).encode()
|
|
req = Request(url, data=payload,
|
|
headers={"Content-Type": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
data = json.loads(resp.read())
|
|
result = data.get("result")
|
|
if isinstance(result, int):
|
|
return result
|
|
except (urllib.error.URLError, json.JSONDecodeError, OSError, TimeoutError):
|
|
pass
|
|
return None
|
|
|
|
|
|
# -- Snapshot management -------------------------------------------------------
|
|
|
|
|
|
def get_local_snapshot_slot(snapshots_dir: str) -> int | None:
|
|
"""Find the highest slot among local snapshot files."""
|
|
best_slot: int | None = None
|
|
snap_path = Path(snapshots_dir)
|
|
if not snap_path.is_dir():
|
|
return None
|
|
for entry in snap_path.iterdir():
|
|
m = FULL_SNAP_RE.match(entry.name)
|
|
if m:
|
|
slot = int(m.group(1))
|
|
if best_slot is None or slot > best_slot:
|
|
best_slot = slot
|
|
return best_slot
|
|
|
|
|
|
def clean_snapshots(snapshots_dir: str) -> None:
|
|
"""Remove all snapshot files from the directory."""
|
|
snap_path = Path(snapshots_dir)
|
|
if not snap_path.is_dir():
|
|
return
|
|
for entry in snap_path.iterdir():
|
|
if entry.name.startswith(("snapshot-", "incremental-snapshot-")):
|
|
log.info("Removing old snapshot: %s", entry.name)
|
|
entry.unlink(missing_ok=True)
|
|
|
|
|
|
def maybe_download_snapshot(snapshots_dir: str) -> None:
|
|
"""Check snapshot freshness and download if needed.
|
|
|
|
Controlled by env vars:
|
|
SNAPSHOT_AUTO_DOWNLOAD (default: true) — enable/disable
|
|
SNAPSHOT_MAX_AGE_SLOTS (default: 20000) — staleness threshold
|
|
"""
|
|
if not env_bool("SNAPSHOT_AUTO_DOWNLOAD", default=True):
|
|
log.info("Snapshot auto-download disabled")
|
|
return
|
|
|
|
max_age = int(env("SNAPSHOT_MAX_AGE_SLOTS", "20000"))
|
|
|
|
# Get mainnet current slot
|
|
mainnet_slot = rpc_get_slot(MAINNET_RPC)
|
|
if mainnet_slot is None:
|
|
log.warning("Cannot reach mainnet RPC — skipping snapshot check")
|
|
return
|
|
|
|
# Check local snapshot
|
|
local_slot = get_local_snapshot_slot(snapshots_dir)
|
|
if local_slot is not None:
|
|
age = mainnet_slot - local_slot
|
|
log.info("Local snapshot at slot %d (mainnet: %d, age: %d slots)",
|
|
local_slot, mainnet_slot, age)
|
|
if age <= max_age:
|
|
log.info("Snapshot is fresh enough (age %d <= %d), skipping download", age, max_age)
|
|
return
|
|
log.info("Snapshot is stale (age %d > %d), downloading fresh", age, max_age)
|
|
else:
|
|
log.info("No local snapshot found, downloading")
|
|
|
|
# Clean old snapshots before downloading
|
|
clean_snapshots(snapshots_dir)
|
|
|
|
# Import and call snapshot download
|
|
# snapshot_download.py is installed alongside this file in /usr/local/bin/
|
|
script_dir = Path(__file__).resolve().parent
|
|
sys.path.insert(0, str(script_dir))
|
|
from snapshot_download import download_best_snapshot
|
|
|
|
ok = download_best_snapshot(snapshots_dir)
|
|
if not ok:
|
|
log.error("Snapshot download failed — starting without fresh snapshot")
|
|
|
|
|
|
# -- Directory and identity setup ----------------------------------------------
|
|
|
|
|
|
def ensure_dirs(*dirs: str) -> None:
|
|
"""Create directories and fix ownership."""
|
|
uid = os.getuid()
|
|
gid = os.getgid()
|
|
for d in dirs:
|
|
os.makedirs(d, exist_ok=True)
|
|
try:
|
|
subprocess.run(
|
|
["sudo", "chown", "-R", f"{uid}:{gid}", d],
|
|
check=False, capture_output=True,
|
|
)
|
|
except FileNotFoundError:
|
|
pass # sudo not available — dirs already owned correctly
|
|
|
|
|
|
def ensure_identity_rpc() -> None:
|
|
"""Generate ephemeral identity keypair for RPC mode if not mounted."""
|
|
if os.path.isfile(IDENTITY_FILE):
|
|
return
|
|
log.info("Generating RPC node identity keypair...")
|
|
subprocess.run(
|
|
["solana-keygen", "new", "--no-passphrase", "--silent",
|
|
"--force", "--outfile", IDENTITY_FILE],
|
|
check=True,
|
|
)
|
|
|
|
|
|
def print_identity() -> None:
|
|
"""Print the node identity pubkey."""
|
|
result = subprocess.run(
|
|
["solana-keygen", "pubkey", IDENTITY_FILE],
|
|
capture_output=True, text=True, check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
log.info("Node identity: %s", result.stdout.strip())
|
|
|
|
|
|
# -- Arg construction ----------------------------------------------------------
|
|
|
|
|
|
def build_common_args() -> list[str]:
|
|
"""Build agave-validator args common to both RPC and validator modes."""
|
|
args: list[str] = [
|
|
"--identity", IDENTITY_FILE,
|
|
"--entrypoint", env_required("VALIDATOR_ENTRYPOINT"),
|
|
"--known-validator", env_required("KNOWN_VALIDATOR"),
|
|
"--ledger", LEDGER_DIR,
|
|
"--accounts", ACCOUNTS_DIR,
|
|
"--snapshots", SNAPSHOTS_DIR,
|
|
"--rpc-port", env("RPC_PORT", "8899"),
|
|
"--rpc-bind-address", env("RPC_BIND_ADDRESS", "127.0.0.1"),
|
|
"--gossip-port", env("GOSSIP_PORT", "8001"),
|
|
"--dynamic-port-range", env("DYNAMIC_PORT_RANGE", "9000-10000"),
|
|
"--no-os-network-limits-test",
|
|
"--wal-recovery-mode", "skip_any_corrupted_record",
|
|
"--limit-ledger-size", env("LIMIT_LEDGER_SIZE", "50000000"),
|
|
]
|
|
|
|
# Snapshot generation
|
|
if env("NO_SNAPSHOTS") == "true":
|
|
args.append("--no-snapshots")
|
|
else:
|
|
args += [
|
|
"--full-snapshot-interval-slots", env("SNAPSHOT_INTERVAL_SLOTS", "100000"),
|
|
"--maximum-full-snapshots-to-retain", env("MAXIMUM_SNAPSHOTS_TO_RETAIN", "5"),
|
|
]
|
|
if env("NO_INCREMENTAL_SNAPSHOTS") != "true":
|
|
args += ["--maximum-incremental-snapshots-to-retain", "2"]
|
|
|
|
# Account indexes
|
|
account_indexes = env("ACCOUNT_INDEXES")
|
|
if account_indexes:
|
|
for idx in account_indexes.split(","):
|
|
idx = idx.strip()
|
|
if idx:
|
|
args += ["--account-index", idx]
|
|
|
|
# Additional entrypoints
|
|
for ep in env("EXTRA_ENTRYPOINTS").split():
|
|
if ep:
|
|
args += ["--entrypoint", ep]
|
|
|
|
# Additional known validators
|
|
for kv in env("EXTRA_KNOWN_VALIDATORS").split():
|
|
if kv:
|
|
args += ["--known-validator", kv]
|
|
|
|
# Cluster verification
|
|
genesis_hash = env("EXPECTED_GENESIS_HASH")
|
|
if genesis_hash:
|
|
args += ["--expected-genesis-hash", genesis_hash]
|
|
shred_version = env("EXPECTED_SHRED_VERSION")
|
|
if shred_version:
|
|
args += ["--expected-shred-version", shred_version]
|
|
|
|
# Metrics — just needs to be in the environment, agave reads it directly
|
|
# (env var is already set, nothing to pass as arg)
|
|
|
|
# Gossip host / TVU address
|
|
gossip_host = env("GOSSIP_HOST")
|
|
if gossip_host:
|
|
args += ["--gossip-host", gossip_host]
|
|
elif env("PUBLIC_TVU_ADDRESS"):
|
|
args += ["--public-tvu-address", env("PUBLIC_TVU_ADDRESS")]
|
|
|
|
# Jito flags
|
|
if env("JITO_ENABLE") == "true":
|
|
log.info("Jito MEV enabled")
|
|
jito_flags: list[tuple[str, str]] = [
|
|
("JITO_TIP_PAYMENT_PROGRAM", "--tip-payment-program-pubkey"),
|
|
("JITO_DISTRIBUTION_PROGRAM", "--tip-distribution-program-pubkey"),
|
|
("JITO_MERKLE_ROOT_AUTHORITY", "--merkle-root-upload-authority"),
|
|
("JITO_COMMISSION_BPS", "--commission-bps"),
|
|
("JITO_BLOCK_ENGINE_URL", "--block-engine-url"),
|
|
("JITO_SHRED_RECEIVER_ADDR", "--shred-receiver-address"),
|
|
]
|
|
for env_name, flag in jito_flags:
|
|
val = env(env_name)
|
|
if val:
|
|
args += [flag, val]
|
|
|
|
return args
|
|
|
|
|
|
def build_rpc_args() -> list[str]:
|
|
"""Build agave-validator args for RPC (non-voting) mode."""
|
|
args = build_common_args()
|
|
args += [
|
|
"--no-voting",
|
|
"--log", f"{LOG_DIR}/validator.log",
|
|
"--full-rpc-api",
|
|
"--enable-rpc-transaction-history",
|
|
"--rpc-pubsub-enable-block-subscription",
|
|
"--enable-extended-tx-metadata-storage",
|
|
"--no-wait-for-vote-to-start-leader",
|
|
"--no-snapshot-fetch",
|
|
]
|
|
|
|
# Public vs private RPC
|
|
public_rpc = env("PUBLIC_RPC_ADDRESS")
|
|
if public_rpc:
|
|
args += ["--public-rpc-address", public_rpc]
|
|
else:
|
|
args += ["--private-rpc", "--allow-private-addr", "--only-known-rpc"]
|
|
|
|
# Jito relayer URL (RPC mode doesn't use it, but validator mode does —
|
|
# handled in build_validator_args)
|
|
|
|
return args
|
|
|
|
|
|
def build_validator_args() -> list[str]:
|
|
"""Build agave-validator args for voting validator mode."""
|
|
vote_keypair = env("VOTE_ACCOUNT_KEYPAIR",
|
|
"/data/config/vote-account-keypair.json")
|
|
|
|
# Identity must be mounted for validator mode
|
|
if not os.path.isfile(IDENTITY_FILE):
|
|
log.error("Validator identity keypair not found at %s", IDENTITY_FILE)
|
|
log.error("Mount your validator keypair to %s", IDENTITY_FILE)
|
|
sys.exit(1)
|
|
|
|
# Vote account keypair must exist
|
|
if not os.path.isfile(vote_keypair):
|
|
log.error("Vote account keypair not found at %s", vote_keypair)
|
|
log.error("Mount your vote account keypair or set VOTE_ACCOUNT_KEYPAIR")
|
|
sys.exit(1)
|
|
|
|
# Print vote account pubkey
|
|
result = subprocess.run(
|
|
["solana-keygen", "pubkey", vote_keypair],
|
|
capture_output=True, text=True, check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
log.info("Vote account: %s", result.stdout.strip())
|
|
|
|
args = build_common_args()
|
|
args += [
|
|
"--vote-account", vote_keypair,
|
|
"--log", "-",
|
|
]
|
|
|
|
# Jito relayer URL (validator-only)
|
|
relayer_url = env("JITO_RELAYER_URL")
|
|
if env("JITO_ENABLE") == "true" and relayer_url:
|
|
args += ["--relayer-url", relayer_url]
|
|
|
|
return args
|
|
|
|
|
|
def append_extra_args(args: list[str]) -> list[str]:
|
|
"""Append EXTRA_ARGS passthrough flags."""
|
|
extra = env("EXTRA_ARGS")
|
|
if extra:
|
|
args += extra.split()
|
|
return args
|
|
|
|
|
|
# -- Serve subcommand ---------------------------------------------------------
|
|
|
|
|
|
def cmd_serve() -> None:
|
|
"""Main serve flow: snapshot check, setup, exec agave-validator."""
|
|
mode = env("AGAVE_MODE", "test")
|
|
log.info("AGAVE_MODE=%s", mode)
|
|
|
|
# Test mode dispatches to start-test.sh
|
|
if mode == "test":
|
|
os.execvp("start-test.sh", ["start-test.sh"])
|
|
|
|
if mode not in ("rpc", "validator"):
|
|
log.error("Unknown AGAVE_MODE: %s (valid: test, rpc, validator)", mode)
|
|
sys.exit(1)
|
|
|
|
# Ensure directories
|
|
dirs = [CONFIG_DIR, LEDGER_DIR, ACCOUNTS_DIR, SNAPSHOTS_DIR]
|
|
if mode == "rpc":
|
|
dirs.append(LOG_DIR)
|
|
ensure_dirs(*dirs)
|
|
|
|
# Snapshot freshness check and auto-download
|
|
maybe_download_snapshot(SNAPSHOTS_DIR)
|
|
|
|
# Identity setup
|
|
if mode == "rpc":
|
|
ensure_identity_rpc()
|
|
print_identity()
|
|
|
|
# Build args
|
|
if mode == "rpc":
|
|
args = build_rpc_args()
|
|
else:
|
|
args = build_validator_args()
|
|
|
|
args = append_extra_args(args)
|
|
|
|
# Write startup timestamp for probe grace period
|
|
Path("/tmp/entrypoint-start").write_text(str(time.time()))
|
|
|
|
log.info("Starting agave-validator with %d arguments", len(args))
|
|
os.execvp("agave-validator", ["agave-validator"] + args)
|
|
|
|
|
|
# -- Probe subcommand ---------------------------------------------------------
|
|
|
|
|
|
def cmd_probe() -> None:
|
|
"""Liveness probe: check local RPC slot vs mainnet.
|
|
|
|
Exit 0 = healthy, exit 1 = unhealthy.
|
|
|
|
Grace period: PROBE_GRACE_SECONDS (default 600) — probe always passes
|
|
during grace period to allow for snapshot unpacking and initial replay.
|
|
"""
|
|
grace_seconds = int(env("PROBE_GRACE_SECONDS", "600"))
|
|
max_lag = int(env("PROBE_MAX_SLOT_LAG", "20000"))
|
|
|
|
# Check grace period
|
|
start_file = Path("/tmp/entrypoint-start")
|
|
if start_file.exists():
|
|
try:
|
|
start_time = float(start_file.read_text().strip())
|
|
elapsed = time.time() - start_time
|
|
if elapsed < grace_seconds:
|
|
# Within grace period — always healthy
|
|
sys.exit(0)
|
|
except (ValueError, OSError):
|
|
pass
|
|
else:
|
|
# No start file — serve hasn't started yet, within grace
|
|
sys.exit(0)
|
|
|
|
# Query local RPC
|
|
rpc_port = env("RPC_PORT", "8899")
|
|
local_url = f"http://127.0.0.1:{rpc_port}"
|
|
local_slot = rpc_get_slot(local_url, timeout=5)
|
|
if local_slot is None:
|
|
# Local RPC unreachable after grace period — unhealthy
|
|
sys.exit(1)
|
|
|
|
# Query mainnet
|
|
mainnet_slot = rpc_get_slot(MAINNET_RPC, timeout=10)
|
|
if mainnet_slot is None:
|
|
# Can't reach mainnet to compare — assume healthy (don't penalize
|
|
# the validator for mainnet RPC being down)
|
|
sys.exit(0)
|
|
|
|
lag = mainnet_slot - local_slot
|
|
if lag > max_lag:
|
|
sys.exit(1)
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
# -- Main ----------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
subcmd = sys.argv[1] if len(sys.argv) > 1 else "serve"
|
|
|
|
if subcmd == "serve":
|
|
cmd_serve()
|
|
elif subcmd == "probe":
|
|
cmd_probe()
|
|
else:
|
|
log.error("Unknown subcommand: %s (valid: serve, probe)", subcmd)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|