staticweb: Work with prefix-based tempurls
Note that there's a bit of a privilege escalation as prefix-based tempurls can now be used to perform listings -- but only on containers with staticweb enabled. Since having staticweb enabled was previously pretty useless unless the container was both public and publicly-listable, I think it's probably fine. This also allows tempurls to be used at the container level, but only for staticweb responses. Change-Id: I7949185fdd3b64b882df01d54a8bc158ce2d7032
This commit is contained in:
6 changed files with 550 additions and 146 deletions
@@ -15,13 +15,17 @@
# limitations under the License.
import functools
import hashlib
import six
import time
from unittest import SkipTest
from six.moves.urllib.parse import unquote
from swift.common.middleware import tempurl
from swift.common.utils import quote
from swift.common.swob import str_to_wsgi
import test.functional as tf
from test.functional.tests import Utils, Base, Base2, BaseEnv
from test.functional.test_tempurl import tempurl_parms
from test.functional.swift_test_client import Account, Connection, \
ResponseError
@@ -424,3 +428,213 @@ class TestStaticWebUTF8(Base2, TestStaticWeb):
def test_redirect_slash_anon_remap_cont(self):
self.skipTest("Can't remap UTF8 containers")
class TestStaticWebTempurlEnv(BaseEnv):
static_web_enabled = None # tri-state: None initially, then True/False
tempurl_enabled = None # tri-state: None initially, then True/False
@classmethod
def setUp(cls):
cls.conn = Connection(tf.config)
cls.conn.authenticate()
if cls.static_web_enabled is None:
cls.static_web_enabled = 'staticweb' in tf.cluster_info
if not cls.static_web_enabled:
return
if cls.tempurl_enabled is None:
cls.tempurl_enabled = 'tempurl' in tf.cluster_info
if not cls.tempurl_enabled:
return
cls.account = Account(
cls.conn, tf.config.get('account', tf.config['username']))
cls.account.delete_containers()
cls.container = cls.account.container(Utils.create_name())
cls.tempurl_key = Utils.create_name()
if not cls.container.create(
hdrs={'X-Container-Meta-Web-Listings': 'true',
'X-Container-Meta-Temp-URL-Key': cls.tempurl_key}):
raise ResponseError(cls.conn.response)
objects = ['index',
'error',
'listings_css',
'dir/',
'dir/obj',
'dir/subdir/',
'dir/subdir/obj']
cls.objects = {}
for item in sorted(objects):
if '/' in item.rstrip('/'):
parent, _ = item.rstrip('/').rsplit('/', 1)
path = '%s/%s' % (cls.objects[parent + '/'].name,
Utils.create_name())
else:
path = Utils.create_name()
if item[-1] == '/':
cls.objects[item] = cls.container.file(path)
cls.objects[item].write(hdrs={
'Content-Type': 'application/directory'})
else:
cls.objects[item] = cls.container.file(path)
cls.objects[item].write(('%s contents' % item).encode('utf8'))
class TestStaticWebTempurl(Base):
env = TestStaticWebTempurlEnv
set_up = False
def setUp(self):
super(TestStaticWebTempurl, self).setUp()
if self.env.static_web_enabled is False:
raise SkipTest("Static Web not enabled")
elif self.env.static_web_enabled is not True:
# just some sanity checking
raise Exception(
"Expected static_web_enabled to be True/False, got %r" %
(self.env.static_web_enabled,))
if self.env.tempurl_enabled is False:
raise SkipTest("Temp URL not enabled")
elif self.env.tempurl_enabled is not True:
# just some sanity checking
raise Exception(
"Expected tempurl_enabled to be True/False, got %r" %
(self.env.tempurl_enabled,))
self.whole_container_parms = dict(tempurl_parms(
'GET', int(time.time() + 60),
'prefix:%s' % self.env.conn.make_path(
self.env.container.path + ['']),
self.env.tempurl_key, hashlib.sha256,
), temp_url_prefix='')
def link(self, virtual_name, parms=None):
name = self.env.objects[virtual_name].name.rsplit('/', 1)[-1]
if parms is None:
parms = self.whole_container_parms
return (
'<a href="%s?temp_url_prefix=%s&temp_url_expires=%s&'
'temp_url_sig=%s">%s</a>' % (
name,
parms['temp_url_prefix'],
quote(str(parms['temp_url_expires'])),
parms['temp_url_sig'],
name))
def test_unauthed(self):
status = self.env.conn.make_request(
'GET', self.env.container.path, cfg={'no_auth_token': True})
self.assertEqual(status, 401)
def test_staticweb_off(self):
self.env.container.update_metadata(
{'X-Remove-Container-Meta-Web-Listings': 'true'})
status = self.env.conn.make_request(
'GET', self.env.container.path, parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 401, self.env.conn.response.read())
status = self.env.conn.make_request(
'GET', self.env.container.path + [''],
parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 401)
status = self.env.conn.make_request(
'GET',
self.env.container.path + [self.env.objects['dir/'].name, ''],
parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 404)
def test_get_root(self):
status = self.env.conn.make_request(
'GET', self.env.container.path, parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 301)
status = self.env.conn.make_request(
'GET', self.env.container.path + [''],
parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 200)
body = self.env.conn.response.read()
if not six.PY2:
body = body.decode('utf-8')
self.assertIn('Listing of /v1/', body)
self.assertNotIn('href="..', body)
self.assertIn(self.link('dir/'), body)
def test_get_dir(self):
status = self.env.conn.make_request(
'GET',
self.env.container.path + [self.env.objects['dir/'].name, ''],
parms=self.whole_container_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 200)
body = self.env.conn.response.read()
if not six.PY2:
body = body.decode('utf-8')
self.assertIn('Listing of /v1/', body)
self.assertIn('href="..', body)
self.assertIn(self.link('dir/obj'), body)
self.assertIn(self.link('dir/subdir/'), body)
def test_get_dir_with_iso_expiry(self):
iso_expiry = time.strftime(
tempurl.EXPIRES_ISO8601_FORMAT,
time.gmtime(int(self.whole_container_parms['temp_url_expires'])))
iso_parms = dict(self.whole_container_parms,
temp_url_expires=iso_expiry)
status = self.env.conn.make_request(
'GET',
self.env.container.path + [self.env.objects['dir/'].name, ''],
parms=iso_parms,
cfg={'no_auth_token': True})
self.assertEqual(status, 200)
body = self.env.conn.response.read()
if not six.PY2:
body = body.decode('utf-8')
self.assertIn('Listing of /v1/', body)
self.assertIn('href="..', body)
self.assertIn(self.link('dir/obj', iso_parms), body)
self.assertIn(self.link('dir/subdir/', iso_parms), body)
def test_get_limited_dir(self):
parms = dict(tempurl_parms(
'GET', int(time.time() + 60),
'prefix:%s' % self.env.conn.make_path(
self.env.container.path + [self.env.objects['dir/'].name, '']),
self.env.tempurl_key, hashlib.sha256,
), temp_url_prefix=self.env.objects['dir/'].name + '/')
status = self.env.conn.make_request(
'GET',
self.env.container.path + [self.env.objects['dir/'].name, ''],
parms=parms, cfg={'no_auth_token': True})
self.assertEqual(status, 200)
body = self.env.conn.response.read()
if not six.PY2:
body = body.decode('utf-8')
self.assertIn('Listing of /v1/', body)
self.assertNotIn('href="..', body)
self.assertIn(self.link('dir/obj', parms), body)
self.assertIn(self.link('dir/subdir/', parms), body)
status = self.env.conn.make_request(
'GET', self.env.container.path + [
self.env.objects['dir/subdir/'].name, ''],
parms=parms, cfg={'no_auth_token': True})
self.assertEqual(status, 200)
body = self.env.conn.response.read()
if not six.PY2:
body = body.decode('utf-8')
self.assertIn('Listing of /v1/', body)
self.assertIn('href="..', body)
self.assertIn(self.link('dir/subdir/obj', parms), body)
@@ -34,6 +34,19 @@ from test.functional.swift_test_client import Account, Connection, \
ResponseError
def tempurl_parms(method, expires, path, key, digest=None):
path = urllib.parse.unquote(path)
if not six.PY2:
method = method.encode('utf8')
path = path.encode('utf8')
key = key.encode('utf8')
sig = hmac.new(
key,
b'%s\n%d\n%s' % (method, expires, path),
digest or hashlib.sha256).hexdigest()
return {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
def setUpModule():
tf.setup_package()
@@ -119,16 +132,7 @@ class TestTempurl(Base):
self.env.tempurl_key)
def tempurl_parms(self, method, expires, path, key):
path = urllib.parse.unquote(path)
if not six.PY2:
method = method.encode('utf8')
path = path.encode('utf8')
key = key.encode('utf8')
sig = hmac.new(
key,
b'%s\n%d\n%s' % (method, expires, path),
self.digest).hexdigest()
return {'temp_url_sig': sig, 'temp_url_expires': str(expires)}
return tempurl_parms(method, expires, path, key, self.digest)
def test_GET(self):
for e in (str(self.expires), self.expires_8601):
@@ -350,17 +354,12 @@ class TestTempurlPrefix(TestTempurl):
else:
prefix = path_parts[4][:4]
prefix_to_hash = '/'.join(path_parts[0:4]) + '/' + prefix
if not six.PY2:
method = method.encode('utf8')
prefix_to_hash = prefix_to_hash.encode('utf8')
key = key.encode('utf8')
sig = hmac.new(
key,
b'%s\n%d\nprefix:%s' % (method, expires, prefix_to_hash),
self.digest).hexdigest()
return {
'temp_url_sig': sig, 'temp_url_expires': str(expires),
'temp_url_prefix': prefix}
parms = tempurl_parms(
method, expires,
'prefix:' + prefix_to_hash,
key, self.digest)
parms['temp_url_prefix'] = prefix
return parms
def test_empty_prefix(self):
parms = self.tempurl_parms(
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.