diff --git a/promgen/actions.py b/promgen/actions.py index f3047ddc..e54ed5b9 100644 --- a/promgen/actions.py +++ b/promgen/actions.py @@ -1,6 +1,14 @@ from django.contrib import admin, messages +from django.db import transaction +from django.http import HttpResponseRedirect +from django.template.response import TemplateResponse +from django.utils.translation import gettext as _ +from guardian.models import UserObjectPermission +from guardian.shortcuts import assign_perm +from social_django.models import UserSocialAuth -from promgen import tasks +from promgen import models, tasks +from promgen.notification.user import NotificationUser @admin.action(description="Clear Tombstones") @@ -54,3 +62,102 @@ def shard_rules(modeladmin, request, queryset): def shard_urls(modeladmin, request, queryset): for shard in queryset: prometheus_urls(modeladmin, request, shard.prometheus_set.all()) + + +@admin.action(description="Merge selected users") +def merge_users_action(modeladmin, request, queryset): + if "new_user_id" not in request.POST: + return TemplateResponse( + request, + "promgen/user_merge.html", + context={ + "title": _("Merge Users"), + "users": queryset, + }, + ) + + new_user_id = int(request.POST["new_user_id"]) + new_user = queryset.get(id=new_user_id) + old_users = queryset.exclude(id=new_user_id) + count = old_users.count() + + merge_users(old_users, new_user) + messages.success(request, f"Merged {count} user(s) into {new_user.username}") + + return HttpResponseRedirect(request.get_full_path()) + + +@transaction.atomic +def merge_users(old_users, new_user): + # Merge social auth accounts + UserSocialAuth.objects.filter(user__in=old_users).update(user=new_user) + + # Update owner fields + models.Sender.objects.filter(owner__in=old_users).update(owner=new_user) + models.Project.objects.filter(owner__in=old_users).update(owner=new_user) + models.Service.objects.filter(owner__in=old_users).update(owner=new_user) + + # Update sender value fields for notification.user type + models.Sender.objects.filter( + sender=NotificationUser.__module__, value__in=old_users.values_list("id", flat=True) + ).update(value=str(new_user.id)) + + # Copy group memberships and user permissions + for old_user in old_users: + for group in old_user.groups.all(): + new_user.groups.add(group) + for perm in old_user.user_permissions.all(): + new_user.user_permissions.add(perm) + + # Update object permissions. If all users have many permissions on the same object, we want to + # keep the highest level of permission to the new user. See map_obj_perm_by_perm_rank function. + user_ids = list(old_users.values_list("id", flat=True)) + [new_user.id] + + service_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="promgen", content_type__model="service" + ).values("object_pk", "permission__codename") + service_perm_map = map_obj_perm_by_perm_rank( + service_perms, + {"service_admin": 3, "service_editor": 2, "service_viewer": 1}, + ) + for service_id, codename in service_perm_map.items(): + assign_perm(codename, new_user, models.Service.objects.get(pk=service_id)) + + project_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="promgen", content_type__model="project" + ).values("object_pk", "permission__codename") + project_perm_map = map_obj_perm_by_perm_rank( + project_perms, + {"project_admin": 3, "project_editor": 2, "project_viewer": 1}, + ) + for project_id, codename in project_perm_map.items(): + assign_perm(codename, new_user, models.Project.objects.get(pk=project_id)) + + group_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="auth", content_type__model="group" + ).values("object_pk", "permission__codename") + group_perm_map = map_obj_perm_by_perm_rank( + group_perms, + {"group_admin": 2, "group_member": 1}, + ) + for group_id, codename in group_perm_map.items(): + assign_perm(codename, new_user, models.Group.objects.get(pk=group_id)) + + # Delete the old users, related objects will be cascade deleted + old_users.delete() + + +def map_obj_perm_by_perm_rank(user_obj_perms, perm_rank): + permission_map = {} + for perm in user_obj_perms: + obj_id = perm["object_pk"] + codename = perm["permission__codename"] + if permission_map.get(obj_id, None): + current_rank = perm_rank[permission_map[obj_id]] + new_rank = perm_rank[codename] + if new_rank > current_rank: + permission_map[obj_id] = codename + else: + permission_map[obj_id] = codename + + return permission_map diff --git a/promgen/admin.py b/promgen/admin.py index 79bee7b5..5c4f98fb 100644 --- a/promgen/admin.py +++ b/promgen/admin.py @@ -5,6 +5,8 @@ import guardian.admin from django import forms from django.contrib import admin +from django.contrib.auth.admin import UserAdmin +from django.contrib.auth.models import User from django.utils.html import format_html from promgen import actions, models, plugins @@ -173,3 +175,11 @@ def has_add_permission(self, request, obj=None): def has_change_permission(self, request, obj=None): return False + + +class PromgenUserAdmin(UserAdmin): + actions = [actions.merge_users_action] + + +admin.site.unregister(User) +admin.site.register(User, PromgenUserAdmin) diff --git a/promgen/templates/promgen/user_merge.html b/promgen/templates/promgen/user_merge.html new file mode 100644 index 00000000..16b48ec0 --- /dev/null +++ b/promgen/templates/promgen/user_merge.html @@ -0,0 +1,36 @@ +{% extends "admin/base_site.html" %} +{% load i18n static %} +{% load admin_urls %} +{% block breadcrumbs %} + +{% endblock %} +{% block content %} +

