Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: store logs as Redis hashes #823

Merged
merged 1 commit into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions wp1/base_db_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from wp1.environment import Environment
from wp1.models.wp10.selection import Selection

from wp1.redis_db import connect as redis_connect

from wp1.models.wp10.rating import Rating

logger = logging.getLogger(__name__)

try:
Expand Down Expand Up @@ -93,10 +97,28 @@ def _setup_wp_one_db(self):
cursor.execute(stmt)
self.wp10db.commit()

def connect_redis_db(self):
if ENV != Environment.TEST:
raise ValueError(
'Database tests destroy data! They should only be run in the TEST env'
)
return redis_connect()

def _setup_redis_db(self):
self.redis = self.connect_redis_db()
self.redis.ping()
self.redis.flushdb()

def _cleanup_redis_db(self):
self.redis.flushdb()

def setUp(self):
self.addCleanup(self._cleanup_wp_one_db)
self._setup_wp_one_db()

self.addCleanup(self._cleanup_redis_db)
self._setup_redis_db()


class BaseWikiDbTest(WpOneAssertions):

Expand Down Expand Up @@ -144,6 +166,9 @@ def setUp(self):
self.addCleanup(self._cleanup_wp_one_db)
self._setup_wp_one_db()

self.addCleanup(self._cleanup_redis_db)
self._setup_redis_db()


def get_first_selection(wp10db):
with wp10db.cursor() as cursor:
Expand Down
68 changes: 55 additions & 13 deletions wp1/logic/log.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,59 @@
import attr
import datetime

import attr
from redis import Redis
from wp1.redis_db import gen_redis_log_key
from wp1.models.wp10.log import Log

# Redis does not allow None types. However if a log to be stored has a None
# we convert it to this value while storing on Redis and back to None
# when converting from Redis to python object
REDIS_NULL = b"__redis__none__"


def insert_or_update(redis: Redis, log: Log):
log_key = gen_redis_log_key(project=log.l_project,
namespace=log.l_namespace,
action=log.l_action,
article=log.l_article)
with redis.pipeline() as pipe:
mapping = {
k: REDIS_NULL if v is None else v for k, v in attr.asdict(log).items()
}
pipe.hset(log_key, mapping=mapping)
pipe.expire(log_key, datetime.timedelta(days=7))
pipe.execute()


def get_logs(
redis: Redis,
*,
project: str | bytes = "*",
namespace: str | bytes = "*",
action: str | bytes = "*",
article: str | bytes = "*",
start_dt: datetime.datetime | None = None,
) -> list[Log]:
"""Retrieve logs from Redis matching the given filters."""
key = gen_redis_log_key(project=project,
namespace=namespace,
action=action,
article=article)
logs: list[Log] = []
for log_key in redis.scan_iter(match=key, _type="HASH"):
data = redis.hgetall(log_key)
# convert the data according to the field types of the Log object
log_dict = {
k.decode("utf-8"): v if v != REDIS_NULL else None
for k, v in data.items()
}
if log_dict["l_namespace"] is not None:
log_dict["l_namespace"] = int(log_dict["l_namespace"])

log = Log(**log_dict)
# skip logs that are not newer than start_dt
if start_dt is not None and log.timestamp_dt < start_dt:
continue

Check warning on line 56 in wp1/logic/log.py

View check run for this annotation

Codecov / codecov/patch

wp1/logic/log.py#L56

Added line #L56 was not covered by tests
logs.append(log)

def insert_or_update(wp10db, log):
with wp10db.cursor() as cursor:
cursor.execute(
'''
INSERT INTO logging
(l_project, l_namespace, l_article, l_action, l_timestamp, l_old,
l_new, l_revision_timestamp)
VALUES
(%(l_project)s, %(l_namespace)s, %(l_article)s, %(l_action)s,
%(l_timestamp)s, %(l_old)s, %(l_new)s, %(l_revision_timestamp)s)
ON DUPLICATE KEY UPDATE l_article = l_article
''', attr.asdict(log))
return logs
6 changes: 3 additions & 3 deletions wp1/logic/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def get_pages_by_category(wikidb, category, ns=None):
yield Page(**result)


