mirror of
https://github.com/frankwxu/mobile-pii-discovery-agent.git
synced 2026-04-10 12:13:44 +00:00
add automated process (folder level)
This commit is contained in:
80
sql_utils.py
80
sql_utils.py
@@ -1,7 +1,8 @@
|
||||
import re
|
||||
import json
|
||||
import sys
|
||||
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def extract_tables_with_aliases(select_sql: str) -> dict[str, str]:
|
||||
@@ -270,4 +271,79 @@ def extract_select_columns(select_sql: str) -> list[str]:
|
||||
# Take the final identifier
|
||||
columns.append(item.split()[-1])
|
||||
|
||||
return columns
|
||||
return columns
|
||||
|
||||
|
||||
def is_sqlite_file(p: Path) -> bool:
|
||||
try:
|
||||
with p.open("rb") as f:
|
||||
return f.read(16) == b"SQLite format 3\x00"
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
import importlib.util
|
||||
from typing import List, Tuple
|
||||
|
||||
def load_db_files_list(py_path: Path, var_name: str = "db_files") -> List[str]:
|
||||
"""Load a list variable (default: db_files) from a .py file."""
|
||||
spec = importlib.util.spec_from_file_location(py_path.stem, py_path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise ValueError(f"Cannot load module from {py_path}")
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod) # type: ignore
|
||||
|
||||
if not hasattr(mod, var_name):
|
||||
raise AttributeError(f"{py_path} does not define `{var_name}`")
|
||||
value = getattr(mod, var_name)
|
||||
if not isinstance(value, list):
|
||||
raise TypeError(f"`{var_name}` must be a list, got {type(value)}")
|
||||
return value
|
||||
|
||||
def build_db_paths(
|
||||
db_dir: Path,
|
||||
db_files: List[str],
|
||||
is_sqlite_fn,
|
||||
) -> Tuple[List[Path], List[str], List[str]]:
|
||||
"""
|
||||
Build ordered paths from filenames, skipping missing and non-sqlite.
|
||||
Returns (db_paths, missing, not_sqlite).
|
||||
"""
|
||||
db_paths: List[Path] = []
|
||||
missing: List[str] = []
|
||||
not_sqlite: List[str] = []
|
||||
|
||||
for name in db_files:
|
||||
p = db_dir / name
|
||||
if not p.exists():
|
||||
missing.append(str(p))
|
||||
continue
|
||||
if not is_sqlite_fn(p):
|
||||
not_sqlite.append(str(p))
|
||||
continue
|
||||
db_paths.append(p)
|
||||
|
||||
return db_paths, missing, not_sqlite
|
||||
|
||||
def print_db_path_report(db_paths: List[Path], missing: List[str], not_sqlite: List[str]) -> None:
|
||||
print(f"Will process {len(db_paths)} databases (from db_files list).")
|
||||
if missing:
|
||||
print("Missing files:")
|
||||
for x in missing:
|
||||
print(" -", x)
|
||||
if not_sqlite:
|
||||
print("Not SQLite (bad header):")
|
||||
for x in not_sqlite:
|
||||
print(" -", x)
|
||||
|
||||
def save_jsonl(all_results, out_dir):
|
||||
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||
out_path = out_dir / f"evidence_{ts}.jsonl"
|
||||
|
||||
with out_path.open("w", encoding="utf-8") as f:
|
||||
for r in all_results:
|
||||
f.write(json.dumps(r, ensure_ascii=False) + "\n")
|
||||
|
||||
print(f"Wrote: {out_path.resolve()}")
|
||||
return out_path
|
||||
Reference in New Issue
Block a user