Skip to content
Snippets Groups Projects
Commit 3918aa5d authored by ale's avatar ale
Browse files

Refactor SMART monitoring

Maintain backwards compatibility, but add metrics covering all SMART /
NVME attributes, as well as self-test status.
parent 312c1c30
Branches
No related tags found
No related merge requests found
#!/usr/bin/env python3
import argparse
import collections
import csv
import datetime
import decimal
import itertools
import json
import re
import os
import shlex
import subprocess
import sys
GUARD_FILE = '/etc/smartmon.disable'
device_info_re = re.compile(r'^(?P<k>[^:]+?)(?:(?:\sis|):)\s*(?P<v>.*)$')
ata_error_count_re = re.compile(
r'^Error (\d+) \[\d+\] occurred', re.MULTILINE)
PREFIX = 'smartmon_'
self_test_re = re.compile(r'^SMART.*(PASSED|OK)$', re.MULTILINE)
device_info_map = {
'Vendor': 'vendor',
'Product': 'product',
'Revision': 'revision',
'Logical Unit id': 'lun_id',
'Model Family': 'model_family',
'Device Model': 'device_model',
'Serial Number': 'serial_number',
'Firmware Version': 'firmware_version',
}
GUARD_FILE = '/etc/smartmon.disable'
smart_attributes_whitelist = {
'airflow_temperature_cel',
......@@ -72,25 +55,79 @@ smart_attributes_whitelist = {
'workload_minutes',
}
Metric = collections.namedtuple('Metric', 'name labels value')
SmartAttribute = collections.namedtuple('SmartAttribute', [
'id', 'name', 'flag', 'value', 'worst', 'threshold', 'type', 'updated',
'when_failed', 'raw_value',
])
def to_label_value(s):
"""Return a value suitable for a label."""
return s.lower().replace(' ', '_')
class Device(collections.namedtuple('DeviceBase', 'path opts')):
"""Representation of a device as found by smartctl --scan output."""
def quote(s):
"""Quote a value with double quotes."""
return '"%s"' % s.replace('"', '\\"')
@property
def type(self):
return self.opts.type
_smart_value_rx = re.compile(r'^(\d+)')
def smart_value(s):
"""Parse a SMART attribute string representation.
We need to use string representations because it's the only
"processed" (transformed) value in the smartctl JSON output.
"""
m = _smart_value_rx.match(s)
if m:
return int(m[1])
return 0
class _Metric(collections.namedtuple('Metric', ['name', 'labels', 'value', 'type'])):
"""Individual metric, with labels."""
def print(self):
labels = ','.join(
f'{k}={quote(v)}' for k, v in self.labels.items())
# Decimal also correctly converts bool values to 0/1.
value = decimal.Decimal(self.value)
print(f'{PREFIX}{self.name}{{{labels}}} {value}')
def print_meta(self):
print(f'# HELP {PREFIX}{self.name} SMART metric {self.name}')
print(f'# TYPE {PREFIX}{self.name} {self.type}')
def Gauge(name, labels, value):
"""Gauge-type metric."""
return _Metric(name, labels, value, 'gauge')
class Collection():
"""Metric collection."""
def __init__(self):
self.metrics = {}
def add(self, metric):
if metric.name in self.metrics:
self.metrics[metric.name].append(metric)
else:
self.metrics[metric.name] = [metric]
def print(self):
for metrics in self.metrics.values():
metrics[0].print_meta()
for m in metrics:
m.print()
class Device(collections.namedtuple('DeviceBase', ['path', 'type'])):
"""Representation of a device as found by smartctl --scan output."""
@property
def base_labels(self):
def labels(self):
return {'disk': self.path, 'type': self.type}
@property
def smartctl_select(self):
return ['--device', self.type, self.path]
......@@ -103,7 +140,8 @@ class Device(collections.namedtuple('DeviceBase', 'path opts')):
if not tokens:
return None
return Device(tokens[0], parser.parse_known_args(tokens[1:])[0])
args, _ = parser.parse_known_args(tokens[1:])
return Device(tokens[0], args.type)
def __hash__(self):
return hash((self.path, self.type))
......@@ -112,30 +150,6 @@ class Device(collections.namedtuple('DeviceBase', 'path opts')):
return ((self.path, self.type) == (other.path, other.type))
def metric_key(metric, prefix=''):
return '{prefix}{metric.name}'.format(prefix=prefix, metric=metric)
def metric_format(metric, prefix=''):
key = metric_key(metric, prefix)
labels = ','.join(
'{k}="{v}"'.format(k=k, v=v.replace('"', '\\"')) for k, v in metric.labels.items())
value = decimal.Decimal(metric.value)
return '{key}{{{labels}}} {value}'.format(
key=key, labels=labels, value=value)
def metric_print_meta(metric, prefix=''):
key = metric_key(metric, prefix)
print('# HELP {key} SMART metric {metric.name}'.format(
key=key, metric=metric))
print('# TYPE {key} gauge'.format(key=key, metric=metric))
def metric_print(metric, prefix=''):
print(metric_format(metric, prefix))
def smart_ctl(*args, check=True):
"""Wrapper around invoking the smartctl binary.
......@@ -150,10 +164,6 @@ def smart_ctl(*args, check=True):
return e.output.decode('utf-8')
def smart_ctl_version():
return smart_ctl('-V').split('\n')[0].split()[1]
def scan_devices():
"""Find SMART devices by scanning.
......@@ -162,10 +172,8 @@ def scan_devices():
"""
devices = smart_ctl('--scan-open')
devices_nvme = smart_ctl('--scan-open', '-d', 'nvme')
for line in devices.split('\n') + \
devices_nvme.split('\n'):
for line in devices.split('\n'):
device = Device.from_string(line.strip())
if device:
yield device
......@@ -183,206 +191,89 @@ def smartd_devices(config='/etc/smartd.conf'):
yield Device.from_string(line.strip())
def device_is_active(device):
"""Returns whenever the given device is currently active or not.
Args:
device: (Device) Device in question.
Returns:
(bool) True if the device is active and False otherwise.
"""
try:
smart_ctl('--nocheck', 'standby', *device.smartctl_select())
except subprocess.CalledProcessError:
return False
def collect_self_test_status(device, data):
"""Extract SMART self-test status from logs."""
if 'ata_smart_self_test_log' not in data:
return
return True
# Attempt to extract the most recent self test status by type.
most_recent_test_by_type = {}
for test in data['ata_smart_self_test_log']['standard']['table']:
key = test['type']['value']
if (key not in most_recent_test_by_type) or \
(test['lifetime_hours'] > most_recent_test_by_type[key]['lifetime_hours']):
most_recent_test_by_type[key] = test
for test in most_recent_test_by_type.values():
labels = {'test': to_label_value(test['type']['string'])}
labels.update(device.labels)
yield Gauge('self_test_status', labels, test['status']['passed'])
yield Gauge('self_test_hours', labels, test['lifetime_hours'])
def device_info(device):
"""Query device for basic model information.
def collect_ata_attributes(device, data):
"""Parse SMART ATA attributes."""
Args:
device: (Device) Device in question.
if 'ata_smart_attributes' not in data:
return
Returns:
(generator): Generator yielding:
key (str): Key describing the value.
value (str): Actual value.
"""
info_lines = smart_ctl(
'--info', *device.smartctl_select()
).strip().split('\n')[3:]
matches = (device_info_re.match(l) for l in info_lines)
return (m.groups() for m in matches if m is not None)
def device_smart_capabilities(device):
"""Returns SMART capabilities of the given device.
Args:
device: (Device) Device in question.
Returns:
(tuple): tuple containing:
(bool): True whenever SMART is available, False otherwise.
(bool): True whenever SMART is enabled, False otherwise.
"""
try:
subprocess.check_call(
['/usr/sbin/smartctl', '--info'] + device.smartctl_select(),
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True, True
except subprocess.CalledProcessError:
return False, False
def collect_device_info(device):
"""Collect basic device information.
Args:
device: (Device) Device in question.
Yields:
(Metric) metrics describing general device information.
"""
values = dict(device_info(device))
yield Metric('device_info', {
**device.base_labels,
**{v: values[k] for k, v in device_info_map.items() if k in values}
}, True)
def collect_device_health_self_assessment(device):
"""Collect metric about the device health self assessment.
Args:
device: (Device) Device in question.
Yields:
(Metric) Device health self assessment.
"""
out = smart_ctl('--health', *device.smartctl_select())
if self_test_re.search(out):
self_assessment_passed = True
else:
self_assessment_passed = False
yield Metric(
'device_smart_healthy', device.base_labels, self_assessment_passed)
def collect_ata_metrics(device):
# Fetch SMART attributes for the given device.
attributes = smart_ctl(
'--attributes', *device.smartctl_select()
)
# replace multiple occurrences of whitespace with a single whitespace
# so that the CSV Parser recognizes individual columns properly.
attributes = re.sub(r'[\t\x20]+', ' ', attributes)
# Turn smartctl output into a list of lines and skip to the table of
# SMART attributes.
attribute_lines = attributes.strip().split('\n')[7:]
reader = csv.DictReader(
(l.strip() for l in attribute_lines),
fieldnames=SmartAttribute._fields[:-1],
restkey=SmartAttribute._fields[-1], delimiter=' ')
for entry in reader:
# We're only interested in the SMART attributes that are
# whitelisted here.
entry['name'] = entry['name'].lower()
if entry['name'] not in smart_attributes_whitelist:
for attr in data['ata_smart_attributes']['table']:
name = to_label_value(attr['name'])
if name not in smart_attributes_whitelist:
continue
labels = {'attr': name}
labels.update(device.labels)
value = smart_value(attr['raw']['string'])
yield Gauge('attribute', labels, value)
# Ensure that only the numeric parts are fetched from the raw_value.
# Attributes such as 194 Temperature_Celsius reported by my SSD
# are in the format of "36 (Min/Max 24/40)" which can't be expressed
# properly as a prometheus metric.
m = re.match('^(\d+)', ' '.join(entry['raw_value']))
if not m:
continue
entry['raw_value'] = m.group(1)
if entry['name'] in smart_attributes_whitelist:
labels = {
'name': entry['name'],
**device.base_labels,
}
for col in 'value', 'worst', 'threshold':
yield Metric(
'attr_{col}'.format(name=entry["name"], col=col),
labels, entry[col])
def collect_nvme_attributes(device, data):
"""Parse SMART NVME attributes."""
def collect_ata_error_count(device):
"""Inspect the device error log and report the amount of entries.
Args:
device: (Device) Device in question.
Yields:
(Metric) Device error count.
"""
error_log = smart_ctl(
'-l', 'xerror,1', *device.smartctl_select(), check=False)
m = ata_error_count_re.search(error_log)
if 'nvme_smart_health_information_log' not in data:
return
error_count = m.group(1) if m is not None else 0
yield Metric('device_errors', device.base_labels, error_count)
def collect_disks_smart_metrics():
now = int(datetime.datetime.utcnow().timestamp())
devices = set()
for device in itertools.chain(scan_devices(), smartd_devices()):
if device in devices:
for key, value in data['nvme_smart_health_information_log'].items():
if not isinstance(value, int):
continue
devices.add(device)
yield Metric('smartctl_run', device.base_labels, now)
labels = {'attr': key}
labels.update(device.labels)
yield Gauge('attribute', labels, value)
is_active = device_is_active(device)
yield Metric('device_active', device.base_labels, is_active)
def collect_device_metrics(device):
"""Collect all SMART metrics for a single device."""
data = json.loads(
smart_ctl('-a', '--json', *device.smartctl_select))
# Skip further metrics collection to prevent the disk from
# spinning up.
if not is_active:
continue
yield from collect_device_info(device)
is_available = data['smart_support']['available']
yield Gauge('device_smart_available', device.labels, is_available)
yield Gauge('device_smart_enabled', device.labels, data['smart_support']['enabled'])
if not is_available:
return
smart_available, smart_enabled = device_smart_capabilities(device)
yield Gauge('power_on_hours', device.labels, data['power_on_time']['hours'])
yield Gauge('power_cycle_count', device.labels, data['power_cycle_count'])
yield Metric(
'device_smart_available', device.base_labels, smart_available)
yield Metric(
'device_smart_enabled', device.base_labels, smart_enabled)
device_info_labels = {}
device_info_labels.update(device.labels)
for key in ['model_name', 'model_family', 'serial_number', 'firmware_version']:
if key in data:
device_info_labels[key] = data[key]
yield Gauge('device_info', device_info_labels, 1)
# Skip further metrics collection here if SMART is disabled
# on the device. Further smartctl invocations would fail
# anyways.
if not smart_available:
continue
for metric in itertools.chain(
collect_ata_attributes(device, data),
collect_nvme_attributes(device, data),
collect_self_test_status(device, data),
):
yield metric
yield from collect_device_health_self_assessment(device)
if device.type.startswith('sat'):
yield from collect_ata_metrics(device)
yield from collect_ata_error_count(device)
def collect_metrics(devices):
"""Collect all SMART metrics for all known devices."""
for device in devices:
for metric in collect_device_metrics(device):
yield metric
def main():
......@@ -390,23 +281,15 @@ def main():
if os.path.exists(GUARD_FILE):
sys.exit(0)
version_metric = Metric('smartctl_version', {
'version': smart_ctl_version()
}, True)
metric_print_meta(version_metric, 'smartmon_')
metric_print(version_metric, 'smartmon_')
metrics = list(collect_disks_smart_metrics())
metrics.sort(key=lambda i: i.name)
# Get the list of devices from scanning and/or configuration.
devices = set(itertools.chain(scan_devices(), smartd_devices()))
previous_name = None
for m in metrics:
if m.name != previous_name:
metric_print_meta(m, 'smartmon_')
collection = Collection()
for metric in collect_metrics(devices):
collection.add(metric)
previous_name = m.name
collection.print()
metric_print(m, 'smartmon_')
if __name__ == '__main__':
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment