130 lines
5.5 KiB
Rust
130 lines
5.5 KiB
Rust
/********************************************************************************
|
|
* Prometheus exporter for monitoring network connectivity using icmp pings *
|
|
* *
|
|
* Copyright (C) 2019-2022 Jan Christian Grünhage *
|
|
* Copyright (C) 2020-2021 Famedly GmbH *
|
|
* *
|
|
* This program is free software: you can redistribute it and/or modify *
|
|
* it under the terms of the GNU Affero General Public License as *
|
|
* published by the Free Software Foundation, either version 3 of the *
|
|
* License, or (at your option) any later version. *
|
|
* *
|
|
* This program is distributed in the hope that it will be useful, *
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
|
* GNU Affero General Public License for more details. *
|
|
* *
|
|
* You should have received a copy of the GNU Affero General Public License *
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
|
|
********************************************************************************/
|
|
use crate::config::{Config, Host};
|
|
use anyhow::{Context, Result};
|
|
use arc_swap::ArcSwap;
|
|
use async_anyhow_logger::catch;
|
|
use log::{info, trace};
|
|
use metrics::histogram;
|
|
use once_cell::sync::OnceCell as SyncOnceCell;
|
|
use tokio::sync::OnceCell as AsyncOnceCell;
|
|
use std::{net::IpAddr, sync::Arc, time::Duration, collections::HashMap};
|
|
use tokio_icmp_echo::{PingFuture, Pinger};
|
|
use trust_dns_resolver::{TokioAsyncResolver, lookup_ip::LookupIp};
|
|
|
|
static PING_IP_TARGETS: SyncOnceCell<HashMap<Host, ArcSwap<Vec<IpAddr>>>> = SyncOnceCell::new();
|
|
|
|
async fn pinger() -> Result<&'static Pinger> {
|
|
static INSTANCE: AsyncOnceCell<Pinger> = AsyncOnceCell::const_new();
|
|
INSTANCE.get_or_try_init(|| async {
|
|
Pinger::new().await.context("Couldn't create pinger!")
|
|
}).await
|
|
}
|
|
|
|
fn resolver() -> Result<&'static TokioAsyncResolver> {
|
|
static INSTANCE: SyncOnceCell<TokioAsyncResolver> = SyncOnceCell::new();
|
|
INSTANCE.get_or_try_init(|| {
|
|
TokioAsyncResolver::tokio_from_system_conf().context("Couldn't start resolver!")
|
|
})
|
|
}
|
|
|
|
|
|
pub(crate) async fn start_pinging_hosts(config: &Config) -> Result<()> {
|
|
let mut handles = vec![];
|
|
let mut map = HashMap::new();
|
|
for (host, _interval) in config.ping.hosts.clone().into_iter() {
|
|
match host.clone() {
|
|
Host::IpAddr(addr) => map.insert(host, ArcSwap::new(Arc::new(vec![addr]))),
|
|
Host::Domain(domain) => {
|
|
let lookup = get_host_addresses(&domain).await?;
|
|
map.insert(host, ArcSwap::new(Arc::new(lookup.iter().collect())))
|
|
},
|
|
};
|
|
}
|
|
PING_IP_TARGETS.set(map).unwrap();
|
|
for (host, interval) in config.ping.hosts.clone().into_iter() {
|
|
if let Host::Domain(domain) = host.clone() {
|
|
tokio::spawn(catch(refresh_host_addresses(&PING_IP_TARGETS.get().unwrap()[&host], domain.clone())));
|
|
}
|
|
info!("Spawn ping task for {}", host);
|
|
handles.push(tokio::spawn(ping_host(
|
|
host,
|
|
interval,
|
|
config.ping.timeout,
|
|
)));
|
|
}
|
|
let (result, _, _) = futures::future::select_all(handles).await;
|
|
result??;
|
|
Ok(())
|
|
}
|
|
|
|
async fn ping_host(host: Host, interval: u64, timeout: Duration) -> Result<()> {
|
|
let pinger = pinger().await?;
|
|
let name: String = match host.clone() {
|
|
Host::IpAddr(addr) => addr.to_string(),
|
|
Host::Domain(name) => name,
|
|
};
|
|
let targets = &PING_IP_TARGETS.get().unwrap()[&host];
|
|
let mut interval = tokio::time::interval(Duration::from_millis(interval));
|
|
let ident = rand::random();
|
|
let mut seq_cnt = 0;
|
|
loop {
|
|
interval.tick().await;
|
|
for target in &**targets.load() {
|
|
tokio::spawn(catch(handle_ping_result(
|
|
pinger.ping(*target, ident, seq_cnt, timeout),
|
|
name.clone(),
|
|
target.to_string(),
|
|
timeout,
|
|
)));
|
|
seq_cnt = seq_cnt.wrapping_add(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn refresh_host_addresses(targets: &ArcSwap<Vec<IpAddr>>, name: String) -> Result<()> {
|
|
loop {
|
|
let lookup = get_host_addresses(&name).await?;
|
|
targets.store(Arc::new(lookup.iter().collect()));
|
|
tokio::time::sleep_until(lookup.valid_until().into()).await;
|
|
}
|
|
}
|
|
|
|
async fn get_host_addresses(name: &str) -> Result<LookupIp> {
|
|
Ok(resolver()?.lookup_ip(name).await?)
|
|
}
|
|
|
|
async fn handle_ping_result(result: PingFuture, name: String, ip: String, timeout: Duration) -> Result<()> {
|
|
let pong = result.await.context(format!("Couldn't ping {}", &name))?;
|
|
match pong {
|
|
Some(time) => {
|
|
let ms = time.as_millis();
|
|
trace!("Received pong from {} after {} ms", &name, &ms);
|
|
histogram!("ping_rtt_milliseconds", ms as f64, "target" => name, "ip" => ip);
|
|
}
|
|
None => {
|
|
trace!("Received no response from {} within timeout", &name);
|
|
histogram!("ping_rtt_milliseconds", timeout.as_millis() as f64, "target" => name, "ip" => ip);
|
|
}
|
|
};
|
|
|
|
Ok(())
|
|
}
|