Commit c1c8e22e authored by stevanradakovic's avatar stevanradakovic Committed by Rémi Duraffort

Fix authorization API errors and add unit tests.

Update authorization XMLRPC API so it does not use
abstract class to add/remove permissions.
parent f231d80e
Pipeline #5271 passed with stages
in 10 minutes
......@@ -15,5 +15,5 @@ else
PYTHONPATH=. pytest-3 --cache-clear -W ignore::DeprecationWarning -v --junitxml=common.xml lava_common/tests
PYTHONPATH=. pytest-3 --cache-clear -W ignore::DeprecationWarning \
--ds lava_server.settings.development -v --junitxml=server.xml \
lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py
lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py lava_server/tests.py
fi
......@@ -17,5 +17,5 @@ then
else
set -x
PYTHONPATH=. pytest-3 --cache-clear -v --junitxml=common.xml lava_common/tests
PYTHONPATH=. pytest-3 --cache-clear --ds lava_server.settings.development -v --junitxml=server.xml lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py
PYTHONPATH=. pytest-3 --cache-clear --ds lava_server.settings.development -v --junitxml=server.xml lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py lava_server/tests.py
fi
......@@ -104,5 +104,5 @@ if [ -n "${SERVER}" ]
then
echo "Testing server"
echo
PYTHONPATH=. pytest-3 --cache-clear --ds lava_server.settings.development -v --reuse-db lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py
PYTHONPATH=. pytest-3 --cache-clear --ds lava_server.settings.development -v --reuse-db lava_scheduler_app/tests lava_results_app/tests linaro_django_xmlrpc/tests.py lava_rest_app/tests.py lava_server/tests.py
fi
......@@ -52,7 +52,9 @@ class GroupObjectPermissionManager(models.Manager):
ctype = ContentType.objects.get_for_model(queryset.model)
try:
permission = Permission.objects.get(content_type=ctype, codename=perm)
permission = Permission.objects.get(
content_type=ctype, codename=perm.split(".", 1)[-1]
)
except Permission.DoesNotExist:
raise PermissionNameError("Please use existing permission codename")
......@@ -70,7 +72,9 @@ class GroupObjectPermissionManager(models.Manager):
"""
ctype = ContentType.objects.get_for_model(obj)
try:
permission = Permission.objects.get(content_type=ctype, codename=perm)
permission = Permission.objects.get(
content_type=ctype, codename=perm.split(".", 1)[-1]
)
except Permission.DoesNotExist:
raise PermissionNameError("Please use existing permission codename")
......@@ -95,7 +99,7 @@ class GroupObjectPermissionManager(models.Manager):
ctype = ContentType.objects.get_for_model(obj)
kwargs = {
"group": group,
"permission__codename": perm,
"permission__codename": perm.split(".", 1)[-1],
"permission__content_type": ctype,
ctype.model: obj,
}
......
......@@ -133,6 +133,7 @@ class ManagersTest(TestCaseWithFactory):
GroupDevicePermission.objects.assign_perm(
"admin_device", self.group1, self.qemu_device1
)
self.assertTrue(self.user1.has_perm(Device.ADMIN_PERMISSION, self.qemu_device1))
GroupDevicePermission.objects.remove_perm(
"admin_device", self.group1, self.qemu_device1
)
......
......@@ -28,8 +28,14 @@ from django.contrib.auth.models import Group
from django.core.exceptions import PermissionDenied
from lava_common.utils import debian_package_version
from lava_scheduler_app.api import check_perm
from lava_scheduler_app.views import get_restricted_job
from lava_scheduler_app.models import Device, DeviceType, GroupObjectPermission
from lava_scheduler_app.models import (
Device,
DeviceType,
GroupDevicePermission,
GroupDeviceTypePermission,
)
from linaro_django_xmlrpc.models import errors, Mapper, SystemAPI
......@@ -515,6 +521,7 @@ class LavaSystemAPI(SystemAPI):
)
return yaml.dump(network_map)
@check_perm("lava_scheduler_app.admin_devicetype")
def assign_perm_device_type(self, perm, device_type, group):
"""
Name
......@@ -544,13 +551,6 @@ class LavaSystemAPI(SystemAPI):
------------
No return value.
"""
self._authenticate()
if not self.user.has_perm(DeviceType.ADMIN_PERMISSION, device_type):
raise xmlrpc.client.Fault(
errors.FORBIDDEN,
"Permission denied for user '%s' to assign permissions. Needs administrator privileges."
% self.user,
)
if not isinstance(device_type, str):
raise xmlrpc.client.Fault(
errors.BAD_REQUEST, "device_type name must be a string"
......@@ -569,8 +569,9 @@ class LavaSystemAPI(SystemAPI):
errors.BAD_REQUEST, "please use existing device type name"
)
GroupObjectPermission.objects.assign_perm(perm, group, device_type)
GroupDeviceTypePermission.objects.assign_perm(perm, group, device_type)
@check_perm("lava_scheduler_app.admin_devicetype")
def revoke_perm_device_type(self, perm, device_type, group):
"""
Name
......@@ -600,13 +601,6 @@ class LavaSystemAPI(SystemAPI):
------------
No return value.
"""
self._authenticate()
if not self.user.has_perm(DeviceType.ADMIN_PERMISSION, device_type):
raise xmlrpc.client.Fault(
errors.FORBIDDEN,
"Permission denied for user '%s' to revoke permissions. Needs administrator privileges."
% self.user,
)
if not isinstance(device_type, str):
raise xmlrpc.client.Fault(
errors.BAD_REQUEST, "device_type name must be a string"
......@@ -625,8 +619,9 @@ class LavaSystemAPI(SystemAPI):
errors.BAD_REQUEST, "please use existing device type name"
)
GroupObjectPermission.objects.remove_perm(perm, group, device_type)
GroupDeviceTypePermission.objects.remove_perm(perm, group, device_type)
@check_perm("lava_scheduler_app.admin_device")
def assign_perm_device(self, perm, device, group):
"""
Name
......@@ -656,13 +651,6 @@ class LavaSystemAPI(SystemAPI):
------------
No return value.
"""
self._authenticate()
if not self.user.has_perm(Device.ADMIN_PERMISSION, device):
raise xmlrpc.client.Fault(
errors.FORBIDDEN,
"Permission denied for user '%s' to assign permissions. Needs administrator privileges."
% self.user,
)
if not isinstance(device, str):
raise xmlrpc.client.Fault(
errors.BAD_REQUEST, "device argument must be a string"
......@@ -681,8 +669,9 @@ class LavaSystemAPI(SystemAPI):
errors.BAD_REQUEST, "please use existing device hostname"
)
GroupObjectPermission.objects.assign_perm(perm, group, device)
GroupDevicePermission.objects.assign_perm(perm, group, device)
@check_perm("lava_scheduler_app.admin_device")
def revoke_perm_device(self, perm, device, group):
"""
Name
......@@ -712,13 +701,6 @@ class LavaSystemAPI(SystemAPI):
------------
No return value.
"""
self._authenticate()
if not self.user.has_perm(Device.ADMIN_PERMISSION, device):
raise xmlrpc.client.Fault(
errors.FORBIDDEN,
"Permission denied for user '%s' to revoke permissions. Needs administrator privileges."
% self.user,
)
if not isinstance(device, str):
raise xmlrpc.client.Fault(
errors.BAD_REQUEST, "device argument must be a string"
......@@ -737,7 +719,7 @@ class LavaSystemAPI(SystemAPI):
errors.BAD_REQUEST, "please use existing device hostname"
)
GroupObjectPermission.objects.remove_perm(perm, group, device)
GroupDevicePermission.objects.remove_perm(perm, group, device)
class LavaMapper(Mapper):
......
# -*- coding: utf-8 -*-
# Copyright (C) 2019 Linaro Limited
#
# Author: Stevan Radaković <stevan.radakovic@linaro.org>
#
# This file is part of LAVA.
#
# LAVA is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License version 3
# as published by the Free Software Foundation
#
# LAVA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with LAVA. If not, see <http://www.gnu.org/licenses/>.
import pytest
import xmlrpc.client
from django.contrib.auth.models import Group, User
from nose.tools import nottest
from lava_scheduler_app.models import Device, DeviceType, Worker
from lava_scheduler_app.tests.test_api import TestTransport
class TestLavaServerApi:
@nottest
def ensure_user(
self, username, email, password, is_superuser=False
): # pylint: disable=no-self-use
if User.objects.filter(username=username):
user = User.objects.get(username=username)
else:
user = User.objects.create_user(
username, email, password, is_superuser=is_superuser
)
user.save()
return user
@nottest
def server_proxy(self, user=None, password=None): # pylint: disable=no-self-use
return xmlrpc.client.ServerProxy(
"http://localhost/RPC2/",
transport=TestTransport(user=user, password=password),
)
@pytest.fixture(autouse=True)
def setUp(self, db):
# create group
self.group = Group.objects.create(name="group1")
# create users
self.user1 = User.objects.create(username="user1")
self.user2 = User.objects.create(username="user2")
self.user1.groups.add(self.group)
# Create workers
self.worker1 = Worker.objects.create(
hostname="worker1", state=Worker.STATE_ONLINE, health=Worker.HEALTH_ACTIVE
)
# create devicetype
self.device_type1 = DeviceType.objects.create(name="device_type1")
# create device
self.device1 = Device.objects.create(
hostname="public01", device_type=self.device_type1, worker_host=self.worker1
)
def test_assign_perm_devicetype_unauthorized(self):
user = self.ensure_user("test", "test@mail.net", "test")
server = self.server_proxy("test", "test")
try:
server.system.assign_perm_device_type(
"lava_scheduler_app.view_devicetype",
self.device_type1.name,
self.group.name,
)
except xmlrpc.client.Fault as f:
assert f.faultCode == 403 # nosec
else:
print("fault not raised")
assert False # nosec
def test_assign_perm_devicetype(self):
user = self.ensure_user("test", "test@mail.net", "test", True)
server = self.server_proxy("test", "test")
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
server.system.assign_perm_device_type(
"lava_scheduler_app.view_devicetype",
self.device_type1.name,
self.group.name,
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== True
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
def test_revoke_perm_devicetype_unauthorized(self):
user = self.ensure_user("test", "test@mail.net", "test")
server = self.server_proxy("test", "test")
try:
server.system.revoke_perm_device_type(
"lava_scheduler_app.view_devicetype",
self.device_type1.name,
self.group.name,
)
except xmlrpc.client.Fault as f:
assert f.faultCode == 403 # nosec
else:
print("fault not raised")
assert False # nosec
def test_revoke_perm_devicetype(self):
user = self.ensure_user("test", "test@mail.net", "test", True)
server = self.server_proxy("test", "test")
server.system.assign_perm_device_type(
"lava_scheduler_app.view_devicetype",
self.device_type1.name,
self.group.name,
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== True
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
server.system.revoke_perm_device_type(
"lava_scheduler_app.view_devicetype",
self.device_type1.name,
self.group.name,
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_devicetype", self.device_type1)
== False
)
def test_assign_perm_device_unauthorized(self):
user = self.ensure_user("test", "test@mail.net", "test")
server = self.server_proxy("test", "test")
try:
server.system.assign_perm_device(
"lava_scheduler_app.view_device", self.device1.hostname, self.group.name
)
except xmlrpc.client.Fault as f:
assert f.faultCode == 403 # nosec
else:
print("fault not raised")
assert False # nosec
def test_assign_perm_device(self):
user = self.ensure_user("test", "test@mail.net", "test", True)
server = self.server_proxy("test", "test")
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
server.system.assign_perm_device(
"lava_scheduler_app.view_device", self.device1.hostname, self.group.name
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_device", self.device1) == True
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
def test_revoke_perm_device_unauthorized(self):
user = self.ensure_user("test", "test@mail.net", "test")
server = self.server_proxy("test", "test")
try:
server.system.revoke_perm_device(
"lava_scheduler_app.view_device", self.device1.hostname, self.group.name
)
except xmlrpc.client.Fault as f:
assert f.faultCode == 403 # nosec
else:
print("fault not raised")
assert False # nosec
def test_revoke_perm_device(self):
user = self.ensure_user("test", "test@mail.net", "test", True)
server = self.server_proxy("test", "test")
server.system.assign_perm_device(
"lava_scheduler_app.view_device", self.device1.hostname, self.group.name
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_device", self.device1) == True
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
server.system.revoke_perm_device(
"lava_scheduler_app.view_device", self.device1.hostname, self.group.name
)
assert ( # nosec
self.user1.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
assert ( # nosec
self.user2.has_perm("lava_scheduler_app.view_device", self.device1) == False
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment