Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion promgen/actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
from django.contrib import admin, messages
from django.db import transaction
from django.http import HttpResponseRedirect
from django.template.response import TemplateResponse
from django.utils.translation import gettext as _
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm
from social_django.models import UserSocialAuth

from promgen import tasks
from promgen import models, tasks
from promgen.notification.user import NotificationUser


@admin.action(description="Clear Tombstones")
Expand Down Expand Up @@ -54,3 +62,102 @@ def shard_rules(modeladmin, request, queryset):
def shard_urls(modeladmin, request, queryset):
for shard in queryset:
prometheus_urls(modeladmin, request, shard.prometheus_set.all())


@admin.action(description="Merge selected users")
def merge_users_action(modeladmin, request, queryset):
if "new_user_id" not in request.POST:
return TemplateResponse(
request,
"promgen/user_merge.html",
context={
"title": _("Merge Users"),
"users": queryset,
},
)

new_user_id = int(request.POST["new_user_id"])
new_user = queryset.get(id=new_user_id)
old_users = queryset.exclude(id=new_user_id)
count = old_users.count()

merge_users(old_users, new_user)
messages.success(request, f"Merged {count} user(s) into {new_user.username}")

return HttpResponseRedirect(request.get_full_path())


@transaction.atomic
def merge_users(old_users, new_user):
# Merge social auth accounts
UserSocialAuth.objects.filter(user__in=old_users).update(user=new_user)

# Update owner fields
models.Sender.objects.filter(owner__in=old_users).update(owner=new_user)
models.Project.objects.filter(owner__in=old_users).update(owner=new_user)
models.Service.objects.filter(owner__in=old_users).update(owner=new_user)

# Update sender value fields for notification.user type
models.Sender.objects.filter(
sender=NotificationUser.__module__, value__in=old_users.values_list("id", flat=True)
).update(value=str(new_user.id))

# Copy group memberships and user permissions
for old_user in old_users:
for group in old_user.groups.all():
new_user.groups.add(group)
for perm in old_user.user_permissions.all():
new_user.user_permissions.add(perm)

# Update object permissions. If all users have many permissions on the same object, we want to
# keep the highest level of permission to the new user. See map_obj_perm_by_perm_rank function.
user_ids = list(old_users.values_list("id", flat=True)) + [new_user.id]

service_perms = UserObjectPermission.objects.filter(
user_id__in=user_ids, content_type__app_label="promgen", content_type__model="service"
).values("object_pk", "permission__codename")
service_perm_map = map_obj_perm_by_perm_rank(
service_perms,
{"service_admin": 3, "service_editor": 2, "service_viewer": 1},
)
for service_id, codename in service_perm_map.items():
assign_perm(codename, new_user, models.Service.objects.get(pk=service_id))

project_perms = UserObjectPermission.objects.filter(
user_id__in=user_ids, content_type__app_label="promgen", content_type__model="project"
).values("object_pk", "permission__codename")
project_perm_map = map_obj_perm_by_perm_rank(
project_perms,
{"project_admin": 3, "project_editor": 2, "project_viewer": 1},
)
for project_id, codename in project_perm_map.items():
assign_perm(codename, new_user, models.Project.objects.get(pk=project_id))

group_perms = UserObjectPermission.objects.filter(
user_id__in=user_ids, content_type__app_label="auth", content_type__model="group"
).values("object_pk", "permission__codename")
group_perm_map = map_obj_perm_by_perm_rank(
group_perms,
{"group_admin": 2, "group_member": 1},
)
for group_id, codename in group_perm_map.items():
assign_perm(codename, new_user, models.Group.objects.get(pk=group_id))

# Delete the old users, related objects will be cascade deleted
old_users.delete()


def map_obj_perm_by_perm_rank(user_obj_perms, perm_rank):
permission_map = {}
for perm in user_obj_perms:
obj_id = perm["object_pk"]
codename = perm["permission__codename"]
if permission_map.get(obj_id, None):
current_rank = perm_rank[permission_map[obj_id]]
new_rank = perm_rank[codename]
if new_rank > current_rank:
permission_map[obj_id] = codename
else:
permission_map[obj_id] = codename

return permission_map
10 changes: 10 additions & 0 deletions promgen/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import guardian.admin
from django import forms
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from django.contrib.auth.models import User
from django.utils.html import format_html

from promgen import actions, models, plugins
Expand Down Expand Up @@ -173,3 +175,11 @@ def has_add_permission(self, request, obj=None):

def has_change_permission(self, request, obj=None):
return False


class PromgenUserAdmin(UserAdmin):
actions = [actions.merge_users_action]