The following changes will be applied:

+ + + + + + +

Please choose the user that you want to keep:

+
{% csrf_token %} + + {% for user in users %} + +
+ {% endfor %} + + Cancel +
+{% endblock %} diff --git a/promgen/tests/test_actions.py b/promgen/tests/test_actions.py new file mode 100644 index 00000000..ef568249 --- /dev/null +++ b/promgen/tests/test_actions.py @@ -0,0 +1,138 @@ +# Copyright (c) 2024 LINE Corporation +# These sources are released under the terms of the MIT license: see LICENSE +from django.contrib.auth.models import User +from django.urls import reverse +from guardian.models import UserObjectPermission +from guardian.shortcuts import assign_perm + +from promgen import models, tests +from promgen.notification.email import NotificationEmail +from promgen.notification.user import NotificationUser + + +class ActionTests(tests.PromgenTest): + fixtures = ["testcases.yaml", "extras.yaml"] + + def test_merge_users(self): + test_shard = models.Shard.objects.get(pk=1) + test_service = models.Service.objects.get(pk=1) + + old_user_1 = User.objects.create_user(username="old_user_1") + old_user_2 = User.objects.create_user(username="old_user_2") + new_user = User.objects.create_user(username="new_user") + + service = models.Service.objects.create(name="Service", owner=old_user_1) + project = models.Project.objects.create( + name="Project", owner=old_user_1, service=service, shard=test_shard + ) + sender_email = NotificationEmail.create( + obj=service, value="example@example.com", owner=old_user_1 + ) + sender_user = NotificationUser.create(obj=service, value=old_user_1.pk, owner=old_user_1) + assign_perm("service_viewer", old_user_1, test_service) + + group = models.Group.objects.create(name="Test Group") + old_user_1.groups.add(group) + assign_perm("group_admin", old_user_1, group) + assign_perm("group_member", old_user_2, group) + + self.force_login(username="demo") + response = self.client.post( + reverse("admin:auth_user_changelist"), + { + "action": "merge_users_action", + "_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk], + "new_user_id": new_user.pk, + }, + ) + + # Check non-admin user cannot perform merge action + self.assertEqual(response.status_code, 302) + self.assertEqual( + response.url, + reverse("admin:login") + "?next=" + reverse("admin:auth_user_changelist"), + "Non-admin user redirected to login page", + ) + + # Check admin user can perform merge action + self.force_login(username="admin") + response = self.client.post( + reverse("admin:auth_user_changelist"), + { + "action": "merge_users_action", + "_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk], + "new_user_id": new_user.pk, + }, + ) + self.assertEqual(response.status_code, 302) + self.assertEqual( + response.url, + reverse("admin:auth_user_changelist"), + "Redirected back to user changelist after merge", + ) + + service.refresh_from_db() + project.refresh_from_db() + sender_email.refresh_from_db() + sender_user.refresh_from_db() + self.assertTrue(service.owner == new_user, "Service owner changed to new user") + self.assertTrue(project.owner == new_user, "Project owner changed to new user") + self.assertTrue(sender_email.owner == new_user, "Sender owner changed to new user") + self.assertTrue(sender_user.owner == new_user, "Sender owner changed to new user") + self.assertTrue(sender_user.value == str(new_user.pk), "Sender value changed to new user") + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=service.pk, + content_type__app_label="promgen", + content_type__model="service", + ).count(), + 1, + "New owner has only one permission on service", + ) + self.assertTrue( + new_user.has_perm("service_admin", service), "New owner has ADMIN permission" + ) + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=project.pk, + content_type__app_label="promgen", + content_type__model="project", + ).count(), + 1, + "New owner has only one permission on project", + ) + self.assertTrue( + new_user.has_perm("project_admin", project), "New owner has ADMIN permission" + ) + self.assertTrue( + new_user.has_perm("service_viewer", test_service), + "New user inherit existing permissions", + ) + self.assertTrue( + new_user.groups.filter(pk=group.pk).exists(), + "New user inherit group membership", + ) + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=group.pk, + content_type__app_label="auth", + content_type__model="group", + ).count(), + 1, + "New owner has only one permission on group", + ) + self.assertTrue( + new_user.has_perm("group_admin", group), + "New user inherit the highest permission on group", + ) + self.assertFalse( + UserObjectPermission.objects.filter( + user_id__in=[old_user_1.pk, old_user_2.pk] + ).exists(), + "Old users' object permissions deleted", + ) + self.assertFalse(User.objects.filter(pk=old_user_1.pk).exists(), "Old user 1 deleted") + self.assertFalse(User.objects.filter(pk=old_user_2.pk).exists(), "Old user 2 deleted")