163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cleaner for `illustrations_seed.csv` -> `illustrations_clean.csv`
|
|
|
|
What it does:
|
|
- Reads your CSV safely (commas/newlines/quotes inside fields supported).
|
|
- For every field value, replaces inner *double quotes* with a single quote:
|
|
" “ ” « » „ --> '
|
|
(Existing single quotes like Jehovah's are preserved.)
|
|
- Writes a new CSV with QUOTE_ALL so commas/newlines never break columns.
|
|
- Optional: --normalize-dates converts Date / Date Edited to ISO YYYY-MM-DD.
|
|
- Prints a short summary and verifies the output column count.
|
|
|
|
Usage:
|
|
python3 cleaner.py
|
|
python3 cleaner.py --in my_in.csv --out my_out.csv --normalize-dates
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import os
|
|
from datetime import datetime
|
|
|
|
EXPECTED_HEADER = [
|
|
"Subject",
|
|
"Illustration",
|
|
"Application",
|
|
"Scripture",
|
|
"Source",
|
|
"Talk Title",
|
|
"Talk Number",
|
|
"Code",
|
|
"Date",
|
|
"Date Edited",
|
|
]
|
|
|
|
# Map various double-quote characters to a single straight apostrophe
|
|
DOUBLE_QUOTES_TO_SINGLE = {
|
|
'"': "'", # U+0022
|
|
'“': "'", # U+201C
|
|
'”': "'", # U+201D
|
|
'„': "'", # U+201E
|
|
'«': "'", # U+00AB
|
|
'»': "'", # U+00BB
|
|
}
|
|
|
|
def to_single_quotes(s: str) -> str:
|
|
"""Replace inner double-quote characters with a single quote, leave existing single quotes as-is."""
|
|
if not isinstance(s, str):
|
|
return s
|
|
out = s
|
|
for dq, sq in DOUBLE_QUOTES_TO_SINGLE.items():
|
|
out = out.replace(dq, sq)
|
|
return out
|
|
|
|
def parse_date_flex(v: str) -> str:
|
|
"""Parse common US formats and return ISO YYYY-MM-DD. If unclear, leave original."""
|
|
v = (v or "").strip()
|
|
if not v:
|
|
return ""
|
|
fmts = ["%m/%d/%y", "%m/%d/%Y"]
|
|
# Some systems support %-m; try them but ignore if unsupported
|
|
fmts_platform = fmts + ["%-m/%-d/%y", "%-m/%-d/%Y"]
|
|
for fmt in fmts_platform:
|
|
try:
|
|
return datetime.strptime(v, fmt).strftime("%Y-%m-%d")
|
|
except Exception:
|
|
continue
|
|
# last-ditch tolerant parse (handles 1/2/25, 01-02-2025, etc.)
|
|
try:
|
|
m, d, y = [p.strip() for p in v.replace("-", "/").split("/")]
|
|
m, d, y = int(m), int(d), int(y)
|
|
if y < 100:
|
|
y = 2000 + y if y <= 69 else 1900 + y
|
|
return datetime(y, m, d).strftime("%Y-%m-%d")
|
|
except Exception:
|
|
return v # preserve original if not confidently parsed
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv", help="Input CSV path")
|
|
ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv", help="Output CSV path")
|
|
ap.add_argument("--normalize-dates", action="store_true",
|
|
help="Convert Date and Date Edited to ISO YYYY-MM-DD.")
|
|
args = ap.parse_args()
|
|
|
|
if not os.path.exists(args.in_path):
|
|
raise SystemExit(f"Input file not found: {args.in_path}")
|
|
|
|
# Read input with a tolerant DictReader
|
|
with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f:
|
|
reader = csv.DictReader(
|
|
f,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
doublequote=True,
|
|
escapechar="\\",
|
|
strict=False,
|
|
)
|
|
input_header = reader.fieldnames or []
|
|
missing = [h for h in EXPECTED_HEADER if h not in input_header]
|
|
if missing:
|
|
print(f"[WARN] Missing expected columns: {missing}")
|
|
print(f" Found columns: {input_header}")
|
|
|
|
rows_out = []
|
|
total = 0
|
|
for row in reader:
|
|
total += 1
|
|
# Build row strictly in EXPECTED_HEADER order; default to ""
|
|
out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER}
|
|
|
|
# Replace inner double quotes with single quotes for every field
|
|
for k, v in out.items():
|
|
out[k] = to_single_quotes(v)
|
|
|
|
# Optional: normalize date fields (safe; doesn't affect commas/quotes)
|
|
if args.normalize_dates:
|
|
out["Date"] = parse_date_flex(out.get("Date", ""))
|
|
out["Date Edited"] = parse_date_flex(out.get("Date Edited", ""))
|
|
|
|
rows_out.append(out)
|
|
|
|
# Write output with QUOTE_ALL so commas/newlines remain safe
|
|
with open(args.out_path, "w", encoding="utf-8", newline="") as f:
|
|
writer = csv.DictWriter(
|
|
f,
|
|
fieldnames=EXPECTED_HEADER,
|
|
delimiter=",",
|
|
quotechar='"',
|
|
quoting=csv.QUOTE_ALL,
|
|
doublequote=True,
|
|
escapechar="\\",
|
|
lineterminator="\n",
|
|
)
|
|
writer.writeheader()
|
|
for r in rows_out:
|
|
writer.writerow(r)
|
|
|
|
# Self-check the written CSV for a stable column count
|
|
problems = []
|
|
with open(args.out_path, "r", encoding="utf-8", newline="") as f:
|
|
rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False)
|
|
expected_cols = None
|
|
for i, row in enumerate(rdr, start=1):
|
|
if i == 1:
|
|
expected_cols = len(row)
|
|
continue
|
|
if len(row) != expected_cols:
|
|
problems.append((i, len(row), expected_cols))
|
|
|
|
print("=== Clean Summary ===")
|
|
print(f"Input rows (excluding header): {total}")
|
|
print(f"Written rows: {len(rows_out)}")
|
|
if problems:
|
|
print("\n[WARNING] Column count issues detected in OUTPUT:")
|
|
for line_no, got, exp in problems:
|
|
print(f" line {line_no}: columns={got}, expected={exp}")
|
|
else:
|
|
print("\nSelf-check: all rows have the expected column count.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |