Illustrations/web/core/utils.py
Joshua Laymon 445da3523b Update web/core/utils.py
attempt at fixing import command
2025-08-13 14:55:47 +00:00

206 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import io
import re
from datetime import datetime
from typing import Dict, Any
from django.db import transaction
from core.models import Entry
# ==============================
# Helpers
# ==============================
def _decode_bytes(b: bytes) -> str:
# Keep BOM-safe decoding
return b.decode("utf-8-sig", errors="replace")
def _sniff_dialect(txt: str):
try:
return csv.Sniffer().sniff(txt[:4096], delimiters=[",", ";", "\t", "|"])
except Exception:
class _D: delimiter = ","
return _D()
def _norm_header(h: str) -> str:
"""
Normalize headers in a forgiving way:
- lower-case
- remove all non-alphanumerics
- collapse spaces/underscores
"""
if not h:
return ""
h = h.strip().lower()
h = h.replace("_", " ")
h = re.sub(r"\s+", " ", h)
# drop everything non-alnum
h = re.sub(r"[^a-z0-9 ]+", "", h)
return h.replace(" ", "")
def _getv(row: Dict[str, Any], hdr_map: Dict[str, str], canon: str) -> str:
# Look up using canonical key -> original header
for orig, can in hdr_map.items():
if can == canon:
v = row.get(orig, "")
return (v or "").strip()
return ""
def _clip(s: str, n: int) -> str:
s = (s or "").strip()
return s[:n] if n and len(s) > n else s
def _parse_date(s: str):
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y", "%m/%d/%y", "%Y.%m.%d", "%m-%d-%Y"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
# ==============================
# Public: import_csv_bytes
# ==============================
def import_csv_bytes(
csv_bytes: bytes,
dry_run: bool = False,
*,
# tune these if you changed model field sizes
max_source=255,
max_code=128,
max_talk_number=128,
max_talk_title=512,
max_scripture=512,
):
"""
Import CSV seed in an idempotent/upsert fashion.
Mapping (case/spacing-insensitive):
Subject, Illustration, Application, Scripture, Source,
Talk Title, Talk Number, Code, Date, Date Edited
"""
text = _decode_bytes(csv_bytes)
dialect = _sniff_dialect(text)
f = io.StringIO(text)
rdr = csv.DictReader(f, dialect=dialect)
seen_headers = [h.strip() for h in (rdr.fieldnames or [])]
# Build header normalization map
# Canonical keys we expect:
# subject illustration application scripture source talktitle talknumber code date dateedited
canon_targets = {
"subject": "subject",
"illustration": "illustration",
"application": "application",
"scripture": "scripture",
"source": "source",
"talktitle": "talk_title",
"title": "talk_title",
"talknumber": "talk_number",
"number": "talk_number",
"code": "code",
"date": "date",
"dateedited": "date_edited",
"edited": "date_edited",
}
header_map = {}
for h in seen_headers:
header_map[h] = canon_targets.get(_norm_header(h), _norm_header(h)) # unknowns still map to their norm
inserted = updated = skipped = 0
errors = []
scripture_parsed = 0
with transaction.atomic():
for idx, row in enumerate(rdr, start=2): # data starts at line 2
try:
subject = _getv(row, header_map, "subject")
illustration = _getv(row, header_map, "illustration")
application = _getv(row, header_map, "application")
scripture_raw = _clip(_getv(row, header_map, "scripture"), max_scripture)
source = _clip(_getv(row, header_map, "source"), max_source)
talk_title = _clip(_getv(row, header_map, "talk_title"), max_talk_title)
talk_number = _clip(_getv(row, header_map, "talk_number"), max_talk_number)
entry_code = _clip(_getv(row, header_map, "code"), max_code)
date_added = _parse_date(_getv(row, header_map, "date"))
date_edited = _parse_date(_getv(row, header_map, "date_edited"))
# Decide how to find an existing row:
# 1) Prefer Code if present (treat as external key)
# 2) Else fall back to (subject, illustration, application)
obj = None
if entry_code:
obj = Entry.objects.filter(entry_code=entry_code).first()
if obj is None:
obj = Entry.objects.filter(
subject=subject, illustration=illustration, application=application
).first()
created = obj is None
if created:
obj = Entry()
# Assign fields
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture_raw
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number
obj.entry_code = entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
if dry_run:
updated += 1 if not created else 0
inserted += 1 if created else 0
else:
obj.save()
if created:
inserted += 1
else:
updated += 1
# (Optional) quick scripture counter — were not parsing here,
# but keep a metric like your previous report
if scripture_raw:
scripture_parsed += 1
except Exception as e:
skipped += 1
# keep error list compact
msg = str(e)
if "value too long for type" in msg and max(msg.count("\n"), 0) == 0:
errors.append("value too long for type character varying(...)")
else:
errors.append(msg)
return {
"rows": inserted + updated + skipped,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": errors[:200], # cap to avoid huge output
"scripture_parsed": scripture_parsed,
"scripture_failed": 0,
"dialect_delimiter": dialect.delimiter,
"used_headerless_mode": False,
"seen_headers": [h.lower() for h in seen_headers],
}