Illustrations/imports/cleaner.py

163 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
Cleaner for `illustrations_seed.csv` -> `illustrations_clean.csv`
What it does:
- Reads your CSV safely (commas/newlines/quotes inside fields supported).
- For every field value, replaces inner *double quotes* with a single quote:
" “ ” « » „ --> '
(Existing single quotes like Jehovah's are preserved.)
- Writes a new CSV with QUOTE_ALL so commas/newlines never break columns.
- Optional: --normalize-dates converts Date / Date Edited to ISO YYYY-MM-DD.
- Prints a short summary and verifies the output column count.
Usage:
python3 cleaner.py
python3 cleaner.py --in my_in.csv --out my_out.csv --normalize-dates
"""
import argparse
import csv
import os
from datetime import datetime
EXPECTED_HEADER = [
"Subject",
"Illustration",
"Application",
"Scripture",
"Source",
"Talk Title",
"Talk Number",
"Code",
"Date",
"Date Edited",
]
# Map various double-quote characters to a single straight apostrophe
DOUBLE_QUOTES_TO_SINGLE = {
'"': "'", # U+0022
'': "'", # U+201C
'': "'", # U+201D
'': "'", # U+201E
'«': "'", # U+00AB
'»': "'", # U+00BB
}
def to_single_quotes(s: str) -> str:
"""Replace inner double-quote characters with a single quote, leave existing single quotes as-is."""
if not isinstance(s, str):
return s
out = s
for dq, sq in DOUBLE_QUOTES_TO_SINGLE.items():
out = out.replace(dq, sq)
return out
def parse_date_flex(v: str) -> str:
"""Parse common US formats and return ISO YYYY-MM-DD. If unclear, leave original."""
v = (v or "").strip()
if not v:
return ""
fmts = ["%m/%d/%y", "%m/%d/%Y"]
# Some systems support %-m; try them but ignore if unsupported
fmts_platform = fmts + ["%-m/%-d/%y", "%-m/%-d/%Y"]
for fmt in fmts_platform:
try:
return datetime.strptime(v, fmt).strftime("%Y-%m-%d")
except Exception:
continue
# last-ditch tolerant parse (handles 1/2/25, 01-02-2025, etc.)
try:
m, d, y = [p.strip() for p in v.replace("-", "/").split("/")]
m, d, y = int(m), int(d), int(y)
if y < 100:
y = 2000 + y if y <= 69 else 1900 + y
return datetime(y, m, d).strftime("%Y-%m-%d")
except Exception:
return v # preserve original if not confidently parsed
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv", help="Input CSV path")
ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv", help="Output CSV path")
ap.add_argument("--normalize-dates", action="store_true",
help="Convert Date and Date Edited to ISO YYYY-MM-DD.")
args = ap.parse_args()
if not os.path.exists(args.in_path):
raise SystemExit(f"Input file not found: {args.in_path}")
# Read input with a tolerant DictReader
with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(
f,
delimiter=",",
quotechar='"',
doublequote=True,
escapechar="\\",
strict=False,
)
input_header = reader.fieldnames or []
missing = [h for h in EXPECTED_HEADER if h not in input_header]
if missing:
print(f"[WARN] Missing expected columns: {missing}")
print(f" Found columns: {input_header}")
rows_out = []
total = 0
for row in reader:
total += 1
# Build row strictly in EXPECTED_HEADER order; default to ""
out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER}
# Replace inner double quotes with single quotes for every field
for k, v in out.items():
out[k] = to_single_quotes(v)
# Optional: normalize date fields (safe; doesn't affect commas/quotes)
if args.normalize_dates:
out["Date"] = parse_date_flex(out.get("Date", ""))
out["Date Edited"] = parse_date_flex(out.get("Date Edited", ""))
rows_out.append(out)
# Write output with QUOTE_ALL so commas/newlines remain safe
with open(args.out_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=EXPECTED_HEADER,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_ALL,
doublequote=True,
escapechar="\\",
lineterminator="\n",
)
writer.writeheader()
for r in rows_out:
writer.writerow(r)
# Self-check the written CSV for a stable column count
problems = []
with open(args.out_path, "r", encoding="utf-8", newline="") as f:
rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False)
expected_cols = None
for i, row in enumerate(rdr, start=1):
if i == 1:
expected_cols = len(row)
continue
if len(row) != expected_cols:
problems.append((i, len(row), expected_cols))
print("=== Clean Summary ===")
print(f"Input rows (excluding header): {total}")
print(f"Written rows: {len(rows_out)}")
if problems:
print("\n[WARNING] Column count issues detected in OUTPUT:")
for line_no, got, exp in problems:
print(f" line {line_no}: columns={got}, expected={exp}")
else:
print("\nSelf-check: all rows have the expected column count.")
if __name__ == "__main__":
main()