Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import io
import json
from typing import List, Tuple

import dateutil.parser
Expand Down Expand Up @@ -269,6 +270,10 @@ def results_to_file(

if type(result_data["results"]) is dict:
for taskId, result in result_data["results"].items():

ref_data = result_data.get("reference", {}).get(taskId, {})
ref_json = json.dumps(ref_data) if ref_data else r"\N"

if result_type == "geometry":
result = geojson.dumps(geojson.GeometryCollection(result))
w.writerow(
Expand All @@ -283,6 +288,7 @@ def results_to_file(
result,
app_version,
client_type,
ref_json,
]
)
elif type(result_data["results"]) is list:
Expand All @@ -292,6 +298,10 @@ def results_to_file(
# if first key (list index) is 5
# list indicies 0-4 will have value None
for taskId, result in enumerate(result_data["results"]):

ref_data = result_data.get("reference", {}).get(taskId, {})
ref_json = json.dumps(ref_data) if ref_data else r"\N"

if result is None:
continue
else:
Expand All @@ -309,6 +319,7 @@ def results_to_file(
result,
app_version,
client_type,
ref_json,
]
)
else:
Expand Down Expand Up @@ -369,6 +380,7 @@ def save_results_to_postgres(
"result",
"app_version",
"client_type",
"ref",
]
p_con.copy_from(results_file, result_temp_table, columns)
results_file.close()
Expand Down Expand Up @@ -420,6 +432,7 @@ def save_results_to_postgres(

query_insert_mapping_sessions = f"""
BEGIN;

INSERT INTO mapping_sessions
SELECT
project_id,
Expand All @@ -433,9 +446,10 @@ def save_results_to_postgres(
client_type
FROM {result_temp_table}
GROUP BY project_id, group_id, user_id, app_version, client_type
ON CONFLICT (project_id,group_id,user_id)
ON CONFLICT (project_id, group_id, user_id)
DO NOTHING;
INSERT INTO {result_table}

INSERT INTO {result_table} (mapping_session_id, task_id, result)
SELECT
ms.mapping_session_id,
r.task_id,
Expand All @@ -447,6 +461,21 @@ def save_results_to_postgres(
AND ms.user_id = r.user_id
ON CONFLICT (mapping_session_id, task_id)
DO NOTHING;

INSERT INTO mapping_sessions_refs (mapping_session_id, task_id, ref)
SELECT
ms.mapping_session_id,
r.task_id,
r.ref
FROM {result_temp_table} r
JOIN mapping_sessions ms ON
ms.project_id = r.project_id
AND ms.group_id = r.group_id
AND ms.user_id = r.user_id
WHERE r.ref IS NOT NULL
ON CONFLICT (mapping_session_id, task_id)
DO NOTHING;

COMMIT;
"""
p_con.query(query_insert_mapping_sessions)
Expand Down
44 changes: 41 additions & 3 deletions mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ast
import csv
import datetime
import gzip
import json
Expand Down Expand Up @@ -98,6 +99,7 @@ def get_results(
Parse timestamp as datetime object and add attribute "day" for each result.
Return None if there are no results for this project.
Otherwise, return dataframe.
Include the 'ref' JSON field in integer results if it exists.

Parameters
----------
Expand All @@ -108,7 +110,7 @@ def get_results(
if result_table == "mapping_sessions_results_geometry":
result_sql = "ST_AsGeoJSON(msr.result) as result"
else:
result_sql = "msr.result"
result_sql = "msr.result as result"

sql_query = sql.SQL(
f"""
Expand All @@ -124,6 +126,7 @@ def get_results(
ms.app_version,
ms.client_type,
{result_sql},
refs.ref as ref,
-- the username for users which login to MapSwipe with their
-- OSM account is not defined or ''.
-- We capture this here as it will cause problems
Expand All @@ -136,7 +139,10 @@ def get_results(
LEFT JOIN mapping_sessions ms ON
ms.mapping_session_id = msr.mapping_session_id
LEFT JOIN users U USING (user_id)
WHERE project_id = {"{}"}
LEFT JOIN mapping_sessions_refs refs
ON msr.mapping_session_id = refs.mapping_session_id
AND msr.task_id = refs.task_id
WHERE ms.project_id = {"{}"}
) TO STDOUT WITH CSV HEADER
"""
).format(sql.Literal(project_id))
Expand Down Expand Up @@ -427,6 +433,8 @@ def get_agg_results_by_task_id(
:, ~agg_results_df.columns.str.contains("Unnamed")
]

agg_results_df = add_ref_to_agg_results(results_df, agg_results_df)

return agg_results_df


Expand Down Expand Up @@ -504,6 +512,30 @@ def get_statistics_for_geometry_result_project(project_id: str):
return project_stats_dict


def add_ref_to_agg_results(
results_df: pd.DataFrame, agg_results_df: pd.DataFrame
) -> pd.DataFrame:
"""
Adds a 'ref' column to agg_results_df if it exists in results_df.
For each task_id, all unique non-empty refs are collected into a list.
If no refs exist for a task, the corresponding value is empty string.
If results_df has no 'ref' column, agg_results_df is returned unchanged.
"""
if "ref" not in results_df.columns:
return agg_results_df

refs_per_task = (
results_df.groupby("task_id")["ref"]
.apply(lambda x: list({r for r in x if pd.notna(r) and r not in ({}, "")}))
.apply(lambda lst: json.dumps([json.loads(r) for r in lst]) if lst else "")
)

if refs_per_task.apply(lambda x: len(x) > 0).any():
agg_results_df["ref"] = agg_results_df["task_id"].map(refs_per_task).fillna("")

return agg_results_df


def get_statistics_for_integer_result_project(
project_id: str, project_info: pd.Series, generate_hot_tm_geometries: bool
) -> dict:
Expand Down Expand Up @@ -550,7 +582,13 @@ def get_statistics_for_integer_result_project(
tasks_df,
project_info["custom_options"],
)
agg_results_df.to_csv(agg_results_filename, index_label="idx")

agg_results_df.to_csv(
agg_results_filename,
index_label="idx",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
)

geojson_functions.gzipped_csv_to_gzipped_geojson(
filename=agg_results_filename,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"createdBy": "atCSosZACaN0qhcVjtMO1tq9d1G3",
"geometry": "https://firebasestorage.googleapis.com/v0/b/dev-mapswipe.appspot.com/o/all_predictions_192.geojson?alt=media&token=b7a85e56-6ab1-4e0d-a734-a772025a88b8",
"filter": "way['building']",
"groupSize": 25,
"image": "https://firebasestorage.googleapis.com/v0/b/dev-mapswipe.appspot.com/o/projectImages%2F1742895229710-project-image-1x1.png?alt=media&token=26cf1956-9ab7-4348-b529-9952f2f8424e",
"lookFor": "Buildings",
"manualUrl": "https://fair-dev.hotosm.org/start-mapping/358",
"name": "Conflate fAIr buildings - Kathmandu (1)\nHOT",
"projectDetails": "This is a test.",
"projectNumber": 1,
"projectRegion": "Kathmandu",
"projectTopic": "Conflate fAIr buildings",
"projectTopicKey": "conflate fair buildings - kathmandu (1) hot",
"projectType": 8,
"requestingOrganisation": "HOT",
"tileServer": {
"credits": "Please add imagery credits here.",
"name": "custom",
"url": "https://2glp8ghj65.execute-api.us-east-1.amazonaws.com/cog/tiles/WebMercatorQuad/{z}/{x}/{y}@1x?url=https%3A%2F%2Foin-hotosm-temp.s3.us-east-1.amazonaws.com%2F62d85d11d8499800053796c1%2F0%2F62d85d11d8499800053796c2.tif",
"wmtsLayerName": "-"
},
"tutorialId": "tutorial_-MQsj5VWpNcJxCTVTOyH",
"verificationNumber": 3
}
15 changes: 13 additions & 2 deletions mapswipe_workers/tests/integration/set_up_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ CREATE TABLE IF NOT EXISTS results_temp (
end_time timestamp,
result int,
app_version varchar,
client_type varchar
client_type varchar,
ref jsonb
);

-- create table for results import through csv
Expand All @@ -107,7 +108,8 @@ CREATE TABLE IF NOT EXISTS results_geometry_temp (
end_time timestamp,
result varchar,
app_version varchar,
client_type varchar
client_type varchar,
ref jsonb
);


Expand Down Expand Up @@ -206,6 +208,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry (
references mapping_sessions (mapping_session_id)
);

CREATE TABLE IF NOT EXISTS mapping_sessions_refs (
mapping_session_id int8,
task_id varchar,
ref JSONB not null,
PRIMARY KEY (mapping_session_id, task_id),
FOREIGN KEY (mapping_session_id)
references mapping_sessions (mapping_session_id)
);

CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger
LANGUAGE plpgsql AS
$$
Expand Down
1 change: 1 addition & 0 deletions mapswipe_workers/tests/integration/test_get_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_get_results_df_from_postgres(self):
"app_version",
"client_type",
"result",
"ref",
"username",
"day",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_get_results_df_from_postgres(self):
"app_version",
"client_type",
"result",
"ref",
"username",
"day",
],
Expand Down
45 changes: 45 additions & 0 deletions mapswipe_workers/tests/unittests/test_project_stats.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import unittest

import pandas as pd

from mapswipe_workers.generate_stats.project_stats import (
add_missing_result_columns,
add_ref_to_agg_results,
calc_agreement,
calc_count,
calc_parent_option_count,
Expand Down Expand Up @@ -172,6 +174,49 @@ def test_calc_parent_option_count(self):
assert list(compared["other"].index) == updated_index
assert list(compared["other"]) == updated_value

def test_add_ref_single_ref(self):
# All results have the same ref
results_df = pd.DataFrame(
{
"task_id": ["t1", "t1"],
"ref": [
json.dumps({"osmId": 123, "osmType": "ways_poly"}),
json.dumps({"osmId": 123, "osmType": "ways_poly"}),
],
}
)
agg_results_df = pd.DataFrame({"task_id": ["t1"]})
updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())

self.assertIn("ref", updated_df.columns)
ref_value = json.loads(updated_df["ref"].iloc[0])
self.assertEqual(ref_value, [{"osmId": 123, "osmType": "ways_poly"}])

def test_add_ref_multiple_refs(self):
# Different refs for same task
results_df = pd.DataFrame(
{
"task_id": ["t1", "t1"],
"ref": [json.dumps({"osmId": 123}), json.dumps({"osmId": 456})],
}
)
agg_results_df = pd.DataFrame({"task_id": ["t1"]})
updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())

self.assertIn("ref", updated_df.columns)
ref_value = json.loads(updated_df["ref"].iloc[0])
self.assertCountEqual(ref_value, [{"osmId": 123}, {"osmId": 456}])

def test_add_ref_no_refs_column(self):
# results_df has no 'ref' column
results_df = pd.DataFrame({"task_id": ["t1", "t2"], "result": [1, 2]})
agg_results_df = pd.DataFrame({"task_id": ["t1", "t2"]})

updated_df = add_ref_to_agg_results(results_df, agg_results_df.copy())

self.assertNotIn("ref", updated_df.columns)
pd.testing.assert_frame_equal(updated_df, agg_results_df)


if __name__ == "__main__":
unittest.main()
15 changes: 13 additions & 2 deletions postgres/initdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ CREATE TABLE IF NOT EXISTS results_temp (
end_time timestamp,
result int,
app_version varchar,
client_type varchar
client_type varchar,
ref jsonb
);

-- create table for results import through csv
Expand All @@ -107,7 +108,8 @@ CREATE TABLE IF NOT EXISTS results_geometry_temp (
end_time timestamp,
result varchar,
app_version varchar,
client_type varchar
client_type varchar,
ref jsonb
);


Expand Down Expand Up @@ -206,6 +208,15 @@ CREATE TABLE IF NOT EXISTS mapping_sessions_results_geometry (
references mapping_sessions (mapping_session_id)
);

CREATE TABLE IF NOT EXISTS mapping_sessions_refs (
mapping_session_id int8,
task_id varchar,
ref JSONB not null,
PRIMARY KEY (mapping_session_id, task_id),
FOREIGN KEY (mapping_session_id)
references mapping_sessions (mapping_session_id)
);

CREATE OR REPLACE FUNCTION mapping_sessions_results_constraint() RETURNS trigger
LANGUAGE plpgsql AS
$$
Expand Down
14 changes: 14 additions & 0 deletions postgres/scripts/add_ref_to_results_for_conflation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
ALTER TABLE results_temp
ADD COLUMN ref jsonb;

ALTER TABLE results_geometry_temp
ADD COLUMN ref jsonb;

CREATE TABLE IF NOT EXISTS public.mapping_sessions_refs (
mapping_session_id int8,
task_id varchar,
ref JSONB not null,
PRIMARY KEY (mapping_session_id, task_id),
FOREIGN KEY (mapping_session_id)
references mapping_sessions (mapping_session_id)
);
Loading