# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from django.utils.translation import ugettext_lazy as _
import re
from castellan.common.objects import x_509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import load_pem_x509_certificate
from horizon import exceptions
from horizon import forms
from horizon import messages
from castellan_ui.api import client
NAME_REGEX = re.compile(r"^\w+(?:[- ]\w+)*$", re.UNICODE)
ERROR_MESSAGES = {
'invalid': _('Name may only contain letters, '
'numbers, underscores, spaces, and hyphens '
'and may not be white space.')}
[docs]class ImportX509Certificate(forms.SelfHandlingForm):
name = forms.RegexField(required=False,
max_length=255,
label=_("Certificate Name"),
regex=NAME_REGEX,
error_messages=ERROR_MESSAGES)
source_type = forms.ChoiceField(
label=_('Source'),
required=False,
choices=[('file', _('Import File')),
('raw', _('Direct Input'))],
widget=forms.ThemableSelectWidget(
attrs={'class': 'switchable', 'data-slug': 'source'}))
cert_file = forms.FileField(
label=_("Choose file"),
widget=forms.FileInput(
attrs={'class': 'switched', 'data-switch-on': 'source',
'data-source-file': _('PEM Certificate File')}),
required=False)
direct_input = forms.CharField(
label=_('PEM Certificate'),
widget=forms.widgets.Textarea(
attrs={'class': 'switched', 'data-switch-on': 'source',
'data-source-raw': _('PEM Certificate')}),
required=False)
[docs] def clean(self):
data = super(ImportX509Certificate, self).clean()
# The cert can be missing based on particular upload
# conditions. Code defensively for it here...
cert_file = data.get('cert_file', None)
cert_raw = data.get('direct_input', None)
if cert_raw and cert_file:
raise forms.ValidationError(
_("Cannot specify both file and direct input."))
if not cert_raw and not cert_file:
raise forms.ValidationError(
_("No input was provided for the certificate value."))
try:
if cert_file:
cert_pem = self.files['cert_file'].read()
else:
cert_pem = str(data['direct_input'])
cert_obj = load_pem_x509_certificate(
cert_pem.encode('utf-8'), default_backend())
cert_der = cert_obj.public_bytes(Encoding.DER)
except Exception as e:
msg = _('There was a problem loading the certificate: %s. '
'Is the certificate valid and in PEM format?') % e
raise forms.ValidationError(msg)
data['cert_data'] = base64.b64encode(cert_der).decode('utf-8')
return data
[docs] def handle(self, request, data):
try:
cert_pem = data.get('cert_data')
cert_uuid = client.import_object(
request,
data=cert_pem,
name=data['name'],
object_type=x_509.X509)
if data['name']:
identifier = data['name']
else:
identifier = cert_uuid
messages.success(request,
_('Successfully imported certificate: %s')
% identifier)
return cert_uuid
except Exception as e:
msg = _('Unable to import certificate: %s')
messages.error(request, msg % e)
exceptions.handle(request, ignore=True)
self.api_error(_('Unable to import certificate.'))
return False
Except where otherwise noted, this document is licensed under Creative Commons Attribution 3.0 License. See all OpenStack Legal Documents.