Update imports/cleaner.py

This commit is contained in:
Joshua Laymon 2025-08-13 17:05:57 +00:00
parent 54a5ecaf12
commit 65bc45ad0d

View File

@ -1,14 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Content-preserving CSV sanitizer for 'illustrations_seed.csv' -> 'illustrations_clean.csv'. Cleaner for `illustrations_seed.csv` -> `illustrations_clean.csv`
What it does (and ONLY this by default): What it does:
- Parses your CSV safely. - Reads your CSV safely (commas/newlines/quotes inside fields supported).
- Writes a new CSV where EVERY FIELD is QUOTED and any inner " become "". - For every field value, replaces inner *double quotes* with a single quote:
- Keeps the exact text of every field (no trimming, no subject/scripture tweaking, no punctuation edits). " “ ” « » „ --> '
- Keeps the exact column order expected by your importer. (Existing single quotes like Jehovah's are preserved.)
- Writes a new CSV with QUOTE_ALL so commas/newlines never break columns.
- Optional: --normalize-dates converts Date / Date Edited to ISO YYYY-MM-DD.
- Prints a short summary and verifies the output column count.
Optional flag --normalize-dates will convert Date and Date Edited to ISO YYYY-MM-DD. Usage:
python3 cleaner.py
python3 cleaner.py --in my_in.csv --out my_out.csv --normalize-dates
""" """
import argparse import argparse
@ -29,42 +34,60 @@ EXPECTED_HEADER = [
"Date Edited", "Date Edited",
] ]
# Map various double-quote characters to a single straight apostrophe
DOUBLE_QUOTES_TO_SINGLE = {
'"': "'", # U+0022
'': "'", # U+201C
'': "'", # U+201D
'': "'", # U+201E
'«': "'", # U+00AB
'»': "'", # U+00BB
}
def to_single_quotes(s: str) -> str:
"""Replace inner double-quote characters with a single quote, leave existing single quotes as-is."""
if not isinstance(s, str):
return s
out = s
for dq, sq in DOUBLE_QUOTES_TO_SINGLE.items():
out = out.replace(dq, sq)
return out
def parse_date_flex(v: str) -> str: def parse_date_flex(v: str) -> str:
"""Very permissive date parser; returns ISO or original string if parsing fails.""" """Parse common US formats and return ISO YYYY-MM-DD. If unclear, leave original."""
v = (v or "").strip() v = (v or "").strip()
if not v: if not v:
return "" return ""
fmts = ["%m/%d/%y", "%m/%d/%Y", "%-m/%-d/%y", "%-m/%-d/%Y"] fmts = ["%m/%d/%y", "%m/%d/%Y"]
for fmt in fmts: # Some systems support %-m; try them but ignore if unsupported
fmts_platform = fmts + ["%-m/%-d/%y", "%-m/%-d/%Y"]
for fmt in fmts_platform:
try: try:
return datetime.strptime(v, fmt).strftime("%Y-%m-%d") return datetime.strptime(v, fmt).strftime("%Y-%m-%d")
except Exception: except Exception:
pass continue
# last-ditch: split by / or - # last-ditch tolerant parse (handles 1/2/25, 01-02-2025, etc.)
try: try:
parts = [p.strip() for p in v.replace("-", "/").split("/")] m, d, y = [p.strip() for p in v.replace("-", "/").split("/")]
if len(parts) == 3: m, d, y = int(m), int(d), int(y)
m, d, y = parts if y < 100:
y = int(y) y = 2000 + y if y <= 69 else 1900 + y
m = int(m) return datetime(y, m, d).strftime("%Y-%m-%d")
d = int(d)
return datetime(y if y > 99 else (2000 + y if y <= 69 else 1900 + y), m, d).strftime("%Y-%m-%d")
except Exception: except Exception:
pass return v # preserve original if not confidently parsed
return v # preserve original if we can't confidently parse
def main(): def main():
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv") ap.add_argument("--in", dest="in_path", default="illustrations_seed.csv", help="Input CSV path")
ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv") ap.add_argument("--out", dest="out_path", default="illustrations_clean.csv", help="Output CSV path")
ap.add_argument("--normalize-dates", action="store_true", ap.add_argument("--normalize-dates", action="store_true",
help="If set, convert Date and Date Edited to ISO YYYY-MM-DD. Otherwise leave as-is.") help="Convert Date and Date Edited to ISO YYYY-MM-DD.")
args = ap.parse_args() args = ap.parse_args()
if not os.path.exists(args.in_path): if not os.path.exists(args.in_path):
raise SystemExit(f"Input file not found: {args.in_path}") raise SystemExit(f"Input file not found: {args.in_path}")
# Read using DictReader; accept whatever header is present but verify shape. # Read input with a tolerant DictReader
with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f: with open(args.in_path, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader( reader = csv.DictReader(
f, f,
@ -77,26 +100,30 @@ def main():
input_header = reader.fieldnames or [] input_header = reader.fieldnames or []
missing = [h for h in EXPECTED_HEADER if h not in input_header] missing = [h for h in EXPECTED_HEADER if h not in input_header]
if missing: if missing:
print(f"[WARN] Input CSV missing columns: {missing}") print(f"[WARN] Missing expected columns: {missing}")
print(f" Found columns: {input_header}") print(f" Found columns: {input_header}")
rows_out = [] rows_out = []
total = 0 total = 0
for row in reader: for row in reader:
total += 1 total += 1
# Build output row strictly in EXPECTED_HEADER order, preserving raw strings. # Build row strictly in EXPECTED_HEADER order; default to ""
out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER} out = {h: (row.get(h, "") if row.get(h, "") is not None else "") for h in EXPECTED_HEADER}
# Optional date normalization (ONLY dates; no commas involved) # Replace inner double quotes with single quotes for every field
for k, v in out.items():
out[k] = to_single_quotes(v)
# Optional: normalize date fields (safe; doesn't affect commas/quotes)
if args.normalize_dates: if args.normalize_dates:
for dh in ("Date", "Date Edited"): out["Date"] = parse_date_flex(out.get("Date", ""))
out[dh] = parse_date_flex(out.get(dh, "")) out["Date Edited"] = parse_date_flex(out.get("Date Edited", ""))
rows_out.append(out) rows_out.append(out)
# Write with QUOTE_ALL so commas/newlines/quotes never break columns. # Write output with QUOTE_ALL so commas/newlines remain safe
with open(args.out_path, "w", encoding="utf-8", newline="") as f: with open(args.out_path, "w", encoding="utf-8", newline="") as f:
w = csv.DictWriter( writer = csv.DictWriter(
f, f,
fieldnames=EXPECTED_HEADER, fieldnames=EXPECTED_HEADER,
delimiter=",", delimiter=",",
@ -106,11 +133,11 @@ def main():
escapechar="\\", escapechar="\\",
lineterminator="\n", lineterminator="\n",
) )
w.writeheader() writer.writeheader()
for r in rows_out: for r in rows_out:
w.writerow(r) writer.writerow(r)
# Quick self-check: re-read output and ensure fixed column count # Self-check the written CSV for a stable column count
problems = [] problems = []
with open(args.out_path, "r", encoding="utf-8", newline="") as f: with open(args.out_path, "r", encoding="utf-8", newline="") as f:
rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False) rdr = csv.reader(f, delimiter=",", quotechar='"', doublequote=True, escapechar="\\", strict=False)