From 4ad562250c7e92a2a2e60f7634e88254e4a5825e Mon Sep 17 00:00:00 2001 From: Joshua Laymon Date: Wed, 13 Aug 2025 05:37:32 +0000 Subject: [PATCH] Update web/core/utils.py --- web/core/utils.py | 280 +++++++++++++++++++++------------------------- 1 file changed, 126 insertions(+), 154 deletions(-) diff --git a/web/core/utils.py b/web/core/utils.py index 0c5941a..206305a 100644 --- a/web/core/utils.py +++ b/web/core/utils.py @@ -1,192 +1,164 @@ -import csv, io, re +import csv +import io +import re from dateutil import parser as dateparser from datetime import date -# ---------------------------- -# Scripture parsing (unchanged) -# ---------------------------- -SCR_REF_RE = re.compile( - r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", - re.VERBOSE, -) -BOOK_ALIASES = { - "matt": "Matthew", - "mt": "Matthew", - "jn": "John", - "john": "John", - "lk": "Luke", - "luke": "Luke", - "ps": "Psalms", -} +from core.models import Entry, ScriptureRef -def normalize_book(s: str) -> str: - b = re.sub(r"[.\s]", "", s).lower() - return BOOK_ALIASES.get(b, s.strip()) +EXPECTED_HEADERS = [h.lower() for h in [ + "Subject","Illustration","Application","Scripture","Source", + "Talk Title","Talk Number","Code","Date","Date Edited" +]] -def parse_scripture(s: str): - items = [] - for p in [x.strip() for x in (s or "").split(";") if x.strip()]: - m = SCR_REF_RE.match(p) - if not m: - items.append(None) - continue - br, ch1, v1, ch2, v2 = m.groups() - items.append( - { - "book": normalize_book(br), - "chapter_from": int(ch1), - "verse_from": int(v1) if v1 else None, - "chapter_to": int(ch2) if ch2 else None, - "verse_to": int(v2) if v2 else None, - } - ) - return items - - -def parse_date(v): - if not v or not str(v).strip(): - return None +def _sniff(text: str): + sample = text[:8192] try: - return dateparser.parse(str(v)).date() + dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") except Exception: - return None + class _Simple(csv.Dialect): + delimiter = ',' + quotechar = '"' + escapechar = None + doublequote = True + skipinitialspace = True + lineterminator = '\n' + quoting = csv.QUOTE_MINIMAL + dialect = _Simple + return dialect -# ------------------------------------- -# CSV import (headers/format as agreed) -# ------------------------------------- -EXPECTED_HEADERS = [ - h.lower() - for h in [ - "Subject", - "Illustration", - "Application", - "Scripture", - "Source", - "Talk Title", - "Talk Number", - "Code", - "Date", - "Date Edited", - ] -] +def _as_dictreader(text: str, dialect, fieldnames=None): + sio = io.StringIO(text) + if fieldnames is None: + reader = csv.DictReader(sio, dialect=dialect) + else: + reader = csv.DictReader(sio, dialect=dialect, fieldnames=fieldnames) + first = next(reader, None) + if first is not None: + matches = sum(1 for k, v in first.items() if (v or "").strip().lower() == k.strip().lower()) + if matches < 5: + yield first + for row in reader: + yield row + return + for row in reader: + yield row + + +def parse_scripture(scripture_str): + """ + Placeholder scripture parser — adjust as needed. + """ + if not scripture_str: + return [] + # Very basic parsing, could be replaced with real logic + return [{"raw": scripture_str}] def import_csv_bytes(b: bytes, dry_run=True): - text = b.decode("utf-8-sig") - reader = csv.DictReader(io.StringIO(text)) - headers = [(h or "").strip().lower() for h in (reader.fieldnames or [])] - missing = [h for h in EXPECTED_HEADERS if h not in headers] - if missing: - raise ValueError(f"Missing required headers: {missing}") + """ + Robust import: + - Auto-detect delimiter (comma/semicolon/tab/pipe). + - If required headers are missing, re-parse treating file as *headerless* + using the canonical column order. + - Upsert by Code; skip rows that are entirely empty. + """ + text = b.decode("utf-8-sig", errors="replace") + dialect = _sniff(text) + + reader1 = csv.DictReader(io.StringIO(text), dialect=dialect) + headers1 = [(h or "").strip().lower() for h in (reader1.fieldnames or [])] + + used_headerless = False + if not headers1 or sum(h in EXPECTED_HEADERS for h in headers1) < 5: + used_headerless = True + rows_iter = _as_dictreader(text, dialect, fieldnames=EXPECTED_HEADERS) + else: + rows_iter = (row for row in reader1) report = { - "rows": 0, - "inserted": 0, - "updated": 0, - "skipped": 0, - "errors": [], - "scripture_parsed": 0, - "scripture_failed": 0, + "rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [], + "scripture_parsed": 0, "scripture_failed": 0, + "dialect_delimiter": getattr(dialect, "delimiter", "?"), + "used_headerless_mode": used_headerless, + "seen_headers": headers1, } - rows = list(reader) - report["rows"] = len(rows) - from core.models import Entry, ScriptureRef - - for row in rows: + def parse_date_safe(v): + if not v or not str(v).strip(): + return None try: - code = (row.get("code") or "").strip() - talk = row.get("talk number") + return dateparser.parse(str(v)).date() + except Exception: + return None + + for row in rows_iter: + report["rows"] += 1 + try: + row_lc = {(k or "").strip().lower(): (v or "") for k, v in row.items()} + + subj = (row_lc.get("subject") or "").strip() + illu = (row_lc.get("illustration") or "").strip() + appl = (row_lc.get("application") or "").strip() + scr = (row_lc.get("scripture") or "").strip() + src = (row_lc.get("source") or "").strip() + tt = (row_lc.get("talk title") or "").strip() + tnum = (row_lc.get("talk number") or "").strip() + code = (row_lc.get("code") or "").strip() + dadd = parse_date_safe(row_lc.get("date")) + ded = parse_date_safe(row_lc.get("date edited")) + try: - talk = int(talk) if str(talk).strip() else None + tnum = int(tnum) if tnum else None except Exception: - talk = None + tnum = None + + if not any([subj, illu, appl, scr, src, tt, code, tnum, dadd, ded]): + report["skipped"] += 1 + continue data = dict( - subject=row.get("subject") or "", - illustration=row.get("illustration") or "", - application=row.get("application") or "", - scripture_raw=row.get("scripture") or "", - source=row.get("source") or "", - talk_number=talk, - talk_title=row.get("talk title") or "", - entry_code=code, - date_added=parse_date(row.get("date")), - date_edited=parse_date(row.get("date edited")), + subject=subj, illustration=illu, application=appl, + scripture_raw=scr, source=src, talk_number=tnum, + talk_title=tt, entry_code=code, date_added=dadd, date_edited=ded ) - parsed = parse_scripture(data["scripture_raw"]) + parsed = parse_scripture(scr) for it in parsed: if it: report["scripture_parsed"] += 1 else: report["scripture_failed"] += 1 - if not dry_run: - obj = None - if code: - try: - obj = Entry.objects.get(entry_code=code) - except Entry.DoesNotExist: - obj = None + if dry_run: + continue - if obj: - for k, v in data.items(): - setattr(obj, k, v) - obj.save() - obj.scripture_refs.all().delete() - report["updated"] += 1 - else: - obj = Entry.objects.create(**data) - report["inserted"] += 1 + obj = None + if code: + try: + obj = Entry.objects.get(entry_code=code) + except Entry.DoesNotExist: + obj = None - for it in parsed: - if it: - ScriptureRef.objects.create(entry=obj, **it) + if obj: + for k, v in data.items(): + setattr(obj, k, v) + obj.save() + obj.scripture_refs.all().delete() + report["updated"] += 1 + else: + obj = Entry.objects.create(**data) + report["inserted"] += 1 + + for it in parsed: + if it: + ScriptureRef.objects.create(entry=obj, **it) except Exception as e: report["skipped"] += 1 report["errors"].append(str(e)) - return report - - -# -------------------------------------------- -# Search helpers: tokens & wildcard-to-regex -# -------------------------------------------- -_QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)') - - -def terms(q: str): - """ - Split into tokens while preserving quoted phrases. - """ - out = [] - for m in _QUOTED_OR_WORD.finditer(q or ""): - token = (m.group(1) or m.group(2) or "").strip() - if token: - out.append(token) - return out - - -def has_wildcards(token: str) -> bool: - return "*" in token or "?" in token - - -def wildcard_to_regex(token: str) -> str: - """ - Convert user wildcard token to a safe regex: - * -> .* - ? -> . - Everything else is escaped. - We rely on Django's `__iregex` for case-insensitive matching. - """ - STAR = "__STAR__" - QMARK = "__QMARK__" - s = token.replace("*", STAR).replace("?", QMARK) - s = re.escape(s) - s = s.replace(STAR, ".*").replace(QMARK, ".") - return s \ No newline at end of file + return report \ No newline at end of file