Source code for object_storage.test_object_formpost_negative

# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import hashlib
import hmac
import time
from urllib import parse as urlparse

from tempest.api.object_storage import base
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc

CONF = config.CONF


[docs] class ObjectFormPostNegativeTest(base.BaseObjectTest): """Negative tests of object post with form""" metadata = {} containers = [] @classmethod def resource_setup(cls): super(ObjectFormPostNegativeTest, cls).resource_setup() cls.container_name = cls.create_container() cls.object_name = data_utils.rand_name( prefix=CONF.resource_name_prefix, name='ObjectTemp') cls.key = 'Meta' cls.metadata = {'Temp-URL-Key': cls.key} cls.account_client.create_update_or_delete_account_metadata( create_update_metadata=cls.metadata) def setUp(self): super(ObjectFormPostNegativeTest, self).setUp() # make sure the metadata has been set account_client_metadata, _ = \ self.account_client.list_account_metadata() self.assertIn('x-account-meta-temp-url-key', account_client_metadata) self.assertEqual( account_client_metadata['x-account-meta-temp-url-key'], self.key) @classmethod def resource_cleanup(cls): cls.account_client.create_update_or_delete_account_metadata( delete_metadata=cls.metadata) cls.delete_containers() super(ObjectFormPostNegativeTest, cls).resource_cleanup() def get_multipart_form(self, expires=600): path = "%s/%s/%s" % ( urlparse.urlparse(self.container_client.base_url).path, self.container_name, self.object_name) redirect = '' max_file_size = 104857600 max_file_count = 10 expires += int(time.time()) hmac_body = '%s\n%s\n%s\n%s\n%s' % (path, redirect, max_file_size, max_file_count, expires) signature = hmac.new( self.key.encode(), hmac_body.encode(), hashlib.sha1 ).hexdigest() fields = {'redirect': redirect, 'max_file_size': str(max_file_size), 'max_file_count': str(max_file_count), 'expires': str(expires), 'signature': signature} boundary = '--boundary--' data = [] for (key, value) in fields.items(): data.append('--' + boundary) data.append('Content-Disposition: form-data; name="%s"' % key) data.append('') data.append(value) data.append('--' + boundary) data.append('Content-Disposition: form-data; ' 'name="file1"; filename="testfile"') data.append('Content-Type: application/octet-stream') data.append('') data.append('hello world') data.append('--' + boundary + '--') data.append('') body = '\r\n'.join(data) content_type = 'multipart/form-data; boundary=%s' % boundary return body, content_type
[docs] @decorators.idempotent_id('d3fb3c4d-e627-48ce-9379-a1631f21336d') @utils.requires_ext(extension='formpost', service='object') @decorators.attr(type=['negative']) def test_post_object_using_form_expired(self): """Test posting object using expired form""" body, content_type = self.get_multipart_form(expires=1) time.sleep(2) headers = {'Content-Type': content_type, 'Content-Length': str(len(body))} url = "%s/%s" % (self.container_name, self.object_name) exc = self.assertRaises( lib_exc.Unauthorized, self.object_client.post, url, body, headers=headers) self.assertIn('FormPost: Form Expired', str(exc))
[docs] @decorators.idempotent_id('b277257f-113c-4499-b8d1-5fead79f7360') @utils.requires_ext(extension='formpost', service='object') @decorators.attr(type=['negative']) def test_post_object_using_form_invalid_signature(self): """Test posting object using form with invalid signature""" self.key = "Wrong" body, content_type = self.get_multipart_form() headers = {'Content-Type': content_type, 'Content-Length': str(len(body))} url = "%s/%s" % (self.container_name, self.object_name) exc = self.assertRaises( lib_exc.Unauthorized, self.object_client.post, url, body, headers=headers) self.assertIn('FormPost: Invalid Signature', str(exc))