#!/usr/bin/python
from __future__ import unicode_literals
import re
try:
import StringIO
except ImportError:
# Python 3
import io as StringIO
from . import core
def text_string_to_metric_families(text):
"""Parse Prometheus text format from a unicode string.
See text_fd_to_metric_families.
"""
for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)):
yield metric_family
ESCAPE_SEQUENCES = {
'\\\\': '\\',
'\\n': '\n',
'\\"': '"',
}
def replace_escape_sequence(match):
return ESCAPE_SEQUENCES[match.group(0)]
HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
ESCAPING_RE = re.compile(r'\\[\\n"]')
def _replace_help_escaping(s):
return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
def _replace_escaping(s):
return ESCAPING_RE.sub(replace_escape_sequence, s)
def _is_character_escaped(s, charpos):
num_bslashes = 0
while (charpos > num_bslashes and
s[charpos - 1 - num_bslashes] == '\\'):
num_bslashes += 1
return num_bslashes % 2 == 1
def _parse_labels(labels_string):
labels = {}
# Return if we don't have valid labels
if "=" not in labels_string:
return labels
escaping = False
if "\\" in labels_string:
escaping = True
# Copy original labels
sub_labels = labels_string
try:
# Process one label at a time
while sub_labels:
# The label name is before the equal
value_start = sub_labels.index("=")
label_name = sub_labels[:value_start]
sub_labels = sub_labels[value_start + 1:].lstrip()
# Find the first quote after the equal
quote_start = sub_labels.index('"') + 1
value_substr = sub_labels[quote_start:]
# Find the last unescaped quote
i = 0
while i < len(value_substr):
i = value_substr.index('"', i)
if not _is_character_escaped(value_substr, i):
break
i += 1
# The label value is inbetween the first and last quote
quote_end = i + 1
label_value = sub_labels[quote_start:quote_end]
# Replace escaping if needed
if escaping:
label_value = _replace_escaping(label_value)
labels[label_name.strip()] = label_value
# Remove the processed label from the sub-slice for next iteration
sub_labels = sub_labels[quote_end + 1:]
next_comma = sub_labels.find(",") + 1
sub_labels = sub_labels[next_comma:].lstrip()
return labels
except ValueError:
raise ValueError("Invalid labels: %s" % labels_string)
# If we have multiple values only consider the first
def _parse_value(s):
s = s.lstrip()
separator = " "
if separator not in s:
separator = "\t"
i = s.find(separator)
if i == -1:
return s
return s[:i]
def _parse_sample(text):
# Detect the labels in the text
try:
label_start, label_end = text.index("{"), text.rindex("}")
# The name is before the labels
name = text[:label_start].strip()
# We ignore the starting curly brace
label = text[label_start + 1:label_end]
# The value is after the label end (ignoring curly brace and space)
value = float(_parse_value(text[label_end + 2:]))
return name, _parse_labels(label), value
# We don't have labels
except ValueError:
# Detect what separator is used
separator = " "
if separator not in text:
separator = "\t"
name_end = text.index(separator)
name = text[:name_end]
# The value is after the name
value = float(_parse_value(text[name_end:]))
return name, {}, value
def text_fd_to_metric_families(fd):
"""Parse Prometheus text format from a file descriptor.
This is a laxer parser than the main Go parser,
so successful parsing does not imply that the parsed
text meets the specification.
Yields core.Metric's.
"""
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
def build_metric(name, documentation, typ, samples):
metric = core.Metric(name, documentation, typ)
metric.samples = samples
return metric
for line in fd:
line = line.strip()
if line.startswith('#'):
parts = line.split(None, 3)
if len(parts) < 2:
continue
if parts[1] == 'HELP':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
typ = 'untyped'
samples = []
allowed_names = [parts[2]]
if len(parts) == 4:
documentation = _replace_help_escaping(parts[3])
else:
documentation = ''
elif parts[1] == 'TYPE':
if parts[2] != name:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric
name = parts[2]
documentation = ''
samples = []
typ = parts[3]
allowed_names = {
'counter': [''],
'gauge': [''],
'summary': ['_count', '_sum', ''],
'histogram': ['_count', '_sum', '_bucket'],
}.get(typ, [''])
allowed_names = [name + n for n in allowed_names]
else:
# Ignore other comment tokens
pass
elif line == '':
# Ignore blank lines
pass
else:
sample = _parse_sample(line)
if sample[0] not in allowed_names:
if name != '':
yield build_metric(name, documentation, typ, samples)
# New metric, yield immediately as untyped singleton
name = ''
documentation = ''
typ = 'untyped'
samples = []
allowed_names = []
yield build_metric(sample[0], documentation, typ, [sample])
else:
samples.append(sample)
if name != '':
yield build_metric(name, documentation, typ, samples)