fix timing intervals

fixes #3
This commit is contained in:
Jan Christian Gr??nhage 2020-04-06 20:29:46 +02:00
parent 732733ac9c
commit e97e8f7d70
Signed by: jcgruenhage
GPG key ID: 6594C449C633D10C
3 changed files with 43 additions and 24 deletions

View file

@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Fixed
- fix interval timings
## [v0.2.0] - 2020-04-06
### Added

View file

@ -3,4 +3,3 @@ listener = "[::]:9898"
[hosts]
"1.1.1.1" = 500
"1.0.0.1" = 500

View file

@ -22,8 +22,9 @@ use futures_util::stream::StreamExt;
use lazy_static::lazy_static;
use log::{error, info, trace};
use prometheus::*;
use std::net::IpAddr;
use std::time::Duration;
use tokio::time::delay_for;
use tokio_ping::Pinger;
lazy_static! {
static ref PING_HISTOGRAM: HistogramVec = register_histogram_vec!(
@ -42,7 +43,7 @@ lazy_static! {
pub(crate) async fn start_pinging_hosts(
config: Config,
) -> std::result::Result<(), tokio_ping::Error> {
let pinger = match tokio_ping::Pinger::new().await {
let pinger = match Pinger::new().await {
Ok(pinger) => pinger,
Err(error) => {
error!("Couldn't create pinger: {}", error);
@ -51,27 +52,44 @@ pub(crate) async fn start_pinging_hosts(
};
for (host, interval) in config.hosts.clone() {
info!("Spawn ping task for {}", host);
let pingchain = pinger.chain(host).timeout(Duration::from_secs(3));
let host = host.to_string();
tokio::spawn(pingchain.stream().for_each(move |ping_result| {
match ping_result {
Ok(time) => match time {
Some(time) => {
let ms = time.as_millis();
trace!("Received pong from {} after {} ms", &host, &ms);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(ms as f64);
}
None => {
trace!("Received no response from {} within timeout", &host);
PING_HISTOGRAM.with_label_values(&[&host]).observe(3000.0);
}
},
Err(error) => error!("Couldn't ping {}: {}", &host, error),
}
delay_for(Duration::from_millis(interval))
}));
tokio::spawn(ping_host(pinger.clone(), host, interval));
}
Ok(())
}
async fn ping_host(pinger: Pinger, host: IpAddr, interval: u64) {
let pingchain = pinger.chain(host).timeout(Duration::from_secs(3));
let host = host.to_string();
let mut stream = pingchain.stream();
let mut interval = tokio::time::interval(Duration::from_millis(interval));
loop {
interval.tick().await;
handle_ping_result(stream.next().await.unwrap(), &host).await;
}
}
async fn handle_ping_result(
result: std::result::Result<Option<Duration>, tokio_ping::Error>,
host: &str,
) {
let pong = match result {
Ok(pong) => pong,
Err(error) => {
error!("Couldn't ping {}: {}", &host, error);
return;
}
};
match pong {
Some(time) => {
let ms = time.as_millis();
trace!("Received pong from {} after {} ms", &host, &ms);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(ms as f64);
}
None => {
trace!("Received no response from {} within timeout", &host);
PING_HISTOGRAM.with_label_values(&[&host]).observe(3000.0);
}
};
}