Files
mobile-pii-discovery-agent/RQs/RQ3/RQ3_t10_search_reduction.ipynb
2026-02-21 22:53:10 -05:00

424 lines
18 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "a30eef73",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ID Apps CandidateCols ColsScanned Reduc(%)\n",
"----------------------------------------------------------\n",
"A1 WhatsApp 1627 40 97.54%\n",
"A2 Snapchat 842 161 80.88%\n",
"A3 Telegram 1197 0 100.00%\n",
"A4 Google Maps 71 2 97.18%\n",
"A5 Samsung Internet 173 43 75.14%\n",
"I1 WhatsApp 328 44 86.59%\n",
"I2 Contacts 219 17 92.24%\n",
"I3 Apple Messages 181 39 78.45%\n",
"I4 Safari 72 0 100.00%\n",
"I5 Calendar 539 35 93.51%\n",
"\n",
"Validation report:\n",
" Rule: ColsScanned = unique union of source_columns across all PII types for the app\n",
"\n",
" A1: records=5, total_cols=1627, scanned_unique_cols=40, reduction=97.54%\n",
" A2: records=5, total_cols=842, scanned_unique_cols=161, reduction=80.88%\n",
" A3: records=5, total_cols=1197, scanned_unique_cols=0, reduction=100.00%\n",
" A4: records=5, total_cols=71, scanned_unique_cols=2, reduction=97.18%\n",
" A5: records=5, total_cols=173, scanned_unique_cols=43, reduction=75.14%\n",
" I1: records=5, total_cols=328, scanned_unique_cols=44, reduction=86.59%\n",
" I2: records=5, total_cols=219, scanned_unique_cols=17, reduction=92.24%\n",
" I3: records=5, total_cols=181, scanned_unique_cols=39, reduction=78.45%\n",
" I4: records=5, total_cols=72, scanned_unique_cols=0, reduction=100.00%\n",
" I5: records=5, total_cols=539, scanned_unique_cols=35, reduction=93.51%\n",
"\n",
"Wrote LaTeX: I:\\project2026\\llmagent\\RQs\\RQ3\\RQ2_search_space_reduction.tex\n"
]
}
],
"source": [
"import csv\n",
"import json\n",
"import re\n",
"from pathlib import Path\n",
"from collections import OrderedDict, defaultdict\n",
"from typing import Dict, List, Set, Tuple\n",
"\n",
"# -----------------------------\n",
"# INPUTS\n",
"# -----------------------------\n",
"CSV_PATH = Path(r\"app_total_columns.csv\")\n",
"JSONL_PATH = Path(r\"..\\normalized_PII_results\\GPT-5.1\\app_level\\app_level.jsonl\")\n",
"OUT_TEX = Path(\"RQ3_search_space_reduction.tex\")\n",
"\n",
"# Locked app order + labels (table ID and display name)\n",
"APP_NAME_PLAIN = OrderedDict([\n",
" (\"A1\", \"WhatsApp\"),\n",
" (\"A2\", \"Snapchat\"),\n",
" (\"A3\", \"Telegram\"),\n",
" (\"A4\", \"Google Maps\"),\n",
" (\"A5\", \"Samsung Internet\"),\n",
" (\"I1\", \"WhatsApp\"),\n",
" (\"I2\", \"Contacts\"),\n",
" (\"I3\", \"Apple Messages\"),\n",
" (\"I4\", \"Safari\"),\n",
" (\"I5\", \"Calendar\"),\n",
"])\n",
"\n",
"APP_CODE_RE = re.compile(r\"^(A|I)\\d+$\", re.IGNORECASE)\n",
"\n",
"# -----------------------------\n",
"# CORE RULE (CONFIRMED BY YOU)\n",
"# -----------------------------\n",
"# Cols Scanned (Extraction) = unique union of all source_columns across all PII types for that app.\n",
"\n",
"\n",
"def get_app_code_from_db_path(db_path: str) -> str:\n",
" \"\"\"\n",
" Extract app code from db_path.\n",
" Supports:\n",
" - \"selectedDBs\\\\A1\" -> A1\n",
" - \"selectedDBs\\\\A1_msgstore.db\" -> A1\n",
" - \"A1_something\" -> A1\n",
" - \"A1-something\" -> A1\n",
" Hard-fails if app code cannot be derived confidently.\n",
" \"\"\"\n",
" p = Path(db_path)\n",
"\n",
" # Prefer last path component (folder or filename stem)\n",
" last = p.name\n",
" stem = Path(last).stem\n",
"\n",
" # If last component itself is a code (e.g., folder named A1)\n",
" if APP_CODE_RE.match(last.strip()):\n",
" return last.strip().upper()\n",
" if APP_CODE_RE.match(stem.strip()):\n",
" return stem.strip().upper()\n",
"\n",
" # Split stem on separators\n",
" for sep in (\"_\", \"-\"):\n",
" if sep in stem:\n",
" candidate = stem.split(sep, 1)[0].strip()\n",
" if APP_CODE_RE.match(candidate):\n",
" return candidate.upper()\n",
"\n",
" raise ValueError(f\"Cannot derive app_code from db_path={db_path!r} (last={last!r}, stem={stem!r})\")\n",
"\n",
"\n",
"def read_candidate_totals(csv_path: Path) -> Dict[str, int]:\n",
" \"\"\"\n",
" Reads candidate totals. Hard-fails on missing required columns or bad ints.\n",
" Expects headers: app_code, total_columns (app_name is allowed but unused here).\n",
" \"\"\"\n",
" if not csv_path.exists():\n",
" raise FileNotFoundError(f\"CSV not found: {csv_path}\")\n",
"\n",
" totals: Dict[str, int] = {}\n",
" with csv_path.open(\"r\", encoding=\"utf-8\", newline=\"\") as f:\n",
" reader = csv.DictReader(f)\n",
" if reader.fieldnames is None:\n",
" raise ValueError(\"CSV has no header row\")\n",
"\n",
" required = {\"app_code\", \"total_columns\"}\n",
" missing = required - set(h.strip() for h in reader.fieldnames if h)\n",
" if missing:\n",
" raise ValueError(f\"CSV missing required columns: {sorted(missing)}. Found: {reader.fieldnames}\")\n",
"\n",
" for row_no, row in enumerate(reader, start=2):\n",
" code = (row.get(\"app_code\") or \"\").strip().upper()\n",
" tc = (row.get(\"total_columns\") or \"\").strip()\n",
" if not code:\n",
" raise ValueError(f\"CSV row {row_no}: empty app_code\")\n",
" if not APP_CODE_RE.match(code):\n",
" raise ValueError(f\"CSV row {row_no}: invalid app_code={code!r}\")\n",
" if not tc:\n",
" raise ValueError(f\"CSV row {row_no}: empty total_columns for app_code={code}\")\n",
" try:\n",
" total = int(tc)\n",
" except ValueError as e:\n",
" raise ValueError(f\"CSV row {row_no}: total_columns not int for app_code={code}: {tc!r}\") from e\n",
" if total < 0:\n",
" raise ValueError(f\"CSV row {row_no}: total_columns negative for app_code={code}: {total}\")\n",
" if code in totals:\n",
" raise ValueError(f\"CSV row {row_no}: duplicate app_code={code}\")\n",
" totals[code] = total\n",
"\n",
" return totals\n",
"\n",
"\n",
"def read_scanned_cols_from_app_jsonl(jsonl_path: Path) -> Tuple[Dict[str, int], Dict[str, int], Dict[str, Set[str]]]:\n",
" \"\"\"\n",
" Reads JSONL and computes:\n",
" - scanned_counts: app_code -> count(unique union of source_columns across all PII types)\n",
" - record_counts: app_code -> number of JSONL records observed for that app\n",
" - scanned_sets: app_code -> set of unique source_columns (for validation/inspection)\n",
"\n",
" Hard-fails on:\n",
" - malformed JSON\n",
" - missing/invalid db_path\n",
" - source_columns not list (if present)\n",
" - source_columns elements not strings\n",
" - cannot derive app_code\n",
" \"\"\"\n",
" if not jsonl_path.exists():\n",
" raise FileNotFoundError(f\"JSONL not found: {jsonl_path}\")\n",
"\n",
" record_counts: Dict[str, int] = defaultdict(int)\n",
" scanned_sets: Dict[str, Set[str]] = defaultdict(set)\n",
"\n",
" with jsonl_path.open(\"r\", encoding=\"utf-8\") as f:\n",
" for line_no, raw in enumerate(f, start=1):\n",
" line = raw.strip()\n",
" if not line:\n",
" continue\n",
"\n",
" try:\n",
" rec = json.loads(line)\n",
" except json.JSONDecodeError as e:\n",
" raise ValueError(f\"Bad JSON in {jsonl_path} line {line_no}: {e}\") from e\n",
"\n",
" if not isinstance(rec, dict):\n",
" raise ValueError(f\"JSONL line {line_no}: expected object/dict, got {type(rec).__name__}\")\n",
"\n",
" db_path = rec.get(\"db_path\", None)\n",
" if not isinstance(db_path, str) or not db_path.strip():\n",
" raise ValueError(f\"JSONL line {line_no}: missing/invalid db_path\")\n",
"\n",
" app = get_app_code_from_db_path(db_path)\n",
"\n",
" record_counts[app] += 1\n",
"\n",
" cols = rec.get(\"source_columns\", [])\n",
" if cols is None:\n",
" cols = []\n",
" if not isinstance(cols, list):\n",
" raise ValueError(f\"JSONL line {line_no}: source_columns must be a list, got {type(cols).__name__}\")\n",
"\n",
" for c in cols:\n",
" if not isinstance(c, str):\n",
" raise ValueError(f\"JSONL line {line_no}: source_columns contains non-string: {c!r}\")\n",
" s = c.strip()\n",
" if s:\n",
" scanned_sets[app].add(s)\n",
"\n",
" scanned_counts = {app: len(s) for app, s in scanned_sets.items()}\n",
" # Ensure apps with records but no columns show up with 0\n",
" for app in record_counts:\n",
" scanned_counts.setdefault(app, 0)\n",
" scanned_sets.setdefault(app, set())\n",
"\n",
" return scanned_counts, dict(record_counts), dict(scanned_sets)\n",
"\n",
"\n",
"def format_reduction(total: int, scanned: int) -> float:\n",
" \"\"\"\n",
" Reduction (%) = (1 - scanned/total) * 100\n",
" Hard-fails if total <= 0 or scanned < 0 or scanned > total (integrity).\n",
" \"\"\"\n",
" if total <= 0:\n",
" raise ValueError(f\"Invalid total_columns={total}; must be > 0 for reduction computation\")\n",
" if scanned < 0:\n",
" raise ValueError(f\"Invalid scanned={scanned}; must be >= 0\")\n",
" if scanned > total:\n",
" raise ValueError(f\"Integrity error: scanned ({scanned}) > total_columns ({total})\")\n",
" return (1.0 - (scanned / total)) * 100.0\n",
"\n",
"\n",
"def latex_escape(s: str) -> str:\n",
" \"\"\"\n",
" Minimal LaTeX escaping for safety.\n",
" \"\"\"\n",
" repl = {\n",
" \"\\\\\": r\"\\textbackslash{}\",\n",
" \"&\": r\"\\&\",\n",
" \"%\": r\"\\%\",\n",
" \"$\": r\"\\$\",\n",
" \"#\": r\"\\#\",\n",
" \"_\": r\"\\_\",\n",
" \"{\": r\"\\{\",\n",
" \"}\": r\"\\}\",\n",
" \"~\": r\"\\textasciitilde{}\",\n",
" \"^\": r\"\\textasciicircum{}\",\n",
" }\n",
" out = []\n",
" for ch in s:\n",
" out.append(repl.get(ch, ch))\n",
" return \"\".join(out)\n",
"\n",
"\n",
"def build_latex_table(candidate_totals: Dict[str, int], scanned_counts: Dict[str, int]) -> str:\n",
" lines: List[str] = []\n",
" lines.append(r\"\\begin{table}[th]\")\n",
" lines.append(r\"\\centering\")\n",
" lines.append(\n",
" r\"\\caption{Reduction of effective extraction space via hypothesis-driven planning.\"\n",
" r\" Total candidate columns are counted over all tables in the selected databases\"\n",
" r\" for each application, while reductions correspond to columns exhaustively scanned\"\n",
" r\" during row-level PII extraction.}\"\n",
" )\n",
" lines.append(r\"\\label{tab:search_space_reduction}\")\n",
" lines.append(r\"\\small\")\n",
" lines.append(r\"\\begin{tabular}{|l|l|p{1.3cm}|p{1.7cm}|p{1.0cm}|}\")\n",
" lines.append(r\"\\hline\")\n",
" lines.append(\n",
" r\"\\textbf{ID} & \\textbf{Apps} & \\textbf{Candidate Cols (Total)} & \"\n",
" r\"\\textbf{Cols Scanned (Extraction)} & \\textbf{Reduc. (\\%)} \\\\\"\n",
" )\n",
" lines.append(r\"\\hline\")\n",
"\n",
" # Deterministic order: use APP_NAME_PLAIN, then any extras (should not happen, but handled)\n",
" app_order = list(APP_NAME_PLAIN.keys())\n",
" extras = sorted(set(candidate_totals) - set(app_order))\n",
" app_order += extras\n",
"\n",
" for app in app_order:\n",
" if app not in candidate_totals:\n",
" continue\n",
"\n",
" app_name = APP_NAME_PLAIN.get(app, app)\n",
" total = candidate_totals[app]\n",
" scanned = int(scanned_counts.get(app, 0))\n",
"\n",
" reduc = format_reduction(total, scanned)\n",
"\n",
" lines.append(\n",
" f\"{latex_escape(app)} & {latex_escape(app_name)} & {total} & {scanned} & {reduc:.2f}\\\\% \\\\\\\\\"\n",
" )\n",
" lines.append(r\"\\hline\")\n",
"\n",
" lines.append(r\"\\end{tabular}\")\n",
" lines.append(r\"\\end{table}\")\n",
" return \"\\n\".join(lines)\n",
"\n",
"\n",
"def build_plaintext_table(candidate_totals: Dict[str, int], scanned_counts: Dict[str, int]) -> str:\n",
" headers = [\"ID\", \"Apps\", \"CandidateCols\", \"ColsScanned\", \"Reduc(%)\"]\n",
"\n",
" app_order = list(APP_NAME_PLAIN.keys())\n",
" extras = sorted(set(candidate_totals) - set(app_order))\n",
" app_order += extras\n",
"\n",
" rows = [headers]\n",
" for app in app_order:\n",
" if app not in candidate_totals:\n",
" continue\n",
" app_name = APP_NAME_PLAIN.get(app, app)\n",
" total = candidate_totals[app]\n",
" scanned = int(scanned_counts.get(app, 0))\n",
" reduc = format_reduction(total, scanned)\n",
" rows.append([app, app_name, str(total), str(scanned), f\"{reduc:.2f}%\"])\n",
"\n",
" widths = [0] * len(headers)\n",
" for r in rows:\n",
" for i, cell in enumerate(r):\n",
" widths[i] = max(widths[i], len(cell))\n",
"\n",
" def fmt_row(r):\n",
" parts = []\n",
" for i, cell in enumerate(r):\n",
" if i in (0, 1):\n",
" parts.append(cell.ljust(widths[i]))\n",
" else:\n",
" parts.append(cell.rjust(widths[i]))\n",
" return \" \".join(parts)\n",
"\n",
" out = [fmt_row(rows[0]), \"-\" * len(fmt_row(rows[0]))]\n",
" for r in rows[1:]:\n",
" out.append(fmt_row(r))\n",
" return \"\\n\".join(out)\n",
"\n",
"\n",
"def build_validation_report(\n",
" candidate_totals: Dict[str, int],\n",
" scanned_counts: Dict[str, int],\n",
" record_counts: Dict[str, int],\n",
") -> str:\n",
" \"\"\"\n",
" Prints per-app sanity stats.\n",
" Hard-fails already happen in format_reduction (scanned > total).\n",
" \"\"\"\n",
" lines: List[str] = []\n",
" lines.append(\"Validation report:\")\n",
" lines.append(\" Rule: ColsScanned = unique union of source_columns across all PII types for the app\")\n",
" lines.append(\"\")\n",
"\n",
" app_order = list(APP_NAME_PLAIN.keys())\n",
" extras = sorted(set(candidate_totals) - set(app_order))\n",
" app_order += extras\n",
"\n",
" for app in app_order:\n",
" if app not in candidate_totals:\n",
" continue\n",
" total = candidate_totals[app]\n",
" scanned = int(scanned_counts.get(app, 0))\n",
" recs = int(record_counts.get(app, 0))\n",
" reduc = format_reduction(total, scanned)\n",
" lines.append(f\" {app}: records={recs}, total_cols={total}, scanned_unique_cols={scanned}, reduction={reduc:.2f}%\")\n",
"\n",
" # Also warn if JSONL contains apps not in CSV (not fatal, but surfaced)\n",
" extra_jsonl_apps = sorted(set(record_counts) - set(candidate_totals))\n",
" if extra_jsonl_apps:\n",
" lines.append(\"\")\n",
" lines.append(\" WARNING: JSONL contains app_codes not present in CSV:\")\n",
" for a in extra_jsonl_apps:\n",
" lines.append(f\" - {a} (records={record_counts.get(a,0)}, scanned={scanned_counts.get(a,0)})\")\n",
"\n",
" return \"\\n\".join(lines)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" candidate_totals = read_candidate_totals(CSV_PATH)\n",
" scanned_counts, record_counts, _scanned_sets = read_scanned_cols_from_app_jsonl(JSONL_PATH)\n",
"\n",
" # Ensure every CSV app code is present in scanned_counts (0 if none)\n",
" for app in candidate_totals:\n",
" scanned_counts.setdefault(app, 0)\n",
" record_counts.setdefault(app, 0)\n",
"\n",
" # Build outputs (integrity checks happen inside format_reduction)\n",
" tex = build_latex_table(candidate_totals, scanned_counts)\n",
" OUT_TEX.write_text(tex, encoding=\"utf-8\")\n",
"\n",
" print(build_plaintext_table(candidate_totals, scanned_counts))\n",
" print()\n",
" print(build_validation_report(candidate_totals, scanned_counts, record_counts))\n",
" print(f\"\\nWrote LaTeX: {OUT_TEX.resolve()}\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a0a0cd2a",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "bnl",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}