#!/usr/bin/env python3 """ Cleaner for `illustrations_seed.csv` -> `illustrations_clean.csv` What it does: - Reads your CSV safely (commas/newlines/quotes inside fields supported). - For every field value, replaces inner *double quotes* with a single quote: " “ ” « » „ --> ' (Existing single quotes like Jehovah's are preserved.) - Writes a new CSV with QUOTE_ALL so commas/newlines never break columns. - Optional: --normalize-dates converts Date / Date Edited to ISO YYYY-MM-DD. - Prints a short summary and verifies the output column count. Usage: python3 cleaner.py python3 cleaner.py --in my_in.csv --out my_out.csv --normalize-dates """ import argparse import csv import os from datetime import datetime EXPECTED_HEADER = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] # Map various double-quote characters to a single straight apostrophe DOUBLE_QUOTES_TO_SINGLE = { '"': "'", # U+0022 '“': "'", # U+201C '”': "'", # U+201D '„': "'", # U+201E '«': "'", # U+00AB '»': "'", # U+00BB } def to_single_quotes(s: str) -> str: """Replace inner double-quote characters with a single quote, leave existing single quotes as-is.""" if not isinstance(s, str): return s out = s for dq, sq in DOUBLE_QUOTES_TO_SINGLE.items(): out = out.replace(dq, sq) return out def parse_date_flex(v: str) -> str: """Parse common US formats and return ISO YYYY-MM-DD. If unclear, leave original.""" v = (v or "").strip() if not v: return "" fmts = ["%m/%d/%y", "%m/%d/%Y"] # Some systems support %-m; try them but ignore if unsupported fmts_platform = fmts + ["%-m/%-d/%y", "%-m/%-d/%Y"] for fmt in fmts_platform: try: return datetime.strptime(v, fmt).strftime("%Y-%m-%d") except Exception: continue # last-ditch tolerant parse (handles 1/2/25, 01-02-2025, etc.) try: m, d, y = [p.strip() for p in v.replace("-", "/").split("/")] m, d, y = int(m), int(d), int(y) if y < 100: y = 2000 + y if y <= 69 else 1900 + y return datetime(y, m, d).strftime("%Y-%m-%d") except Exception: return v # preserve original if not confidently parsed def main(): ap = argparse.ArgumentParser() ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv", help="Input CSV path") ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv", help="Output CSV path") ap.add_argument("--normalize-dates", action="store_true", help="Convert Date and Date Edited to ISO YYYY-MM-DD.") args = ap.parse_args() if not os.path.exists(args.in_path): raise SystemExit(f"Input file not found: {args.in_path}") # Read input with a tolerant DictReader with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader( f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False, ) input_header = reader.fieldnames or [] missing = [h for h in EXPECTED_HEADER if h not in input_header] if missing: print(f"[WARN] Missing expected columns: {missing}") print(f" Found columns: {input_header}") rows_out = [] total = 0 for row in reader: total += 1 # Build row strictly in EXPECTED_HEADER order; default to "" out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER} # Replace inner double quotes with single quotes for every field for k, v in out.items(): out[k] = to_single_quotes(v) # Optional: normalize date fields (safe; doesn't affect commas/quotes) if args.normalize_dates: out["Date"] = parse_date_flex(out.get("Date", "")) out["Date Edited"] = parse_date_flex(out.get("Date Edited", "")) rows_out.append(out) # Write output with QUOTE_ALL so commas/newlines remain safe with open(args.out_path, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter( f, fieldnames=EXPECTED_HEADER, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, doublequote=True, escapechar="\\", lineterminator="\n", ) writer.writeheader() for r in rows_out: writer.writerow(r) # Self-check the written CSV for a stable column count problems = [] with open(args.out_path, "r", encoding="utf-8", newline="") as f: rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) expected_cols = None for i, row in enumerate(rdr, start=1): if i == 1: expected_cols = len(row) continue if len(row) != expected_cols: problems.append((i, len(row), expected_cols)) print("=== Clean Summary ===") print(f"Input rows (excluding header): {total}") print(f"Written rows: {len(rows_out)}") if problems: print("\n[WARNING] Column count issues detected in OUTPUT:") for line_no, got, exp in problems: print(f" line {line_no}: columns={got}, expected={exp}") else: print("\nSelf-check: all rows have the expected column count.") if __name__ == "__main__": main()