Source code for ceilometer.transformer.conversions
#
# Copyright 2013 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import re
from oslo_log import log
from oslo_utils import timeutils
import six
from ceilometer.i18n import _, _LW
from ceilometer import sample
from ceilometer import transformer
LOG = log.getLogger(__name__)
[docs]class BaseConversionTransformer(transformer.TransformerBase):
    """Transformer to derive conversion."""
    grouping_keys = ['resource_id']
    def __init__(self, source=None, target=None, **kwargs):
        """Initialize transformer with configured parameters.
        :param source: dict containing source sample unit
        :param target: dict containing target sample name, type,
                       unit and scaling factor (a missing value
                       connotes no change)
        """
        self.source = source or {}
        self.target = target or {}
        super(BaseConversionTransformer, self).__init__(**kwargs)
    def _map(self, s, attr):
        """Apply the name or unit mapping if configured."""
        mapped = None
        from_ = self.source.get('map_from')
        to_ = self.target.get('map_to')
        if from_ and to_:
            if from_.get(attr) and to_.get(attr):
                try:
                    mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
                except Exception:
                    pass
        return mapped or self.target.get(attr, getattr(s, attr))
[docs]class DeltaTransformer(BaseConversionTransformer):
    """Transformer based on the delta of a sample volume."""
    def __init__(self, target=None, growth_only=False, **kwargs):
        """Initialize transformer with configured parameters.
        :param growth_only: capture only positive deltas
        """
        super(DeltaTransformer, self).__init__(target=target, **kwargs)
        self.growth_only = growth_only
        self.cache = {}
[docs]    def handle_sample(self, s):
        """Handle a sample, converting if necessary."""
        key = s.name + s.resource_id
        prev = self.cache.get(key)
        timestamp = timeutils.parse_isotime(s.timestamp)
        self.cache[key] = (s.volume, timestamp)
        if prev:
            prev_volume = prev[0]
            prev_timestamp = prev[1]
            time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
            # disallow violations of the arrow of time
            if time_delta < 0:
                LOG.warning(_LW('Dropping out of time order sample: %s'), (s,))
                # Reset the cache to the newer sample.
                self.cache[key] = prev
                return None
            volume_delta = s.volume - prev_volume
            if self.growth_only and volume_delta < 0:
                LOG.warning(_LW('Negative delta detected, dropping value'))
                s = None
            else:
                s = self._convert(s, volume_delta)
                LOG.debug('Converted to: %s', s)
        else:
            LOG.warning(_LW('Dropping sample with no predecessor: %s'), (s,))
            s = None
        return s
    def _convert(self, s, delta):
        """Transform the appropriate sample fields."""
        return sample.Sample(
            name=self._map(s, 'name'),
            unit=s.unit,
            type=sample.TYPE_DELTA,
            volume=delta,
            user_id=s.user_id,
            project_id=s.project_id,
            resource_id=s.resource_id,
            timestamp=s.timestamp,
            resource_metadata=s.resource_metadata
        )
[docs]class ScalingTransformer(BaseConversionTransformer):
    """Transformer to apply a scaling conversion."""
    def __init__(self, source=None, target=None, **kwargs):
        """Initialize transformer with configured parameters.
        :param source: dict containing source sample unit
        :param target: dict containing target sample name, type,
                       unit and scaling factor (a missing value
                       connotes no change)
        """
        super(ScalingTransformer, self).__init__(source=source, target=target,
                                                 **kwargs)
        self.scale = self.target.get('scale')
        self.max = self.target.get('max')
        LOG.debug('scaling conversion transformer with source:'
                  ' %(source)s target: %(target)s:', {'source': self.source,
                                                      'target': self.target})
    def _scale(self, s):
        """Apply the scaling factor.
        Either a straight multiplicative factor or else a string to be eval'd.
        """
        ns = transformer.Namespace(s.as_dict())
        scale = self.scale
        return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
                 else s.volume * scale) if scale else s.volume)
    def _convert(self, s, growth=1):
        """Transform the appropriate sample fields."""
        volume = self._scale(s) * growth
        return sample.Sample(
            name=self._map(s, 'name'),
            unit=self._map(s, 'unit'),
            type=self.target.get('type', s.type),
            volume=min(volume, self.max) if self.max else volume,
            user_id=s.user_id,
            project_id=s.project_id,
            resource_id=s.resource_id,
            timestamp=s.timestamp,
            resource_metadata=s.resource_metadata
        )
[docs]    def handle_sample(self, s):
        """Handle a sample, converting if necessary."""
        LOG.debug('handling sample %s', s)
        if self.source.get('unit', s.unit) == s.unit:
            s = self._convert(s)
            LOG.debug('converted to: %s', s)
        return s
[docs]class RateOfChangeTransformer(ScalingTransformer):
    """Transformer based on the rate of change of a sample volume.
    For example, taking the current and previous volumes of a cumulative sample
    and producing a gauge value based on the proportion of some maximum used.
    """
    def __init__(self, **kwargs):
        """Initialize transformer with configured parameters."""
        super(RateOfChangeTransformer, self).__init__(**kwargs)
        self.cache = {}
        self.scale = self.scale or '1'
