Update web/core/views.py

This commit is contained in:
Joshua Laymon 2025-08-22 11:37:03 +00:00
parent 33cfa9b79f
commit d4e912c47a

View File

@ -333,105 +333,107 @@ def import_wizard(request):
"Subject", "Illustration", "Application", "Scripture", "Source", "Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited", "Talk Title", "Talk Number", "Code", "Date", "Date Edited",
] ]
EXPECTED_NORM = [h.lower() for h in EXPECTED]
if request.method == "POST": if request.method == "POST":
form = ImportForm(request.POST, request.FILES) form = ImportForm(request.POST, request.FILES)
if form.is_valid(): if form.is_valid():
try: try:
import io, re, csv as _csv
raw = form.cleaned_data["file"].read() raw = form.cleaned_data["file"].read()
text = raw.decode("utf-8-sig", errors="replace") # BOM-safe
# Try to sniff; fall back to excel dialect # --- Decode once (BOM-safe) ---
import io, csv as _csv
text = raw.decode("utf-8-sig", errors="replace")
# --- Try to sniff a dialect; fall back to Excel CSV ---
try: try:
first_line = text.splitlines()[0] if text else "" first_line = text.splitlines()[0] if text else ""
dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel
except Exception: except Exception:
dialect = _csv.excel dialect = _csv.excel
rows = list(_csv.reader(io.StringIO(text), dialect)) rdr = _csv.reader(io.StringIO(text), dialect)
rows = list(rdr)
if not rows: if not rows:
raise ValueError("The CSV file appears to be empty.") raise ValueError("The CSV file appears to be empty.")
# --- header cleaning --- expected_norm = [h.lower() for h in EXPECTED]
# Handles: r."Talk Title", r:'Talk Title', r=Talk Title, r: Talk Title, etc.
_r_prefix = re.compile(r'^[rR]\s*[\.\:\=\-]\s*')
def clean_header_cell(s: str) -> str: def _clean_header(s):
s = "" if s is None else str(s) s = "" if s is None else str(s)
s = s.strip() s = s.strip()
# strip balanced quotes # Strip r: or r= prefixes and wrapping quotes
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"): if s.lower().startswith("r:") or s.lower().startswith("r="):
s = s[1:-1].strip() s = s[2:].lstrip()
# strip weird r.<sep> prefix if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
s = _r_prefix.sub("", s) s = s[1:-1]
# final trim + lower for comparison
return s.strip().lower() return s.strip().lower()
first = rows[0] first = rows[0]
norm_first = [clean_header_cell(c) for c in first] norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == EXPECTED_NORM) header_ok = (norm_first == expected_norm)
# If first row isnt the header but length matches, inject our clean header # If not header but column count matches, inject expected header.
if not header_ok and len(first) == len(EXPECTED): if not header_ok and len(first) == len(EXPECTED):
rows.insert(0, EXPECTED) rows.insert(0, EXPECTED)
elif not header_ok and len(first) != len(EXPECTED): elif not header_ok and len(first) != len(EXPECTED):
# Try common alternate delimiters if the column count is off # Retry with common alternate delimiters
for delim in (";", "\t"): for delim in (";", "\t"):
test = list(_csv.reader(io.StringIO(text), delimiter=delim)) rows2 = list(_csv.reader(io.StringIO(text), delimiter=delim))
if test and len(test[0]) == len(EXPECTED): if rows2 and len(rows2[0]) == len(EXPECTED):
rows = test rows = rows2
first = rows[0] first = rows[0]
norm_first = [clean_header_cell(c) for c in first] norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == EXPECTED_NORM) header_ok = (norm_first == expected_norm)
if not header_ok: if not header_ok:
rows.insert(0, EXPECTED) rows.insert(0, EXPECTED)
break break
# Reencode a sanitized CSV to feed the existing importer # Re-encode sanitized CSV for the existing importer
out = io.StringIO() out = io.StringIO()
w = _csv.writer(out) w = _csv.writer(out)
for r in rows: for r in rows:
w.writerow(r) w.writerow(r)
fixed_raw = out.getvalue().encode("utf-8") fixed_raw = out.getvalue().encode("utf-8")
# Keep utils in sync for any helpers that read EXPECTED_HEADERS # Keep utils in sync for import_csv_bytes variants
from . import utils as core_utils from . import utils as core_utils
core_utils.EXPECTED_HEADERS = EXPECTED core_utils.EXPECTED_HEADERS = EXPECTED
# Run your robust importer
report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {} report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {}
report["header_ok"] = header_ok report["header_ok"] = header_ok
# Normalize preview for the template: list of rows + columns list
report.setdefault("columns", EXPECTED)
if report.get("preview"):
if isinstance(report["preview"][0], dict):
cols = report["columns"]
report["preview"] = [
[row.get(c, "") for c in cols] for row in report["preview"]
]
if not header_ok: if not header_ok:
messages.warning( messages.warning(
request, request,
"The first row didnt match the expected header; it was cleaned/normalized automatically." "The first row didnt match the expected header; a clean header was injected automatically."
) )
return render(request, "import_result.html", return render(
{"report": report, "dry_run": form.cleaned_data["dry_run"]}) request,
"import_result.html",
{"report": report, "dry_run": form.cleaned_data["dry_run"]},
)
except Exception as e: except Exception as e:
messages.error(request, f"Import failed: {e}") messages.error(request, f"Import failed: {e}")
else:
form = ImportForm()
# make sure we have the ordered column list the preview expects # If form invalid or error: fall through to show the form again
expected = [ return render(request, "import_wizard.html", {"form": form})
"Subject","Illustration","Application","Scripture","Source",
"Talk Title","Talk Number","Code","Date","Date Edited",
]
report.setdefault("columns", expected)
# if report.preview is a list of dicts, convert to list-of-lists
if report.get("preview") and isinstance(report["preview"][0], dict):
report["preview"] = [[row.get(c, "") for c in report["columns"]]
for row in report["preview"]]
# GET
form = ImportForm()
return render(request, "import_wizard.html", {"form": form}) return render(request, "import_wizard.html", {"form": form})
@login_required @login_required
@user_passes_test(is_admin) @user_passes_test(is_admin)
def export_csv(request): def export_csv(request):