Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/attestation/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub async fn attest_handler(
attestation_sig = %sig,
"SAS attestation issued"
);

state.metrics.increment_attestations();

Ok(Json(AttestResponse {
success: true,
attestation_tx: Some(sig),
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod listener;
mod relayer;
mod server;
mod solana;
mod status;

use std::sync::Arc;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -100,6 +101,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracker,
commitment_registry,
sas_attestor,
metrics: Arc::new(status::status_metrics::StatusMetrics::new()),
};

let app = create_router(state, &config.cors_origins);
Expand Down
2 changes: 2 additions & 0 deletions src/relayer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ pub async fn verify_handler(
"Re-verification completed"
);

state.metrics.increment_verifications();

Ok(Json(VerifyResponse {
success: true,
tx_signature: Some(outcome.signature),
Expand Down
4 changes: 4 additions & 0 deletions src/relayer/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ impl RelayerTransaction {
is_valid: true,
})
}

pub async fn get_balance(&self) -> Result<u64, AppError> {
self.client.get_balance().await
}
}
4 changes: 4 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::attestation::handler::attest_handler;
use crate::attestation::sas::SasAttestor;
use crate::relayer::handler::{health_handler, verify_handler};
use crate::relayer::transaction::RelayerTransaction;
use crate::status::handler::status_handler;
use crate::status::status_metrics::StatusMetrics;

#[derive(Clone)]
pub struct AppState {
Expand All @@ -25,6 +27,7 @@ pub struct AppState {
pub tracker: Arc<IntegratorTracker>,
pub commitment_registry: Arc<CommitmentRegistry>,
pub sas_attestor: Option<Arc<SasAttestor>>,
pub metrics: Arc<StatusMetrics>,
}

async fn auth_middleware(
Expand Down Expand Up @@ -92,6 +95,7 @@ pub fn create_router(state: AppState, cors_origins: &[String]) -> Router {

Router::new()
.route("/health", get(health_handler))
.route("/status", get(status_handler))
.merge(verify_routes)
.layer(DefaultBodyLimit::max(4096))
.layer(cors)
Expand Down
67 changes: 67 additions & 0 deletions src/status/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::time::{SystemTime, UNIX_EPOCH};

use axum::extract::State;
use axum::http::HeaderMap;
use axum::Json;
use serde::Serialize;
use subtle::ConstantTimeEq;

use crate::error::AppError;
use crate::server::AppState;

const BALANCE_CACHE_TTL_SECONDS: u64 = 30;

#[derive(Serialize)]
pub struct StatusResponse {
pub uptime_seconds: u64,
pub verifications_relayed: u64,
pub attestations_issued: u64,
pub relayer_balance_lamports: Option<u64>,
pub sas_configured: bool,
}

pub async fn status_handler(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<StatusResponse>, AppError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();

let relayer_balance_lamports = match headers
.get("X-API-Key")
.and_then(|value| value.to_str().ok())
{
Some(key) => {
let key_bytes = key.as_bytes();
let is_valid = state.api_keys.iter().any(|candidate| {
candidate.len() == key_bytes.len() && candidate.as_bytes().ct_eq(key_bytes).into()
});

if is_valid {
let balance_fetched_at = state.metrics.balance_fetched_at();
let cached_balance = state.metrics.cached_balance();

if now.saturating_sub(balance_fetched_at) < BALANCE_CACHE_TTL_SECONDS {
Some(cached_balance)
} else {
let balance = state.relayer_tx.get_balance().await?;
state.metrics.update_cached_balance(balance, now);
Some(balance)
}
} else {
None
}
}
_ => None,
};

Ok(Json(StatusResponse {
uptime_seconds: now.saturating_sub(state.metrics.start_time()),
verifications_relayed: state.metrics.verifications_relayed(),
attestations_issued: state.metrics.attestations_issued(),
relayer_balance_lamports,
sas_configured: state.sas_attestor.is_some(),
}))
}
2 changes: 2 additions & 0 deletions src/status/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod status_metrics;
pub mod handler;
131 changes: 131 additions & 0 deletions src/status/status_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::sync::atomic::{AtomicU64, Ordering};

pub struct StatusMetrics {
total_verifications_relayed: AtomicU64,
total_attestations_issued: AtomicU64,
start_time: u64,
cached_balance: AtomicU64,
balance_fetched_at: AtomicU64,
}

impl StatusMetrics {
pub fn new() -> Self {
Self {
total_verifications_relayed: AtomicU64::new(0),
total_attestations_issued: AtomicU64::new(0),
start_time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached_balance: AtomicU64::new(0),
balance_fetched_at: AtomicU64::new(0),
}
}

// increase total_verifications_relayed by 1
pub fn increment_verifications(&self) {
self.total_verifications_relayed
.fetch_add(1, Ordering::Relaxed);
}

// increase total_attestations_issued by 1
pub fn increment_attestations(&self) {
self.total_attestations_issued
.fetch_add(1, Ordering::Relaxed);
}

// getters for the metrics
pub fn verifications_relayed(&self) -> u64 {
self.total_verifications_relayed.load(Ordering::Relaxed)
}

pub fn attestations_issued(&self) -> u64 {
self.total_attestations_issued.load(Ordering::Relaxed)
}

pub fn start_time(&self) -> u64 {
self.start_time
}

pub fn cached_balance(&self) -> u64 {
self.cached_balance.load(Ordering::Relaxed)
}

pub fn balance_fetched_at(&self) -> u64 {
self.balance_fetched_at.load(Ordering::Relaxed)
}

pub fn update_cached_balance(&self, balance: u64, fetched_at: u64) {
self.cached_balance.store(balance, Ordering::Relaxed);
self.balance_fetched_at.store(fetched_at, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn counters_start_at_zero() {
let m = StatusMetrics::new();
assert_eq!(m.verifications_relayed(), 0);
assert_eq!(m.attestations_issued(), 0);
}

#[test]
fn increment_verifications_counts_correctly() {
let m = StatusMetrics::new();
m.increment_verifications();
m.increment_verifications();
assert_eq!(m.verifications_relayed(), 2);
assert_eq!(m.attestations_issued(), 0);
}

#[test]
fn increment_attestations_counts_correctly() {
let m = StatusMetrics::new();
m.increment_attestations();
assert_eq!(m.attestations_issued(), 1);
assert_eq!(m.verifications_relayed(), 0);
}

#[test]
fn counters_are_independent() {
let m = StatusMetrics::new();
m.increment_verifications();
m.increment_verifications();
m.increment_attestations();
assert_eq!(m.verifications_relayed(), 2);
assert_eq!(m.attestations_issued(), 1);
}

#[test]
fn start_time_is_recent() {
let before = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let m = StatusMetrics::new();
let after = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
assert!(m.start_time() >= before);
assert!(m.start_time() <= after);
}

#[test]
fn balance_cache_starts_empty() {
let m = StatusMetrics::new();
assert_eq!(m.cached_balance(), 0);
assert_eq!(m.balance_fetched_at(), 0);
}

#[test]
fn balance_cache_updates_together() {
let m = StatusMetrics::new();
m.update_cached_balance(123, 456);
assert_eq!(m.cached_balance(), 123);
assert_eq!(m.balance_fetched_at(), 456);
}
}