136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Content-preserving CSV sanitizer for 'illustrations_seed.csv' -> 'illustrations_clean.csv'.
|
|
|
|
What it does (and ONLY this by default):
|
|
- Parses your CSV safely.
|
|
- Writes a new CSV where EVERY FIELD is QUOTED and any inner " become "".
|
|
- Keeps the exact text of every field (no trimming, no subject/scripture tweaking, no punctuation edits).
|
|
- Keeps the exact column order expected by your importer.
|
|
|
|
Optional flag --normalize-dates will convert Date and Date Edited to ISO YYYY-MM-DD.
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import os
|
|
from datetime import datetime
|
|
|
|
EXPECTED_HEADER = [
|
|
"Subject",
|
|
"Illustration",
|
|
"Application",
|
|
"Scripture",
|
|
"Source",
|
|
"Talk Title",
|
|
"Talk Number",
|
|
"Code",
|
|
"Date",
|
|
"Date Edited",
|
|
]
|
|
|
|
def parse_date_flex(v: str) -> str:
|
|
"""Very permissive date parser; returns ISO or original string if parsing fails."""
|
|
v = (v or "").strip()
|
|
if not v:
|
|
return ""
|
|
fmts = ["%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"]
|
|
for fmt in fmts:
|
|
try:
|
|
return datetime.strptime(v, fmt).strftime("%Y-%m-%d")
|
|
except Exception:
|
|
pass
|
|
# last-ditch: split by / or -
|
|
try:
|
|
parts = [p.strip() for p in v.replace("-", "/").split("/")]
|
|
if len(parts) == 3:
|
|
m, d, y = parts
|
|
y = int(y)
|
|
m = int(m)
|
|
d = int(d)
|
|
return datetime(y if y > 99 else (2000 + y if y <= 69 else 1900 + y), m, d).strftime("%Y-%m-%d")
|
|
except Exception:
|
|
pass
|
|
return v # preserve original if we can't confidently parse
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv")
|
|
ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv")
|
|
ap.add_argument("--normalize-dates", action="store_true",
|
|
help="If set, convert Date and Date Edited to ISO YYYY-MM-DD. Otherwise leave as-is.")
|
|
args = ap.parse_args()
|
|
|
|
if not os.path.exists(args.in_path):
|
|
raise SystemExit(f"Input file not found: {args.in_path}")
|
|
|
|
# Read using DictReader; accept whatever header is present but verify shape.
|
|
with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f:
|
|
reader = csv.DictReader(
|
|
f,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
doublequote=True,
|
|
escapechar="\\",
|
|
strict=False,
|
|
)
|
|
input_header = reader.fieldnames or []
|
|
missing = [h for h in EXPECTED_HEADER if h not in input_header]
|
|
if missing:
|
|
print(f"[WARN] Input CSV missing columns: {missing}")
|
|
print(f" Found columns: {input_header}")
|
|
|
|
rows_out = []
|
|
total = 0
|
|
for row in reader:
|
|
total += 1
|
|
# Build output row strictly in EXPECTED_HEADER order, preserving raw strings.
|
|
out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER}
|
|
|
|
# Optional date normalization (ONLY dates; no commas involved)
|
|
if args.normalize_dates:
|
|
for dh in ("Date", "Date Edited"):
|
|
out[dh] = parse_date_flex(out.get(dh, ""))
|
|
|
|
rows_out.append(out)
|
|
|
|
# Write with QUOTE_ALL so commas/newlines/quotes never break columns.
|
|
with open(args.out_path, "w", encoding="utf-8", newline="") as f:
|
|
w = csv.DictWriter(
|
|
f,
|
|
fieldnames=EXPECTED_HEADER,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
quoting=csv.QUOTE_ALL,
|
|
doublequote=True,
|
|
escapechar="\\",
|
|
lineterminator="\n",
|
|
)
|
|
w.writeheader()
|
|
for r in rows_out:
|
|
w.writerow(r)
|
|
|
|
# Quick self-check: re-read output and ensure fixed column count
|
|
problems = []
|
|
with open(args.out_path, "r", encoding="utf-8", newline="") as f:
|
|
rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False)
|
|
expected_cols = None
|
|
for i, row in enumerate(rdr, start=1):
|
|
if i == 1:
|
|
expected_cols = len(row)
|
|
continue
|
|
if len(row) != expected_cols:
|
|
problems.append((i, len(row), expected_cols))
|
|
|
|
print("=== Clean Summary ===")
|
|
print(f"Input rows (excluding header): {total}")
|
|
print(f"Written rows: {len(rows_out)}")
|
|
if problems:
|
|
print("\n[WARNING] Column count issues detected in OUTPUT:")
|
|
for line_no, got, exp in problems:
|
|
print(f" line {line_no}: columns={got}, expected={exp}")
|
|
else:
|
|
print("\nSelf-check: all rows have the expected column count.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |