diff --git a/imports/cleaner.py b/imports/cleaner.py index e01c191..b8134fa 100644 --- a/imports/cleaner.py +++ b/imports/cleaner.py @@ -1,33 +1,22 @@ #!/usr/bin/env python3 """ -Clean and normalize `illustrations_seed.csv` -> `illustrations_clean.csv`. +Content-preserving CSV sanitizer for 'illustrations_seed.csv' -> 'illustrations_clean.csv'. -Key behaviors: -- Robustly parse CSV with many commas and embedded quotes/newlines. -- Prepass to normalize smart quotes/non-breaking spaces before parsing. -- Quote ALL fields on output to guarantee importer-friendly CSV. -- Subject: trim parts, drop empties, remove trailing comma, rejoin with ", ". -- Scripture: remove trailing semicolons, normalize common book abbreviations. -- Dates: accept flexible M/D/YY, M/D/YYYY, etc.; output ISO YYYY-MM-DD. -- Talk Number / Code: numeric if possible; blank if invalid. -- Write rejects to `illustrations_rejects.csv` with a reason, if any. -- Post-write self-check: verifies column count of every row. +What it does (and ONLY this by default): +- Parses your CSV safely. +- Writes a new CSV where EVERY FIELD is QUOTED and any inner " become "". +- Keeps the exact text of every field (no trimming, no subject/scripture tweaking, no punctuation edits). +- Keeps the exact column order expected by your importer. -Usage: - python3 clean_illustrations_csv.py \ - --in illustrations_seed.csv \ - --out illustrations_clean.csv \ - --rejects illustrations_rejects.csv +Optional flag --normalize-dates will convert Date and Date Edited to ISO YYYY-MM-DD. """ import argparse import csv -import io import os -import re from datetime import datetime -HEADER = [ +EXPECTED_HEADER = [ "Subject", "Illustration", "Application", @@ -40,280 +29,76 @@ HEADER = [ "Date Edited", ] -# Common scripture book abbreviation normalization. -BOOK_MAP = { - # New Testament (common abbreviations) - "rom": "Romans", - "romans": "Romans", - "eph": "Ephesians", - "ephesians": "Ephesians", - "col": "Colossians", - "colossians": "Colossians", - "1 cor": "1 Corinthians", - "2 cor": "2 Corinthians", - "1 thess": "1 Thessalonians", - "2 thess": "2 Thessalonians", - "1 tim": "1 Timothy", - "2 tim": "2 Timothy", - "1 pet": "1 Peter", - "2 pet": "2 Peter", - "1 john": "1 John", - "2 john": "2 John", - "3 john": "3 John", - "heb": "Hebrews", - "rev": "Revelation", - "revelation": "Revelation", - "acts": "Acts", - "matt": "Matthew", - "mt": "Matthew", - "mark": "Mark", - "mk": "Mark", - "luke": "Luke", - "lk": "Luke", - "john": "John", - "jn": "John", - "jude": "Jude", - "phil": "Philippians", - "php": "Philippians", - "philem": "Philemon", - "titus": "Titus", - "gal": "Galatians", - "galatians": "Galatians", - "james": "James", - "jas": "James", - - # Old Testament (examples + the ones in your sample) - "eccl": "Ecclesiastes", - "eccles": "Ecclesiastes", - "ecclesiastes": "Ecclesiastes", - "ps": "Psalms", - "psalm": "Psalms", - "psalms": "Psalms", - "prov": "Proverbs", - "proverbs": "Proverbs", - "gen": "Genesis", - "genesis": "Genesis", - "ex": "Exodus", - "exod": "Exodus", - "exodus": "Exodus", - "isa": "Isaiah", - "isaiah": "Isaiah", - "jer": "Jeremiah", - "jeremiah": "Jeremiah", - "dan": "Daniel", - "daniel": "Daniel", -} - -DATE_FORMATS = [ - "%m/%d/%y", - "%m/%d/%Y", - "%-m/%-d/%y", # on Linux/mac - "%-m/%-d/%Y", - "%m/%-d/%y", - "%-m/%d/%y", - "%m/%-d/%Y", - "%-m/%d/%Y", -] - -NBSP = "\u00A0" -SMARTS = { - "\u201C": '"', # left double - "\u201D": '"', # right double - "\u2018": "'", # left single - "\u2019": "'", # right single / apostrophe - "\u00AB": '"', # « - "\u00BB": '"', # » -} - -def pre_normalize_text(raw: str) -> str: - """Prepass to remove non-breaking spaces and normalize smart quotes.""" - if raw is None: - return "" - s = str(raw).replace(NBSP, " ") - for k, v in SMARTS.items(): - s = s.replace(k, v) - # Normalize any stray CR-only line endings - s = s.replace("\r\n", "\n").replace("\r", "\n") - return s - -def parse_date(value: str) -> str: - """Parse flexible US-style dates; return ISO YYYY-MM-DD or '' if empty/invalid.""" - if not value: - return "" - v = value.strip() +def parse_date_flex(v: str) -> str: + """Very permissive date parser; returns ISO or original string if parsing fails.""" + v = (v or "").strip() if not v: return "" - # Common separators already handled; try multiple formats. - for fmt in DATE_FORMATS: + fmts = ["%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"] + for fmt in fmts: try: - dt = datetime.strptime(v, fmt) - # Heuristic for 2-digit years if needed (datetime handles 1900s/2000s defaults) - # We trust strptime here; user examples are 2000s. - return dt.strftime("%Y-%m-%d") + return datetime.strptime(v, fmt).strftime("%Y-%m-%d") except Exception: pass - # Try to interpret like M/D/YY or M/D/YYYY with flexible spacing - m = re.match(r"^\s*(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})\s*$", v) - if m: - mth, day, yr = int(m.group(1)), int(m.group(2)), m.group(3) - if len(yr) == 2: - # Assume 20xx for 00-69, 19xx for 70-99 (datetime default logic) - year = 2000 + int(yr) if int(yr) <= 69 else 1900 + int(yr) - else: - year = int(yr) - try: - return datetime(year, mth, day).strftime("%Y-%m-%d") - except Exception: - return "" - return "" - -BOOK_RE = re.compile(r"^\s*([1-3]?\s*[A-Za-z\.]+)\s*(.*)$") - -def normalize_scripture(value: str) -> str: - """Normalize scripture strings: strip trailing semicolons/spaces, normalize book name if easily detectable.""" - if not value: - return "" - s = value.strip() - # Remove trailing semicolons and excess punctuation/spaces. - s = re.sub(r"[;,\s]+$", "", s) - - # Try to normalize the *first* book token if identifiable. - m = BOOK_RE.match(s) - if not m: - return s - book_raw, rest = m.group(1), m.group(2) - - # normalize book key - key = book_raw.lower().replace(".", "") - key = re.sub(r"\s+", " ", key).strip() - # normalize ordinal spacing: "1cor" -> "1 cor" - key = re.sub(r"^([1-3])([a-z])", r"\1 \2", key) - - book = BOOK_MAP.get(key, None) - if not book: - # Title-case fallback (basic) - book = " ".join(w.capitalize() for w in key.split()) - - rest = rest.strip() - # Normalize spacing in the chapter/verse segment (e.g., "14:13, 19") - rest = re.sub(r"\s*,\s*", ", ", rest) - rest = re.sub(r"\s*;\s*", "; ", rest) - rest = re.sub(r"\s+", " ", rest) - - out = (book + (" " + rest if rest else "")).strip() - return out - -def clean_subject(value: str) -> str: - """Trim parts, drop empty entries, remove trailing commas/spaces, re-join with ', '.""" - if not value: - return "" - # Strip external quotes handled by csv; here we just process content - s = value.strip() - # Split by comma, trim each token, drop empties - parts = [p.strip() for p in s.split(",")] - parts = [p for p in parts if p] # drop empty tokens - # Re-join - return ", ".join(parts) - -def to_int_or_blank(value: str) -> str: - if value is None: - return "" - v = str(value).strip() - if v == "": - return "" - # Strip non-digit chars (but keep minus? not needed here) - v2 = re.sub(r"[^0-9-]+", "", v) - if v2 in ("", "-", "--"): - return "" + # last-ditch: split by / or - try: - int(v2) - return v2 + parts = [p.strip() for p in v.replace("-", "/").split("/")] + if len(parts) == 3: + m, d, y = parts + y = int(y) + m = int(m) + d = int(d) + return datetime(y if y > 99 else (2000 + y if y <= 69 else 1900 + y), m, d).strftime("%Y-%m-%d") except Exception: - return "" + pass + return v # preserve original if we can't confidently parse -def normalize_row(row_dict, stats, rownum): - """Return (cleaned_row_dict, reject_reason_or_None).""" - clean = {} +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv") + ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv") + ap.add_argument("--normalize-dates", action="store_true", + help="If set, convert Date and Date Edited to ISO YYYY-MM-DD. Otherwise leave as-is.") + args = ap.parse_args() - # Subject - subject = row_dict.get("Subject", "") - subject = clean_subject(subject) - clean["Subject"] = subject + if not os.path.exists(args.in_path): + raise SystemExit(f"Input file not found: {args.in_path}") - # Illustration - ill = pre_normalize_text(row_dict.get("Illustration", "")).strip() - clean["Illustration"] = ill - - # Application - app = pre_normalize_text(row_dict.get("Application", "")).strip() - clean["Application"] = app - - # Scripture - scr = pre_normalize_text(row_dict.get("Scripture", "")).strip() - scr_norm = normalize_scripture(scr) - if scr and scr != scr_norm: - stats["scripture_changed"] += 1 - clean["Scripture"] = scr_norm - - # Source - src = pre_normalize_text(row_dict.get("Source", "")).strip() - clean["Source"] = src - - # Talk Title - ttitle = pre_normalize_text(row_dict.get("Talk Title", "")).strip() - clean["Talk Title"] = ttitle - - # Talk Number - tnum = to_int_or_blank(row_dict.get("Talk Number", "")) - if tnum == "" and str(row_dict.get("Talk Number", "")).strip() not in ("",): - stats["invalid_ints"] += 1 - clean["Talk Number"] = tnum - - # Code - code = to_int_or_blank(row_dict.get("Code", "")) - if code == "" and str(row_dict.get("Code", "")).strip() not in ("",): - stats["invalid_ints"] += 1 - clean["Code"] = code - - # Date - date_raw = pre_normalize_text(row_dict.get("Date", "")).strip() - date_norm = parse_date(date_raw) - if date_raw and not date_norm: - stats["invalid_dates"] += 1 - elif date_norm: - stats["dates_normalized"] += 1 - clean["Date"] = date_norm - - # Date Edited - datee_raw = pre_normalize_text(row_dict.get("Date Edited", "")).strip() - datee_norm = parse_date(datee_raw) - if datee_raw and not datee_norm: - stats["invalid_dates"] += 1 - elif datee_norm: - stats["dates_normalized"] += 1 - clean["Date Edited"] = datee_norm - - # Reject logic: If the row is completely empty across all known fields, skip. - if not any(clean.get(h, "").strip() for h in HEADER): - return clean, "empty_row" - - return clean, None - -def read_with_prepass(path): - """Read entire file, pre-normalize text, then parse CSV via csv.DictReader.""" - with open(path, "r", encoding="utf-8-sig", newline="") as f: - raw = f.read() - normalized = pre_normalize_text(raw) - buf = io.StringIO(normalized) - reader = csv.DictReader(buf, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) - return reader - -def write_csv(rows, out_path): - """Write rows (list of dicts) with QUOTE_ALL to ensure commas/newlines are safe.""" - with open(out_path, "w", encoding="utf-8", newline="") as f: - writer = csv.DictWriter( + # Read using DictReader; accept whatever header is present but verify shape. + with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader( f, - fieldnames=HEADER, + delimiter=",", + quotechar='"', + doublequote=True, + escapechar="\\", + strict=False, + ) + input_header = reader.fieldnames or [] + missing = [h for h in EXPECTED_HEADER if h not in input_header] + if missing: + print(f"[WARN] Input CSV missing columns: {missing}") + print(f" Found columns: {input_header}") + + rows_out = [] + total = 0 + for row in reader: + total += 1 + # Build output row strictly in EXPECTED_HEADER order, preserving raw strings. + out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER} + + # Optional date normalization (ONLY dates; no commas involved) + if args.normalize_dates: + for dh in ("Date", "Date Edited"): + out[dh] = parse_date_flex(out.get(dh, "")) + + rows_out.append(out) + + # Write with QUOTE_ALL so commas/newlines/quotes never break columns. + with open(args.out_path, "w", encoding="utf-8", newline="") as f: + w = csv.DictWriter( + f, + fieldnames=EXPECTED_HEADER, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, @@ -321,113 +106,27 @@ def write_csv(rows, out_path): escapechar="\\", lineterminator="\n", ) - writer.writeheader() - for r in rows: - writer.writerow({k: r.get(k, "") for k in HEADER}) + w.writeheader() + for r in rows_out: + w.writerow(r) -def self_check_csv(path): - """Verify column count on every row equals header length.""" + # Quick self-check: re-read output and ensure fixed column count problems = [] - with open(path, "r", encoding="utf-8", newline="") as f: - reader = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) - expected = None - rownum = 0 - for row in reader: - rownum += 1 - if rownum == 1: - expected = len(row) + with open(args.out_path, "r", encoding="utf-8", newline="") as f: + rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) + expected_cols = None + for i, row in enumerate(rdr, start=1): + if i == 1: + expected_cols = len(row) continue - if len(row) != expected: - problems.append((rownum, len(row), expected)) - return problems + if len(row) != expected_cols: + problems.append((i, len(row), expected_cols)) -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv") - ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv") - ap.add_argument("--rejects", dest="rejects_path", default="illustrations_rejects.csv") - args = ap.parse_args() - - in_path = args.in_path - out_path = args.out_path - rejects_path = args.rejects_path - - if not os.path.exists(in_path): - raise SystemExit(f"Input file not found: {in_path}") - - stats = { - "total_rows": 0, - "written_rows": 0, - "reject_rows": 0, - "scripture_changed": 0, - "invalid_ints": 0, - "invalid_dates": 0, - "dates_normalized": 0, - } - - rejects = [] - cleaned_rows = [] - - reader = read_with_prepass(in_path) - - # Validate header presence/shape - missing = [h for h in HEADER if h not in reader.fieldnames] - if missing: - raise SystemExit(f"Input CSV missing expected headers: {missing}\nFound headers: {reader.fieldnames}") - - for idx, row in enumerate(reader, start=2): # start=2 because header is line 1 - stats["total_rows"] += 1 - try: - cleaned, reason = normalize_row(row, stats, idx) - if reason: - stats["reject_rows"] += 1 - r = {k: row.get(k, "") for k in HEADER} - r["reason"] = reason - rejects.append(r) - continue - cleaned_rows.append(cleaned) - except Exception as e: - stats["reject_rows"] += 1 - r = {k: row.get(k, "") for k in HEADER} - r["reason"] = f"exception@row {idx}: {e}" - rejects.append(r) - - # Write outputs - write_csv(cleaned_rows, out_path) - - # Write rejects if any - if rejects: - # Ensure 'reason' is the last column for readability - rej_header = HEADER + ["reason"] - with open(rejects_path, "w", encoding="utf-8", newline="") as f: - writer = csv.DictWriter( - f, - fieldnames=rej_header, - delimiter=",", - quotechar='"', - quoting=csv.QUOTE_ALL, - doublequote=True, - escapechar="\\", - lineterminator="\n", - ) - writer.writeheader() - for r in rejects: - writer.writerow({k: r.get(k, "") for k in rej_header}) - - # Self check the written CSV - problems = self_check_csv(out_path) - - # Summary print("=== Clean Summary ===") - print(f"Input rows (excluding header): {stats['total_rows']}") - print(f"Written rows: {stats['written_rows'] + len(cleaned_rows)}") - print(f"Reject rows: {stats['reject_rows']}") - print(f"Scripture normalized: {stats['scripture_changed']}") - print(f"Dates normalized: {stats['dates_normalized']}") - print(f"Invalid ints blanked: {stats['invalid_ints']}") - print(f"Invalid dates (left blank): {stats['invalid_dates']}") + print(f"Input rows (excluding header): {total}") + print(f"Written rows: {len(rows_out)}") if problems: - print("\nSelf-check found column count issues on these line numbers in the OUTPUT (1-based):") + print("\n[WARNING] Column count issues detected in OUTPUT:") for line_no, got, exp in problems: print(f" line {line_no}: columns={got}, expected={exp}") else: