"""A Class for collection of metrics from a Prometheus Host."""
from urllib.parse import urlparse
import bz2
import os
import sys
import json
import logging
import numpy
from datetime import datetime, timedelta
import requests
from retrying import retry
from .exceptions import PrometheusApiClientException
# set up logging
_LOGGER = logging.getLogger(__name__)
# In case of a connection failure try 2 more times
MAX_REQUEST_RETRIES = 3
# wait 1 second before retrying in case of an error
CONNECTION_RETRY_WAIT_TIME = 1000
[docs]class PrometheusConnect:
"""
A Class for collection of metrics from a Prometheus Host.
:param url: (str) url for the prometheus host
:param headers: (dict) A dictionary of http headers to be used to communicate with
the host. Example: {"Authorization": "bearer my_oauth_token_to_the_host"}
:param disable_ssl: (bool) If set to True, will disable ssl certificate verification
for the http requests made to the prometheus host
"""
def __init__(
self, url: str = "http://127.0.0.1:9090", headers: dict = None,
disable_ssl: bool = False
):
"""Constructor for the class PrometheusConnect."""
self.headers = headers
self.url = url
self.prometheus_host = urlparse(self.url).netloc
self._all_metrics = None
self.ssl_verification = not disable_ssl
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def all_metrics(self, params: dict = None):
"""
Get the list of all the metrics that the prometheus host scrapes.
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:returns: (list) A list of names of all the metrics available from the
specified prometheus host
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = params or {}
response = requests.get(
"{0}/api/v1/label/__name__/values".format(self.url),
verify=self.ssl_verification,
headers=self.headers,
params=params,
)
if response.status_code == 200:
self._all_metrics = response.json()["data"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return self._all_metrics
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def get_current_metric_value(
self, metric_name: str, label_config: dict = None, params: dict = None
):
r"""
A method to get the current metric value for the specified metric and label configuration.
:param metric_name: (str) The name of the metric
:param label_config: (dict) A dictionary that specifies metric labels and their
values
:param params: (dict) Optional dictionary containing GET parameters to be sent
along with the API request, such as "time"
:returns: (list) A list of current metric values for the specified metric
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
Example Usage:
``prom = PrometheusConnect()``
``my_label_config = {'cluster': 'my_cluster_id', 'label_2': 'label_2_value'}``
``prom.get_current_metric_value(metric_name='up', label_config=my_label_config)``
"""
params = params or {}
data = []
if label_config:
label_list = [str(key + "=" + "'" + label_config[key] + "'") for key in label_config]
query = metric_name + "{" + ",".join(label_list) + "}"
else:
query = metric_name
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={**{"query": query}, **params},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data += response.json()["data"]["result"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return data
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def get_metric_range_data(
self,
metric_name: str,
label_config: dict = None,
start_time: datetime = (datetime.now() - timedelta(minutes=10)),
end_time: datetime = datetime.now(),
chunk_size: timedelta = None,
store_locally: bool = False,
params: dict = None,
):
r"""
A method to get the current metric value for the specified metric and label configuration.
:param metric_name: (str) The name of the metric.
:param label_config: (dict) A dictionary specifying metric labels and their
values.
:param start_time: (datetime) A datetime object that specifies the metric range start time.
:param end_time: (datetime) A datetime object that specifies the metric range end time.
:param chunk_size: (timedelta) Duration of metric data downloaded in one request. For
example, setting it to timedelta(hours=3) will download 3 hours worth of data in each
request made to the prometheus host
:param store_locally: (bool) If set to True, will store data locally at,
`"./metrics/hostname/metric_date/name_time.json.bz2"`
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:return: (list) A list of metric data for the specified metric in the given time
range
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = params or {}
data = []
_LOGGER.debug("start_time: %s", start_time)
_LOGGER.debug("end_time: %s", end_time)
_LOGGER.debug("chunk_size: %s", chunk_size)
if not (isinstance(start_time, datetime) and isinstance(end_time, datetime)):
raise TypeError("start_time and end_time can only be of type datetime.datetime")
if not chunk_size:
chunk_size = end_time - start_time
if not isinstance(chunk_size, timedelta):
raise TypeError("chunk_size can only be of type datetime.timedelta")
start = round(start_time.timestamp())
end = round(end_time.timestamp())
if (end_time - start_time).total_seconds() < chunk_size.total_seconds():
sys.exit("specified chunk_size is too big")
chunk_seconds = round(chunk_size.total_seconds())
if label_config:
label_list = [str(key + "=" + "'" + label_config[key] + "'") for key in label_config]
query = metric_name + "{" + ",".join(label_list) + "}"
else:
query = metric_name
_LOGGER.debug("Prometheus Query: %s", query)
while start < end:
if start + chunk_seconds > end:
chunk_seconds = end - start
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={
**{
"query": query + "[" + str(chunk_seconds) + "s" + "]",
"time": start + chunk_seconds,
},
**params,
},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data += response.json()["data"]["result"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
if store_locally:
# store it locally
self._store_metric_values_local(
metric_name,
json.dumps(response.json()["data"]["result"]),
start + chunk_seconds,
)
start += chunk_seconds
return data
def _store_metric_values_local(self, metric_name, values, end_timestamp, compressed=False):
r"""
Store metrics on the local filesystem, optionally with bz2 compression.
:param metric_name: (str) the name of the metric being saved
:param values: (str) metric data in JSON string format
:param end_timestamp: (int) timestamp in any format understood by \
datetime.datetime.fromtimestamp()
:param compressed: (bool) whether or not to apply bz2 compression
:returns: (str) path to the saved metric file
"""
if not values:
_LOGGER.debug("No values for %s", metric_name)
return None
file_path = self._metric_filename(metric_name, end_timestamp)
if compressed:
payload = bz2.compress(str(values).encode("utf-8"))
file_path = file_path + ".bz2"
else:
payload = str(values).encode("utf-8")
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as file:
file.write(payload)
return file_path
def _metric_filename(self, metric_name: str, end_timestamp: int):
r"""
Add a timestamp to the filename before it is stored.
:param metric_name: (str) the name of the metric being saved
:param end_timestamp: (int) timestamp in any format understood by \
datetime.datetime.fromtimestamp()
:returns: (str) the generated path
"""
end_timestamp = datetime.fromtimestamp(end_timestamp)
directory_name = end_timestamp.strftime("%Y%m%d")
timestamp = end_timestamp.strftime("%Y%m%d%H%M")
object_path = (
"./metrics/"
+ self.prometheus_host
+ "/"
+ metric_name
+ "/"
+ directory_name
+ "/"
+ timestamp
+ ".json"
)
return object_path
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def custom_query(self, query: str, params: dict = None):
"""
A method to send a custom query to a Prometheus Host.
This method takes as input a string which will be sent as a query to
the specified Prometheus Host. This query is a PromQL query.
:param query: (str) This is a PromQL query, a few examples can be found
at https://prometheus.io/docs/prometheus/latest/querying/examples/
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "time"
:returns: (list) A list of metric data received in response of the query sent
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
params = params or {}
data = None
query = str(query)
# using the query API to get raw data
response = requests.get(
"{0}/api/v1/query".format(self.url),
params={**{"query": query}, **params},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data = response.json()["data"]["result"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return data
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def custom_query_range(self, query: str, start_time: datetime, end_time: datetime, step: str,
params: dict = None):
"""
A method to send a query_range to a Prometheus Host.
This method takes as input a string which will be sent as a query to
the specified Prometheus Host. This query is a PromQL query.
:param query: (str) This is a PromQL query, a few examples can be found
at https://prometheus.io/docs/prometheus/latest/querying/examples/
:param start_time: (datetime) A datetime object that specifies the query range start time.
:param end_time: (datetime) A datetime object that specifies the query range end time.
:param step: (str) Query resolution step width in duration format or float number of seconds
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "timeout"
:returns: (dict) A dict of metric data received in response of the query sent
:raises:
(RequestException) Raises an exception in case of a connection error
(PrometheusApiClientException) Raises in case of non 200 response status code
"""
start = round(start_time.timestamp())
end = round(end_time.timestamp())
params = params or {}
data = None
query = str(query)
# using the query_range API to get raw data
response = requests.get(
"{0}/api/v1/query_range".format(self.url),
params={**{"query": query,
"start": start,
"end": end,
"step": step},
**params},
verify=self.ssl_verification,
headers=self.headers,
)
if response.status_code == 200:
data = response.json()["data"]["result"]
else:
raise PrometheusApiClientException(
"HTTP Status Code {} ({})".format(response.status_code, response.content)
)
return data
[docs] @retry(stop_max_attempt_number=MAX_REQUEST_RETRIES, wait_fixed=CONNECTION_RETRY_WAIT_TIME)
def get_metric_aggregation(self, query: str, operations: list, params: dict = None):
"""
A method to get aggregations on metric values received from PromQL query.
This method takes as input a string which will be sent as a query to
the specified Prometheus Host. This query is a PromQL query. And, a
list of operations to perform such as- sum, max, min, deviation, etc.
The received query is passed to the custom_query method which returns
the result of the query and the values are extracted from the result.
:param query: (str) This is a PromQL query, a few examples can be found
at https://prometheus.io/docs/prometheus/latest/querying/examples/
:param operations: (list) A list of operations to perform on the values.
Operations are specified in string type.
:param params: (dict) Optional dictionary containing GET parameters to be
sent along with the API request, such as "timeout"
Available operations - sum, max, min, variance, nth percentile, deviation
and average.
:returns: (dict) A dict of aggregated values received in response to the operations
performed on the values for the query sent.
Example output :
{
'sum': 18.05674,
'max': 6.009373
}
"""
if not isinstance(operations, list):
raise TypeError("Operations can be only of type list")
if len(operations) == 0:
_LOGGER.debug("No operations found to perform")
return None
aggregated_values = {}
data = self.custom_query(query, params)
values = []
for result in data:
val = float(result["value"][1])
values.append(val)
if len(values) == 0:
_LOGGER.debug("No values found for given query.")
return None
np_array = numpy.array(values)
for operation in operations:
if operation == "sum":
aggregated_values['sum'] = numpy.sum(np_array)
elif operation == "max":
aggregated_values['max'] = numpy.max(np_array)
elif operation == "min":
aggregated_values['min'] = numpy.min(np_array)
elif operation == "average":
aggregated_values['average'] = numpy.average(np_array)
elif operation.startswith("percentile"):
percentile = float(operation.split('_')[1])
aggregated_values['percentile_' + str(percentile)] = numpy.percentile(values, percentile)
elif operation == "deviation":
aggregated_values['deviation'] = numpy.std(np_array)
elif operation == "variance":
aggregated_values['deviation'] = numpy.var(np_array)
else:
raise TypeError("Invalid operation: " + operation)
return aggregated_values