From 7442ff2595bad497685034bd5471e7e6f1b2b85e Mon Sep 17 00:00:00 2001 From: hoang Date: Wed, 3 Dec 2025 14:17:21 +0700 Subject: [PATCH] Add Merge Users action to Admin Site We have introduced a new feature to the User model on the Administrator site that allows Promgen's admins to merge multiple users into one. After the merge, the selected user will assume ownership of all associated objects. User roles will be consolidated, retaining the highest level of permissions for each object. The other users will be permanently removed. --- promgen/actions.py | 109 ++++++++++++++++- promgen/admin.py | 10 ++ promgen/templates/promgen/user_merge.html | 36 ++++++ promgen/tests/test_actions.py | 138 ++++++++++++++++++++++ 4 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 promgen/templates/promgen/user_merge.html create mode 100644 promgen/tests/test_actions.py diff --git a/promgen/actions.py b/promgen/actions.py index f3047ddc6..e54ed5b95 100644 --- a/promgen/actions.py +++ b/promgen/actions.py @@ -1,6 +1,14 @@ from django.contrib import admin, messages +from django.db import transaction +from django.http import HttpResponseRedirect +from django.template.response import TemplateResponse +from django.utils.translation import gettext as _ +from guardian.models import UserObjectPermission +from guardian.shortcuts import assign_perm +from social_django.models import UserSocialAuth -from promgen import tasks +from promgen import models, tasks +from promgen.notification.user import NotificationUser @admin.action(description="Clear Tombstones") @@ -54,3 +62,102 @@ def shard_rules(modeladmin, request, queryset): def shard_urls(modeladmin, request, queryset): for shard in queryset: prometheus_urls(modeladmin, request, shard.prometheus_set.all()) + + +@admin.action(description="Merge selected users") +def merge_users_action(modeladmin, request, queryset): + if "new_user_id" not in request.POST: + return TemplateResponse( + request, + "promgen/user_merge.html", + context={ + "title": _("Merge Users"), + "users": queryset, + }, + ) + + new_user_id = int(request.POST["new_user_id"]) + new_user = queryset.get(id=new_user_id) + old_users = queryset.exclude(id=new_user_id) + count = old_users.count() + + merge_users(old_users, new_user) + messages.success(request, f"Merged {count} user(s) into {new_user.username}") + + return HttpResponseRedirect(request.get_full_path()) + + +@transaction.atomic +def merge_users(old_users, new_user): + # Merge social auth accounts + UserSocialAuth.objects.filter(user__in=old_users).update(user=new_user) + + # Update owner fields + models.Sender.objects.filter(owner__in=old_users).update(owner=new_user) + models.Project.objects.filter(owner__in=old_users).update(owner=new_user) + models.Service.objects.filter(owner__in=old_users).update(owner=new_user) + + # Update sender value fields for notification.user type + models.Sender.objects.filter( + sender=NotificationUser.__module__, value__in=old_users.values_list("id", flat=True) + ).update(value=str(new_user.id)) + + # Copy group memberships and user permissions + for old_user in old_users: + for group in old_user.groups.all(): + new_user.groups.add(group) + for perm in old_user.user_permissions.all(): + new_user.user_permissions.add(perm) + + # Update object permissions. If all users have many permissions on the same object, we want to + # keep the highest level of permission to the new user. See map_obj_perm_by_perm_rank function. + user_ids = list(old_users.values_list("id", flat=True)) + [new_user.id] + + service_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="promgen", content_type__model="service" + ).values("object_pk", "permission__codename") + service_perm_map = map_obj_perm_by_perm_rank( + service_perms, + {"service_admin": 3, "service_editor": 2, "service_viewer": 1}, + ) + for service_id, codename in service_perm_map.items(): + assign_perm(codename, new_user, models.Service.objects.get(pk=service_id)) + + project_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="promgen", content_type__model="project" + ).values("object_pk", "permission__codename") + project_perm_map = map_obj_perm_by_perm_rank( + project_perms, + {"project_admin": 3, "project_editor": 2, "project_viewer": 1}, + ) + for project_id, codename in project_perm_map.items(): + assign_perm(codename, new_user, models.Project.objects.get(pk=project_id)) + + group_perms = UserObjectPermission.objects.filter( + user_id__in=user_ids, content_type__app_label="auth", content_type__model="group" + ).values("object_pk", "permission__codename") + group_perm_map = map_obj_perm_by_perm_rank( + group_perms, + {"group_admin": 2, "group_member": 1}, + ) + for group_id, codename in group_perm_map.items(): + assign_perm(codename, new_user, models.Group.objects.get(pk=group_id)) + + # Delete the old users, related objects will be cascade deleted + old_users.delete() + + +def map_obj_perm_by_perm_rank(user_obj_perms, perm_rank): + permission_map = {} + for perm in user_obj_perms: + obj_id = perm["object_pk"] + codename = perm["permission__codename"] + if permission_map.get(obj_id, None): + current_rank = perm_rank[permission_map[obj_id]] + new_rank = perm_rank[codename] + if new_rank > current_rank: + permission_map[obj_id] = codename + else: + permission_map[obj_id] = codename + + return permission_map diff --git a/promgen/admin.py b/promgen/admin.py index 79bee7b5a..5c4f98fb1 100644 --- a/promgen/admin.py +++ b/promgen/admin.py @@ -5,6 +5,8 @@ import guardian.admin from django import forms from django.contrib import admin +from django.contrib.auth.admin import UserAdmin +from django.contrib.auth.models import User from django.utils.html import format_html from promgen import actions, models, plugins @@ -173,3 +175,11 @@ def has_add_permission(self, request, obj=None): def has_change_permission(self, request, obj=None): return False + + +class PromgenUserAdmin(UserAdmin): + actions = [actions.merge_users_action] + + +admin.site.unregister(User) +admin.site.register(User, PromgenUserAdmin) diff --git a/promgen/templates/promgen/user_merge.html b/promgen/templates/promgen/user_merge.html new file mode 100644 index 000000000..16b48ec0c --- /dev/null +++ b/promgen/templates/promgen/user_merge.html @@ -0,0 +1,36 @@ +{% extends "admin/base_site.html" %} +{% load i18n static %} +{% load admin_urls %} +{% block breadcrumbs %} + +{% endblock %} +{% block content %} +

