From 7c6d02342d2567db3cd200899f2cc3c06a812eff Mon Sep 17 00:00:00 2001 From: Oliver Fritz Date: Wed, 1 Oct 2025 16:48:21 +0200 Subject: [PATCH 01/12] feat:add osm ref to results (wip) --- .../firebase_to_postgres/transfer_results.py | 15 +++++- .../generate_stats/project_stats.py | 47 ++++++++++++++++++- mapswipe_workers/requirements.txt | 1 + .../add_ref_to_results_for_conflation.sql | 2 + 4 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 postgres/scripts/add_ref_to_results_for_conflation.sql diff --git a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py index d3856618e..547fa61b8 100644 --- a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py +++ b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py @@ -1,5 +1,6 @@ import csv import io +import json from typing import List, Tuple import dateutil.parser @@ -269,6 +270,10 @@ def results_to_file( if type(result_data["results"]) is dict: for taskId, result in result_data["results"].items(): + + ref_data = result_data.get("ref", {}).get(taskId, {}) + ref_json = json.dumps(ref_data) if ref_data else None + if result_type == "geometry": result = geojson.dumps(geojson.GeometryCollection(result)) w.writerow( @@ -283,6 +288,7 @@ def results_to_file( result, app_version, client_type, + ref_json, ] ) elif type(result_data["results"]) is list: @@ -292,6 +298,10 @@ def results_to_file( # if first key (list index) is 5 # list indicies 0-4 will have value None for taskId, result in enumerate(result_data["results"]): + + ref_data = result_data.get("ref", {}).get(taskId, {}) + ref_json = json.dumps(ref_data) if ref_data else None + if result is None: continue else: @@ -309,6 +319,7 @@ def results_to_file( result, app_version, client_type, + ref_json, ] ) else: @@ -369,6 +380,7 @@ def save_results_to_postgres( "result", "app_version", "client_type", + "ref", ] p_con.copy_from(results_file, result_temp_table, columns) results_file.close() @@ -439,7 +451,8 @@ def save_results_to_postgres( SELECT ms.mapping_session_id, r.task_id, - {result_sql} + {result_sql}, + r.ref FROM {result_temp_table} r JOIN mapping_sessions ms ON ms.project_id = r.project_id diff --git a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py index 45804eae6..a7f0cd9cb 100644 --- a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py +++ b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py @@ -98,6 +98,7 @@ def get_results( Parse timestamp as datetime object and add attribute "day" for each result. Return None if there are no results for this project. Otherwise, return dataframe. + Include the 'ref' JSON field in integer results if it exists. Parameters ---------- @@ -108,7 +109,10 @@ def get_results( if result_table == "mapping_sessions_results_geometry": result_sql = "ST_AsGeoJSON(msr.result) as result" else: - result_sql = "msr.result" + result_sql = """ + (msr.result->>'result')::int as result, + msr.result->'ref' as ref + """ sql_query = sql.SQL( f""" @@ -504,6 +508,44 @@ def get_statistics_for_geometry_result_project(project_id: str): return project_stats_dict +def unify_refs(ref_list): + if not ref_list: + return None + first_ref = json.dumps(ref_list[0], sort_keys=True) + for r in ref_list[1:]: + if json.dumps(r, sort_keys=True) != first_ref: + return "multiple" + return ref_list[0] + + +def add_ref_to_agg_results( + results_df: pd.DataFrame, agg_results_df: pd.DataFrame +) -> pd.DataFrame: + """ + Add a 'ref' column to agg_results_df. + If all user refs for a task are identical, use that ref. + If refs differ, set ref to 'multiple'. + """ + + # collect refs per task + refs_per_task = ( + results_df.groupby(["project_id", "group_id", "task_id"])["ref"] + .apply(list) + .reset_index() + ) + + refs_per_task["ref"] = refs_per_task["ref"].apply(unify_refs) + + # merge into agg_results_df + agg_results_df = agg_results_df.merge( + refs_per_task[["project_id", "group_id", "task_id", "ref"]], + on=["project_id", "group_id", "task_id"], + how="left", + ) + + return agg_results_df + + def get_statistics_for_integer_result_project( project_id: str, project_info: pd.Series, generate_hot_tm_geometries: bool ) -> dict: @@ -550,6 +592,9 @@ def get_statistics_for_integer_result_project( tasks_df, project_info["custom_options"], ) + + agg_results_df = add_ref_to_agg_results(results_df, agg_results_df) + agg_results_df.to_csv(agg_results_filename, index_label="idx") geojson_functions.gzipped_csv_to_gzipped_geojson( diff --git a/mapswipe_workers/requirements.txt b/mapswipe_workers/requirements.txt index 588754060..f2b425a48 100644 --- a/mapswipe_workers/requirements.txt +++ b/mapswipe_workers/requirements.txt @@ -6,6 +6,7 @@ flake8==3.8.3 geojson==3.0.1 mapswipe-workers==3.0 pandas==1.5.2 +numpy==1.26.4 pre-commit==2.9.2 psycopg2-binary==2.9.3 python-dateutil==2.8.1 diff --git a/postgres/scripts/add_ref_to_results_for_conflation.sql b/postgres/scripts/add_ref_to_results_for_conflation.sql new file mode 100644 index 000000000..d6b0f1007 --- /dev/null +++ b/postgres/scripts/add_ref_to_results_for_conflation.sql @@ -0,0 +1,2 @@ +ALTER TABLE mapping_sessions_results +ADD COLUMN ref jsonb; From 25d9de7a1e9d2845a7bbb63df91b77ce3114a573 Mon Sep 17 00:00:00 2001 From: ofritz Date: Thu, 16 Oct 2025 17:24:51 +0200 Subject: [PATCH 02/12] feat(conflation): adjust sql script to add ref colum to results_temp table --- postgres/scripts/add_ref_to_results_for_conflation.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/postgres/scripts/add_ref_to_results_for_conflation.sql b/postgres/scripts/add_ref_to_results_for_conflation.sql index d6b0f1007..aa7e00e94 100644 --- a/postgres/scripts/add_ref_to_results_for_conflation.sql +++ b/postgres/scripts/add_ref_to_results_for_conflation.sql @@ -1,2 +1,5 @@ ALTER TABLE mapping_sessions_results ADD COLUMN ref jsonb; + +ALTER TABLE results_temp +ADD COLUMN ref jsonb; From 10b03c2cb2e9bb27d300ade1b0d1706d065782c6 Mon Sep 17 00:00:00 2001 From: ofritz Date: Thu, 16 Oct 2025 18:08:50 +0200 Subject: [PATCH 03/12] fix: replace empty string with null --- .../mapswipe_workers/firebase_to_postgres/transfer_results.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py index 547fa61b8..8b3b154da 100644 --- a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py +++ b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py @@ -272,7 +272,7 @@ def results_to_file( for taskId, result in result_data["results"].items(): ref_data = result_data.get("ref", {}).get(taskId, {}) - ref_json = json.dumps(ref_data) if ref_data else None + ref_json = json.dumps(ref_data) if ref_data else r"\N" if result_type == "geometry": result = geojson.dumps(geojson.GeometryCollection(result)) @@ -300,7 +300,7 @@ def results_to_file( for taskId, result in enumerate(result_data["results"]): ref_data = result_data.get("ref", {}).get(taskId, {}) - ref_json = json.dumps(ref_data) if ref_data else None + ref_json = json.dumps(ref_data) if ref_data else r"\N" if result is None: continue From afec6a89186331d0aa266f53b4800ffeda86e94e Mon Sep 17 00:00:00 2001 From: ofritz Date: Mon, 20 Oct 2025 17:19:56 +0200 Subject: [PATCH 04/12] fix(conflation): ref column in agg_results --- .../firebase_to_postgres/transfer_results.py | 4 +- .../generate_stats/project_stats.py | 58 +++++++------------ .../tests/unittests/test_project_stats.py | 26 +++++++++ 3 files changed, 49 insertions(+), 39 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py index 8b3b154da..9ae15bcdc 100644 --- a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py +++ b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py @@ -271,7 +271,7 @@ def results_to_file( if type(result_data["results"]) is dict: for taskId, result in result_data["results"].items(): - ref_data = result_data.get("ref", {}).get(taskId, {}) + ref_data = result_data.get("reference", {}).get(taskId, {}) ref_json = json.dumps(ref_data) if ref_data else r"\N" if result_type == "geometry": @@ -299,7 +299,7 @@ def results_to_file( # list indicies 0-4 will have value None for taskId, result in enumerate(result_data["results"]): - ref_data = result_data.get("ref", {}).get(taskId, {}) + ref_data = result_data.get("reference", {}).get(taskId, {}) ref_json = json.dumps(ref_data) if ref_data else r"\N" if result is None: diff --git a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py index a7f0cd9cb..b3a69b50e 100644 --- a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py +++ b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py @@ -5,6 +5,7 @@ import os import tempfile import typing +import csv import pandas as pd from pandas.api.types import is_numeric_dtype @@ -109,10 +110,7 @@ def get_results( if result_table == "mapping_sessions_results_geometry": result_sql = "ST_AsGeoJSON(msr.result) as result" else: - result_sql = """ - (msr.result->>'result')::int as result, - msr.result->'ref' as ref - """ + result_sql = "msr.result as result, msr.ref as ref" sql_query = sql.SQL( f""" @@ -431,6 +429,9 @@ def get_agg_results_by_task_id( :, ~agg_results_df.columns.str.contains("Unnamed") ] + # Add ref column + agg_results_df = add_ref_to_agg_results(results_df, agg_results_df) + return agg_results_df @@ -508,41 +509,26 @@ def get_statistics_for_geometry_result_project(project_id: str): return project_stats_dict -def unify_refs(ref_list): - if not ref_list: - return None - first_ref = json.dumps(ref_list[0], sort_keys=True) - for r in ref_list[1:]: - if json.dumps(r, sort_keys=True) != first_ref: - return "multiple" - return ref_list[0] - - -def add_ref_to_agg_results( - results_df: pd.DataFrame, agg_results_df: pd.DataFrame -) -> pd.DataFrame: +def add_ref_to_agg_results(results_df: pd.DataFrame, agg_results_df: pd.DataFrame) -> pd.DataFrame: """ - Add a 'ref' column to agg_results_df. - If all user refs for a task are identical, use that ref. - If refs differ, set ref to 'multiple'. + Adds a 'ref' column to agg_results_df for writing to CSV """ - # collect refs per task - refs_per_task = ( - results_df.groupby(["project_id", "group_id", "task_id"])["ref"] - .apply(list) - .reset_index() - ) - - refs_per_task["ref"] = refs_per_task["ref"].apply(unify_refs) + refs_per_task = results_df.groupby("task_id")["ref"].apply(list) - # merge into agg_results_df - agg_results_df = agg_results_df.merge( - refs_per_task[["project_id", "group_id", "task_id", "ref"]], - on=["project_id", "group_id", "task_id"], - how="left", - ) + ref_values = {} + for task_id, refs in refs_per_task.items(): + # Filter out None or empty dicts + refs = [r for r in refs if r not in (None, {}, "") and not pd.isna(r)] + if not refs: + continue + elif all(r == refs[0] for r in refs): + ref_values[task_id] = refs[0] + else: + ref_values[task_id] = refs + if ref_values: + agg_results_df["ref"] = agg_results_df["task_id"].map(ref_values).fillna("") return agg_results_df @@ -593,9 +579,7 @@ def get_statistics_for_integer_result_project( project_info["custom_options"], ) - agg_results_df = add_ref_to_agg_results(results_df, agg_results_df) - - agg_results_df.to_csv(agg_results_filename, index_label="idx") + agg_results_df.to_csv(agg_results_filename, index_label="idx", quotechar='"', quoting=csv.QUOTE_MINIMAL) geojson_functions.gzipped_csv_to_gzipped_geojson( filename=agg_results_filename, diff --git a/mapswipe_workers/tests/unittests/test_project_stats.py b/mapswipe_workers/tests/unittests/test_project_stats.py index 1bbe85162..7fa07ffb2 100644 --- a/mapswipe_workers/tests/unittests/test_project_stats.py +++ b/mapswipe_workers/tests/unittests/test_project_stats.py @@ -3,6 +3,7 @@ import pandas as pd from mapswipe_workers.generate_stats.project_stats import ( + add_ref_to_agg_results, add_missing_result_columns, calc_agreement, calc_count, @@ -172,6 +173,31 @@ def test_calc_parent_option_count(self): assert list(compared["other"].index) == updated_index assert list(compared["other"]) == updated_value + def test_add_ref_single_ref(self): + # All results have the same ref + results_df = pd.DataFrame({ + "task_id": ["t1", "t1"], + "ref": [{"osmId": 123, "osmType": "ways_poly"}, {"osmId": 123, "osmType": "ways_poly"}] + }) + agg_results_df = pd.DataFrame({"task_id": ["t1"]}) + updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy()) + self.assertIn("ref", updated_df.columns) + self.assertEqual(updated_df["ref"].iloc[0], {"osmId": 123, "osmType": "ways_poly"}) + + def test_add_ref_multiple_refs(self): + # Different refs for same task + results_df = pd.DataFrame({ + "task_id": ["t1", "t1"], + "ref": [{"osmId": 123}, {"osmId": 456}] + }) + agg_results_df = pd.DataFrame({"task_id": ["t1"]}) + updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy()) + self.assertIn("ref", updated_df.columns) + self.assertEqual( + updated_df["ref"].iloc[0], + [{"osmId": 123}, {"osmId": 456}] + ) + if __name__ == "__main__": unittest.main() From 8fa1bba93203232f69b5f1bcc7fdac8555c8a7f4 Mon Sep 17 00:00:00 2001 From: ofritz Date: Mon, 20 Oct 2025 17:26:38 +0200 Subject: [PATCH 05/12] style(conflation): formatting --- .../generate_stats/project_stats.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py index b3a69b50e..d849ce65c 100644 --- a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py +++ b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py @@ -1,11 +1,11 @@ import ast +import csv import datetime import gzip import json import os import tempfile import typing -import csv import pandas as pd from pandas.api.types import is_numeric_dtype @@ -509,7 +509,9 @@ def get_statistics_for_geometry_result_project(project_id: str): return project_stats_dict -def add_ref_to_agg_results(results_df: pd.DataFrame, agg_results_df: pd.DataFrame) -> pd.DataFrame: +def add_ref_to_agg_results( + results_df: pd.DataFrame, agg_results_df: pd.DataFrame +) -> pd.DataFrame: """ Adds a 'ref' column to agg_results_df for writing to CSV """ @@ -579,7 +581,12 @@ def get_statistics_for_integer_result_project( project_info["custom_options"], ) - agg_results_df.to_csv(agg_results_filename, index_label="idx", quotechar='"', quoting=csv.QUOTE_MINIMAL) + agg_results_df.to_csv( + agg_results_filename, + index_label="idx", + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + ) geojson_functions.gzipped_csv_to_gzipped_geojson( filename=agg_results_filename, From a562833e9ee3a0ad306917f2093a86b8c64ddd6b Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 10:21:19 +0200 Subject: [PATCH 06/12] fix: remove numpy from requirements --- mapswipe_workers/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/mapswipe_workers/requirements.txt b/mapswipe_workers/requirements.txt index f2b425a48..588754060 100644 --- a/mapswipe_workers/requirements.txt +++ b/mapswipe_workers/requirements.txt @@ -6,7 +6,6 @@ flake8==3.8.3 geojson==3.0.1 mapswipe-workers==3.0 pandas==1.5.2 -numpy==1.26.4 pre-commit==2.9.2 psycopg2-binary==2.9.3 python-dateutil==2.8.1 From bb0d0122a3526d5730185210986961d2722fd690 Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 10:41:52 +0200 Subject: [PATCH 07/12] feat(conflation): add ref column to postgres results tables --- postgres/initdb.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/postgres/initdb.sql b/postgres/initdb.sql index f954d3a8c..296ac0c2e 100644 --- a/postgres/initdb.sql +++ b/postgres/initdb.sql @@ -93,7 +93,8 @@ CREATE TABLE IF NOT EXISTS results_temp ( end_time timestamp, result int, app_version varchar, - client_type varchar + client_type varchar, + ref jsonb ); -- create table for results import through csv @@ -192,6 +193,7 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results ( mapping_session_id int8, task_id varchar, result int2 not null, + ref jsonb, PRIMARY KEY (mapping_session_id, task_id), FOREIGN KEY (mapping_session_id) references mapping_sessions (mapping_session_id) From 2658c06b50d2c619993741663538976c3eee14cf Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 10:45:03 +0200 Subject: [PATCH 08/12] feat(conflation): add ref columns in postgres test setup --- mapswipe_workers/tests/integration/set_up_db.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mapswipe_workers/tests/integration/set_up_db.sql b/mapswipe_workers/tests/integration/set_up_db.sql index f954d3a8c..296ac0c2e 100644 --- a/mapswipe_workers/tests/integration/set_up_db.sql +++ b/mapswipe_workers/tests/integration/set_up_db.sql @@ -93,7 +93,8 @@ CREATE TABLE IF NOT EXISTS results_temp ( end_time timestamp, result int, app_version varchar, - client_type varchar + client_type varchar, + ref jsonb ); -- create table for results import through csv @@ -192,6 +193,7 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results ( mapping_session_id int8, task_id varchar, result int2 not null, + ref jsonb, PRIMARY KEY (mapping_session_id, task_id), FOREIGN KEY (mapping_session_id) references mapping_sessions (mapping_session_id) From 73b53ac55d508b21575093dd3c3c8347f1684d9f Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 15:02:54 +0200 Subject: [PATCH 09/12] feat(conflation): use separate mapping_sessions_refs table --- .../firebase_to_postgres/transfer_results.py | 23 +++++++-- .../generate_stats/project_stats.py | 35 +++++++------- .../tests/integration/set_up_db.sql | 10 +++- .../tests/unittests/test_project_stats.py | 47 +++++++++++++------ postgres/initdb.sql | 10 +++- .../add_ref_to_results_for_conflation.sql | 12 +++-- 6 files changed, 99 insertions(+), 38 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py index 9ae15bcdc..4d92c812b 100644 --- a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py +++ b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py @@ -432,6 +432,8 @@ def save_results_to_postgres( query_insert_mapping_sessions = f""" BEGIN; + + -- Create or ensure mapping_sessions exist INSERT INTO mapping_sessions SELECT project_id, @@ -445,21 +447,36 @@ def save_results_to_postgres( client_type FROM {result_temp_table} GROUP BY project_id, group_id, user_id, app_version, client_type - ON CONFLICT (project_id,group_id,user_id) + ON CONFLICT (project_id, group_id, user_id) + DO NOTHING; + + INSERT INTO {result_table} (mapping_session_id, task_id, result) + SELECT + ms.mapping_session_id, + r.task_id, + {result_sql} + FROM {result_temp_table} r + JOIN mapping_sessions ms ON + ms.project_id = r.project_id + AND ms.group_id = r.group_id + AND ms.user_id = r.user_id + ON CONFLICT (mapping_session_id, task_id) DO NOTHING; - INSERT INTO {result_table} + + INSERT INTO mapping_sessions_refs (mapping_session_id, task_id, ref) SELECT ms.mapping_session_id, r.task_id, - {result_sql}, r.ref FROM {result_temp_table} r JOIN mapping_sessions ms ON ms.project_id = r.project_id AND ms.group_id = r.group_id AND ms.user_id = r.user_id + WHERE r.ref IS NOT NULL ON CONFLICT (mapping_session_id, task_id) DO NOTHING; + COMMIT; """ p_con.query(query_insert_mapping_sessions) diff --git a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py index d849ce65c..d2b45c26e 100644 --- a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py +++ b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py @@ -110,7 +110,7 @@ def get_results( if result_table == "mapping_sessions_results_geometry": result_sql = "ST_AsGeoJSON(msr.result) as result" else: - result_sql = "msr.result as result, msr.ref as ref" + result_sql = "msr.result as result" sql_query = sql.SQL( f""" @@ -126,6 +126,7 @@ def get_results( ms.app_version, ms.client_type, {result_sql}, + refs.ref as ref, -- the username for users which login to MapSwipe with their -- OSM account is not defined or ''. -- We capture this here as it will cause problems @@ -138,7 +139,10 @@ def get_results( LEFT JOIN mapping_sessions ms ON ms.mapping_session_id = msr.mapping_session_id LEFT JOIN users U USING (user_id) - WHERE project_id = {"{}"} + LEFT JOIN mapping_sessions_refs refs + ON msr.mapping_session_id = refs.mapping_session_id + AND msr.task_id = refs.task_id + WHERE ms.project_id = {"{}"} ) TO STDOUT WITH CSV HEADER """ ).format(sql.Literal(project_id)) @@ -513,24 +517,23 @@ def add_ref_to_agg_results( results_df: pd.DataFrame, agg_results_df: pd.DataFrame ) -> pd.DataFrame: """ - Adds a 'ref' column to agg_results_df for writing to CSV + Adds a 'ref' column to agg_results_df if it exists in results_df. + For each task_id, all unique non-empty refs are collected into a list. + If no refs exist for a task, the corresponding value is empty string. + If results_df has no 'ref' column, agg_results_df is returned unchanged. """ + if "ref" not in results_df.columns: + return agg_results_df - refs_per_task = results_df.groupby("task_id")["ref"].apply(list) + refs_per_task = ( + results_df.groupby("task_id")["ref"] + .apply(lambda x: list({r for r in x if pd.notna(r) and r not in ({}, "")})) + .apply(lambda lst: json.dumps([json.loads(r) for r in lst]) if lst else "") + ) - ref_values = {} - for task_id, refs in refs_per_task.items(): - # Filter out None or empty dicts - refs = [r for r in refs if r not in (None, {}, "") and not pd.isna(r)] - if not refs: - continue - elif all(r == refs[0] for r in refs): - ref_values[task_id] = refs[0] - else: - ref_values[task_id] = refs + if refs_per_task.apply(lambda x: len(x) > 0).any(): + agg_results_df["ref"] = agg_results_df["task_id"].map(refs_per_task).fillna("") - if ref_values: - agg_results_df["ref"] = agg_results_df["task_id"].map(ref_values).fillna("") return agg_results_df diff --git a/mapswipe_workers/tests/integration/set_up_db.sql b/mapswipe_workers/tests/integration/set_up_db.sql index 296ac0c2e..9b0cb5a85 100644 --- a/mapswipe_workers/tests/integration/set_up_db.sql +++ b/mapswipe_workers/tests/integration/set_up_db.sql @@ -193,7 +193,6 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results ( mapping_session_id int8, task_id varchar, result int2 not null, - ref jsonb, PRIMARY KEY (mapping_session_id, task_id), FOREIGN KEY (mapping_session_id) references mapping_sessions (mapping_session_id) @@ -208,6 +207,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry ( references mapping_sessions (mapping_session_id) ); +CREATE TABLE IF NOT EXISTS mapping_sessions_refs ( + mapping_session_id int8, + task_id varchar, + ref JSONB not null, + PRIMARY KEY (mapping_session_id, task_id), + FOREIGN KEY (mapping_session_id) + references mapping_sessions (mapping_session_id) +); + CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger LANGUAGE plpgsql AS $$ diff --git a/mapswipe_workers/tests/unittests/test_project_stats.py b/mapswipe_workers/tests/unittests/test_project_stats.py index 7fa07ffb2..00d4272a9 100644 --- a/mapswipe_workers/tests/unittests/test_project_stats.py +++ b/mapswipe_workers/tests/unittests/test_project_stats.py @@ -1,10 +1,11 @@ +import json import unittest import pandas as pd from mapswipe_workers.generate_stats.project_stats import ( - add_ref_to_agg_results, add_missing_result_columns, + add_ref_to_agg_results, calc_agreement, calc_count, calc_parent_option_count, @@ -175,28 +176,46 @@ def test_calc_parent_option_count(self): def test_add_ref_single_ref(self): # All results have the same ref - results_df = pd.DataFrame({ - "task_id": ["t1", "t1"], - "ref": [{"osmId": 123, "osmType": "ways_poly"}, {"osmId": 123, "osmType": "ways_poly"}] - }) + results_df = pd.DataFrame( + { + "task_id": ["t1", "t1"], + "ref": [ + json.dumps({"osmId": 123, "osmType": "ways_poly"}), + json.dumps({"osmId": 123, "osmType": "ways_poly"}), + ], + } + ) agg_results_df = pd.DataFrame({"task_id": ["t1"]}) updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy()) + self.assertIn("ref", updated_df.columns) - self.assertEqual(updated_df["ref"].iloc[0], {"osmId": 123, "osmType": "ways_poly"}) + ref_value = json.loads(updated_df["ref"].iloc[0]) + self.assertEqual(ref_value, [{"osmId": 123, "osmType": "ways_poly"}]) def test_add_ref_multiple_refs(self): # Different refs for same task - results_df = pd.DataFrame({ - "task_id": ["t1", "t1"], - "ref": [{"osmId": 123}, {"osmId": 456}] - }) + results_df = pd.DataFrame( + { + "task_id": ["t1", "t1"], + "ref": [json.dumps({"osmId": 123}), json.dumps({"osmId": 456})], + } + ) agg_results_df = pd.DataFrame({"task_id": ["t1"]}) updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy()) + self.assertIn("ref", updated_df.columns) - self.assertEqual( - updated_df["ref"].iloc[0], - [{"osmId": 123}, {"osmId": 456}] - ) + ref_value = json.loads(updated_df["ref"].iloc[0]) + self.assertCountEqual(ref_value, [{"osmId": 123}, {"osmId": 456}]) + + def test_add_ref_no_refs_column(self): + # results_df has no 'ref' column + results_df = pd.DataFrame({"task_id": ["t1", "t2"], "result": [1, 2]}) + agg_results_df = pd.DataFrame({"task_id": ["t1", "t2"]}) + + updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy()) + + self.assertNotIn("ref", updated_df.columns) + pd.testing.assert_frame_equal(updated_df, agg_results_df) if __name__ == "__main__": diff --git a/postgres/initdb.sql b/postgres/initdb.sql index 296ac0c2e..9b0cb5a85 100644 --- a/postgres/initdb.sql +++ b/postgres/initdb.sql @@ -193,7 +193,6 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results ( mapping_session_id int8, task_id varchar, result int2 not null, - ref jsonb, PRIMARY KEY (mapping_session_id, task_id), FOREIGN KEY (mapping_session_id) references mapping_sessions (mapping_session_id) @@ -208,6 +207,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry ( references mapping_sessions (mapping_session_id) ); +CREATE TABLE IF NOT EXISTS mapping_sessions_refs ( + mapping_session_id int8, + task_id varchar, + ref JSONB not null, + PRIMARY KEY (mapping_session_id, task_id), + FOREIGN KEY (mapping_session_id) + references mapping_sessions (mapping_session_id) +); + CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger LANGUAGE plpgsql AS $$ diff --git a/postgres/scripts/add_ref_to_results_for_conflation.sql b/postgres/scripts/add_ref_to_results_for_conflation.sql index aa7e00e94..9981502f7 100644 --- a/postgres/scripts/add_ref_to_results_for_conflation.sql +++ b/postgres/scripts/add_ref_to_results_for_conflation.sql @@ -1,5 +1,11 @@ -ALTER TABLE mapping_sessions_results -ADD COLUMN ref jsonb; - ALTER TABLE results_temp ADD COLUMN ref jsonb; + +CREATE TABLE IF NOT EXISTS public.mapping_sessions_refs ( + mapping_session_id int8, + task_id varchar, + ref JSONB not null, + PRIMARY KEY (mapping_session_id, task_id), + FOREIGN KEY (mapping_session_id) + references mapping_sessions (mapping_session_id) +); From c0dd1a313491bcea2e6719490406f2b978c2a422 Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 15:46:05 +0200 Subject: [PATCH 10/12] fix(conflation): add ref to results_geometry_temp --- mapswipe_workers/tests/integration/set_up_db.sql | 3 ++- postgres/initdb.sql | 3 ++- postgres/scripts/add_ref_to_results_for_conflation.sql | 3 +++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/mapswipe_workers/tests/integration/set_up_db.sql b/mapswipe_workers/tests/integration/set_up_db.sql index 9b0cb5a85..e32265e9a 100644 --- a/mapswipe_workers/tests/integration/set_up_db.sql +++ b/mapswipe_workers/tests/integration/set_up_db.sql @@ -108,7 +108,8 @@ CREATE TABLE IF NOT EXISTS results_geometry_temp ( end_time timestamp, result varchar, app_version varchar, - client_type varchar + client_type varchar, + ref jsonb ); diff --git a/postgres/initdb.sql b/postgres/initdb.sql index 9b0cb5a85..e32265e9a 100644 --- a/postgres/initdb.sql +++ b/postgres/initdb.sql @@ -108,7 +108,8 @@ CREATE TABLE IF NOT EXISTS results_geometry_temp ( end_time timestamp, result varchar, app_version varchar, - client_type varchar + client_type varchar, + ref jsonb ); diff --git a/postgres/scripts/add_ref_to_results_for_conflation.sql b/postgres/scripts/add_ref_to_results_for_conflation.sql index 9981502f7..13c02864d 100644 --- a/postgres/scripts/add_ref_to_results_for_conflation.sql +++ b/postgres/scripts/add_ref_to_results_for_conflation.sql @@ -1,6 +1,9 @@ ALTER TABLE results_temp ADD COLUMN ref jsonb; +ALTER TABLE results_geometry_temp +ADD COLUMN ref jsonb; + CREATE TABLE IF NOT EXISTS public.mapping_sessions_refs ( mapping_session_id int8, task_id varchar, From fb074d66a1924b89916c6d4b20a2855061635afc Mon Sep 17 00:00:00 2001 From: ofritz Date: Tue, 21 Oct 2025 16:19:51 +0200 Subject: [PATCH 11/12] fix(conflation): add conflation project draft to integration fixtures, reflect ref column in results temp in tests --- .../conflation/projectDrafts/conflation.json | 25 +++++++++++++++++++ .../tests/integration/test_get_results.py | 1 + .../test_get_results_real_project.py | 1 + 3 files changed, 27 insertions(+) create mode 100644 mapswipe_workers/tests/integration/fixtures/conflation/projectDrafts/conflation.json diff --git a/mapswipe_workers/tests/integration/fixtures/conflation/projectDrafts/conflation.json b/mapswipe_workers/tests/integration/fixtures/conflation/projectDrafts/conflation.json new file mode 100644 index 000000000..1ace4d890 --- /dev/null +++ b/mapswipe_workers/tests/integration/fixtures/conflation/projectDrafts/conflation.json @@ -0,0 +1,25 @@ +{ + "createdBy": "atCSosZACaN0qhcVjtMO1tq9d1G3", + "geometry": "https://firebasestorage.googleapis.com/v0/b/dev-mapswipe.appspot.com/o/all_predictions_192.geojson?alt=media&token=b7a85e56-6ab1-4e0d-a734-a772025a88b8", + "filter": "way['building']", + "groupSize": 25, + "image": "https://firebasestorage.googleapis.com/v0/b/dev-mapswipe.appspot.com/o/projectImages%2F1742895229710-project-image-1x1.png?alt=media&token=26cf1956-9ab7-4348-b529-9952f2f8424e", + "lookFor": "Buildings", + "manualUrl": "https://fair-dev.hotosm.org/start-mapping/358", + "name": "Conflate fAIr buildings - Kathmandu (1)\nHOT", + "projectDetails": "This is a test.", + "projectNumber": 1, + "projectRegion": "Kathmandu", + "projectTopic": "Conflate fAIr buildings", + "projectTopicKey": "conflate fair buildings - kathmandu (1) hot", + "projectType": 8, + "requestingOrganisation": "HOT", + "tileServer": { + "credits": "Please add imagery credits here.", + "name": "custom", + "url": "https://2glp8ghj65.execute-api.us-east-1.amazonaws.com/cog/tiles/WebMercatorQuad/{z}/{x}/{y}@1x?url=https%3A%2F%2Foin-hotosm-temp.s3.us-east-1.amazonaws.com%2F62d85d11d8499800053796c1%2F0%2F62d85d11d8499800053796c2.tif", + "wmtsLayerName": "-" + }, + "tutorialId": "tutorial_-MQsj5VWpNcJxCTVTOyH", + "verificationNumber": 3 +} diff --git a/mapswipe_workers/tests/integration/test_get_results.py b/mapswipe_workers/tests/integration/test_get_results.py index b2cd58954..0eb9d4c77 100644 --- a/mapswipe_workers/tests/integration/test_get_results.py +++ b/mapswipe_workers/tests/integration/test_get_results.py @@ -38,6 +38,7 @@ def test_get_results_df_from_postgres(self): "app_version", "client_type", "result", + "ref", "username", "day", ], diff --git a/mapswipe_workers/tests/integration/test_get_results_real_project.py b/mapswipe_workers/tests/integration/test_get_results_real_project.py index 5fe1b346b..ae5848618 100644 --- a/mapswipe_workers/tests/integration/test_get_results_real_project.py +++ b/mapswipe_workers/tests/integration/test_get_results_real_project.py @@ -71,6 +71,7 @@ def test_get_results_df_from_postgres(self): "app_version", "client_type", "result", + "ref", "username", "day", ], From 004ac56a3e4893001676284796a33250296693b6 Mon Sep 17 00:00:00 2001 From: ofritz Date: Wed, 22 Oct 2025 17:39:33 +0200 Subject: [PATCH 12/12] remove unnecessary comments --- .../mapswipe_workers/firebase_to_postgres/transfer_results.py | 1 - .../mapswipe_workers/generate_stats/project_stats.py | 1 - 2 files changed, 2 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py index 4d92c812b..bc0072575 100644 --- a/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py +++ b/mapswipe_workers/mapswipe_workers/firebase_to_postgres/transfer_results.py @@ -433,7 +433,6 @@ def save_results_to_postgres( query_insert_mapping_sessions = f""" BEGIN; - -- Create or ensure mapping_sessions exist INSERT INTO mapping_sessions SELECT project_id, diff --git a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py index d2b45c26e..7d5f0147f 100644 --- a/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py +++ b/mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py @@ -433,7 +433,6 @@ def get_agg_results_by_task_id( :, ~agg_results_df.columns.str.contains("Unnamed") ] - # Add ref column agg_results_df = add_ref_to_agg_results(results_df, agg_results_df) return agg_results_df