update dependencies, replace oping with tokio_ping

This commit is contained in:
Jan Christian Gr??nhage 2020-04-06 12:56:39 +02:00
parent b60f2f8290
commit c6d5505f2a
7 changed files with 531 additions and 674 deletions

985
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,15 +6,16 @@ edition = "2018"
description = "Pings configured hosts in a configurable intervals and exposes metrics for prometheus."
[dependencies]
prometheus = "0.5.0"
oping = "0.3.3"
toml = "0.4.10"
hyper = "0.12.23"
lazy_static = "1.2.0"
futures = "0.1.25"
tokio = "0.1.15"
clap = "2.32.0"
fern = "0.5.7"
log = "0.4.6"
chrono = "0.4.6"
serde = { version = "1.0.87", features = ["derive"] }
prometheus = "0.8.0"
toml = "0.5.6"
hyper = "0.13.4"
lazy_static = "1.4.0"
futures = "0.3.4"
tokio = { version = "0.2.16", features = ["rt-threaded", "macros", "time"] }
clap = "2.33.0"
fern = "0.6.0"
log = "0.4.8"
chrono = "0.4.11"
serde = { version = "1.0.106", features = ["derive"] }
tokio-ping = { git = "https://github.com/usbalbin/tokio-ping", branch = "new_futures" }
futures-util = "0.3.4"

View file

@ -1,6 +1,6 @@
listener = "[::]:9898"
[hosts]
"1.1.1.1" = 5
"1.0.0.1" = 5
"1.1.1.1" = 500
"1.0.0.1" = 500

View file

