Update web/core/utils.py

This commit is contained in:
Joshua Laymon 2025-08-22 00:13:31 +00:00
parent e304f29958
commit 0c63721ebb

View File

@ -5,7 +5,7 @@ import csv
import io
import re
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any, Tuple, Iterable
from django.db import transaction, IntegrityError, DataError, DatabaseError
@ -352,3 +352,124 @@ def import_csv_bytes(b: bytes, dry_run: bool = False, commit_every: int = 500) -
"used_headerless_mode": False,
"seen_headers": headers,
}
EXPECTED_HEADERS = [
"Subject","Illustration","Application","Scripture","Source",
"Talk Title","Talk Number","Code","Date","Date Edited"
]
def _to_int_or_none(s: str) -> Optional[int]:
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except Exception:
return None
def _to_date_or_none(s: str) -> Optional[datetime.date]:
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except Exception:
pass
return None # let caller decide if this is acceptable
def import_csv_bytes(data: bytes, dry_run: bool = True, batch_size: int = 1000) -> Dict[str, Any]:
"""
Robust CSV importer for Entries.
- data: raw bytes of the uploaded file
- dry_run: when True, do not write to DB; return preview + errors
- batch_size: bulk_create chunk size
Returns: dict(report=..., rows=preview_rows, errors=[...])
"""
text = io.TextIOWrapper(io.BytesIO(data), encoding="utf-8-sig", newline="")
reader = csv.reader(text)
# Read header row
try:
header = next(reader)
except StopIteration:
return {"report": "Empty file.", "rows": [], "errors": ["File is empty."]}
# Loose header check: either exact match, or map by index if close
header_norm = [h.strip() for h in header]
if header_norm != EXPECTED_HEADERS:
return {
"report": "Header mismatch.",
"rows": [],
"errors": [
"Expected header: " + ", ".join(EXPECTED_HEADERS),
"Found header: " + ", ".join(header_norm),
],
}
to_create: List[Entry] = []
errors: List[str] = []
preview: List[Tuple[int, Dict[str, Any]]] = [] # first 100 rows for the UI
rownum = 1
def make_entry(row: List[str]) -> Optional[Entry]:
# force length to 10, padding if needed
padded = row + [""] * (10 - len(row))
subj, ill, app, scr, src, talk_title, talk_num, code, d_added, d_edited = padded[:10]
e = Entry(
subject=(subj or "").strip(),
illustration=(ill or "").strip(),
application=(app or "").strip(),
scripture_raw=(scr or "").strip(),
source=(src or "").strip(),
talk_title=(talk_title or "").strip(),
talk_number=_to_int_or_none(talk_num),
entry_code=(code or "").strip(),
date_added=_to_date_or_none(d_added),
date_edited=_to_date_or_none(d_edited),
)
return e
created_total = 0
with (transaction.atomic() if not dry_run else _noop_context()):
for row in reader:
rownum += 1
try:
e = make_entry(row)
# (optional) add required-field checks; e.g., at least one of illustration/application
if not ((e.illustration and e.illustration.strip()) or (e.application and e.application.strip())):
errors.append(f"Row {rownum}: missing Illustration and Application")
continue
to_create.append(e)
if len(preview) < 100:
preview.append((rownum, {
"Subject": e.subject, "Illustration": e.illustration[:120],
"Application": e.application[:120], "Scripture": e.scripture_raw,
"Source": e.source, "Talk Title": e.talk_title,
"Talk Number": e.talk_number, "Code": e.entry_code,
"Date": e.date_added, "Date Edited": e.date_edited,
}))
if not dry_run and len(to_create) >= batch_size:
Entry.objects.bulk_create(to_create, batch_size=batch_size)
created_total += len(to_create)
to_create.clear()
except Exception as ex:
errors.append(f"Row {rownum}: {ex}")
if not dry_run and to_create:
Entry.objects.bulk_create(to_create, batch_size=batch_size)
created_total += len(to_create)
to_create.clear()
report = f"{'Would import' if dry_run else 'Imported'} {created_total if not dry_run else len(preview)}+ rows."
return {"report": report, "rows": preview, "errors": errors}
# small context manager used above
class _noop_context:
def __enter__(self): return self
def __exit__(self, exc_type, exc, tb): return False