# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import eventlet
from oslo_log import log
from watcher.common import context
from watcher.common import exception
from watcher.common import scheduling
from watcher.decision_engine.model.collector import manager
from watcher import objects
from watcher import conf
LOG = log.getLogger(__name__)
CONF = conf.CONF
[docs]class DecisionEngineSchedulingService(scheduling.BackgroundSchedulerService):
def __init__(self, gconfig=None, **options):
gconfig = None or {}
super(DecisionEngineSchedulingService, self).__init__(
gconfig, **options)
self.collector_manager = manager.CollectorManager()
@property
def collectors(self):
return self.collector_manager.get_collectors()
[docs] def add_sync_jobs(self):
for name, collector in self.collectors.items():
timed_task = self._wrap_collector_sync_with_timeout(
collector, name)
self.add_job(timed_task,
trigger='interval',
seconds=collector.config.period,
next_run_time=datetime.datetime.now())
def _as_timed_sync_func(self, sync_func, name, timeout):
def _timed_sync():
with eventlet.Timeout(
timeout,
exception=exception.ClusterDataModelCollectionError(cdm=name)
):
sync_func()
return _timed_sync
def _wrap_collector_sync_with_timeout(self, collector, name):
"""Add an execution timeout constraint on a function"""
timeout = collector.config.period
def _sync():
try:
timed_sync = self._as_timed_sync_func(
collector.synchronize, name, timeout)
timed_sync()
except Exception as exc:
LOG.exception(exc)
collector.set_cluster_data_model_as_stale()
return _sync
[docs] def add_checkstate_job(self):
# 30 minutes interval
interval = CONF.watcher_decision_engine.check_periodic_interval
ap_manager = objects.action_plan.StateManager()
if CONF.watcher_decision_engine.action_plan_expiry != 0:
self.add_job(ap_manager.check_expired, 'interval',
args=[context.make_context()],
seconds=interval,
next_run_time=datetime.datetime.now())
[docs] def cancel_ongoing_audits(self):
audit_filters = {
'audit_type': objects.audit.AuditType.ONESHOT.value,
'state': objects.audit.State.ONGOING,
'hostname': CONF.host
}
local_context = context.make_context()
ongoing_audits = objects.Audit.list(
local_context,
filters=audit_filters)
for audit in ongoing_audits:
audit.state = objects.audit.State.CANCELLED
audit.save()
LOG.info("Audit %(uuid)s has been cancelled because it was in "
"%(state)s state when Decision Engine had been stopped "
"on %(hostname)s host.",
{'uuid': audit.uuid,
'state': objects.audit.State.ONGOING,
'hostname': audit.hostname})
[docs] def start(self):
"""Start service."""
self.add_sync_jobs()
self.add_checkstate_job()
self.cancel_ongoing_audits()
super(DecisionEngineSchedulingService, self).start()
[docs] def reset(self):
"""Reset service.
Called in case service running in daemon mode receives SIGHUP.
"""
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.