| import argparse |
| import ast |
| import json |
| import random |
| from pathlib import Path |
| from typing import Any, Dict, Optional |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| from modelling.utils import load_json |
|
|
|
|
| def to_jsonable(value: Any) -> Any: |
| if value is None: |
| return None |
| if isinstance(value, float) and pd.isna(value): |
| return None |
| if isinstance(value, np.generic): |
| return value.item() |
| return value |
|
|
|
|
| def parse_optional_int(value: Optional[str]) -> Optional[int]: |
| if value is None: |
| return None |
| value = str(value).strip().lower() |
| if value in {"", "none", "null", "random"}: |
| return None |
| return int(value) |
|
|
|
|
| def choose_row_index(num_rows: int, row_index: Optional[int], seed: int) -> int: |
| if num_rows <= 0: |
| raise RuntimeError("CSV has no rows") |
| if row_index is None: |
| return random.Random(seed).randrange(num_rows) |
| if row_index < 0 or row_index >= num_rows: |
| raise IndexError(f"row_index out of range: {row_index}; num_rows={num_rows}") |
| return row_index |
|
|
|
|
| def validate_ratio(name: str, value: float) -> float: |
| value = float(value) |
| if not 0.0 <= value <= 1.0: |
| raise ValueError(f"{name} must be in [0, 1], got {value}") |
| return value |
|
|
|
|
| def load_json_if_exists(path: Optional[str]) -> Optional[Dict[str, Any]]: |
| if not path: |
| return None |
| p = Path(path) |
| if not p.exists() or not p.is_file(): |
| return None |
| return load_json(str(p)) |
|
|
|
|
| def get_categorical_columns(config_data: Dict[str, Any]) -> list[str]: |
| cat_vocab = load_json_if_exists(config_data.get("cat_vocab_path")) |
| if not isinstance(cat_vocab, dict): |
| return [] |
| return list(cat_vocab.keys()) |
|
|
|
|
| def get_numeric_columns(config_data: Dict[str, Any]) -> list[str]: |
| numeric_vocab = load_json_if_exists(config_data.get("numeric_vocab_path")) |
| if not isinstance(numeric_vocab, dict): |
| return [] |
|
|
| columns: list[str] = [] |
| for group in numeric_vocab.get("groups", []): |
| for name in group.get("feature_names", []): |
| columns.append(str(name)) |
| return columns |
|
|
|
|
| def get_vision_input(config_data: Dict[str, Any], row: Dict[str, Any]) -> Dict[str, Any]: |
| photo_map = load_json_if_exists(config_data.get("photo_map_path")) |
| id_column = str(config_data.get("id_column", "id")) |
| sample_id = row.get(id_column) |
|
|
| if not isinstance(photo_map, dict) or sample_id is None: |
| return {"image_path_suffix": ""} |
|
|
| relative_path = photo_map.get(sample_id) |
| if relative_path is None: |
| relative_path = photo_map.get(str(sample_id)) |
|
|
| if relative_path is None or relative_path == "": |
| return {"image_path_suffix": ""} |
|
|
| return {"image_path_suffix": str(relative_path)} |
|
|
|
|
| def parse_numeric_value(value: Any) -> Any: |
| """ |
| Convert known numeric CSV cells into readable JSON numbers. |
| |
| Loader convention: |
| - missing numeric cell is "" |
| - scalar numeric cell is something like "12.3" |
| - vector numeric cell is something like "[1.2, 3.4]" |
| """ |
| value = to_jsonable(value) |
|
|
| if value == "" or value is None: |
| return "" |
|
|
| if isinstance(value, (int, float)) and not isinstance(value, bool): |
| return value |
|
|
| if isinstance(value, str): |
| s = value.strip() |
| if s == "": |
| return "" |
|
|
| if s.startswith("[") and s.endswith("]"): |
| parsed = ast.literal_eval(s) |
| if not isinstance(parsed, (list, tuple)): |
| raise ValueError(f"Expected numeric vector list, got: {value!r}") |
| return [float(x) for x in parsed] |
|
|
| return float(s) |
|
|
| return value |
|
|
|
|
| def create_unmasked_card( |
| row: Dict[str, Any], |
| cat_columns: list[str], |
| numeric_columns: list[str], |
| vision: Dict[str, Any], |
| ) -> Dict[str, Any]: |
| categorical = {col: row.get(col, "") for col in cat_columns if col in row} |
| numeric = { |
| col: parse_numeric_value(row.get(col, "")) |
| for col in numeric_columns |
| if col in row |
| } |
|
|
| return { |
| "categorical": categorical, |
| "numeric": numeric, |
| "vision": vision, |
| } |
|
|
|
|
| def choose_mask_keys(values: Dict[str, Any], ratio: float, rng: random.Random) -> list[str]: |
| valid_keys = [k for k, v in values.items() if v not in ("", None)] |
| if ratio <= 0.0 or not valid_keys: |
| return [] |
|
|
| k = int(round(len(valid_keys) * ratio)) |
| k = max(0, min(k, len(valid_keys))) |
| if k == 0: |
| return [] |
|
|
| return rng.sample(valid_keys, k) |
|
|
|
|
| def create_masked_card( |
| unmasked_card: Dict[str, Any], |
| cat_mask_ratio: float, |
| num_mask_ratio: float, |
| seed: int, |
| ) -> Dict[str, Any]: |
| rng = random.Random(seed) |
| masked = json.loads(json.dumps(unmasked_card, ensure_ascii=False)) |
|
|
| cat_keys = choose_mask_keys(masked["categorical"], cat_mask_ratio, rng) |
| num_keys = choose_mask_keys(masked["numeric"], num_mask_ratio, rng) |
|
|
| for key in cat_keys: |
| masked["categorical"][key] = None |
|
|
| for key in num_keys: |
| masked["numeric"][key] = None |
|
|
| return masked |
|
|
|
|
| def output_paths_from_given_name(given_name: str) -> tuple[Path, Path]: |
| path = Path(given_name) |
| base = path.with_suffix("") if path.suffix == ".json" else path |
|
|
| unmasked_path = base.with_name(base.name + "__unmasked.json") |
| masked_path = base.with_name(base.name + "__masked.json") |
| return unmasked_path, masked_path |
|
|
|
|
| def create_cards( |
| config_data_path: str, |
| row_index: Optional[int], |
| seed: int, |
| cat_mask_ratio: float, |
| num_mask_ratio: float, |
| ) -> tuple[Dict[str, Any], Dict[str, Any]]: |
| config_data = load_json(config_data_path) |
| csv_path = config_data["data_csv_path"] |
|
|
| |
| df = pd.read_csv( |
| csv_path, |
| keep_default_na=False, |
| na_filter=False, |
| low_memory=False, |
| ) |
|
|
| chosen_row_index = choose_row_index( |
| num_rows=len(df), |
| row_index=row_index, |
| seed=seed, |
| ) |
|
|
| row = { |
| str(k): to_jsonable(v) |
| for k, v in df.iloc[chosen_row_index].to_dict().items() |
| } |
|
|
| cat_columns = get_categorical_columns(config_data) |
| numeric_columns = get_numeric_columns(config_data) |
| vision = get_vision_input(config_data, row) |
|
|
| unmasked_card = create_unmasked_card( |
| row=row, |
| cat_columns=cat_columns, |
| numeric_columns=numeric_columns, |
| vision=vision, |
| ) |
| masked_card = create_masked_card( |
| unmasked_card=unmasked_card, |
| cat_mask_ratio=cat_mask_ratio, |
| num_mask_ratio=num_mask_ratio, |
| seed=seed, |
| ) |
|
|
| return unmasked_card, masked_card |
|
|
|
|
| def save_json_pretty(obj: Dict[str, Any], path: Path) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| json.dump(obj, f, ensure_ascii=False, indent=2) |
| f.write("\n") |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Create readable/editable SoilFormer input cards from one CSV row." |
| ) |
| parser.add_argument( |
| "--config_data", |
| type=str, |
| default="config/config_data.json", |
| help="Path to config_data.json. Default: config/config_data.json", |
| ) |
| parser.add_argument( |
| "--row_index", |
| type=str, |
| default=None, |
| help="CSV row index. Use None/null/random or omit for a random row.", |
| ) |
| parser.add_argument( |
| "--output", |
| type=str, |
| required=True, |
| help="Given output name. Writes given_name__unmasked.json and given_name__masked.json.", |
| ) |
| parser.add_argument( |
| "--cat_mask_ratio", |
| type=float, |
| default=0.15, |
| help="Ratio of non-missing categorical features to mask. Default: 0.15", |
| ) |
| parser.add_argument( |
| "--num_mask_ratio", |
| type=float, |
| default=0.15, |
| help="Ratio of non-missing numeric features to mask. Default: 0.15", |
| ) |
| parser.add_argument( |
| "--seed", |
| type=int, |
| default=0, |
| help="Seed for random row selection and feature masking. Default: 42", |
| ) |
| args = parser.parse_args() |
|
|
| cat_mask_ratio = validate_ratio("cat_mask_ratio", args.cat_mask_ratio) |
| num_mask_ratio = validate_ratio("num_mask_ratio", args.num_mask_ratio) |
|
|
| unmasked_card, masked_card = create_cards( |
| config_data_path=args.config_data, |
| row_index=parse_optional_int(args.row_index), |
| seed=args.seed, |
| cat_mask_ratio=cat_mask_ratio, |
| num_mask_ratio=num_mask_ratio, |
| ) |
|
|
| unmasked_path, masked_path = output_paths_from_given_name(args.output) |
| save_json_pretty(unmasked_card, unmasked_path) |
| save_json_pretty(masked_card, masked_path) |
|
|
| print( |
| json.dumps( |
| { |
| "status": "ok", |
| "unmasked_output": str(unmasked_path), |
| "masked_output": str(masked_path), |
| }, |
| ensure_ascii=False, |
| ) |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|