def update_page_moved(wp10db, project, old_ns, old_title, new_ns, new_title,
move_timestamp_dt):
def update_page_moved(wp10db, redis, project, old_ns, old_title, new_ns,
new_title, move_timestamp_dt):
logger.debug('Updating moves table for %s -> %s', old_title.decode('utf-8'),
new_title.decode('utf-8'))
db_timestamp = move_timestamp_dt.strftime(TS_FORMAT).encode('utf-8')
Expand All @@ -62,7 +62,7 @@ def update_page_moved(wp10db, project, old_ns, old_title, new_ns, new_title,
l_old=b'',
l_new=b'',
l_revision_timestamp=db_timestamp)
logic_log.insert_or_update(wp10db, new_log)
logic_log.insert_or_update(redis, new_log)


def _get_redirects_from_db(wikidb, namespace, title, timestamp_dt):
Expand Down
43 changes: 19 additions & 24 deletions wp1/logic/page_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from wp1.constants import TS_FORMAT
from wp1.logic import page as logic_page
from wp1.logic import project as logic_project
from wp1.logic import log as logic_log
from wp1.models.wp10.log import Log
from wp1.models.wp10.move import Move
from wp1.models.wp10.namespace import Namespace, NsType
Expand All @@ -21,10 +22,8 @@ def get_all_moves(wp10db):
return [Move(**db_move) for db_move in cursor.fetchall()]


def get_all_logs(wp10db):
with wp10db.cursor() as cursor:
cursor.execute('SELECT * FROM ' + Log.table_name)
return [Log(**db_log) for db_log in cursor.fetchall()]
def get_all_logs(redis):
return logic_log.get_logs(redis)


class LogicPageCategoryTest(BaseWikiDbTest):
Expand Down Expand Up @@ -251,8 +250,8 @@ def setUp(self):
self.timestamp_db = self.dt.strftime(TS_FORMAT).encode('utf-8')

def test_new_move(self):
logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

with self.wp10db.cursor() as cursor:
Expand All @@ -271,17 +270,13 @@ def test_new_move(self):
self.assertEqual(self.timestamp_db, move.m_timestamp)

def test_new_move_log(self):
logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

with self.wp10db.cursor() as cursor:
cursor.execute(
'''
SELECT * FROM logging
WHERE l_article = %(old_article)s
''', {'old_article': self.old_article})
log = Log(**cursor.fetchone())
logs = logic_log.get_logs(self.redis, article=self.old_article)
self.assertEqual(len(logs), 1)
log = logs[0]

self.assertIsNotNone(log)
self.assertEqual(self.old_ns, log.l_namespace)
Expand All @@ -292,25 +287,25 @@ def test_new_move_log(self):
self.assertEqual(self.timestamp_db, log.l_revision_timestamp)

def test_does_not_add_existing_move(self):
logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

all_moves = get_all_moves(self.wp10db)
self.assertEqual(1, len(all_moves))

def test_does_not_add_existing_log(self):
logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

