Update web/core/views.py

This commit is contained in:
Joshua Laymon 2025-08-22 03:17:59 +00:00
parent af2de1e779
commit a32d58eec2

View File

@ -329,103 +329,101 @@ def entry_delete(request, entry_id):
@login_required @login_required
@user_passes_test(is_admin) @user_passes_test(is_admin)
def import_wizard(request): def import_wizard(request):
# Safety: expected header list # Safety: expected header list (matches DB/order the importer expects)
_EXPECTED_HEADERS = [ _EXPECTED_HEADERS = [
"Subject", "Illustration", "Application", "Scripture", "Source", "Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited", "Talk Title", "Talk Number", "Code", "Date", "Date Edited",
] ]
if request.method == "POST": if request.method == "POST":
form = ImportForm(request.POST, request.FILES) form = ImportForm(request.POST, request.FILES)
if form.is_valid(): if form.is_valid():
try:
raw = form.cleaned_data["file"].read()
import io, csv as _csv
# Decode once (BOM-safe)
text = raw.decode("utf-8-sig", errors="replace")
# Try to sniff a dialect; fall back to Excel-style CSV
try: try:
first_line = text.splitlines()[0] if text else "" raw = form.cleaned_data["file"].read()
dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel
except Exception:
dialect = _csv.excel
rdr = _csv.reader(io.StringIO(text), dialect) import io
rows = list(rdr) import csv as _csv
if not rows:
raise ValueError("The CSV file appears to be empty.")
# Expected header (DB field order) # Decode once (BOMsafe)
expected = [ text = raw.decode("utf-8-sig", errors="replace")
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
expected_norm = [h.lower() for h in expected]
# Header cleaner: fixes r:"Talk Title", stray quotes, spaces, case # Try to sniff a dialect; fall back to Excel-style CSV
def _clean_header(s): try:
s = "" if s is None else str(s) first_line = text.splitlines()[0] if text else ""
s = s.strip() dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel
if s.lower().startswith("r:") or s.lower().startswith("r="): except Exception:
s = s[2:].lstrip() dialect = _csv.excel
if (len(s) >= 2) and (s[0] == s[-1]) and s[0] in ('"', "'"):
s = s[1:-1]
return s.strip().lower()
first = rows[0] rdr = _csv.reader(io.StringIO(text), dialect)
norm_first = [_clean_header(c) for c in first] rows = list(rdr)
if not rows:
raise ValueError("The CSV file appears to be empty.")
# If first row isnt our header but length matches, inject one expected = _EXPECTED_HEADERS
header_ok = (norm_first == expected_norm) expected_norm = [h.lower() for h in expected]
if not header_ok and len(first) == len(expected):
rows.insert(0, expected)
elif not header_ok and len(first) != len(expected):
# Try common alternate delimiters if column count is off
for delim in (";", "\t"):
rdr2 = _csv.reader(io.StringIO(text), delimiter=delim)
test_rows = list(rdr2)
if test_rows and len(test_rows[0]) == len(expected):
rows = test_rows
first = rows[0]
norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == expected_norm)
if not header_ok:
rows.insert(0, expected)
break
# Re-encode a sanitized CSV for the existing importer # Header cleaner: fixes r:"Talk Title", stray quotes, spaces, case
out = io.StringIO() def _clean_header(s):
w = _csv.writer(out) s = "" if s is None else str(s)
for r in rows: s = s.strip()
w.writerow(r) if s.lower().startswith("r:") or s.lower().startswith("r="):
fixed_raw = out.getvalue().encode("utf-8") s = s[2:].lstrip()
if (len(s) >= 2) and (s[0] == s[-1]) and s[0] in ('"', "'"):
s = s[1:-1]
return s.strip().lower()
# Keep utils in sync for importer variants that read EXPECTED_HEADERS first = rows[0]
from . import utils as core_utils norm_first = [_clean_header(c) for c in first]
core_utils.EXPECTED_HEADERS = expected
# Hand off to the robust importer you already have # If first row isnt our header but length matches, inject one
report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {} header_ok = (norm_first == expected_norm)
report["header_ok"] = header_ok if not header_ok and len(first) == len(expected):
if not header_ok: rows.insert(0, expected)
messages.warning( elif not header_ok and len(first) != len(expected):
# Try common alternate delimiters if column count is off
for delim in (";", "\t"):
rdr2 = _csv.reader(io.StringIO(text), delimiter=delim)
test_rows = list(rdr2)
if test_rows and len(test_rows[0]) == len(expected):
rows = test_rows
first = rows[0]
norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == expected_norm)
if not header_ok:
rows.insert(0, expected)
break
# Re-encode a sanitized CSV for the existing importer
out = io.StringIO()
w = _csv.writer(out)
for r in rows:
w.writerow(r)
fixed_raw = out.getvalue().encode("utf-8")
# Keep utils in sync for importer variants that read EXPECTED_HEADERS
from . import utils as core_utils
core_utils.EXPECTED_HEADERS = expected
# Hand off to the robust importer you already have
report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {}
report["header_ok"] = header_ok
if not header_ok:
messages.warning(
request,
"The first row didnt match the expected header; a clean header was injected automatically."
)
return render(
request, request,
"The first row didnt match the expected header; a clean header was injected automatically." "import_result.html",
{"report": report, "dry_run": form.cleaned_data["dry_run"]},
) )
except Exception as e:
messages.error(request, f"Import failed: {e}")
else:
form = ImportForm()
return render( return render(request, "import_wizard.html", {"form": form})
request,
"import_result.html",
{"report": report, "dry_run": form.cleaned_data["dry_run"]},
)
except Exception as e:
messages.error(request, f"Import failed: {e}")
else:
form = ImportForm()
return render(request, "import_wizard.html", {"form": form})
@login_required @login_required
@user_passes_test(is_admin) @user_passes_test(is_admin)