@ -1,15 +1,15 @@
use clap::{clap_app, crate_authors, crate_description, crate_name, crate_version};
use log::info;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use log::{error, warn, info, debug, trace};
use serde::{Serialize, Deserialize};
use clap::{clap_app, crate_name, crate_version, crate_description, crate_authors};
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub(crate) struct Config {
pub(crate) listener: std::net::SocketAddr,
pub(crate) hosts: HashMap<String, u64>,
pub(crate) hosts: HashMap<std::net::IpAddr, u64>,
}
pub(crate) fn setup_clap() -> (clap::ArgMatches<'static>) {
pub(crate) fn setup_clap() -> clap::ArgMatches<'static> {
clap_app!(myapp =>
(name: crate_name!())
(version: crate_version!())
@ -17,7 +17,8 @@ pub(crate) fn setup_clap() -> (clap::ArgMatches<'static>) {
(about: crate_description!())
(@arg config: +required "Set config file")
(@arg v: -v --verbose ... "Be verbose (you can add this up to 4 times for more logs)")
).get_matches()
)
.get_matches()
}
pub(crate) fn setup_fern(level: u64) {
@ -26,7 +27,7 @@ pub(crate) fn setup_fern(level: u64) {
1 => log::LevelFilter::Warn,
2 => log::LevelFilter::Info,
3 => log::LevelFilter::Debug,
_ => log::LevelFilter::Trace
_ => log::LevelFilter::Trace,
};
match fern::Dispatch::new()
.format(|out, message, record| {
@ -39,7 +40,8 @@ pub(crate) fn setup_fern(level: u64) {
})
.level(level)
.chain(std::io::stdout())
.apply() {
.apply()
{
Err(_) => {
eprintln!("error setting up logging!");
}
@ -56,18 +58,12 @@ pub(crate) struct Error {}
impl std::convert::From<std::io::Error> for Error {
fn from(_: std::io::Error) -> Self {
Error{}
Error {}
}
}
impl std::convert::From<toml::de::Error> for Error {
fn from(_: toml::de::Error) -> Self {
Error{}
}
}
impl std::convert::From<oping::PingError> for Error {
fn from(_: oping::PingError) -> Self {
Error{}
Error {}
}
}

View file

@ -1,27 +1,25 @@
use log::{error, warn, info, debug, trace};
use futures::future::lazy;
use log::error;
mod config;
mod metrics;
mod ping;
use crate::config::{Config, read_config, setup_clap, setup_fern};
use crate::config::{read_config, setup_clap, setup_fern};
use crate::metrics::start_serving_metrics;
use crate::ping::start_pinging_hosts;
fn main() {
#[tokio::main]
async fn main() -> Result<(), ()> {
let clap = setup_clap();
setup_fern(clap.occurrences_of("v"));
let config = match read_config(clap.value_of("config").unwrap()) {
Ok(config) => config,
Err(_) => {
error!("Couldn't read config file!");
return;
return Err(());
}
};
tokio::run(lazy(move || {
start_serving_metrics(&config);
start_pinging_hosts(&config);
Ok(())
}));
tokio::spawn(start_pinging_hosts(config.clone()));
start_serving_metrics(config.clone()).await;
Ok(())
}

View file

@ -1,10 +1,12 @@
use crate::config::{Config};
use crate::config::Config;
use hyper::{
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use lazy_static::lazy_static;
use hyper::{Server, Response, Body, header::CONTENT_TYPE, service::service_fn_ok};
use prometheus::{TextEncoder, Counter, Gauge, HistogramVec};
use prometheus::*;
use futures::future::Future;
use prometheus::{Counter, Gauge, HistogramVec, TextEncoder};
lazy_static! {
static ref HTTP_COUNTER: Counter = register_counter!(opts!(
@ -27,25 +29,34 @@ lazy_static! {
.unwrap();
}
pub(crate) fn start_serving_metrics(config: &Config) {
let serve_metrics = || {
service_fn_ok(|_req| {
HTTP_COUNTER.inc();
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
let metric_families = prometheus::gather();
let mut buffer = vec![];
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
HTTP_BODY_GAUGE.set(buffer.len() as f64);
let mut res = Response::new(Body::from(buffer));
res.headers_mut().insert(CONTENT_TYPE, encoder.format_type().parse().unwrap());
timer.observe_duration();
res
})
};
println!("Listening on {}", &config.listener);
let server = Server::bind(&config.listener)
.serve(serve_metrics)
.map_err(|err| eprintln!("server error: {}", err));
tokio::spawn(server);
}
async fn serve_req(_req: Request<Body>) -> std::result::Result<Response<Body>, hyper::Error> {
let encoder = TextEncoder::new();
HTTP_COUNTER.inc();
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
HTTP_BODY_GAUGE.set(buffer.len() as f64);
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.unwrap();
timer.observe_duration();
Ok(response)
}
pub(crate) async fn start_serving_metrics(config: Config) {
let serve_future = Server::bind(&config.listener).serve(make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(serve_req))
}));
if let Err(err) = serve_future.await {
eprintln!("server error: {}", err);
}
}

View file

@ -1,55 +1,47 @@
use crate::config::{Config, Error};
use std::time::{Duration, Instant};
use tokio::timer::{Interval};
use futures::{future::{lazy, Future}, stream::Stream};
use oping::{Ping};
use log::{trace, debug, info, warn, error};
use prometheus::*;
use crate::config::Config;
use futures_util::stream::StreamExt;
use lazy_static::lazy_static;
use log::{error, info};
use prometheus::*;
use std::time::Duration;
use tokio::time::delay_for;
lazy_static! {
static ref PING_HISTOGRAM : HistogramVec = register_histogram_vec!(
static ref PING_HISTOGRAM: HistogramVec = register_histogram_vec!(
"ping_rtt_milliseconds",
"The ping round trip time in milliseconds",
&["target"],
vec![0.5, 1.0, 5.0, 10.0, 15.0, 20.0, 25.0, 50.0, 75.0, 100.0, 150.0, 200.0, 250.0,
300.0, 350.0, 400.0, 450.0, 500.0, 550.0, 600.0, 650.0, 700.0, 750.0, 800.0, 900.0,
1000.0, 1250.0, 1500.0, 1750.0, 2000.0]
).unwrap();
vec![
0.5, 1.0, 5.0, 10.0, 15.0, 20.0, 25.0, 50.0, 75.0, 100.0, 150.0, 200.0, 250.0, 300.0,
350.0, 400.0, 450.0, 500.0, 550.0, 600.0, 650.0, 700.0, 750.0, 800.0, 900.0, 1000.0,
1250.0, 1500.0, 1750.0, 2000.0
]
)
.unwrap();
}
pub(crate) fn start_pinging_hosts(config: &Config) {
pub(crate) async fn start_pinging_hosts(
config: Config,
) -> std::result::Result<(), tokio_ping::Error> {
let pinger = tokio_ping::Pinger::new().await?;
for (host, interval) in config.hosts.clone() {
info!("Spawn ping task for {}", host);
tokio::spawn(
Interval::new(Instant::now(), Duration::from_millis(interval))
.for_each(move |_| {
let mut ping = Ping::new();
ping.set_timeout(2.5);
ping.add_host(&host);
for response in match ping.send() {
Ok(iterator) => iterator,
Err(e) => {
error!("Something went wrong sending the ping: {:?}", e);
return Ok(());
}
}{
if response.dropped > 0 {
debug!("No response from host: {}", response.hostname);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(2500.0)
} else {
debug!("Response from host {} (address {}): latency {} ms",
response.hostname, response.address, response.latency_ms);
trace!(" all details: {:?}", response);
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(response.latency_ms);
}
let pingchain = pinger.chain(host).timeout(Duration::from_secs(3));
let host = host.to_string();
tokio::spawn(pingchain.stream().for_each(move |ping_result| {
match ping_result {
Ok(time) => match time {
Some(time) => {
PING_HISTOGRAM
.with_label_values(&[&host])
.observe(time.as_millis() as f64);
}
Ok(())
}).map_err(|_| ())
);
None => PING_HISTOGRAM.with_label_values(&[&host]).observe(3000.0),
},
Err(error) => error!("Couldn't ping {}: {}", &host, error),
}
delay_for(Duration::from_millis(interval))
}));
}
}
Ok(())
}