logic_page.update_page_moved(self.wp10db, self.project, self.old_ns,
self.old_article, self.new_ns,
logic_page.update_page_moved(self.wp10db, self.redis, self.project,
self.old_ns, self.old_article, self.new_ns,
self.new_article, self.dt)

all_logs = get_all_logs(self.wp10db)
all_logs = get_all_logs(self.redis)
self.assertEqual(1, len(all_logs))
29 changes: 16 additions & 13 deletions wp1/logic/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ def update_project_by_name(project_name, track_progress=False):
if not project:
project = Project(p_project=project_name,
p_timestamp=GLOBAL_TIMESTAMP_WIKI)

update_project(wikidb,
wp10db,
redis,
project,
redis=redis,
track_progress=track_progress)

if track_progress:
Expand Down Expand Up @@ -338,9 +339,9 @@ def increment_progress_count(redis, project_name):

def update_project_assessments(wikidb,
wp10db,
redis,
project,
extra_assessments,
redis=None,
track_progress=False):
old_ratings = {}
for rating in logic_rating.get_project_ratings(wp10db, project.p_project):
Expand All @@ -365,9 +366,10 @@ def update_project_assessments(wikidb,
seen,
redis=redis,
track_progress=track_progress)
store_new_ratings(wp10db, new_ratings, old_ratings, rating_to_category)
store_new_ratings(wp10db, redis, new_ratings, old_ratings,
rating_to_category)

process_unseen_articles(wikidb, wp10db, project, old_ratings, seen)
process_unseen_articles(wikidb, wp10db, redis, project, old_ratings, seen)


def update_project_assessments_by_kind(wikidb,
Expand All @@ -377,7 +379,7 @@ def update_project_assessments_by_kind(wikidb,
kind,
old_ratings,
seen,
redis=None,
redis,
track_progress=False):
if kind not in (AssessmentKind.QUALITY, AssessmentKind.IMPORTANCE):
raise ValueError('Parameter "kind" was not one of QUALITY or IMPORTANCE')
Expand Down Expand Up @@ -441,7 +443,8 @@ def update_project_assessments_by_kind(wikidb,
return (new_ratings, rating_to_category)


def store_new_ratings(wp10db, new_ratings, old_ratings, rating_to_category):
def store_new_ratings(wp10db, redis, new_ratings, old_ratings,
rating_to_category):

def sort_rating_tuples(rating_tuple):
rating, kind, _ = rating_tuple
Expand All @@ -462,10 +465,10 @@ def sort_rating_tuples(rating_tuple):

if article_ref not in old_ratings or rating_changed:
logic_rating.insert_or_update(wp10db, rating, kind)
logic_rating.add_log_for_rating(wp10db, rating, kind, old_rating_value)
logic_rating.add_log_for_rating(redis, rating, kind, old_rating_value)


def process_unseen_articles(wikidb, wp10db, project, old_ratings, seen):
def process_unseen_articles(wikidb, wp10db, redis, project, old_ratings, seen):
denom = len(old_ratings.keys())
ratio = len(seen) / denom if denom != 0 else 'NaN'

Expand Down Expand Up @@ -499,7 +502,7 @@ def process_unseen_articles(wikidb, wp10db, project, old_ratings, seen):
move_data = logic_page.get_move_data(wp10db, wikidb, ns, title,
project.timestamp_dt)
if move_data is not None:
logic_page.update_page_moved(wp10db, project, ns, title,
logic_page.update_page_moved(wp10db, redis, project, ns, title,
move_data['dest_ns'],
move_data['dest_title'],
move_data['timestamp_dt'])
Expand Down Expand Up @@ -529,10 +532,10 @@ def process_unseen_articles(wikidb, wp10db, project, old_ratings, seen):
logic_rating.insert_or_update(wp10db, rating, kind)

if kind in (AssessmentKind.QUALITY, AssessmentKind.BOTH):
logic_rating.add_log_for_rating(wp10db, rating, AssessmentKind.QUALITY,
logic_rating.add_log_for_rating(redis, rating, AssessmentKind.QUALITY,
old_rating.r_quality)
if kind in (AssessmentKind.IMPORTANCE, AssessmentKind.BOTH):
logic_rating.add_log_for_rating(wp10db, rating, AssessmentKind.IMPORTANCE,
logic_rating.add_log_for_rating(redis, rating, AssessmentKind.IMPORTANCE,
old_rating.r_importance)

n += 1
Expand Down Expand Up @@ -609,14 +612,14 @@ def update_project_record(wp10db, project, metadata):
insert_or_update(wp10db, project)


def update_project(wikidb, wp10db, project, redis=None, track_progress=False):
def update_project(wikidb, wp10db, redis, project, track_progress=False):
extra_assessments = api_project.get_extra_assessments(project.p_project)

update_project_assessments(wikidb,
wp10db,
redis,
project,
extra_assessments,
redis=redis,
track_progress=track_progress)

cleanup_project(wp10db, project)
Expand Down
Loading