Update web/core/utils.py

This commit is contained in:
Joshua Laymon 2025-08-22 13:57:58 +00:00
parent cc6461ced2
commit 17c9a89848

View File

@ -336,158 +336,181 @@ def _to_int_or_none(v: Any) -> Optional[int]:
return None return None
def import_csv_bytes(data: bytes, dry_run: bool = True) -> Dict[str, Any]: import csv
""" import io
Robust CSV importer for Entry. from datetime import datetime
from typing import Optional, List, Dict, Any
- Accepts your human-readable header (Subject, Illustration, ...) from django.db import transaction
and/or direct model field names.
- Normalizes odd headers like r."Talk Title". from .models import Entry
- Handles BOM & dialect sniffing.
- Returns a report dict: {ok, created, updated, skipped, errors, preview, total_rows, header} # Canonical header order expected from the CSV (and shown in the UI)
EXPECTED_HEADERS = [
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
def _clean_header_cell(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
# Handle odd prefixes like r:"Talk Title"
low = s.lower()
if low.startswith("r:") or low.startswith("r="):
s = s[2:].lstrip()
# Strip wrapping quotes
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
s = s[1:-1]
return s.strip()
def _parse_int(x: str) -> Optional[int]:
x = (x or "").strip()
if not x:
return None
try:
return int(x)
except Exception:
return None
def _parse_date(x: str):
""" """
report: Dict[str, Any] = { Returns a date object or None.
"ok": False, Tries several common formats, then ISO.
"""
x = (x or "").strip()
if not x:
return None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"):
try:
return datetime.strptime(x, fmt).date()
except Exception:
pass
try:
return datetime.fromisoformat(x).date()
except Exception:
return None
def import_csv_bytes(content: bytes, dry_run: bool = True, batch_size: int = 1000) -> Dict[str, Any]:
"""
Parse the uploaded CSV (bytes), optionally write to DB.
Returns a report dict the templates expect:
{
"total": <int>,
"created": <int>,
"updated": 0,
"skipped": <int>,
"errors": [ ... ],
"preview": [ [cell,...], ... up to 10 rows ],
"columns": EXPECTED_HEADERS,
}
Notes:
- This implementation always CREATES new rows (no dedupe).
If you want upserts later, we can key on entry_code or (talk_number, entry_code).
"""
report = {
"total": 0,
"created": 0, "created": 0,
"updated": 0, "updated": 0,
"skipped": 0, "skipped": 0,
"errors": [], # list[str] "errors": [],
"preview": [], # first ~10 rows that would be imported "preview": [],
"total_rows": 0, "columns": EXPECTED_HEADERS[:],
"header": [],
} }
# --- decode safely (remove BOM, keep unknowns) --- # Decode once (BOM-safe), sniff dialect, fall back to excel
text = data.decode("utf-8-sig", errors="replace") text = content.decode("utf-8-sig", errors="replace")
# --- sniff dialect; fall back to excel ---
try: try:
sample = "\n".join(text.splitlines()[:10]) first_line = text.splitlines()[0] if text else ""
dialect = csv.Sniffer().sniff(sample) if sample.strip() else csv.excel dialect = csv.Sniffer().sniff(first_line) if first_line else csv.excel
except Exception: except Exception:
dialect = csv.excel dialect = csv.excel
rdr = csv.reader(io.StringIO(text), dialect) rows = list(csv.reader(io.StringIO(text), dialect))
if not rows:
return report # empty file
try: # Header handling (tolerant)
raw_header = next(rdr, []) first = rows[0]
except Exception as e: norm_first = [_clean_header_cell(c).lower() for c in first]
report["errors"].append(f"Failed reading header: {e}") expected_norm = [h.lower() for h in EXPECTED_HEADERS]
return report header_ok = (norm_first == expected_norm)
# Clean & map header if header_ok:
cleaned = [_clean_header_token(h) for h in raw_header] data_rows = rows[1:]
mapped: List[str] = [] else:
unknowns: List[str] = [] # If first row isn't a match but the column count matches, treat it as data
for token in cleaned: if len(first) == len(EXPECTED_HEADERS):
target = ACCEPTABLE_HEADERS.get(token) data_rows = rows # treat all rows as data; we'll use EXPECTED order
if target:
mapped.append(target)
else: else:
unknowns.append(token or "(empty)") # Try common alternate delimiters to recover
for delim in (";", "\t"):
# If header doesn't match expected width but row count does, assume *no* header; rows2 = list(csv.reader(io.StringIO(text), delimiter=delim))
# inject expected header so downstream works. if rows2 and len(rows2[0]) == len(EXPECTED_HEADERS):
has_header = True rows = rows2
if unknowns: first = rows[0]
# Heuristic: if the number of columns equals EXPECTED_HEADERS and *none* norm_first = [_clean_header_cell(c).lower() for c in first]
# of the cleaned tokens map, it's probably a data row (no header) header_ok = (norm_first == expected_norm)
matches = sum(1 for t in cleaned if t in ACCEPTABLE_HEADERS) data_rows = rows[1:] if header_ok else rows
if matches == 0 and len(cleaned) == len(EXPECTED_HEADERS): break
# inject expected header and re-run
has_header = False
mapped = [HEADER_MAP[h] for h in EXPECTED_HEADERS]
# rebuild a reader with the expected header injected
sio = io.StringIO(text)
rdr_tmp = csv.reader(sio, dialect)
rows = list(rdr_tmp)
rows.insert(0, EXPECTED_HEADERS) # inject pretty header for report
rdr = iter(rows) # consume from this list iterator
next(rdr, None) # skip our injected header
else:
# keep going but warn in the report
report["errors"].append(
"Some header columns were not recognized: "
+ ", ".join(unknowns)
+ " (continuing with best-effort mapping)"
)
report["header"] = mapped
# Read rows
rows = list(rdr)
report["total_rows"] = len(rows)
# Build row dicts
def row_to_obj(row_idx: int, row: List[str]) -> Tuple[Optional[Entry], Optional[Dict[str, Any]], Optional[str]]:
"""
Returns (entry_instance_or_None, values_dict_or_None, error_message_or_None)
but does not save to DB.
"""
if len(row) < len(mapped):
return None, None, f"Row {row_idx}: expected {len(mapped)} columns, found {len(row)}."
values: Dict[str, Any] = {}
for i, field in enumerate(mapped):
raw_val = row[i] if i < len(row) else ""
# Coerce types for specific fields
if field in ("date_added", "date_edited"):
values[field] = _parse_date(raw_val)
elif field == "talk_number":
values[field] = _to_int_or_none(raw_val)
else: else:
values[field] = (raw_val or "").strip() # Could not reconcile columns
report["errors"].append(
f"Column mismatch: saw {len(first)} but expected {len(EXPECTED_HEADERS)}."
)
return report
# Create (unsaved) Entry instance for preview/validation # Normalize rows length (pad/trim) and build preview (first 10)
e = Entry(**{k: v for k, v in values.items() if v not in (None, "")}) normalized_rows: List[List[str]] = []
return e, values, None for r in data_rows:
if not r or all((c or "").strip() == "" for c in r):
continue
if len(r) < len(EXPECTED_HEADERS):
r = r + [""] * (len(EXPECTED_HEADERS) - len(r))
elif len(r) > len(EXPECTED_HEADERS):
r = r[:len(EXPECTED_HEADERS)]
normalized_rows.append(r)
# Preview first few report["total"] = len(normalized_rows)
for i, row in enumerate(rows[:10], start=1): report["preview"] = normalized_rows[:10] # show first 10 rows exactly as seen
e, values, err = row_to_obj(i, row) if dry_run or report["total"] == 0:
report["preview"].append({ return report # preview only
"row": i,
"values": values if values else {},
"error": err,
})
if dry_run: # Create entries in batches (transactional)
# Dry run: dont write, just validate basic structure to_create: List[Entry] = []
bad = [p for p in report["preview"] if p["error"]] for r in normalized_rows:
if bad: try:
report["errors"].extend(p["error"] for p in bad if p["error"]) obj = Entry(
report["ok"] = len(report["errors"]) == 0 subject=(r[0] or "").strip(),
return report illustration=(r[1] or "").strip(),
application=(r[2] or "").strip(),
scripture_raw=(r[3] or "").strip(),
source=(r[4] or "").strip(),
talk_title=(r[5] or "").strip(),
talk_number=_parse_int(r[6]),
entry_code=(r[7] or "").strip(),
date_added=_parse_date(r[8]),
date_edited=_parse_date(r[9]),
)
to_create.append(obj)
except Exception as e:
report["skipped"] += 1
report["errors"].append(f"Row skipped due to error: {e}")
# Real import (create new rows). if len(to_create) >= batch_size:
# If you want update/merge behavior, add a key strategy here. with transaction.atomic():
created = 0 Entry.objects.bulk_create(to_create, batch_size=batch_size)
updated = 0 report["created"] += len(to_create)
skipped = 0 to_create.clear()
errors: List[str] = []
with transaction.atomic(): if to_create:
for idx, row in enumerate(rows, start=1): with transaction.atomic():
e, values, err = row_to_obj(idx, row) Entry.objects.bulk_create(to_create, batch_size=batch_size)
if err: report["created"] += len(to_create)
errors.append(err) to_create.clear()
skipped += 1
continue
try:
# Simple create-only behavior:
Entry.objects.create(**values)
created += 1
except Exception as ex:
errors.append(f"Row {idx}: failed to save ({ex})")
skipped += 1
report.update({
"ok": len(errors) == 0,
"created": created,
"updated": updated,
"skipped": skipped,
"errors": errors,
})
return report return report
# small context manager used above # small context manager used above