diff --git a/.github/workflows/missing_form_check_update.yaml b/.github/workflows/missing_form_check_update.yaml new file mode 100644 index 000000000..96219e1ad --- /dev/null +++ b/.github/workflows/missing_form_check_update.yaml @@ -0,0 +1,133 @@ +name: Create Automated PR +on: + schedule: + # Runs at 00:00 UTC on the first day of every month. + - cron: "0 0 1 * *" + workflow_dispatch: # allow manual trigger + +jobs: + check-repository: + runs-on: ubuntu-latest + outputs: + is_correct_repo: ${{ steps.check.outputs.is_correct_repo }} + steps: + - name: Check repository + id: check + run: | + if [ "$GITHUB_REPOSITORY" = "scribe-org/Scribe-Data" ]; then + echo "is_correct_repo=true" >> "$GITHUB_OUTPUT" + else + echo "is_correct_repo=false" >> "$GITHUB_OUTPUT" + echo "::warning::This workflow should only run in scribe-org/Scribe-Data repository." + fi + + create-pull-request: + needs: check-repository + if: needs.check-repository.outputs.is_correct_repo == 'true' + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.x" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install rich requests tqdm + pip install -e . + + - name: Generate Missing Features Data + run: | + # Set up paths + DUMP_PATH=$(PYTHONPATH=$PYTHONPATH:$(pwd)/src python src/scribe_data/check/check_missing_forms/download_wd.py | grep "DOWNLOAD_PATH=" | cut -d'=' -f2) + QUERY_DIR="$(pwd)/src/scribe_data/wikidata/language_data_extraction" + + echo "Dump path: ${DUMP_PATH}" + echo "Query directory: ${QUERY_DIR}" + + # Check if paths exist + if [ -n "${DUMP_PATH}" ] && [ -d "${QUERY_DIR}" ]; then + # Generate the missing features data with all keys processing. + PYTHONPATH=$PYTHONPATH:$(pwd)/src python src/scribe_data/check/check_missing_forms/check_missing_forms.py "${DUMP_PATH}" "${QUERY_DIR}" --process-all-keys + else + echo "Required paths not found:" + echo "Dump path exists: $([ -n "${DUMP_PATH}" ] && echo "Yes" || echo "No")" + echo "Query directory exists: $([ -d "${QUERY_DIR}" ] && echo "Yes" || echo "No")" + exit 1 + fi + + # Debug steps to understand the state. + - name: Debug Info + run: | + echo "Current branch: $(git branch --show-current)" + echo "List of changes:" + git status + + - name: Make changes + run: | + git add src/scribe_data/wikidata/language_data_extraction/**/*.sparql + git config --global user.email "github-actions[bot]@users.noreply.github.com" + git config --global user.name "github-actions[bot]" + + - name: Debug Missing Features Data + if: always() + run: | + # Print the contents of the missing features JSON file if it exists. + if [ -f missing_features.json ]; then + echo "Contents of missing_features.json:" + cat missing_features.json + else + echo "missing_features.json not found" + fi + + - name: Generate PR Body + id: pr-body + run: | + # Run the pr_body.py script with the missing features data. + PR_BODY_CONTENT=$(python src/scribe_data/check/check_missing_forms/pr_body.py missing_features.json) + + # Debug output. + echo "PR Body Content:" + echo "$PR_BODY_CONTENT" + + # Initialize PR body with delimiter + { + echo "body<> $GITHUB_OUTPUT + + - name: Debug PR Body Output + run: | + # Print the PR body content from the output. + echo "PR Body from GITHUB_OUTPUT:" + cat $GITHUB_OUTPUT + + - name: Create Pull Request + uses: peter-evans/create-pull-request@v5 + with: + token: ${{ secrets.GITHUB_TOKEN }} + title: "Automated PR: Updated Language Data Files" + body: ${{ steps.pr-body.outputs.body }} + base: master + branch: automated-missing-forms-pr + delete-branch: true + draft: false + commit-message: "[create-pull-request] automated change" + committer: GitHub + author: github-actions[bot] + + # Debug step to verify PR creation attempt. + - name: Check PR Creation + run: | + echo "Checking if PR was created..." + gh pr list + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/update_emojis.yaml b/.github/workflows/update_emojis.yaml new file mode 100644 index 000000000..14514cec4 --- /dev/null +++ b/.github/workflows/update_emojis.yaml @@ -0,0 +1,150 @@ +name: Check and Update Emoji Data +on: + schedule: + # Runs at 00:00 UTC on the first day of every month. + - cron: "0 0 1 * *" + workflow_dispatch: # allow manual trigger + +jobs: + check-repository: + runs-on: ubuntu-latest + outputs: + is_correct_repo: ${{ steps.check.outputs.is_correct_repo }} + steps: + - name: Check repository + id: check + run: | + if [ "$GITHUB_REPOSITORY" = "scribe-org/Scribe-Data" ]; then + echo "is_correct_repo=true" >> "$GITHUB_OUTPUT" + else + echo "is_correct_repo=false" >> "$GITHUB_OUTPUT" + echo "::warning::This workflow should only run in scribe-org/Scribe-Data repository." + fi + + check-and-update: + needs: check-repository + if: needs.check-repository.outputs.is_correct_repo == 'true' + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.x" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install requests + sudo apt-get install jq + + - name: Get language list + id: get-langs + run: | + # Fetch language list from GitHub API. + DERIVED_LANGS=$(curl -s https://api.github.com/repos/unicode-org/cldr-json/contents/cldr-json/cldr-annotations-derived-full/annotationsDerived | jq -r '.[].name') + FULL_LANGS=$(curl -s https://api.github.com/repos/unicode-org/cldr-json/contents/cldr-json/cldr-annotations-full/annotations | jq -r '.[].name') + + # Combine and deduplicate language lists. + LANG_LIST=$(echo "$DERIVED_LANGS $FULL_LANGS" | tr ' ' '\n' | sort -u | tr '\n' ' ') + echo "lang_list=${LANG_LIST}" >> $GITHUB_OUTPUT + echo "Detected languages: ${LANG_LIST}" + + - name: Download and check emoji data + id: check-updates + run: | + # Create directories if they don't exist. + mkdir -p src/scribe_data/unicode/cldr-annotations-derived-full + mkdir -p src/scribe_data/unicode/cldr-annotations-full + + CHANGES_EXIST=false + CHANGE_SUMMARY="| Language | Derived Changes | Full Changes |\n|----------|-----------------|--------------|" + + # Use dynamic language list from previous step. + for lang in ${{ steps.get-langs.outputs.lang_list }}; do + DERIVED_CHANGED="No" + FULL_CHANGED="No" + + # Download latest data for each language. + mkdir -p "src/scribe_data/unicode/cldr-annotations-derived-full/annotationsDerived/$lang" + mkdir -p "src/scribe_data/unicode/cldr-annotations-full/annotations/$lang" + + curl -L "https://raw.githubusercontent.com/unicode-org/cldr-json/main/cldr-json/cldr-annotations-derived-full/annotationsDerived/$lang/annotations.json" -o "new_derived_$lang.json" + curl -L "https://raw.githubusercontent.com/unicode-org/cldr-json/main/cldr-json/cldr-annotations-full/annotations/$lang/annotations.json" -o "new_full_$lang.json" + + # Check derived annotations. + if [ -f "src/scribe_data/unicode/cldr-annotations-derived-full/annotationsDerived/$lang/annotations.json" ]; then + if ! cmp -s "new_derived_$lang.json" "src/scribe_data/unicode/cldr-annotations-derived-full/annotationsDerived/$lang/annotations.json"; then + CHANGES_EXIST=true + DERIVED_CHANGED="Yes" + fi + else + CHANGES_EXIST=true + DERIVED_CHANGED="New" + fi + + # Check full annotations. + if [ -f "src/scribe_data/unicode/cldr-annotations-full/annotations/$lang/annotations.json" ]; then + if ! cmp -s "new_full_$lang.json" "src/scribe_data/unicode/cldr-annotations-full/annotations/$lang/annotations.json"; then + CHANGES_EXIST=true + FULL_CHANGED="Yes" + fi + else + CHANGES_EXIST=true + FULL_CHANGED="New" + fi + + # Only add to summary if there are changes. + if [ "$DERIVED_CHANGED" != "No" ] || [ "$FULL_CHANGED" != "No" ]; then + CHANGE_SUMMARY="$CHANGE_SUMMARY\n| $lang | $DERIVED_CHANGED | $FULL_CHANGED |" + fi + done + + echo "changes_exist=${CHANGES_EXIST}" >> $GITHUB_OUTPUT + echo "change_summary<> $GITHUB_OUTPUT + echo -e "$CHANGE_SUMMARY" >> $GITHUB_OUTPUT + echo "EOF" >> $GITHUB_OUTPUT + + - name: Update files if changed + if: steps.check-updates.outputs.changes_exist == 'true' + run: | + # Use dynamic language list. + for lang in ${{ steps.get-langs.outputs.lang_list }}; do + mkdir -p "src/scribe_data/unicode/cldr-annotations-derived-full/annotationsDerived/$lang" + mkdir -p "src/scribe_data/unicode/cldr-annotations-full/annotations/$lang" + + mv "new_derived_$lang.json" "src/scribe_data/unicode/cldr-annotations-derived-full/annotationsDerived/$lang/annotations.json" + mv "new_full_$lang.json" "src/scribe_data/unicode/cldr-annotations-full/annotations/$lang/annotations.json" + done + + git config --global user.email "github-actions[bot]@users.noreply.github.com" + git config --global user.name "github-actions[bot]" + + - name: Create Pull Request + if: steps.check-updates.outputs.changes_exist == 'true' + uses: peter-evans/create-pull-request@v5 + with: + token: ${{ secrets.GITHUB_TOKEN }} + title: "chore: Update emoji annotations data" + body: | + This PR updates the emoji annotations data from CLDR. + + ## Changes Summary + ${{ steps.check-updates.outputs.change_summary }} + + ### Legend: + - Yes: File was updated + - New: File was newly added + - No: No changes + + This is an automated PR created by the emoji data update workflow. + branch: update-emoji-data # branch name + delete-branch: true + commit-message: "chore: Update emoji annotations data" + labels: | + automated pr + emoji-data diff --git a/.gitignore b/.gitignore index 610b9da8f..475ba5045 100644 --- a/.gitignore +++ b/.gitignore @@ -40,7 +40,13 @@ scribe_data_csv_export/* scribe_data_json_export/* scribe_data_sqlite_export/* scribe_data_tsv_export/* +scribe_data_mediawiki_export/* +scribe_data_wikidata_dumps_export/* # MARK: Wiki Dumps *.json.bz2 + +# MARK: GitHub Actions + +missing_features.json diff --git a/src/scribe_data/check/__init__.py b/src/scribe_data/check/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/scribe_data/check/check_missing_forms/__init__.py b/src/scribe_data/check/check_missing_forms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/scribe_data/check/check_missing_forms/check_missing_forms.py b/src/scribe_data/check/check_missing_forms/check_missing_forms.py new file mode 100644 index 000000000..43039976e --- /dev/null +++ b/src/scribe_data/check/check_missing_forms/check_missing_forms.py @@ -0,0 +1,228 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Check for missing forms in Wikidata. +""" + +import argparse +import json +import sys +from collections import defaultdict +from pathlib import Path + +from generate_query import generate_query +from get_forms import extract_dump_forms, parse_sparql_files + +from scribe_data.utils import ( + data_type_metadata, + language_metadata, + lexeme_form_metadata, +) + + +def get_all_languages(): + """ + Extract all languages and sub languages from language metadata. + + Returns + ------- + list of str + List of language codes for all languages and sub languages that have + both ISO codes and QIDs defined. + + Notes + ----- + Only includes languages and sub languages that have both 'iso' and 'qid' + fields in their metadata. + """ + languages = [] + + for lang, lang_data in language_metadata.items(): + # Add main language if it has ISO and QID. + if "iso" in lang_data and "qid" in lang_data: + languages.append(lang) + + # Add sub languages. + if "sub_languages" in lang_data: + languages.extend( + sublang + for sublang, sublang_data in lang_data["sub_languages"].items() + if "iso" in sublang_data and "qid" in sublang_data + ) + + return languages + + +def get_missing_features(result_sparql, result_dump): + """ + Compare features between SPARQL results and dump data to find missing ones. + + Parameters + ---------- + result_sparql : dict + Features extracted from SPARQL queries. + Format: {language: {data_type: [features]}} + + result_dump : dict + Features extracted from Wikidata dump. + Format: {language: {data_type: [features]}} + + Returns + ------- + dict or None + Dictionary of missing features by language and data type if any found, + otherwise None. + Format: {language: {data_type: [missing_features]}} + + Notes + ----- + Only includes features that have valid QIDs present in lexeme_form_metadata. + """ + missing_by_lang_type = defaultdict(lambda: defaultdict(list)) + + # Extract all QIDs from the metadata. + all_qids = set() + for category, items in lexeme_form_metadata.items(): + for key, value in items.items(): + all_qids.add(value["qid"]) + + # Compare features for each language and data type. + for lang in result_sparql: + if lang in result_dump: + # Get all unique data types from both sources. + all_data_types = set(result_sparql[lang].keys()) | set( + result_dump[lang].keys() + ) + + for dt in all_data_types: + sparql_values = set() + dump_values = set() + + # Get values from SPARQL if available. + if dt in result_sparql[lang]: + sparql_values = {tuple(item) for item in result_sparql[lang][dt]} + + # Get values from dump if available. + if dt in result_dump[lang]: + dump_values = {tuple(item) for item in result_dump[lang][dt]} + + # Get unique values from both sources. + unique_dump_values = dump_values - sparql_values + unique_sparql_values = sparql_values - dump_values + + # Store valid missing features from dump. + for item in unique_dump_values: + if all(qid in all_qids for qid in item): + missing_by_lang_type[lang][dt].append(list(item)) + + # Store valid missing features from SPARQL. + for item in unique_sparql_values: + if all(qid in all_qids for qid in item): + missing_by_lang_type[lang][dt].append(list(item)) + + return missing_by_lang_type or None + + +def process_missing_features(missing_features, query_dir): + """ + Generate SPARQL queries for missing features by language and data type. + + Parameters + ---------- + missing_features : dict + Dictionary of missing features by language and data type. + Format: {language: {data_type: [features]}} + + query_dir : str or Path + Directory where generated query files should be saved. + + Notes + ----- + Generates separate queries for each data type within each language. + """ + if not missing_features: + return + + for language, data_types in missing_features.items(): + print(f"Processing language: {language}") + print(f"Data types: {list(data_types.keys())}") + + # Create a separate entry for each data type. + for data_type, features in data_types.items(): + language_entry = {language: {data_type: features}} + print(f"Generating query for {language} - {data_type}") + generate_query(language_entry, query_dir) + + +def main(): + """ + Main function to check for missing forms in Wikidata. + + Processes command line arguments, downloads and compares Wikidata dump data + with SPARQL query results to identify missing features, and generates + appropriate SPARQL queries. + + Notes + ----- + Required command line arguments: + - dump_path: Path to the Wikidata dump file + - query_dir: Directory for storing generated queries + + Optional arguments: + - --process-all-keys: Flag to process all nested keys in missing features + """ + parser = argparse.ArgumentParser(description="Check missing forms in Wikidata") + parser.add_argument("dump_path", type=str, help="Path to the dump file") + parser.add_argument("query_dir", type=str, help="Path to the query directory") + parser.add_argument( + "--process-all-keys", + action="store_true", + help="Process all nested keys in the missing features", + ) + + args = parser.parse_args() + + dump_path = Path(args.dump_path) + query_dir = Path(args.query_dir) + + if not dump_path.exists(): + print(f"Error: Dump path does not exist: {dump_path}") + sys.exit(1) + + if not query_dir.exists(): + print(f"Error: Query directory does not exist: {query_dir}") + sys.exit(1) + + # Get all languages including sub languages. + languages = get_all_languages() + + print("Parsing SPARQL files...") + result_sparql = parse_sparql_files() + + print("Extracting Wiki lexeme dump...") + result_dump = extract_dump_forms( + languages=languages, + data_types=list(data_type_metadata.keys()), + file_path=dump_path, + ) + + missing_features = get_missing_features(result_sparql, result_dump) + + try: + print("Generated missing features:", missing_features) + + # Save the missing features to a JSON file. + with open("missing_features.json", "w") as f: + json.dump(missing_features, f, indent=4) + print("Missing features data has been saved to missing_features.json") + + if missing_features: + # Process all data types for each language. + process_missing_features(missing_features, query_dir) + + except Exception as e: + print(f"An error occurred: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/scribe_data/check/check_missing_forms/download_wd.py b/src/scribe_data/check/check_missing_forms/download_wd.py new file mode 100644 index 000000000..c8efb1378 --- /dev/null +++ b/src/scribe_data/check/check_missing_forms/download_wd.py @@ -0,0 +1,87 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Download Wikidata lexeme dump. +""" + +import os +from pathlib import Path + +import requests + +from scribe_data.cli.download import download_wd_lexeme_dump +from scribe_data.utils import DEFAULT_DUMP_EXPORT_DIR + + +def wd_lexeme_dump_download(wikidata_dump=None, output_dir=None): + """ + Download Wikidata lexeme dumps automatically. + + Parameters + ---------- + wikidata_dump : str, optional + Date string in YYYYMMDD format for specific dumps. + If None, downloads the latest dump. + + output_dir : str, optional + Directory path for the downloaded file. + If None, uses DEFAULT_DUMP_EXPORT_DIR. + + Returns + ------- + str or False + Path to downloaded file if successful, False otherwise. + + Notes + ----- + - Downloads are skipped if the file already exists in the output directory. + - Progress is displayed every 50MB during download. + - Creates output directory if it doesn't exist. + """ + dump_url = download_wd_lexeme_dump(wikidata_dump or "latest-lexemes") + + if not dump_url: + print("No dump URL found.") + return False + + output_dir = output_dir or DEFAULT_DUMP_EXPORT_DIR + os.makedirs(output_dir, exist_ok=True) + + filename = dump_url.split("/")[-1] + output_path = str(Path(output_dir) / filename) + + # Check if the file already exists. + if os.path.exists(output_path): + print(f"File already exists: {output_path}. Skipping download.") + return output_path + + # Proceed with the download if the file does not exist. + print(f"Downloading dump to {output_path}...") + + try: + response = requests.get(dump_url, stream=True) + total_size = int(response.headers.get("content-length", 0)) + downloaded_size = 0 + + with open(output_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + downloaded_size += len(chunk) + # Print progress percentage every 50MB. + if total_size and downloaded_size % (50 * 1024 * 1024) < 8192: + progress = (downloaded_size / total_size) * 100 + print(f"Download progress: {progress:.1f}%") + + print("Download completed successfully!") + return output_path + + except requests.exceptions.RequestException as e: + print(f"Error downloading dump: {e}") + + except Exception as e: + print(f"An error occurred: {e}") + + +if __name__ == "__main__": + if output_path := wd_lexeme_dump_download(): + print(f"DOWNLOAD_PATH={output_path}") diff --git a/src/scribe_data/check/check_missing_forms/generate_query.py b/src/scribe_data/check/check_missing_forms/generate_query.py new file mode 100644 index 000000000..42f04eacb --- /dev/null +++ b/src/scribe_data/check/check_missing_forms/generate_query.py @@ -0,0 +1,154 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Generate SPARQL queries for missing lexeme forms. +""" + +import os +from pathlib import Path + +from scribe_data.utils import ( + LANGUAGE_DATA_EXTRACTION_DIR as language_data_extraction, +) +from scribe_data.utils import ( + data_type_metadata, + language_metadata, + lexeme_form_metadata, +) + + +def generate_query(missing_features, query_dir=None): + """ + Generate SPARQL queries for missing lexeme forms. + + Parameters + ---------- + missing_features : dict + Dictionary containing missing features by language and data type. + Format: {language_qid: {data_type_qid: [[form_qids]]}} + + query_dir : str or Path, optional + Directory where query files should be saved. + If None, uses default language_data_extraction directory. + + Returns + ------- + str + Path to the generated query file. + + Notes + ----- + - Generates a single query file combining all forms for a given language and data type combination. + - Query files are named incrementally if duplicates exist. + - Creates necessary directories if they don't exist. + """ + language_qid = next(iter(missing_features.keys())) + data_type_qid = next(iter(missing_features[language_qid].keys())) + + # Find the language entry by QID. + language_entry = next( + (name, data) + for name, data in language_metadata.items() + if data.get("qid") == language_qid + ) + language = language_entry[0] # the language name + + data_type = next( + name for name, qid in data_type_metadata.items() if qid == data_type_qid + ) + + iso_code = language_metadata[language]["iso"] + + # Create a QID to label mapping from the metadata. + qid_to_label = {} + for category in lexeme_form_metadata.values(): + for item in category.values(): + qid_to_label[item["qid"]] = item["label"] + + # Process all forms at once. + forms_query = [] + all_form_combinations = missing_features[language_qid][data_type_qid] + for form_qids in all_form_combinations: + # Convert QIDs to labels and join them together. + labels = [qid_to_label.get(qid, qid) for qid in form_qids] + concatenated_label = "".join(labels) + + # Make first letter lowercase. + concatenated_label = concatenated_label[0].lower() + concatenated_label[1:] + forms_query.append({"label": concatenated_label, "qids": form_qids}) + + # Generate a single query for all forms. + main_body = f"""# tool: scribe-data +# All {language} ({language_qid}) {data_type} ({data_type_qid}) and their forms. +# Enter this query at https://query.wikidata.org/. + +SELECT + (REPLACE(STR(?lexeme), "http://www.wikidata.org/entity/", "") AS ?lexemeID) + ?{data_type} + """ + "\n ".join(f'?{form["label"]}' for form in forms_query) + + where_clause = f""" + WHERE {{ + ?lexeme dct:language wd:{language_qid} ; + wikibase:lexicalCategory wd:{data_type_qid} ; + wikibase:lemma ?{data_type} . + FILTER(lang(?{data_type}) = "{iso_code}") + """ + + # Generate OPTIONAL clauses for all forms in one query. + optional_clauses = "" + for form in forms_query: + qids = ", ".join(f"wd:{qid}" for qid in form["qids"]) + optional_clauses += f""" + OPTIONAL {{ + ?lexeme ontolex:lexicalForm ?{form['label']}Form . + ?{form['label']}Form ontolex:representation ?{form['label']} ; + wikibase:grammaticalFeature {qids} . + }} +""" + + # Print the complete query. + final_query = main_body + where_clause + optional_clauses + "}" + + def get_available_filename(base_path): + """Helper function to find the next available filename""" + if not os.path.exists(base_path): + return base_path + + base, ext = os.path.splitext(base_path) + counter = 1 + + # If the base already ends with _N, start from that number. + import re + + if match := re.search(r"_(\d+)$", base): + counter = int(match.group(1)) + 1 + base = base[: match.start()] + + while True: + new_path = f"{base}_{counter}{ext}" + if not os.path.exists(new_path): + return new_path + counter += 1 + + # Create base filename using the provided query_dir or default. + if query_dir: + base_file_name = ( + Path(query_dir) / language / data_type / f"query_{data_type}.sparql" + ) + + else: + base_file_name = f"{language_data_extraction}/{language}/{data_type}/query_{data_type}.sparql" + + # Get the next available filename. + file_name = get_available_filename(str(base_file_name)) + + # Create directory if it doesn't exist. + os.makedirs(os.path.dirname(file_name), exist_ok=True) + + # Write the file. + with open(file_name, "w") as file: + file.write(final_query) + + print(f"Query file created: {file_name}") + + return file_name diff --git a/src/scribe_data/check/check_missing_forms/get_forms.py b/src/scribe_data/check/check_missing_forms/get_forms.py new file mode 100644 index 000000000..30208b627 --- /dev/null +++ b/src/scribe_data/check/check_missing_forms/get_forms.py @@ -0,0 +1,160 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Get forms from Wikidata. +""" + +import re +from collections import defaultdict + +from scribe_data.utils import ( + LANGUAGE_DATA_EXTRACTION_DIR as language_data_extraction, +) +from scribe_data.utils import ( + data_type_metadata, + language_metadata, +) +from scribe_data.wikidata.parse_dump import LexemeProcessor + +iso_to_qid = { + lang_data["iso"]: lang_data["qid"] + for lang, lang_data in language_metadata.items() + if "iso" in lang_data and "qid" in lang_data +} + +all_forms = defaultdict(lambda: defaultdict(list)) + + +def parse_sparql_files(): + """ + Read and parse all SPARQL query files to extract form information. + + Returns + ------- + dict + Accumulated forms for each language and lexical category. + Format: {language: {lexical_category: [forms]}} + + Notes + ----- + Recursively searches through language_data_extraction directory + for .sparql files and accumulates all form information. + """ + for sub_sub_file in language_data_extraction.rglob("*.sparql"): + with open(sub_sub_file, "r", encoding="utf-8") as query_text: + result = parse_sparql_query(query_text.read()) + + # Accumulate forms for each language and lexical category. + for lang, categories in result.items(): + for category, forms in categories.items(): + if forms: + all_forms[lang][category].extend(forms) + + return all_forms + + +def parse_sparql_query(query_text): + """ + Parse a SPARQL query to extract lexical categories and features. + + Parameters + ---------- + query_text : str + Content of the SPARQL query file. + + Returns + ------- + dict + Dictionary containing parsed information. + Format: {language: {lexical_category: [forms]}} + + Notes + ----- + Extracts: + - Language QID + - Lexical category QID + - Grammatical features from OPTIONAL blocks + """ + # Get language and category first. + language = None + lexical_category = None + + # Parse lexical category. + lexical_matches = re.finditer(r"wikibase:lexicalCategory\s+wd:(Q\d+)", query_text) + for match in lexical_matches: + lexical_category = match.group(1) + + # Parse language. + language_matches = re.finditer(r"dct:language\s+wd:(Q\d+)", query_text) + for match in language_matches: + language = match.group(1) + + result = {language: {lexical_category: []}} + + # Parse optional blocks for forms and features. + optional_blocks = re.finditer(r"OPTIONAL\s*{([^}]+)}", query_text) + + for block in optional_blocks: + block_text = block.group(1) + + # Extract grammatical features. + features = re.finditer(r"wd:(Q\d+)", block_text) + if feature_list := [f.group(1) for f in features]: + result[language][lexical_category].append(feature_list) + + return result + + +# Debug line to parsed file. +parse_sparql_files() + + +def extract_dump_forms( + languages=None, data_types=None, file_path="latest-lexemes.json.bz2" +): + """ + Extract unique grammatical features from Wikidata lexeme dump. + + Parameters + ---------- + languages : list of str, optional + List of language ISO codes (e.g., ['en', 'fr']) + + data_types : list of str, optional + List of lexical categories (e.g., ['nouns', 'verbs']) + + file_path : str, optional + Path to the lexeme dump file, by default "latest-lexemes.json.bz2" + + Returns + ------- + dict + Dictionary of unique grammatical features per language and lexical category. + Format: {language_qid: {data_type_qid: features}} + + Notes + ----- + - Converts ISO codes to QIDs in the output + - Converts data type names to their corresponding QIDs + - Only includes languages and data types that have valid QID mappings + """ + processor = LexemeProcessor( + target_iso=languages, parse_type=["form"], data_types=data_types + ) + + processor.process_file(file_path) + + unique_features = dict(processor.unique_forms) + + # Convert ISO codes to QIDs and data types to QIDs. + converted_features = {} + for iso_code, data_types_dict in unique_features.items(): + if iso_code in iso_to_qid: + lang_qid = iso_to_qid[iso_code] + converted_features[lang_qid] = {} + + for data_type, features in data_types_dict.items(): + # Get QID from data_type_metadata. + if data_type_qid := data_type_metadata.get(data_type): + converted_features[lang_qid][data_type_qid] = features + + return converted_features diff --git a/src/scribe_data/check/check_missing_forms/pr_body.py b/src/scribe_data/check/check_missing_forms/pr_body.py new file mode 100644 index 000000000..822ef9ecf --- /dev/null +++ b/src/scribe_data/check/check_missing_forms/pr_body.py @@ -0,0 +1,97 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Generate a formatted PR body describing missing features for each language. +""" + +import json +import sys + +from scribe_data.utils import ( + data_type_metadata, + language_metadata, +) + + +def pr_body(missing_features): + """ + Generate a formatted PR body describing missing features for each language. + + Parameters + ---------- + missing_features : dict + Dictionary mapping language QIDs to their missing features. + Format: {language_qid: {feature_type: [features]}} + + Returns + ------- + str + Formatted PR body content in markdown format containing a table of + missing features grouped by language. + + Notes + ----- + The PR body includes: + - A header indicating this is an automated PR + - A table showing languages and their missing feature types + - Features are grouped by language for better readability + """ + pr_body_content = ( + "## Automated PR: Missing Features\n\n" + + "This PR was automatically created by a GitHub Action.\n\n" + + "### Missing Features Summary\n" + + "| **Language** | **Feature Type** |\n" + + "|--------------|------------------|\n" + ) + + # Create a dictionary to group features by language. + grouped_features = {} + + # Iterate over the missing features to populate the table. + for entity, features in missing_features.items(): + # Check for sub-languages. + language_name = None + for name, data in language_metadata.items(): + if data.get("qid") == entity: + language_name = name + break + + if "sub_languages" in data: + for sub_name, sub_data in data["sub_languages"].items(): + if sub_data.get("qid") == entity: + language_name = f"{name} ({sub_name})" + break + + if language_name: + break + + # Default to entity if no name is found. + language_name = language_name or entity + + # Group features by language. + if language_name not in grouped_features: + grouped_features[language_name] = set() + + for feature in features.keys(): + feature_name = next( + (name for name, qid in data_type_metadata.items() if qid == feature), + feature, + ) + grouped_features[language_name].add(feature_name) + + # Add grouped features to the PR body. + for language, features in sorted(grouped_features.items()): + feature_list = ", ".join(sorted(features)) + pr_body_content += f"| **{language}** | {feature_list} |\n" + + pr_body_content += "\nPlease review the changes and provide feedback.\n" + + print(pr_body_content) + + return pr_body_content + + +if __name__ == "__main__": + with open(sys.argv[1], "r") as f: + missing_features = json.load(f) + + pr_body(missing_features) diff --git a/src/scribe_data/cli/get.py b/src/scribe_data/cli/get.py index b661debb5..c1f47756d 100644 --- a/src/scribe_data/cli/get.py +++ b/src/scribe_data/cli/get.py @@ -3,7 +3,7 @@ Functions for getting languages-data types packs for the Scribe-Data CLI. """ -import os # for removing original JSON files +import os from pathlib import Path from typing import List, Union @@ -14,6 +14,7 @@ from scribe_data.unicode.generate_emoji_keywords import generate_emoji from scribe_data.utils import ( DEFAULT_CSV_EXPORT_DIR, + DEFAULT_DUMP_EXPORT_DIR, DEFAULT_JSON_EXPORT_DIR, DEFAULT_SQLITE_EXPORT_DIR, DEFAULT_TSV_EXPORT_DIR, @@ -93,20 +94,13 @@ def prompt_user_download_all(): Checks with the user if they'd rather use Wikidata lexeme dumps before a download all call. """ return questionary.confirm( - "Do you want to query Wikidata directly? (selecting 'no' will use Wikidata lexeme dumps)", + "Do you want to query Wikidata directly? (selecting 'no' will use a Wikidata lexemes dump locally to avoid large Query Service calls)", default=False, ).ask() if all_bool: if language: if prompt_user_download_all(): - parse_wd_lexeme_dump( - language=language, - wikidata_dump_type=["form"], - data_types="all", - type_output_dir=output_dir, - ) - else: language_or_sub_language = language.split(" ")[0] print(f"Updating all data types for language: {language.title()}") query_data( @@ -119,15 +113,18 @@ def prompt_user_download_all(): f"Query completed for all data types for language {language.title()}." ) - elif data_type: - if prompt_user_download_all(): + else: parse_wd_lexeme_dump( - language="all", + language=language, wikidata_dump_type=["form"], - data_types=[data_type], + data_types="all", type_output_dir=output_dir, + wikidata_dump_path=wikidata_dump, + overwrite_all=overwrite, ) - else: + + elif data_type: + if prompt_user_download_all(): print(f"Updating all languages for data type: {data_type.capitalize()}") query_data( languages=None, @@ -139,6 +136,16 @@ def prompt_user_download_all(): f"Query completed for all languages for data type {data_type.capitalize()}." ) + else: + parse_wd_lexeme_dump( + language="all", + wikidata_dump_type=["form"], + data_types=[data_type], + type_output_dir=output_dir, + wikidata_dump_path=wikidata_dump, + overwrite_all=overwrite, + ) + else: print("Updating all languages and data types...") rprint( @@ -150,6 +157,7 @@ def prompt_user_download_all(): data_types="all", type_output_dir=output_dir, wikidata_dump_path=wikidata_dump, + overwrite_all=overwrite, ) # MARK: Emojis @@ -160,25 +168,33 @@ def prompt_user_download_all(): # MARK: Translations elif data_type == "translations": + # If no language specified, use "all". if language is None: language = "all" + parse_wd_lexeme_dump( language=language, wikidata_dump_type=["translations"], type_output_dir=output_dir, wikidata_dump_path=wikidata_dump, + overwrite_all=overwrite, ) return # MARK: Form Dump - elif wikidata_dump: + elif wikidata_dump is not None: + # If wikidata_dump is an empty string, use the default path. + if not wikidata_dump: + wikidata_dump = DEFAULT_DUMP_EXPORT_DIR + parse_wd_lexeme_dump( language=language, wikidata_dump_type=["form"], data_types=data_types, type_output_dir=output_dir, wikidata_dump_path=wikidata_dump, + overwrite_all=overwrite, ) return diff --git a/src/scribe_data/cli/list.py b/src/scribe_data/cli/list.py index 4a1b09a5c..287df0472 100644 --- a/src/scribe_data/cli/list.py +++ b/src/scribe_data/cli/list.py @@ -174,8 +174,10 @@ def list_wrapper( ---------- language : str The language to potentially list data types for. + data_type : str The data type to check for. + all_bool : bool Whether all languages and data types should be listed. diff --git a/src/scribe_data/cli/main.py b/src/scribe_data/cli/main.py index a38eca303..1c08ca527 100644 --- a/src/scribe_data/cli/main.py +++ b/src/scribe_data/cli/main.py @@ -19,6 +19,11 @@ from scribe_data.cli.total import total_wrapper from scribe_data.cli.upgrade import upgrade_cli from scribe_data.cli.version import get_version_message +from scribe_data.utils import ( + DEFAULT_CSV_EXPORT_DIR, + DEFAULT_DUMP_EXPORT_DIR, + DEFAULT_JSON_EXPORT_DIR, +) from scribe_data.wiktionary.parse_mediaWiki import parse_wiktionary_translations LIST_DESCRIPTION = "List languages, data types and combinations of each that Scribe-Data can be used for." @@ -115,7 +120,10 @@ def main() -> None: help="The output file type.", ) get_parser.add_argument( - "-od", "--output-dir", type=str, help="The output directory path for results." + "-od", + "--output-dir", + type=str, + help=f"The output directory path for results (default: ./{DEFAULT_JSON_EXPORT_DIR} for JSON, ./{DEFAULT_CSV_EXPORT_DIR} for CSV, etc.).", ) get_parser.add_argument( "-ope", @@ -149,8 +157,9 @@ def main() -> None: get_parser.add_argument( "-wdp", "--wikidata-dump-path", - type=str, - help="Path to a local Wikidata lexemes dump for running with '--all'.", + nargs="?", + const="", + help=f"Path to a local Wikidata lexemes dump. Uses default directory (./{DEFAULT_DUMP_EXPORT_DIR}) if no path provided.", ) get_parser.add_argument( "-t", "--translation", type=str, help="parse a single word using MediaWiki API" @@ -190,7 +199,7 @@ def main() -> None: "--wikidata-dump-path", nargs="?", const=True, - help="Path to a local Wikidata lexemes dump for running with '--all'.", + help=f"Path to a local Wikidata lexemes dump for running with '--all' (default: ./{DEFAULT_DUMP_EXPORT_DIR}).", ) # MARK: Convert @@ -290,7 +299,7 @@ def main() -> None: "-wdp", "--wikidata-dump-path", type=str, - help="The output directory path for the downloaded dump.", + help=f"The output directory path for the downloaded dump (default: ./{DEFAULT_DUMP_EXPORT_DIR}).", ) # MARK: Interactive @@ -346,8 +355,10 @@ def main() -> None: elif args.command in ["get", "g"]: if args.interactive: start_interactive_mode(operation="get") + if args.translation: - parse_wiktionary_translations(args.translation) + parse_wiktionary_translations(args.translation, args.output_dir) + else: get_data( language=args.language.lower() diff --git a/src/scribe_data/cli/total.py b/src/scribe_data/cli/total.py index f2636e09b..c1699832e 100644 --- a/src/scribe_data/cli/total.py +++ b/src/scribe_data/cli/total.py @@ -339,6 +339,7 @@ def total_wrapper( data_type : Union[str, List[str]] The data type(s) to check for. + all_bool : bool Whether all languages and data types should be listed. @@ -348,11 +349,15 @@ def total_wrapper( """ # Handle --all flag if all_bool and wikidata_dump: - language = "all" + if data_type is None: + data_type = "all" + if language is None: + language = "all" if wikidata_dump is True: # flag without a wikidata lexeme dump path parse_wd_lexeme_dump( language=language, + data_types=data_type, wikidata_dump_type=["total"], wikidata_dump_path=None, ) @@ -361,6 +366,7 @@ def total_wrapper( if isinstance(wikidata_dump, str): # if user provided a wikidata lexeme dump path parse_wd_lexeme_dump( language=language, + data_types=[data_type], wikidata_dump_type=["total"], wikidata_dump_path=wikidata_dump, ) diff --git a/src/scribe_data/utils.py b/src/scribe_data/utils.py index 599139df0..3ef45ac50 100644 --- a/src/scribe_data/utils.py +++ b/src/scribe_data/utils.py @@ -25,6 +25,7 @@ DEFAULT_TSV_EXPORT_DIR = "scribe_data_tsv_export" DEFAULT_SQLITE_EXPORT_DIR = "scribe_data_sqlite_export" DEFAULT_DUMP_EXPORT_DIR = "scribe_data_wikidata_dumps_export" +DEFAULT_MEDIAWIKI_EXPORT_DIR = "scribe_data_mediawiki_export" LANGUAGE_DATA_EXTRACTION_DIR = ( Path(__file__).parent / "wikidata" / "language_data_extraction" @@ -695,6 +696,19 @@ def check_lexeme_dump_prompt_download(output_dir: str): rprint("[bold red]No valid dumps found.[/bold red]") return None + elif user_input == "Download new version": + # Rename existing latest dump if it exists. + latest_dump = Path(output_dir) / "latest-lexemes.json.bz2" + if latest_dump.exists(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_name = f"old_latest-lexemes_{timestamp}.json.bz2" + latest_dump.rename(Path(output_dir) / backup_name) + rprint( + f"[bold green]Renamed existing dump to {backup_name}[/bold green]" + ) + + return False + else: rprint("[bold blue]Skipping download.[/bold blue]") return True diff --git a/src/scribe_data/wikidata/parse_dump.py b/src/scribe_data/wikidata/parse_dump.py new file mode 100644 index 000000000..e39b1ec30 --- /dev/null +++ b/src/scribe_data/wikidata/parse_dump.py @@ -0,0 +1,688 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +""" +Functions for parsing Wikidata lexeme dumps. +""" + +import bz2 +import time +from collections import Counter, defaultdict +from pathlib import Path +from typing import List, Union + +import orjson +from tqdm import tqdm + +from scribe_data.utils import ( + DEFAULT_DUMP_EXPORT_DIR, + check_index_exists, + check_qid_is_language, + data_type_metadata, + get_language_iso_code, + language_metadata, + lexeme_form_metadata, +) + + +class LexemeProcessor: + def __init__( + self, + target_lang: Union[str, List[str]] = None, + parse_type: List[str] = None, + data_types: List[str] = None, + ): + """ + parse_type can be any combination of: + - 'translations' + - 'form' + - 'total' + data_types is a list of categories (e.g., ["nouns", "adverbs"]) for forms. + """ + # Pre-compute sets for faster lookups. + self.parse_type = set(parse_type or []) + self.data_types = set(data_types or []) + self.target_lang = set( + [target_lang] if isinstance(target_lang, str) else target_lang or [] + ) + + # Pre-compute valid categories and languages. + self._category_lookup = {v: k for k, v in data_type_metadata.items()} + self.valid_categories = set(data_type_metadata.values()) + + # Build optimized language mapping. + self.iso_to_name = self._build_iso_mapping() + self.valid_iso_codes = set(self.iso_to_name.keys()) + + # Separate data structures. + self.translations_index = defaultdict( + lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + ) + self.forms_index = defaultdict(lambda: defaultdict(list)) + + # Stats. + self.stats = {"processed_entries": 0, "processing_time": 0} + + # For "total" usage. + self.lexical_category_counts = defaultdict(Counter) + self.translation_counts = defaultdict(Counter) + self.forms_counts = defaultdict(Counter) + + # For "unique_forms" usage. + self.unique_forms = defaultdict(lambda: defaultdict(list)) + + # Cache for feature labels. + self._feature_label_cache = {} + for category, items in lexeme_form_metadata.items(): + for item_data in items.values(): + self._feature_label_cache[item_data["qid"]] = ( + category, + item_data["label"], + ) + + # MARK: Build ISO Mapping + + def _build_iso_mapping(self) -> dict: + """ + Build mapping of ISO codes to language names based on language_metadata. + If self.target_lang is non-null, only include those iso codes. + """ + iso_mapping = {} + for lang_name, data in language_metadata.items(): + # Handle sub-languages if they exist. + if "sub_languages" in data: + for sub_lang, sub_data in data["sub_languages"].items(): + if self.target_lang and sub_lang not in self.target_lang: + continue + + if iso_code := sub_data.get("iso"): + iso_mapping[iso_code] = sub_lang + continue # skip main language if it only has sub-languages + + # Handle main languages. + if self.target_lang and lang_name not in self.target_lang: + continue + + if iso_code := data.get("iso"): + iso_mapping[iso_code] = lang_name + + for language in self.target_lang: + if language.lower().startswith("q") and language[1:].isdigit(): + if qid_to_lang := check_qid_is_language(language): + iso_code = get_language_iso_code(language.upper()) + iso_mapping[iso_code] = qid_to_lang + print(f"ISO code for {language} is {iso_code}") + + return iso_mapping + + # MARK: Process Lines + def process_lines(self, line: str) -> None: + """ + Process one line of data with optimized parsing. + """ + try: + # Use faster exception handling. + lexeme = orjson.loads(line.strip().rstrip(",")) + if not lexeme: + return + + # Combine field checks into single lookup. + required_fields = ("lemmas", "lexicalCategory") + if any(field not in lexeme for field in required_fields): + return + + lexical_category = lexeme["lexicalCategory"] + if lexical_category not in self.valid_categories: + return + + category_name = self._category_lookup.get(lexical_category) + if not category_name: + return + + # Process first valid lemma only. + for lang_code, lemma_data in lexeme["lemmas"].items(): + if lang_code not in self.valid_iso_codes: + continue + + word = lemma_data.get("value", "").lower() + if not word: + continue + + parse_types = self.parse_type + if "translations" in parse_types and lexeme.get("senses"): + self._process_translations(lexeme, word, lang_code, category_name) + + if "form" in parse_types and category_name in self.data_types: + self._process_forms(lexeme, lang_code, category_name) + + if "total" in parse_types: + self._process_totals(lexeme, lang_code, category_name) + + break + + except Exception as e: + print(f"Error processing line: {e}") + + def _process_translations(self, lexeme, word, lang_code, category_name): + """ + Optimized translations processing. + """ + translations = {} + valid_iso_codes = self.valid_iso_codes + lexeme_id = lexeme["id"] + + # Pre-fetch senses to avoid repeated lookups. + for sense in lexeme["senses"]: + if glosses := sense.get("glosses"): + translations.update( + (lang, gloss["value"]) + for lang, gloss in glosses.items() + if lang in valid_iso_codes + ) + + if translations: + self.translations_index[lang_code][category_name][lexeme_id][word] = ( + translations + ) + + def _process_forms(self, lexeme, lang_code, category_name): + """ + Optimized forms processing. + """ + lexeme_id = lexeme["id"] + forms_data = {} + + # Pre-compute form data structure. + forms_dict = forms_data.setdefault(lexeme_id, {}) + lang_dict = forms_dict.setdefault(lang_code, {}) + cat_dict = lang_dict.setdefault(category_name, {}) + + for form in lexeme.get("forms", []): + if not (representations := form.get("representations")): + continue + + for rep_data in representations.values(): + if form_value := rep_data.get("value"): + features = form.get("grammaticalFeatures", []) + + # If features are not empty and not already in the list. + if ( + features + and features not in self.unique_forms[lang_code][category_name] + ): + self.unique_forms[lang_code][category_name].append(features) + + if features := form.get("grammaticalFeatures"): + if form_name := self._get_form_name(features): + cat_dict[form_name] = form_value + + break # only process first representation + + if forms_data: + self.forms_index.update(forms_data) + self.forms_counts[lang_code][category_name] += len(forms_data) + + def _get_form_name(self, features): + """ + Optimized form name generation. + """ + if not features: + return "" + + categorized_features = defaultdict(list) + for feature in features: + if feature_info := self._feature_label_cache.get(feature): + category, label = feature_info + categorized_features[category].append((label, feature)) + + form_parts = [] + is_first = True + for category in sorted(categorized_features.keys()): + for label, _ in sorted(categorized_features[category]): + if is_first: + form_parts.append(label.lower()) + is_first = False + + else: + form_parts.append(label) + + return "".join(form_parts) + + def _process_totals(self, lexeme, lang_code, category_name): + """ + Process totals for statistical counting. + """ + # Skip if we have specific data types and this category isn't in them. + if self.data_types and category_name.lower() not in [ + dt.lower() for dt in self.data_types + ]: + return + + # Increment lexeme count for this language and category. + self.lexical_category_counts[lang_code][category_name] += 1 + + # Count translations if they exist. + if lexeme.get("senses"): + translation_count = sum( + bool( + sense.get("glosses") + and any( + lang in self.valid_iso_codes for lang in sense["glosses"].keys() + ) + ) + for sense in lexeme["senses"] + ) + if translation_count > 0: + self.translation_counts[lang_code][category_name] += translation_count + + # MARK: process file + def process_file(self, file_path: str, batch_size: int = 50000): + """ + Main loop: read lines from file (bz2) in batches, call process_lines on each. + """ + # Use context manager for better resource handling. + with bz2.open(file_path, "rt", encoding="utf-8") as bzfile: + # Skip header if present. + first_line = bzfile.readline() + if not first_line.strip().startswith("["): + bzfile.seek(0) + + # Process in larger batches for better performance. + batch = [] + start_time = time.time() + total_entries = int(Path(file_path).stat().st_size / 263) + + for line in tqdm(bzfile, total=total_entries, desc="Processing entries"): + if line.strip() not in ["[", "]", ",", ""]: + batch.append(line) + + if len(batch) >= batch_size: + self._process_batch(batch) + batch.clear() # more efficient than creating new list + self.stats["processed_entries"] += 1 + + # Process remaining items. + if batch: + self._process_batch(batch) + + # Update stats. + self.stats["processing_time"] = time.time() - start_time + self.stats["unique_words"] = len(self.forms_index) + len( + self.translations_index + ) + + # Print summary if "total" was requested. + if "total" in self.parse_type: + self._print_total_summary() + + def _process_batch(self, batch: list) -> None: + """ + Process a batch of lines. + """ + for line in batch: + self.process_lines(line) + + # MARK: print total summary + def _print_total_summary(self): + """ + Print stats if parse_type == total. + """ + print( + f"{'Language':<20} {'Data Type':<25} {'Total Lexemes':<25} {'Total Translations':<20}" + ) + print("=" * 90) + for lang, counts in self.lexical_category_counts.items(): + lang_name = self.iso_to_name[lang] + first_row = True + + for category, count in counts.most_common(): + trans_count = self.translation_counts[lang][category] + + if first_row: + print( + f"{lang_name:<20} {category:<25} {count:<25,} {trans_count:<20,}" + ) + first_row = False + + else: + print(f"{'':<20} {category:<25} {count:<25,} {trans_count:<20,}") + + if lang != list(self.lexical_category_counts.keys())[-1]: + print("\n" + "=" * 90 + "\n") + + # MARK: export translations + def export_translations_json(self, filepath: str, language_iso: str = None) -> None: + """ + Save translations_index to file, optionally filtering by language_iso. + """ + if language_iso: + if language_iso not in self.iso_to_name: + print( + f"Warning: ISO {language_iso} unknown, skipping translations export..." + ) + return + + # Flatten the category level. + filtered = {} + for category_data in self.translations_index[language_iso].values(): + for lexeme_id, word_data in category_data.items(): + filtered[lexeme_id] = word_data + + # Check if filtered data is empty before saving. + if not filtered: + print(f"No translations found for {language_iso}, skipping export...") + return + + self._save_by_language(filtered, filepath, language_iso, "translations") + + # MARK: Export Forms + + def export_forms_json( + self, filepath: str, language_iso: str = None, data_type: str = None + ) -> None: + """ + Export grammatical forms to a JSON file with readable feature labels. + + Parameters + ---------- + filepath : str + Base path where the JSON file will be saved. + + language_iso : str, optional + ISO code of the language to export. If None, exports all languages. + + data_type : str, optional + Category of forms to export (e.g., "nouns", "verbs"). If None, exports all types. + + Notes + ----- + Creates a directory structure: //.json + Skips export if no forms are found for the specified language and data type. + """ + if language_iso: + if language_iso not in self.iso_to_name: + print(f"Warning: ISO {language_iso} unknown, skipping forms export...") + return + + filtered = {} + for id, lang_data in self.forms_index.items(): + if ( + language_iso in lang_data and data_type + ): # only process if we have a data_type + if ( + data_type in lang_data[language_iso] + ): # Check if this data_type exists. + # Initialize the nested dictionary for this ID if it doesn't exist. + if id not in filtered: + filtered[id] = {} + + form_data = lang_data[language_iso][data_type] + for form_name, word in form_data.items(): + filtered[id][form_name] = word + + lang_name = self.iso_to_name[language_iso] + + # Check if filtered data is empty before saving. + if not filtered: + print( + f"No forms found for {lang_name.capitalize()} {data_type}, skipping export..." + ) + return + + # Create the output directory structure. + # Check if this is a sub-language and get its main language. + main_lang = None + for lang, data in language_metadata.items(): + if "sub_languages" in data: + for sub_lang, sub_data in data["sub_languages"].items(): + if sub_lang == lang_name: + main_lang = lang + break + if main_lang: + break + + # If it's a sub-language, create path like: parent/chinese/mandarin/. + if main_lang: + output_path = Path(filepath).parent / main_lang / lang_name + else: + output_path = Path(filepath).parent / lang_name + + output_path.mkdir(parents=True, exist_ok=True) + + # Create the full output filepath. + output_file = output_path / f"{data_type}.json" + + # Save the filtered data to JSON file. + try: + with open(output_file, "wb") as f: + f.write(orjson.dumps(filtered, option=orjson.OPT_INDENT_2)) + print( + f"Successfully exported forms for {lang_name.capitalize()} {data_type} to {output_file}" + ) + except Exception as e: + print( + f"Error saving forms for {lang_name.capitalize()} {data_type}: {e}" + ) + + def _save_by_language(self, filtered, filepath, language_iso, data_type): + """ + Save filtered data to language-specific directory. + + Parameters + ---------- + filtered : dict + Dictionary with form features as keys and words as values. + + filepath : Path + Base path for saving the file. + + language_iso : str + ISO code of the language. + + data_type : str + Type of data being saved (e.g., "nouns", "verbs"). + + Notes + ----- + Creates directory structure: exports//filename + and saves the filtered data as a JSON file. + """ + base_path = Path(filepath) + lang_name = self.iso_to_name[language_iso] + + # Create language-specific directory. + lang_filepath = base_path.parent / base_path.name + lang_filepath.parent.mkdir(parents=True, exist_ok=True) + + print(f"Saving {lang_name} {data_type} forms to {lang_filepath}...") + + # Save the filtered data with pretty printing. + with open(lang_filepath, "wb") as f: + f.write( + orjson.dumps( + filtered, + option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS, + ) + ) + + +# MARK: parse dump +def parse_dump( + language: Union[str, List[str]] = None, + parse_type: List[str] = None, + data_types: List[str] = None, + file_path: str = "latest-lexemes.json.bz2", + output_dir: str = None, + overwrite_all: bool = False, +): + """ + Parse a Wikidata lexeme dump file and extract linguistic data. + + Parameters + ---------- + language : str or list of str, optional + Language(s) to parse data for. Must match language names in language_metadata. + + parse_type : list of str, optional + Types of parsing to perform. Valid options are: + - 'translations': Extract word translations + - 'form': Extract grammatical forms + - 'total': Gather statistical totals + + data_types : list of str, optional + Categories to parse when using 'form' type (e.g. ["nouns", "adverbs"]). + Only used if 'form' is in parse_type. + + file_path : str, default="latest-lexemes.json.bz2" + Path to the lexeme dump file + + output_dir : str, optional + Directory to save output files. If None, uses DEFAULT_DUMP_EXPORT_DIR. + + overwrite_all : bool, default=False + If True, automatically overwrite existing files without prompting + + Notes + ----- + The function processes a Wikidata lexeme dump and extracts linguistic data based on + the specified parameters. For each language and data type combination, it creates + separate JSON files in the output directory structure: + + If a requested index file already exists, that language/category combination + will be skipped. + """ + # Prepare environment - Use default if output_dir is None. + output_dir = output_dir or DEFAULT_DUMP_EXPORT_DIR + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Convert single strings to lists. + languages = [language] if isinstance(language, str) else language + parse_type = parse_type or [] + data_types = data_types or [] + + if "total" not in parse_type: + # For translations, we only need to check the translations index. + if "translations" in parse_type: + languages_to_process = [] + for lang in languages: + index_path = Path(output_dir) / lang / "translations.json" + + if not check_index_exists(index_path, overwrite_all): + languages_to_process.append(lang) + + else: + print(f"Skipping {lang}/translations.json - already exists") + + # Update languages list but keep data_types as is. + languages = languages_to_process + + # For forms, check each language/data_type combination. + elif "form" in parse_type: + languages_to_process = [] + data_types_to_process = set() + + for lang in languages: + needs_processing = False + # Check if this is a sub-language + main_lang = None + for lang_name, data in language_metadata.items(): + if "sub_languages" in data: + for sub_lang in data["sub_languages"]: + if sub_lang == lang: + main_lang = lang_name + break + if main_lang: + break + + for data_type in data_types: + # Create appropriate path based on whether it's a sub-language. + if main_lang: + index_path = ( + Path(output_dir) / main_lang / lang / f"{data_type}.json" + ) + + else: + index_path = Path(output_dir) / lang / f"{data_type}.json" + + if not check_index_exists(index_path, overwrite_all): + needs_processing = True + data_types_to_process.add(data_type) + + else: + # Update path display in skip message. + skip_path = ( + f"{main_lang}/{lang}/{data_type}.json" + if main_lang + else f"{lang}/{data_type}.json" + ) + print(f"Skipping {skip_path} - already exists") + + if needs_processing: + languages_to_process.append(lang) + + # Update both lists. + languages = languages_to_process + data_types = list(data_types_to_process) + + if "translations" not in parse_type and (not data_types or not languages): + print("No data types or languages provided. Nothing to process.") + return + + if not languages: + print("All requested data already exists. Nothing to process.") + return + + processor = LexemeProcessor( + target_lang=languages, parse_type=parse_type, data_types=data_types + ) + processor.process_file(file_path) + + # MARK: Handle JSON exports + if "translations" in parse_type: + for language in languages: + if iso_code := next( + ( + iso + for iso, name in processor.iso_to_name.items() + if name.lower() == language.lower() + ), + None, + ): + index_path = Path(output_dir) / language / "translations.json" + # Ensure parent directory exists. + index_path.parent.mkdir(parents=True, exist_ok=True) + # print(f"Exporting translations for {language} to {index_path}"). + processor.export_translations_json(str(index_path), iso_code) + else: + print(f"Warning: Could not find ISO code for {language}") + + # (b) If "form" in parse_type -> export forms for each data_type in data_types. + if "form" in parse_type: + # For each data_type, we create a separate file, e.g. nouns.json. + for dt in data_types: + index_path = Path(output_dir) / f"{dt}.json" + iso_codes = set() + for word_data in processor.forms_index.values(): + iso_codes.update(word_data.keys()) + + for iso_code in iso_codes: + if iso_code in processor.iso_to_name: + processor.export_forms_json( + filepath=str(index_path), language_iso=iso_code, data_type=dt + ) + + # def print_unique_forms(unique_forms): + # """ + # Pretty print unique grammatical feature sets. + # """ + # for lang, lang_data in unique_forms.items(): + # print(f"\nLanguage: {lang}") + # for category, features_list in lang_data.items(): + # print(f" Category: {category}") + # print(f" Total unique feature sets: {len(features_list)}") + # print(" Feature Sets:") + # for i, feature_set in enumerate(features_list, 1): + # # Convert QIDs to a more readable format + # readable_features = [f"Q{qid}" for qid in feature_set] + # print(f" {i}. {readable_features}") + + # print_unique_forms(processor.unique_forms) + # print(processor.unique_forms) diff --git a/src/scribe_data/wikidata/wikidata_utils.py b/src/scribe_data/wikidata/wikidata_utils.py index a1a779967..7109620ea 100644 --- a/src/scribe_data/wikidata/wikidata_utils.py +++ b/src/scribe_data/wikidata/wikidata_utils.py @@ -12,20 +12,20 @@ from scribe_data.cli.download import wd_lexeme_dump_download_wrapper from scribe_data.utils import data_type_metadata, language_metadata -from scribe_data.wiktionary.parse_dump import parse_dump +from scribe_data.wikidata.parse_dump import parse_dump sparql = SPARQLWrapper("https://query.wikidata.org/sparql") sparql.setReturnFormat(JSON) sparql.setMethod(POST) -def mediaWiki_query(query: str) -> dict: +def mediawiki_query(word: str) -> dict: """ Query the Wikidata API using a MediaWiki query. Parameters ---------- - query : str + word : str The MediaWiki query to execute. Returns @@ -34,8 +34,8 @@ def mediaWiki_query(query: str) -> dict: The JSON response from the API. """ url = ( - f"https://en.wiktionary.org/w/api.php?" - f"action=query&format=json&titles={query}/translations&prop=revisions&rvprop=content" + f"https://wikidata.org/w/api.php?" + f"action=query&format=json&titles={word}/translations&prop=revisions&rvprop=content" ) response = requests.get(url) return response.json() @@ -47,6 +47,7 @@ def parse_wd_lexeme_dump( data_types: List[str] = None, type_output_dir: str = None, wikidata_dump_path: str = None, + overwrite_all: bool = False, ): """ Checks for the existence of a Wikidata lexeme dump and parses it if possible. @@ -67,18 +68,35 @@ def parse_wd_lexeme_dump( wikidata_dump_path : str, optional The local Wikidata lexeme dump directory that should be used to get data. + + overwrite_all : bool, default=False + If True, automatically overwrite existing files without prompting """ - # Convert "all" to list of all languages + # Convert "all" to list of all languages including sub-languages. if isinstance(language, str) and language.lower() == "all": - language = list(language_metadata.keys()) + languages = [] + for main_lang, lang_data in language_metadata.items(): + # Add sub-languages if they exist. + if "sub_languages" in lang_data: + for sub_lang in lang_data["sub_languages"]: + main_lang = sub_lang + languages.append(main_lang) + + language = languages + + # For processing: exclude translations and emoji-keywords. if isinstance(data_types, str) and data_types.lower() == "all": - # Exclude translations as it's a separate section data_types = [ dt for dt in data_type_metadata.keys() if dt != "translations" and dt != "emoji-keywords" ] + print(f"Languages to process: {[lang.capitalize() for lang in language]}") + + if "translations" not in wikidata_dump_type: + print(f"Data types to process: {data_types}") + file_path = wd_lexeme_dump_download_wrapper(None, wikidata_dump_path) if isinstance(file_path, (str, Path)): @@ -94,7 +112,6 @@ def parse_wd_lexeme_dump( data_types=data_types, file_path=file_path, output_dir=type_output_dir, + overwrite_all=overwrite_all, ) return - - rprint(f"[bold red]No valid dumps found in {file_path}.[/bold red]") diff --git a/src/scribe_data/wiktionary/parse_dump.py b/src/scribe_data/wiktionary/parse_dump.py deleted file mode 100644 index 58ab0fa21..000000000 --- a/src/scribe_data/wiktionary/parse_dump.py +++ /dev/null @@ -1,493 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later -""" -Functions for parsing Wikidata lexeme dumps. -""" - -import bz2 -import time -from collections import Counter, defaultdict -from pathlib import Path -from typing import List, Union - -import orjson -import questionary -from scribe_data.utils import ( - DEFAULT_DUMP_EXPORT_DIR, - check_index_exists, - check_qid_is_language, - data_type_metadata, - get_language_iso_code, - language_metadata, -) -from tqdm import tqdm - - -class LexemeProcessor: - def __init__( - self, - target_iso: Union[str, List[str]] = None, - parse_type: List[str] = None, - data_types: List[str] = None, - ): - """ - parse_type can be any combination of: - - 'translations' - - 'form' - - 'total' - data_types is a list of categories (e.g., ["nouns", "adverbs"]) for forms. - """ - # Pre-compute sets for faster lookups. - self.parse_type = set(parse_type or []) - self.data_types = set(data_types or []) - self.target_iso = set( - [target_iso] if isinstance(target_iso, str) else target_iso or [] - ) - - # Pre-compute valid categories and languages. - self._category_lookup = {v: k for k, v in data_type_metadata.items()} - self.valid_categories = set(data_type_metadata.values()) - - # Build optimized language mapping. - self.iso_to_name = self._build_iso_mapping() - self.valid_iso_codes = set(self.iso_to_name.keys()) - - # Separate data structures. - self.translations_index = defaultdict( - lambda: defaultdict(lambda: defaultdict(dict)) - ) - self.forms_index = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) - - # Stats. - self.stats = {"processed_entries": 0, "unique_words": 0, "processing_time": 0} - - # For category lookups, invert data_type_metadata. - # E.g., {"Q1084": "nouns", "Q24905": "verbs", ...}. - self._category_lookup = {v: k for k, v in data_type_metadata.items()} - - # Build map from ISO to full language name. - self.iso_to_name = self._build_iso_mapping() - # For "total" usage. - self.lexical_category_counts = defaultdict(Counter) - self.translation_counts = defaultdict(Counter) - self.forms_counts = defaultdict(Counter) - - # MARK: build iso mapping - def _build_iso_mapping(self) -> dict: - """ - Build mapping of ISO codes to language names based on language_metadata. - If self.target_iso is non-null, only include those iso codes. - """ - iso_mapping = {} - for lang_name, data in language_metadata.items(): - if self.target_iso and lang_name not in self.target_iso: - continue - - if iso_code := data.get("iso"): - iso_mapping[iso_code] = lang_name - - for language in self.target_iso: - if language.lower().startswith("q") and language[1:].isdigit(): - qid_to_lang = check_qid_is_language(language) - if qid_to_lang: - iso_code = get_language_iso_code(language.upper()) - iso_mapping[iso_code] = qid_to_lang - print(f"ISO code for {language} is {iso_code}") - - return iso_mapping - - # MARK: process lines - def process_lines(self, line: str) -> None: - """ - Process one line of data. Depending on parse_type, we do: - - total stats - - translations - - form categories (filtered by data_types) - """ - try: - lexeme = orjson.loads(line.strip().rstrip(",")) - if not lexeme: - return - - # Get common values once. - lemmas = lexeme.get("lemmas", {}) - lexical_category = lexeme.get("lexicalCategory") - - if not (lemmas and lexical_category in self.valid_categories): - return - - category_name = self._category_lookup.get(lexical_category) - if not category_name: - return - - # Process each type in a single pass through the data. - for lang_code, lemma_data in lemmas.items(): - if lang_code not in self.valid_iso_codes: - continue - - word = lemma_data.get("value", "").lower() - if not word: - continue - - if "total" in self.parse_type: - self.lexical_category_counts[lang_code][category_name] += 1 - translation_count = sum( - len(sense.get("glosses", {})) - for sense in lexeme.get("senses", []) - ) - self.translation_counts[lang_code][category_name] += ( - translation_count - ) - - if "translations" in self.parse_type: - if translations := { - lang: gloss["value"] - for sense in lexeme.get("senses", []) - for lang, gloss in sense.get("glosses", {}).items() - if lang in self.valid_iso_codes - }: - self.translations_index[word][lang_code][category_name] = ( - translations - ) - - if "form" in self.parse_type and category_name in self.data_types: - forms_data = defaultdict(list) - for form in lexeme.get("forms", []): - for rep_lang, rep_data in form.get( - "representations", {} - ).items(): - if rep_lang == lang_code: - if form_value := rep_data.get("value"): - forms_data[form_value].extend( - form.get("grammaticalFeatures", []) - ) - - if forms_data: - self.forms_index[word][lang_code][category_name] = dict( - forms_data - ) - self.forms_counts[lang_code][category_name] += len(forms_data) - - break # only process first valid lemma - - except Exception as e: - print(f"Error processing line: {e}") - - # MARK: process file - def process_file(self, file_path: str, batch_size: int = 50000): - """ - Main loop: read lines from file (bz2) in batches, call process_lines on each. - """ - # Use context manager for better resource handling. - with bz2.open(file_path, "rt", encoding="utf-8") as bzfile: - # Skip header if present. - first_line = bzfile.readline() - if not first_line.strip().startswith("["): - bzfile.seek(0) - - # Process in larger batches for better performance. - batch = [] - start_time = time.time() - total_entries = int(Path(file_path).stat().st_size / 263) - - for line in tqdm(bzfile, total=total_entries, desc="Processing entries"): - if line.strip() not in ["[", "]", ",", ""]: - batch.append(line) - - if len(batch) >= batch_size: - self._process_batch(batch) - batch.clear() # more efficient than creating new list - self.stats["processed_entries"] += 1 - - # Process remaining items. - if batch: - self._process_batch(batch) - - # Update stats. - self.stats["processing_time"] = time.time() - start_time - self.stats["unique_words"] = len(self.forms_index) + len( - self.translations_index - ) - - # Print summary if "total" was requested. - if "total" in self.parse_type: - self._print_total_summary() - - def _process_batch(self, batch: list) -> None: - """ - Process a batch of lines. - """ - for line in batch: - self.process_lines(line) - - # MARK: print total summary - def _print_total_summary(self): - """ - Print stats if parse_type == total. - """ - print( - f"{'Language':<20} {'Data Type':<25} {'Total Lexemes':<25} {'Total Translations':<20}" - ) - print("=" * 90) - for lang, counts in self.lexical_category_counts.items(): - lang_name = self.iso_to_name[lang] - first_row = True - - for category, count in counts.most_common(): - trans_count = self.translation_counts[lang][category] - - if first_row: - print( - f"{lang_name:<20} {category:<25} {count:<25,} {trans_count:<20,}" - ) - first_row = False - - else: - print(f"{'':<20} {category:<25} {count:<25,} {trans_count:<20,}") - - if lang != list(self.lexical_category_counts.keys())[-1]: - print("\n" + "=" * 90 + "\n") - - # MARK: export translations - def export_translations_json(self, filepath: str, language_iso: str = None) -> None: - """ - Save translations_index to file, optionally filtering by language_iso. - """ - if language_iso: - if language_iso not in self.iso_to_name: - print( - f"Warning: ISO {language_iso} unknown, skipping translations export..." - ) - return - - filtered = { - word: {language_iso: lang_data[language_iso]} - for word, lang_data in self.translations_index.items() - if language_iso in lang_data - } - - # Check if filtered data is empty before saving. - if not filtered: - print(f"No translations found for {language_iso}, skipping export...") - return - - self._save_by_language(filtered, filepath, language_iso, "translations") - - # MARK: export forms - def export_forms_json( - self, filepath: str, language_iso: str = None, data_type: str = None - ) -> None: - """ - Save forms_index to file, optionally filtering by: - - language_iso - - data_type (e.g. "nouns", "adverbs") - - If data_type is given, we only export that one category from forms. - """ - if language_iso: - if language_iso not in self.iso_to_name: - print(f"Warning: ISO {language_iso} unknown, skipping forms export...") - return - - filtered = {} - for word, lang_data in self.forms_index.items(): - if language_iso in lang_data: - # If data_type is given, only keep that category. - if data_type: - if data_type in lang_data[language_iso]: - filtered[word] = { - language_iso: { - data_type: lang_data[language_iso][data_type] - } - } - - else: - filtered[word] = {language_iso: lang_data[language_iso]} - - # Check if filtered data is empty before saving. - if not filtered: - print(f"No forms found for {language_iso}, skipping export...") - return - - self._save_by_language( - filtered, filepath, language_iso, data_type or "forms" - ) - - def _save_by_language(self, data, filepath, language_iso, category_type): - """ - Save data to exports//filename. - """ - base_path = Path(filepath) - lang_name = self.iso_to_name[language_iso] - - lang_filepath = base_path.parent / lang_name / base_path.name - lang_filepath.parent.mkdir(parents=True, exist_ok=True) - - print(f"Saving {lang_name} {category_type} index to {lang_filepath}...") - with open(lang_filepath, "wb") as f: - f.write( - orjson.dumps( - self._to_dict(data), - option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS, - ) - ) - - def _to_dict(self, dd): - """ - Recursively convert defaultdict to dict. - """ - if isinstance(dd, defaultdict): - dd = {k: self._to_dict(v) for k, v in dd.items()} - - return dd - - -# MARK: parse dump -def parse_dump( - language: Union[str, List[str]] = None, - parse_type: List[str] = None, - data_types: List[str] = None, - file_path: str = "latest-lexemes.json.bz2", - output_dir: str = None, - overwrite_all: bool = False, -): - """ - Parse a Wikidata lexeme dump file and extract linguistic data. - - Parameters - ---------- - language : str or list of str, optional - Language(s) to parse data for. Must match language names in language_metadata. - - parse_type : list of str, optional - Types of parsing to perform. Valid options are: - - 'translations': Extract word translations - - 'form': Extract grammatical forms - - 'total': Gather statistical totals - - data_types : list of str, optional - Categories to parse when using 'form' type (e.g. ["nouns", "adverbs"]). - Only used if 'form' is in parse_type. - - file_path : str, default="latest-lexemes.json.bz2" - Path to the lexeme dump file - - output_dir : str, optional - Directory to save output files. If None, uses DEFAULT_DUMP_EXPORT_DIR. - - overwrite_all : bool, default=False - If True, automatically overwrite existing files without prompting - - Notes - ----- - The function processes a Wikidata lexeme dump and extracts linguistic data based on - the specified parameters. For each language and data type combination, it creates - separate JSON files in the output directory structure: - - If a requested index file already exists, that language/category combination - will be skipped. - """ - # Prepare environment - Use default if output_dir is None. - output_dir = output_dir or DEFAULT_DUMP_EXPORT_DIR - Path(output_dir).mkdir(parents=True, exist_ok=True) - - # Convert single strings to lists. - languages = [language] if isinstance(language, str) else language - parse_type = parse_type or [] - data_types = data_types or [] - - print(f"Languages: {languages}") - print(f"parse_type: {parse_type}") - if data_types: - print(f"data_types for forms: {data_types}") - - if "total" not in parse_type: - choice = questionary.select( - "Choose an action:", - choices=["Overwrite existing data", "Skip process"], - default="Skip process", - ).ask() - if choice == "Overwrite existing data": - overwrite_all = True - - # For translations, we only need to check the translations index. - if "translations" in parse_type: - languages_to_process = [] - for lang in languages: - index_path = Path(output_dir) / lang / "lexeme_translations.json" - - if not check_index_exists(index_path, overwrite_all): - languages_to_process.append(lang) - - else: - print(f"Skipping {lang}/translations.json - already exists") - - # Update languages list but keep data_types as is. - languages = languages_to_process - - # For forms, check each language/data_type combination. - elif "form" in parse_type: - languages_to_process = [] - data_types_to_process = set() - - for lang in languages: - needs_processing = False - for data_type in data_types: - index_path = Path(output_dir) / lang / f"lexeme_{data_type}.json" - - if not check_index_exists(index_path, overwrite_all): - needs_processing = True - data_types_to_process.add(data_type) - - else: - print(f"Skipping {lang}/{data_type}.json - already exists") - - if needs_processing: - languages_to_process.append(lang) - - # Update both lists. - languages = languages_to_process - data_types = list(data_types_to_process) - - print(f"Languages to process: {languages}") - if data_types: - print(f"Data types to process: {data_types}") - - if not languages: - print("All requested data already exists. Nothing to process.") - return - - processor = LexemeProcessor( - target_iso=languages, parse_type=parse_type, data_types=data_types - ) - processor.process_file(file_path) - - # MARK: Handle JSON exports - - # (a) If "translations" in parse_type -> export them. - if "translations" in parse_type: - index_path = Path(output_dir) / "lexeme_translations.json" - - # Export translations for each ISO found. - iso_codes = set() - for word_data in processor.translations_index.values(): - iso_codes.update(word_data.keys()) - for iso_code in iso_codes: - if iso_code in processor.iso_to_name: - processor.export_translations_json(str(index_path), iso_code) - - # (b) If "form" in parse_type -> export forms for each data_type in data_types. - if "form" in parse_type: - # For each data_type, we create a separate file, e.g. lexeme_nouns.json. - for dt in data_types: - index_path = Path(output_dir) / f"lexeme_{dt}.json" - print(f"Exporting forms for {dt} to {index_path}...") - - iso_codes = set() - for word_data in processor.forms_index.values(): - iso_codes.update(word_data.keys()) - - for iso_code in iso_codes: - if iso_code in processor.iso_to_name: - processor.export_forms_json( - filepath=str(index_path), language_iso=iso_code, data_type=dt - ) diff --git a/src/scribe_data/wiktionary/parse_mediaWiki.py b/src/scribe_data/wiktionary/parse_mediaWiki.py index 2350fec2d..e451830de 100644 --- a/src/scribe_data/wiktionary/parse_mediaWiki.py +++ b/src/scribe_data/wiktionary/parse_mediaWiki.py @@ -5,13 +5,17 @@ import json import re +from pathlib import Path -from scribe_data.utils import get_language_from_iso -from scribe_data.wikidata.wikidata_utils import mediaWiki_query +from scribe_data.utils import DEFAULT_MEDIAWIKI_EXPORT_DIR, get_language_from_iso +from scribe_data.wikidata.wikidata_utils import mediawiki_query -def fetch_translation_page(word): - data = mediaWiki_query(word) +def fetch_translation_page(word: str): + """ + Fetches the translation for a given word via the Wiktionary MediaWiki API. + """ + data = mediawiki_query(word=word) pages = data.get("query", {}).get("pages", {}) # Extract page object from dictionary. @@ -104,16 +108,54 @@ def build_json_format(word, translations_by_lang): return book_translations -def parse_wiktionary_translations(word): +def parse_wiktionary_translations(word, output_dir=DEFAULT_MEDIAWIKI_EXPORT_DIR): """ - Parse the translations of a word from Wiktionary. + Parse translations from Wiktionary and save them to a JSON file. + + Fetches the Wiktionary page for the given word, extracts translations + across different languages, and saves them in a structured JSON format. + + Parameters + ---------- + word : str + The word to fetch translations for. + + output_dir : str or Path, optional + Directory to save JSON output (default is DEFAULT_MEDIAWIKI_EXPORT_DIR). + Will be created if it doesn't exist. + + Notes + ----- + The output JSON structure follows the format: + { + "word": { + "language": { + "part_of_speech": { + "1": { + "description": "context", + "translations": "translated_text" + } + } + } + } + } """ - wikitext = fetch_translation_page(word) - translations_by_lang = parse_wikitext_for_translations(wikitext) + output_dir = output_dir or DEFAULT_MEDIAWIKI_EXPORT_DIR + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + translations_by_lang = parse_wikitext_for_translations(fetch_translation_page(word)) if not translations_by_lang: print("No translations found") return - final_json = build_json_format(word, translations_by_lang) - print(json.dumps(final_json, indent=4, ensure_ascii=False)) + json_path = output_path / f"{word}.json" + with open(json_path, "w", encoding="utf-8") as file: + json.dump( + build_json_format(word, translations_by_lang), + file, + indent=4, + ensure_ascii=False, + ) + + print(f"JSON file saved to {json_path}") diff --git a/tests/cli/test_get.py b/tests/cli/test_get.py index 21357603a..2cb52061e 100644 --- a/tests/cli/test_get.py +++ b/tests/cli/test_get.py @@ -48,16 +48,16 @@ def test_invalid_arguments(self): @patch("scribe_data.cli.get.query_data") @patch("scribe_data.cli.get.parse_wd_lexeme_dump") @patch("scribe_data.cli.get.questionary.confirm") - def test_get_all_data_types_for_language_user_says_yes( + def test_get_all_data_types_for_language_user_says_no( self, mock_questionary_confirm, mock_parse, mock_query_data ): """ - Test the behavior when the user agrees to query Wikidata directly. + Test the behavior when the user agrees to use Wikidata lexeme dumps. This test checks that `parse_wd_lexeme_dump` is called with the correct parameters - when the user confirms they want to query Wikidata. + when the user confirms they don't want to query Wikidata. """ - mock_questionary_confirm.return_value.ask.return_value = True + mock_questionary_confirm.return_value.ask.return_value = False get_data(all_bool=True, language="English") @@ -66,6 +66,8 @@ def test_get_all_data_types_for_language_user_says_yes( wikidata_dump_type=["form"], data_types="all", # because if only language given, data_types is None type_output_dir="scribe_data_json_export", # default for JSON + wikidata_dump_path=None, # explicitly set to None + overwrite_all=False, ) mock_query_data.assert_not_called() @@ -84,6 +86,7 @@ def test_get_all_languages_and_data_types(self, mock_parse): data_types="all", type_output_dir="scribe_data_json_export", wikidata_dump_path=None, + overwrite_all=False, ) # MARK: Language and Data Type @@ -264,8 +267,9 @@ def test_get_translations_no_language_specified(self, mock_parse): mock_parse.assert_called_once_with( language="all", wikidata_dump_type=["translations"], - type_output_dir="scribe_data_json_export", # default output dir for JSON + type_output_dir="scribe_data_json_export", wikidata_dump_path=None, + overwrite_all=False, ) @patch("scribe_data.cli.get.parse_wd_lexeme_dump") @@ -282,6 +286,7 @@ def test_get_translations_with_specific_language(self, mock_parse): wikidata_dump_type=["translations"], type_output_dir="./test_output", wikidata_dump_path=None, + overwrite_all=False, ) @patch("scribe_data.cli.get.parse_wd_lexeme_dump") @@ -297,6 +302,61 @@ def test_get_translations_with_dump(self, mock_parse): mock_parse.assert_called_once_with( language="German", wikidata_dump_type=["translations"], - type_output_dir="scribe_data_json_export", # default for JSON + type_output_dir="scribe_data_json_export", wikidata_dump_path="./wikidump.json", + overwrite_all=False, + ) + + # MARK: Use QID as language + + @patch("scribe_data.cli.get.parse_wd_lexeme_dump") + @patch("scribe_data.cli.get.questionary.confirm") + def test_get_data_with_wikidata_identifier( + self, mock_questionary_confirm, mock_parse + ): + """ + Test retrieving data with a Wikidata identifier as language. + + Ensures that `parse_wd_lexeme_dump` is called with the correct parameters + when a Wikidata identifier is used. + """ + # Mock the user confirmation to return True (query Wikidata directly). + mock_questionary_confirm.return_value.ask.return_value = False + + get_data( + language="Q9217", + wikidata_dump="scribe", + output_dir="exported_json", + all_bool=True, + ) + mock_parse.assert_called_once_with( + language="Q9217", + wikidata_dump_type=["form"], + data_types="all", + type_output_dir="exported_json", + wikidata_dump_path="scribe", + overwrite_all=False, + ) + + @patch("scribe_data.cli.get.parse_wd_lexeme_dump") + def test_get_data_with_wikidata_identifier_and_data_type(self, mock_parse): + """ + Test retrieving a specific data type with a Wikidata identifier. + + Ensures that `parse_wd_lexeme_dump` is called with the correct parameters + when a Wikidata identifier and specific data type are used. + """ + get_data( + language="Q9217", + data_type="nouns", + wikidata_dump="scribe", + output_dir="exported_json", + ) + mock_parse.assert_called_once_with( + language="Q9217", + wikidata_dump_type=["form"], + data_types=["nouns"], + type_output_dir="exported_json", + wikidata_dump_path="scribe", + overwrite_all=False, ) diff --git a/tests/cli/test_total.py b/tests/cli/test_total.py index 4f4a51071..2d6f56e95 100644 --- a/tests/cli/test_total.py +++ b/tests/cli/test_total.py @@ -257,3 +257,95 @@ def test_total_wrapper_language_and_data_type(self, mock_get_total_lexemes): def test_total_wrapper_invalid_input(self): with self.assertRaises(ValueError): total_wrapper() + + # MARK: Using Dump + + @patch("scribe_data.cli.total.parse_wd_lexeme_dump") + def test_total_wrapper_wikidata_dump_flag(self, mock_parse_dump): + """Test when wikidata_dump is True (flag without path)""" + total_wrapper(wikidata_dump=True) + mock_parse_dump.assert_called_once_with( + language=None, + data_types=None, + wikidata_dump_type=["total"], + wikidata_dump_path=None, + ) + + @patch("scribe_data.cli.total.parse_wd_lexeme_dump") + def test_total_wrapper_wikidata_dump_path(self, mock_parse_dump): + """Test when wikidata_dump is a file path""" + dump_path = "/path/to/dump.json" + total_wrapper(wikidata_dump=dump_path) + mock_parse_dump.assert_called_once_with( + language=None, + data_types=[None], + wikidata_dump_type=["total"], + wikidata_dump_path=dump_path, + ) + + @patch("scribe_data.cli.total.parse_wd_lexeme_dump") + def test_total_wrapper_wikidata_dump_with_all(self, mock_parse_dump): + """Test when both wikidata_dump and all_bool are True""" + total_wrapper(wikidata_dump=True, all_bool=True) + mock_parse_dump.assert_called_once_with( + language="all", + data_types="all", + wikidata_dump_type=["total"], + wikidata_dump_path=None, + ) + + @patch("scribe_data.cli.total.parse_wd_lexeme_dump") + def test_total_wrapper_wikidata_dump_with_language_and_type(self, mock_parse_dump): + """Test wikidata_dump with specific language and data type""" + total_wrapper( + language="English", data_type="nouns", wikidata_dump="/path/to/dump.json" + ) + mock_parse_dump.assert_called_once_with( + language="English", + data_types=["nouns"], + wikidata_dump_type=["total"], + wikidata_dump_path="/path/to/dump.json", + ) + + # MARK: Using QID + + @patch("scribe_data.cli.total.check_qid_is_language") + @patch("scribe_data.cli.total.print_total_lexemes") + def test_total_wrapper_with_qid(self, mock_print_total, mock_check_qid): + """ + Test when language is provided as a QID + """ + mock_check_qid.return_value = "Thai" + total_wrapper(language="Q9217") + mock_print_total.assert_called_once_with(language="Q9217") + + @patch("scribe_data.cli.total.check_qid_is_language") + @patch("scribe_data.cli.total.get_total_lexemes") + def test_total_wrapper_with_qid_and_datatype(self, mock_get_total, mock_check_qid): + """ + Test when language QID and data type are provided + """ + mock_check_qid.return_value = "Thai" + total_wrapper(language="Q9217", data_type="nouns") + mock_get_total.assert_called_once_with(language="Q9217", data_type="nouns") + + @patch("scribe_data.cli.total.parse_wd_lexeme_dump") + def test_total_wrapper_qid_with_wikidata_dump(self, mock_parse_dump): + """ + Test QID with wikidata dump + """ + total_wrapper(language="Q9217", wikidata_dump=True, all_bool=True) + mock_parse_dump.assert_called_once_with( + language="Q9217", + data_types="all", + wikidata_dump_type=["total"], + wikidata_dump_path=None, + ) + + @patch("scribe_data.cli.total.get_total_lexemes") + def test_get_total_lexemes_with_qid(self, mock_get_total): + """ + Test get_total_lexemes with QID input + """ + total_wrapper(language="Q9217", data_type="Q1084") # Q1084 is noun QID + mock_get_total.assert_called_once_with(language="Q9217", data_type="Q1084")