The following changes will be applied:

+ + + + + + +

Please choose the user that you want to keep:

+
{% csrf_token %} + + {% for user in users %} + +
+ {% endfor %} + + Cancel +
+{% endblock %} diff --git a/promgen/tests/test_actions.py b/promgen/tests/test_actions.py new file mode 100644 index 000000000..ef5682495 --- /dev/null +++ b/promgen/tests/test_actions.py @@ -0,0 +1,138 @@ +# Copyright (c) 2024 LINE Corporation +# These sources are released under the terms of the MIT license: see LICENSE +from django.contrib.auth.models import User +from django.urls import reverse +from guardian.models import UserObjectPermission +from guardian.shortcuts import assign_perm + +from promgen import models, tests +from promgen.notification.email import NotificationEmail +from promgen.notification.user import NotificationUser + + +class ActionTests(tests.PromgenTest): + fixtures = ["testcases.yaml", "extras.yaml"] + + def test_merge_users(self): + test_shard = models.Shard.objects.get(pk=1) + test_service = models.Service.objects.get(pk=1) + + old_user_1 = User.objects.create_user(username="old_user_1") + old_user_2 = User.objects.create_user(username="old_user_2") + new_user = User.objects.create_user(username="new_user") + + service = models.Service.objects.create(name="Service", owner=old_user_1) + project = models.Project.objects.create( + name="Project", owner=old_user_1, service=service, shard=test_shard + ) + sender_email = NotificationEmail.create( + obj=service, value="example@example.com", owner=old_user_1 + ) + sender_user = NotificationUser.create(obj=service, value=old_user_1.pk, owner=old_user_1) + assign_perm("service_viewer", old_user_1, test_service) + + group = models.Group.objects.create(name="Test Group") + old_user_1.groups.add(group) + assign_perm("group_admin", old_user_1, group) + assign_perm("group_member", old_user_2, group) + + self.force_login(username="demo") + response = self.client.post( + reverse("admin:auth_user_changelist"), + { + "action": "merge_users_action", + "_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk], + "new_user_id": new_user.pk, + }, + ) + + # Check non-admin user cannot perform merge action + self.assertEqual(response.status_code, 302) + self.assertEqual( + response.url, + reverse("admin:login") + "?next=" + reverse("admin:auth_user_changelist"), + "Non-admin user redirected to login page", + ) + + # Check admin user can perform merge action + self.force_login(username="admin") + response = self.client.post( + reverse("admin:auth_user_changelist"), + { + "action": "merge_users_action", + "_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk], + "new_user_id": new_user.pk, + }, + ) + self.assertEqual(response.status_code, 302) + self.assertEqual( + response.url, + reverse("admin:auth_user_changelist"), + "Redirected back to user changelist after merge", + ) + + service.refresh_from_db() + project.refresh_from_db() + sender_email.refresh_from_db() + sender_user.refresh_from_db() + self.assertTrue(service.owner == new_user, "Service owner changed to new user") + self.assertTrue(project.owner == new_user, "Project owner changed to new user") + self.assertTrue(sender_email.owner == new_user, "Sender owner changed to new user") + self.assertTrue(sender_user.owner == new_user, "Sender owner changed to new user") + self.assertTrue(sender_user.value == str(new_user.pk), "Sender value changed to new user") + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=service.pk, + content_type__app_label="promgen", + content_type__model="service", + ).count(), + 1, + "New owner has only one permission on service", + ) + self.assertTrue( + new_user.has_perm("service_admin", service), "New owner has ADMIN permission" + ) + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=project.pk, + content_type__app_label="promgen", + content_type__model="project", + ).count(), + 1, + "New owner has only one permission on project", + ) + self.assertTrue( + new_user.has_perm("project_admin", project), "New owner has ADMIN permission" + ) + self.assertTrue( + new_user.has_perm("service_viewer", test_service), + "New user inherit existing permissions", + ) + self.assertTrue( + new_user.groups.filter(pk=group.pk).exists(), + "New user inherit group membership", + ) + self.assertEqual( + UserObjectPermission.objects.filter( + user=new_user, + object_pk=group.pk, + content_type__app_label="auth", + content_type__model="group", + ).count(), + 1, + "New owner has only one permission on group", + ) + self.assertTrue( + new_user.has_perm("group_admin", group), + "New user inherit the highest permission on group", + ) + self.assertFalse( + UserObjectPermission.objects.filter( + user_id__in=[old_user_1.pk, old_user_2.pk] + ).exists(), + "Old users' object permissions deleted", + ) + self.assertFalse(User.objects.filter(pk=old_user_1.pk).exists(), "Old user 1 deleted") + self.assertFalse(User.objects.filter(pk=old_user_2.pk).exists(), "Old user 2 deleted")