"""A Class for collection of metrics from a Prometheus Host."""
from urllib.parse import urlparse
import bz2
import os
import sys
import json
import logging
from datetime import datetime, timedelta
import requests
from retrying import retry
# set up logging
_LOGGER = logging.getLogger(__name__)
# In case of a connection failure try 2 more times
MAX_REQUEST_RETRIES = 3
# wait 1 second before retrying in case of an error
CONNECTION_RETRY_WAIT_TIME = 1000
[docs]class PrometheusConnect:
"""
A Class for collection of metrics from a Prometheus Host.
:param url: (str) url for the prometheus host
:param headers: (dict) A dictionary of http headers to be used to communicate with
the host. Example: {"Authorization": "bearer my_oauth_token_to_the_host"}
:param disable_ssl: (bool) If set to True, will disable ssl certificate verification
for the http requests made to the prometheus host
"""
def __init__(
self, url: str = "http://127.0.0.1:9090", headers: dict = None, disable_ssl: bool = False
):
"""Constructor for the class PrometheusConnect."""
self.headers = headers
self.url = url
self.prometheus_host = urlparse(self.url).netloc
self._all_metrics = None
self.ssl_verification = not disable_ssl
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def all_metrics(self, params: dict = None):
"""
Get the list of all the metrics that the prometheus host scrapes.
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:returns: (list) A list of names of all the metrics available from the
specified prometheus host
:raises: (Http Response error) Raises an exception in case of a connection error
"""
params = params or {}
response = requests.get(
"{0}/api/v1/label/__name__/values".format(self.url),
verify=self.ssl_verification,
headers=self.headers,
params=params,
)
if response.status_code == 200:
self._all_metrics = response.json()["data"]
else:
raise Exception(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return self._all_metrics
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def get_current_metric_value(
self, metric_name: str, label_config: dict = None, params: dict = None
):
r"""
A method to get the current metric value for the specified metric and label configuration.
:param metric_name: (str) The name of the metric
:param label_config: (dict) A dictionary that specifies metric labels and their
values
:param params: (dict) Optional dictionary containing GET parameters to be sent
along with the API request, such as "time"
:returns: (list) A list of current metric values for the specified metric
:raises: (Http Response error) Raises an exception in case of a connection error
Example Usage:
``prom = PrometheusConnect()``
``my_label_config = {'cluster': 'my_cluster_id', 'label_2': 'label_2_value'}``
``prom.get_current_metric_value(metric_name='up', label_config=my_label_config)``
"""
params = params or {}
data = []
if label_config:
label_list = [str(key + "=" + "'" + label_config[key] + "'") for key in label_config]
query = metric_name + "{" + ",".join(label_list) + "}"
else:
query = metric_name
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={**{"query": query}, **params},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data += response.json()["data"]["result"]
else:
raise Exception(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return data
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def get_metric_range_data(
self,
metric_name: str,
label_config: dict = None,
start_time: datetime = (datetime.now() - timedelta(minutes=10)),
end_time: datetime = datetime.now(),
chunk_size: timedelta = None,
store_locally: bool = False,
params: dict = None,
):
r"""
A method to get the current metric value for the specified metric and label configuration.
:param metric_name: (str) The name of the metric.
:param label_config: (dict) A dictionary specifying metric labels and their
values.
:param start_time: (datetime) A datetime object that specifies the metric range start time.
:param end_time: (datetime) A datetime object that specifies the metric range end time.
:param chunk_size: (timedelta) Duration of metric data downloaded in one request. For
example, setting it to timedelta(hours=3) will download 3 hours worth of data in each
request made to the prometheus host
:param store_locally: (bool) If set to True, will store data locally at,
`"./metrics/hostname/metric_date/name_time.json.bz2"`
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:return: (list) A list of metric data for the specified metric in the given time
range
:raises: (Exception) Raises an exception in case of a connection error
"""
params = params or {}
data = []
_LOGGER.debug("start_time: %s", start_time)
_LOGGER.debug("end_time: %s", end_time)
_LOGGER.debug("chunk_size: %s", chunk_size)
if not (isinstance(start_time, datetime) and isinstance(end_time, datetime)):
raise TypeError("start_time and end_time can only be of type datetime.datetime")
if not chunk_size:
chunk_size = end_time - start_time
if not isinstance(chunk_size, timedelta):
raise TypeError("chunk_size can only be of type datetime.timedelta")
start = round(start_time.timestamp())
end = round(end_time.timestamp())
if (end_time - start_time).total_seconds() < chunk_size.total_seconds():
sys.exit("specified chunk_size is too big")
chunk_seconds = round(chunk_size.total_seconds())
if label_config:
label_list = [str(key + "=" + "'" + label_config[key] + "'") for key in label_config]
query = metric_name + "{" + ",".join(label_list) + "}"
else:
query = metric_name
_LOGGER.debug("Prometheus Query: %s", query)
while start < end:
if start + chunk_seconds > end:
chunk_seconds = end - start
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={
**{
"query": query + "[" + str(chunk_seconds) + "s" + "]",
"time": start + chunk_seconds,
},
**params,
},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data += response.json()["data"]["result"]
else:
raise Exception(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
if store_locally:
# store it locally
self._store_metric_values_local(
metric_name,
json.dumps(response.json()["data"]["result"]),
start + chunk_seconds,
)
start += chunk_seconds
return data
def _store_metric_values_local(self, metric_name, values, end_timestamp, compressed=False):
r"""
Store metrics on the local filesystem, optionally with bz2 compression.
:param metric_name: (str) the name of the metric being saved
:param values: (str) metric data in JSON string format
:param end_timestamp: (int) timestamp in any format understood by \
datetime.datetime.fromtimestamp()
:param compressed: (bool) whether or not to apply bz2 compression
:returns: (str) path to the saved metric file
"""
if not values:
_LOGGER.debug("No values for %s", metric_name)
return None
file_path = self._metric_filename(metric_name, end_timestamp)
if compressed:
payload = bz2.compress(str(values).encode("utf-8"))
file_path = file_path + ".bz2"
else:
payload = str(values).encode("utf-8")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as file:
file.write(payload)
return file_path
def _metric_filename(self, metric_name: str, end_timestamp: int):
r"""
Add a timestamp to the filename before it is stored.
:param metric_name: (str) the name of the metric being saved
:param end_timestamp: (int) timestamp in any format understood by \
datetime.datetime.fromtimestamp()
:returns: (str) the generated path
"""
end_timestamp = datetime.fromtimestamp(end_timestamp)
directory_name = end_timestamp.strftime("%Y%m%d")
timestamp = end_timestamp.strftime("%Y%m%d%H%M")
object_path = (
"./metrics/"
+ self.prometheus_host
+ "/"
+ metric_name
+ "/"
+ directory_name
+ "/"
+ timestamp
+ ".json"
)
return object_path
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def custom_query(self, query: str, params: dict = None):
"""
A method to send a custom query to a Prometheus Host.
This method takes as input a string which will be sent as a query to
the specified Prometheus Host. This query is a PromQL query.
:param query: (str) This is a PromQL query, a few examples can be found
at https://prometheus.io/docs/prometheus/latest/querying/examples/
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:returns: (list) A list of metric data received in response of the query sent
:raises: (Exception) Raises an exception in case of a connection error
"""
params = params or {}
data = None
query = str(query)
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={**{"query": query}, **params},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data = response.json()["data"]["result"]
else:
raise Exception(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return data