[docs]    def handle_sample(self, s):
        """Handle a sample, converting if necessary."""
        LOG.debug('handling sample %s', s)
        key = s.name + s.resource_id
        prev = self.cache.get(key)
        timestamp = timeutils.parse_isotime(s.timestamp)
        self.cache[key] = (s.volume, timestamp, s.monotonic_time)
        if prev:
            prev_volume = prev[0]
            prev_timestamp = prev[1]
            prev_monotonic_time = prev[2]
            if (prev_monotonic_time is not None and
                    s.monotonic_time is not None):
                # NOTE(sileht): Prefer high precision timer
                time_delta = s.monotonic_time - prev_monotonic_time
            else:
                time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
            # disallow violations of the arrow of time
            if time_delta < 0:
                LOG.warning(_('dropping out of time order sample: %s'), (s,))
                # Reset the cache to the newer sample.
                self.cache[key] = prev
                return None
            # we only allow negative volume deltas for noncumulative
            # samples, whereas for cumulative we assume that a reset has
            # occurred in the interim so that the current volume gives a
            # lower bound on growth
            volume_delta = (s.volume - prev_volume
                            if (prev_volume <= s.volume or
                                s.type != sample.TYPE_CUMULATIVE)
                            else s.volume)
            rate_of_change = ((1.0 * volume_delta / time_delta)
                              if time_delta else 0.0)
            s = self._convert(s, rate_of_change)
            LOG.debug('converted to: %s', s)
        else:
            LOG.warning(_('dropping sample with no predecessor: %s'),
                        (s,))
            s = None
        return s
[docs]class AggregatorTransformer(ScalingTransformer):
    """Transformer that aggregates samples.
    Aggregation goes until a threshold or/and a retention_time, and then
    flushes them out into the wild.
    Example:
      To aggregate sample by resource_metadata and keep the
      resource_metadata of the latest received sample;
        AggregatorTransformer(retention_time=60, resource_metadata='last')
      To aggregate sample by user_id and resource_metadata and keep the
      user_id of the first received sample and drop the resource_metadata.
        AggregatorTransformer(size=15, user_id='first',
                              resource_metadata='drop')
      To keep the timestamp of the last received sample rather
      than the first:
        AggregatorTransformer(timestamp="last")
    """
    def __init__(self, size=1, retention_time=None,
                 project_id=None, user_id=None, resource_metadata="last",
                 timestamp="first", **kwargs):
        super(AggregatorTransformer, self).__init__(**kwargs)
        self.samples = {}
        self.counts = collections.defaultdict(int)
        self.size = int(size) if size else None
        self.retention_time = float(retention_time) if retention_time else None
        if not (self.size or self.retention_time):
            self.size = 1
        if timestamp in ["first", "last"]:
            self.timestamp = timestamp
        else:
            self.timestamp = "first"
        self.initial_timestamp = None
        self.aggregated_samples = 0
        self.key_attributes = []
        self.merged_attribute_policy = {}
        self._init_attribute('project_id', project_id)
        self._init_attribute('user_id', user_id)
        self._init_attribute('resource_metadata', resource_metadata,
                             is_droppable=True, mandatory=True)
    def _init_attribute(self, name, value, is_droppable=False,
                        mandatory=False):
        drop = ['drop'] if is_droppable else []
        if value or mandatory:
            if value not in ['last', 'first'] + drop:
                LOG.warning('%s is unknown (%s), using last' % (name, value))
                value = 'last'
            self.merged_attribute_policy[name] = value
        else:
            self.key_attributes.append(name)
    def _get_unique_key(self, s):
        # NOTE(arezmerita): in samples generated by ceilometer middleware,
        # when accessing without authentication publicly readable/writable
        # swift containers, the project_id and the user_id are missing.
        # They will be replaced by <undefined> for unique key construction.
        keys = ['<undefined>' if getattr(s, f) is None else getattr(s, f)
                for f in self.key_attributes]
        non_aggregated_keys = "-".join(keys)
        # NOTE(sileht): it assumes, a meter always have the same unit/type
        return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
[docs]    def handle_sample(self, sample_):
        if not self.initial_timestamp:
            self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
        self.aggregated_samples += 1
        key = self._get_unique_key(sample_)
        self.counts[key] += 1
        if key not in self.samples:
            self.samples[key] = self._convert(sample_)
            if self.merged_attribute_policy[
                    'resource_metadata'] == 'drop':
                self.samples[key].resource_metadata = {}
        else:
            if self.timestamp == "last":
                self.samples[key].timestamp = sample_.timestamp
            if sample_.type == sample.TYPE_CUMULATIVE:
                self.samples[key].volume = self._scale(sample_)
            else:
                self.samples[key].volume += self._scale(sample_)
            for field in self.merged_attribute_policy:
                if self.merged_attribute_policy[field] == 'last':
                    setattr(self.samples[key], field,
                            getattr(sample_, field))
[docs]    def flush(self):
        if not self.initial_timestamp:
            return []
        expired = (self.retention_time and
                   timeutils.is_older_than(self.initial_timestamp,
                                           self.retention_time))
        full = self.size and self.aggregated_samples >= self.size
        if full or expired:
            x = list(self.samples.values())
            # gauge aggregates need to be averages
            for s in x:
                if s.type == sample.TYPE_GAUGE:
                    key = self._get_unique_key(s)
                    s.volume /= self.counts[key]
            self.samples.clear()
            self.counts.clear()
            self.aggregated_samples = 0
            self.initial_timestamp = None
            return x
        return []