#!/usr/bin/env python3 """ Clean and normalize `illustrations_seed.csv` -> `illustrations_clean.csv`. Key behaviors: - Robustly parse CSV with many commas and embedded quotes/newlines. - Prepass to normalize smart quotes/non-breaking spaces before parsing. - Quote ALL fields on output to guarantee importer-friendly CSV. - Subject: trim parts, drop empties, remove trailing comma, rejoin with ", ". - Scripture: remove trailing semicolons, normalize common book abbreviations. - Dates: accept flexible M/D/YY, M/D/YYYY, etc.; output ISO YYYY-MM-DD. - Talk Number / Code: numeric if possible; blank if invalid. - Write rejects to `illustrations_rejects.csv` with a reason, if any. - Post-write self-check: verifies column count of every row. Usage: python3 clean_illustrations_csv.py \ --in illustrations_seed.csv \ --out illustrations_clean.csv \ --rejects illustrations_rejects.csv """ import argparse import csv import io import os import re from datetime import datetime HEADER = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] # Common scripture book abbreviation normalization. BOOK_MAP = { # New Testament (common abbreviations) "rom": "Romans", "romans": "Romans", "eph": "Ephesians", "ephesians": "Ephesians", "col": "Colossians", "colossians": "Colossians", "1 cor": "1 Corinthians", "2 cor": "2 Corinthians", "1 thess": "1 Thessalonians", "2 thess": "2 Thessalonians", "1 tim": "1 Timothy", "2 tim": "2 Timothy", "1 pet": "1 Peter", "2 pet": "2 Peter", "1 john": "1 John", "2 john": "2 John", "3 john": "3 John", "heb": "Hebrews", "rev": "Revelation", "revelation": "Revelation", "acts": "Acts", "matt": "Matthew", "mt": "Matthew", "mark": "Mark", "mk": "Mark", "luke": "Luke", "lk": "Luke", "john": "John", "jn": "John", "jude": "Jude", "phil": "Philippians", "php": "Philippians", "philem": "Philemon", "titus": "Titus", "gal": "Galatians", "galatians": "Galatians", "james": "James", "jas": "James", # Old Testament (examples + the ones in your sample) "eccl": "Ecclesiastes", "eccles": "Ecclesiastes", "ecclesiastes": "Ecclesiastes", "ps": "Psalms", "psalm": "Psalms", "psalms": "Psalms", "prov": "Proverbs", "proverbs": "Proverbs", "gen": "Genesis", "genesis": "Genesis", "ex": "Exodus", "exod": "Exodus", "exodus": "Exodus", "isa": "Isaiah", "isaiah": "Isaiah", "jer": "Jeremiah", "jeremiah": "Jeremiah", "dan": "Daniel", "daniel": "Daniel", } DATE_FORMATS = [ "%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", # on Linux/mac "%-m/%-d/%Y", "%m/%-d/%y", "%-m/%d/%y", "%m/%-d/%Y", "%-m/%d/%Y", ] NBSP = "\u00A0" SMARTS = { "\u201C": '"', # left double "\u201D": '"', # right double "\u2018": "'", # left single "\u2019": "'", # right single / apostrophe "\u00AB": '"', # « "\u00BB": '"', # » } def pre_normalize_text(raw: str) -> str: """Prepass to remove non-breaking spaces and normalize smart quotes.""" if raw is None: return "" s = str(raw).replace(NBSP, " ") for k, v in SMARTS.items(): s = s.replace(k, v) # Normalize any stray CR-only line endings s = s.replace("\r\n", "\n").replace("\r", "\n") return s def parse_date(value: str) -> str: """Parse flexible US-style dates; return ISO YYYY-MM-DD or '' if empty/invalid.""" if not value: return "" v = value.strip() if not v: return "" # Common separators already handled; try multiple formats. for fmt in DATE_FORMATS: try: dt = datetime.strptime(v, fmt) # Heuristic for 2-digit years if needed (datetime handles 1900s/2000s defaults) # We trust strptime here; user examples are 2000s. return dt.strftime("%Y-%m-%d") except Exception: pass # Try to interpret like M/D/YY or M/D/YYYY with flexible spacing m = re.match(r"^\s*(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})\s*$", v) if m: mth, day, yr = int(m.group(1)), int(m.group(2)), m.group(3) if len(yr) == 2: # Assume 20xx for 00-69, 19xx for 70-99 (datetime default logic) year = 2000 + int(yr) if int(yr) <= 69 else 1900 + int(yr) else: year = int(yr) try: return datetime(year, mth, day).strftime("%Y-%m-%d") except Exception: return "" return "" BOOK_RE = re.compile(r"^\s*([1-3]?\s*[A-Za-z\.]+)\s*(.*)$") def normalize_scripture(value: str) -> str: """Normalize scripture strings: strip trailing semicolons/spaces, normalize book name if easily detectable.""" if not value: return "" s = value.strip() # Remove trailing semicolons and excess punctuation/spaces. s = re.sub(r"[;,\s]+$", "", s) # Try to normalize the *first* book token if identifiable. m = BOOK_RE.match(s) if not m: return s book_raw, rest = m.group(1), m.group(2) # normalize book key key = book_raw.lower().replace(".", "") key = re.sub(r"\s+", " ", key).strip() # normalize ordinal spacing: "1cor" -> "1 cor" key = re.sub(r"^([1-3])([a-z])", r"\1 \2", key) book = BOOK_MAP.get(key, None) if not book: # Title-case fallback (basic) book = " ".join(w.capitalize() for w in key.split()) rest = rest.strip() # Normalize spacing in the chapter/verse segment (e.g., "14:13, 19") rest = re.sub(r"\s*,\s*", ", ", rest) rest = re.sub(r"\s*;\s*", "; ", rest) rest = re.sub(r"\s+", " ", rest) out = (book + (" " + rest if rest else "")).strip() return out def clean_subject(value: str) -> str: """Trim parts, drop empty entries, remove trailing commas/spaces, re-join with ', '.""" if not value: return "" # Strip external quotes handled by csv; here we just process content s = value.strip() # Split by comma, trim each token, drop empties parts = [p.strip() for p in s.split(",")] parts = [p for p in parts if p] # drop empty tokens # Re-join return ", ".join(parts) def to_int_or_blank(value: str) -> str: if value is None: return "" v = str(value).strip() if v == "": return "" # Strip non-digit chars (but keep minus? not needed here) v2 = re.sub(r"[^0-9-]+", "", v) if v2 in ("", "-", "--"): return "" try: int(v2) return v2 except Exception: return "" def normalize_row(row_dict, stats, rownum): """Return (cleaned_row_dict, reject_reason_or_None).""" clean = {} # Subject subject = row_dict.get("Subject", "") subject = clean_subject(subject) clean["Subject"] = subject # Illustration ill = pre_normalize_text(row_dict.get("Illustration", "")).strip() clean["Illustration"] = ill # Application app = pre_normalize_text(row_dict.get("Application", "")).strip() clean["Application"] = app # Scripture scr = pre_normalize_text(row_dict.get("Scripture", "")).strip() scr_norm = normalize_scripture(scr) if scr and scr != scr_norm: stats["scripture_changed"] += 1 clean["Scripture"] = scr_norm # Source src = pre_normalize_text(row_dict.get("Source", "")).strip() clean["Source"] = src # Talk Title ttitle = pre_normalize_text(row_dict.get("Talk Title", "")).strip() clean["Talk Title"] = ttitle # Talk Number tnum = to_int_or_blank(row_dict.get("Talk Number", "")) if tnum == "" and str(row_dict.get("Talk Number", "")).strip() not in ("",): stats["invalid_ints"] += 1 clean["Talk Number"] = tnum # Code code = to_int_or_blank(row_dict.get("Code", "")) if code == "" and str(row_dict.get("Code", "")).strip() not in ("",): stats["invalid_ints"] += 1 clean["Code"] = code # Date date_raw = pre_normalize_text(row_dict.get("Date", "")).strip() date_norm = parse_date(date_raw) if date_raw and not date_norm: stats["invalid_dates"] += 1 elif date_norm: stats["dates_normalized"] += 1 clean["Date"] = date_norm # Date Edited datee_raw = pre_normalize_text(row_dict.get("Date Edited", "")).strip() datee_norm = parse_date(datee_raw) if datee_raw and not datee_norm: stats["invalid_dates"] += 1 elif datee_norm: stats["dates_normalized"] += 1 clean["Date Edited"] = datee_norm # Reject logic: If the row is completely empty across all known fields, skip. if not any(clean.get(h, "").strip() for h in HEADER): return clean, "empty_row" return clean, None def read_with_prepass(path): """Read entire file, pre-normalize text, then parse CSV via csv.DictReader.""" with open(path, "r", encoding="utf-8-sig", newline="") as f: raw = f.read() normalized = pre_normalize_text(raw) buf = io.StringIO(normalized) reader = csv.DictReader(buf, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) return reader def write_csv(rows, out_path): """Write rows (list of dicts) with QUOTE_ALL to ensure commas/newlines are safe.""" with open(out_path, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter( f, fieldnames=HEADER, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, doublequote=True, escapechar="\\", lineterminator="\n", ) writer.writeheader() for r in rows: writer.writerow({k: r.get(k, "") for k in HEADER}) def self_check_csv(path): """Verify column count on every row equals header length.""" problems = [] with open(path, "r", encoding="utf-8", newline="") as f: reader = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) expected = None rownum = 0 for row in reader: rownum += 1 if rownum == 1: expected = len(row) continue if len(row) != expected: problems.append((rownum, len(row), expected)) return problems def main(): ap = argparse.ArgumentParser() ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv") ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv") ap.add_argument("--rejects", dest="rejects_path", default="illustrations_rejects.csv") args = ap.parse_args() in_path = args.in_path out_path = args.out_path rejects_path = args.rejects_path if not os.path.exists(in_path): raise SystemExit(f"Input file not found: {in_path}") stats = { "total_rows": 0, "written_rows": 0, "reject_rows": 0, "scripture_changed": 0, "invalid_ints": 0, "invalid_dates": 0, "dates_normalized": 0, } rejects = [] cleaned_rows = [] reader = read_with_prepass(in_path) # Validate header presence/shape missing = [h for h in HEADER if h not in reader.fieldnames] if missing: raise SystemExit(f"Input CSV missing expected headers: {missing}\nFound headers: {reader.fieldnames}") for idx, row in enumerate(reader, start=2): # start=2 because header is line 1 stats["total_rows"] += 1 try: cleaned, reason = normalize_row(row, stats, idx) if reason: stats["reject_rows"] += 1 r = {k: row.get(k, "") for k in HEADER} r["reason"] = reason rejects.append(r) continue cleaned_rows.append(cleaned) except Exception as e: stats["reject_rows"] += 1 r = {k: row.get(k, "") for k in HEADER} r["reason"] = f"exception@row {idx}: {e}" rejects.append(r) # Write outputs write_csv(cleaned_rows, out_path) # Write rejects if any if rejects: # Ensure 'reason' is the last column for readability rej_header = HEADER + ["reason"] with open(rejects_path, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter( f, fieldnames=rej_header, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, doublequote=True, escapechar="\\", lineterminator="\n", ) writer.writeheader() for r in rejects: writer.writerow({k: r.get(k, "") for k in rej_header}) # Self check the written CSV problems = self_check_csv(out_path) # Summary print("=== Clean Summary ===") print(f"Input rows (excluding header): {stats['total_rows']}") print(f"Written rows: {stats['written_rows'] + len(cleaned_rows)}") print(f"Reject rows: {stats['reject_rows']}") print(f"Scripture normalized: {stats['scripture_changed']}") print(f"Dates normalized: {stats['dates_normalized']}") print(f"Invalid ints blanked: {stats['invalid_ints']}") print(f"Invalid dates (left blank): {stats['invalid_dates']}") if problems: print("\nSelf-check found column count issues on these line numbers in the OUTPUT (1-based):") for line_no, got, exp in problems: print(f" line {line_no}: columns={got}, expected={exp}") else: print("\nSelf-check: all rows have the expected column count.") if __name__ == "__main__": main()