admin.site.unregister(User)
admin.site.register(User, PromgenUserAdmin)
36 changes: 36 additions & 0 deletions promgen/templates/promgen/user_merge.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{% extends "admin/base_site.html" %}
{% load i18n static %}
{% load admin_urls %}
{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">{% translate "Home" %}</a>
&rsaquo; <a href="{% url 'admin:auth_user_changelist' %}">{% translate "Users" %}</a>
&rsaquo; {% translate "Merge users" %}
</div>
{% endblock %}
{% block content %}
<h2>The following changes will be applied:</h2>
<ul>— Social Auth accounts will be linked to the kept user.</ul>
<ul>— All ownerships of existing objects will be transferred to the kept user.</ul>
<ul>— All memberships in groups will be transferred to the kept user.</ul>
<ul>— Permissions will be transferred to the kept user.
If there are many users with different permissions on the same object,
the kept user will get the highest level of permission.
</ul>
<ul>— Other users will be deleted.</ul>

<h2>Please choose the user that you want to keep:</h2>
<form method="post">{% csrf_token %}
<input type="hidden" name="action" value="merge_users_action">
{% for user in users %}
<label>
<input type="hidden" name="_selected_action" value="{{ user.id }}">
<input type="radio" name="new_user_id" value="{{ user.id }}">
{{ user.username }}
</label>
<br>
{% endfor %}
<input type="submit" name="apply" value="Merge">
<a href="{% url 'admin:auth_user_changelist' %}">Cancel</a>
</form>
{% endblock %}
138 changes: 138 additions & 0 deletions promgen/tests/test_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright (c) 2024 LINE Corporation
# These sources are released under the terms of the MIT license: see LICENSE
from django.contrib.auth.models import User
from django.urls import reverse
from guardian.models import UserObjectPermission
from guardian.shortcuts import assign_perm

from promgen import models, tests
from promgen.notification.email import NotificationEmail
from promgen.notification.user import NotificationUser


class ActionTests(tests.PromgenTest):
fixtures = ["testcases.yaml", "extras.yaml"]

def test_merge_users(self):
test_shard = models.Shard.objects.get(pk=1)
test_service = models.Service.objects.get(pk=1)

old_user_1 = User.objects.create_user(username="old_user_1")
old_user_2 = User.objects.create_user(username="old_user_2")
new_user = User.objects.create_user(username="new_user")

service = models.Service.objects.create(name="Service", owner=old_user_1)
project = models.Project.objects.create(
name="Project", owner=old_user_1, service=service, shard=test_shard
)
sender_email = NotificationEmail.create(
obj=service, value="[email protected]", owner=old_user_1
)
sender_user = NotificationUser.create(obj=service, value=old_user_1.pk, owner=old_user_1)
assign_perm("service_viewer", old_user_1, test_service)

group = models.Group.objects.create(name="Test Group")
old_user_1.groups.add(group)
assign_perm("group_admin", old_user_1, group)
assign_perm("group_member", old_user_2, group)

self.force_login(username="demo")
response = self.client.post(
reverse("admin:auth_user_changelist"),
{
"action": "merge_users_action",
"_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk],
"new_user_id": new_user.pk,
},
)

# Check non-admin user cannot perform merge action
self.assertEqual(response.status_code, 302)
self.assertEqual(
response.url,
reverse("admin:login") + "?next=" + reverse("admin:auth_user_changelist"),
"Non-admin user redirected to login page",
)

# Check admin user can perform merge action
self.force_login(username="admin")
response = self.client.post(
reverse("admin:auth_user_changelist"),
{
"action": "merge_users_action",
"_selected_action": [old_user_1.pk, old_user_2.pk, new_user.pk],
"new_user_id": new_user.pk,
},
)
self.assertEqual(response.status_code, 302)
self.assertEqual(
response.url,
reverse("admin:auth_user_changelist"),
"Redirected back to user changelist after merge",
)

service.refresh_from_db()
project.refresh_from_db()
sender_email.refresh_from_db()
sender_user.refresh_from_db()
self.assertTrue(service.owner == new_user, "Service owner changed to new user")
self.assertTrue(project.owner == new_user, "Project owner changed to new user")
self.assertTrue(sender_email.owner == new_user, "Sender owner changed to new user")
self.assertTrue(sender_user.owner == new_user, "Sender owner changed to new user")
self.assertTrue(sender_user.value == str(new_user.pk), "Sender value changed to new user")
self.assertEqual(
UserObjectPermission.objects.filter(
user=new_user,
object_pk=service.pk,
content_type__app_label="promgen",
content_type__model="service",
).count(),
1,
"New owner has only one permission on service",
)
self.assertTrue(
new_user.has_perm("service_admin", service), "New owner has ADMIN permission"
)
self.assertEqual(
UserObjectPermission.objects.filter(
user=new_user,
object_pk=project.pk,
content_type__app_label="promgen",
content_type__model="project",
).count(),
1,
"New owner has only one permission on project",
)
self.assertTrue(
new_user.has_perm("project_admin", project), "New owner has ADMIN permission"
)
self.assertTrue(
new_user.has_perm("service_viewer", test_service),
"New user inherit existing permissions",
)
self.assertTrue(
new_user.groups.filter(pk=group.pk).exists(),
"New user inherit group membership",
)
self.assertEqual(
UserObjectPermission.objects.filter(
user=new_user,
object_pk=group.pk,
content_type__app_label="auth",
content_type__model="group",
).count(),
1,
"New owner has only one permission on group",
)
self.assertTrue(
new_user.has_perm("group_admin", group),
"New user inherit the highest permission on group",
)
self.assertFalse(
UserObjectPermission.objects.filter(
user_id__in=[old_user_1.pk, old_user_2.pk]
).exists(),
"Old users' object permissions deleted",
)
self.assertFalse(User.objects.filter(pk=old_user_1.pk).exists(), "Old user 1 deleted")
self.assertFalse(User.objects.filter(pk=old_user_2.pk).exists(), "Old user 2 deleted")