From e67834a5e88dceaa1a5022c8b0c96a8da411b7bf Mon Sep 17 00:00:00 2001 From: Joshua Laymon Date: Wed, 13 Aug 2025 05:11:36 +0000 Subject: [PATCH] Update web/core/utils.py --- web/core/utils.py | 183 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 144 insertions(+), 39 deletions(-) diff --git a/web/core/utils.py b/web/core/utils.py index f9fc98a..0c5941a 100644 --- a/web/core/utils.py +++ b/web/core/utils.py @@ -2,47 +2,110 @@ import csv, io, re from dateutil import parser as dateparser from datetime import date -SCR_REF_RE = re.compile(r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", re.VERBOSE) -BOOK_ALIASES={'matt':'Matthew','mt':'Matthew','jn':'John','john':'John','lk':'Luke','luke':'Luke','ps':'Psalms'} +# ---------------------------- +# Scripture parsing (unchanged) +# ---------------------------- +SCR_REF_RE = re.compile( + r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", + re.VERBOSE, +) +BOOK_ALIASES = { + "matt": "Matthew", + "mt": "Matthew", + "jn": "John", + "john": "John", + "lk": "Luke", + "luke": "Luke", + "ps": "Psalms", +} -def normalize_book(s): - import re as _re - b = _re.sub(r"[.\s]","", s).lower() + +def normalize_book(s: str) -> str: + b = re.sub(r"[.\s]", "", s).lower() return BOOK_ALIASES.get(b, s.strip()) -def parse_scripture(s): - items=[] - for p in [x.strip() for x in (s or '').split(';') if x.strip()]: + +def parse_scripture(s: str): + items = [] + for p in [x.strip() for x in (s or "").split(";") if x.strip()]: m = SCR_REF_RE.match(p) - if not m: items.append(None); continue + if not m: + items.append(None) + continue br, ch1, v1, ch2, v2 = m.groups() - items.append({"book": normalize_book(br), "chapter_from": int(ch1), "verse_from": int(v1) if v1 else None, - "chapter_to": int(ch2) if ch2 else None, "verse_to": int(v2) if v2 else None}) + items.append( + { + "book": normalize_book(br), + "chapter_from": int(ch1), + "verse_from": int(v1) if v1 else None, + "chapter_to": int(ch2) if ch2 else None, + "verse_to": int(v2) if v2 else None, + } + ) return items -def parse_date(v): - if not v or not str(v).strip(): return None - try: return dateparser.parse(str(v)).date() - except Exception: return None -EXPECTED_HEADERS=[h.lower() for h in ["Subject","Illustration","Application","Scripture","Source","Talk Title","Talk Number","Code","Date","Date Edited"]] +def parse_date(v): + if not v or not str(v).strip(): + return None + try: + return dateparser.parse(str(v)).date() + except Exception: + return None + + +# ------------------------------------- +# CSV import (headers/format as agreed) +# ------------------------------------- +EXPECTED_HEADERS = [ + h.lower() + for h in [ + "Subject", + "Illustration", + "Application", + "Scripture", + "Source", + "Talk Title", + "Talk Number", + "Code", + "Date", + "Date Edited", + ] +] + def import_csv_bytes(b: bytes, dry_run=True): text = b.decode("utf-8-sig") reader = csv.DictReader(io.StringIO(text)) - headers=[(h or '').strip().lower() for h in (reader.fieldnames or [])] - missing=[h for h in EXPECTED_HEADERS if h not in headers] - if missing: raise ValueError(f"Missing required headers: {missing}") - report={"rows":0,"inserted":0,"updated":0,"skipped":0,"errors":[],"scripture_parsed":0,"scripture_failed":0} - rows=list(reader); report["rows"]=len(rows) + headers = [(h or "").strip().lower() for h in (reader.fieldnames or [])] + missing = [h for h in EXPECTED_HEADERS if h not in headers] + if missing: + raise ValueError(f"Missing required headers: {missing}") + + report = { + "rows": 0, + "inserted": 0, + "updated": 0, + "skipped": 0, + "errors": [], + "scripture_parsed": 0, + "scripture_failed": 0, + } + rows = list(reader) + report["rows"] = len(rows) + from core.models import Entry, ScriptureRef + for row in rows: try: - code=(row.get("code") or "").strip() - talk=row.get("talk number") - try: talk=int(talk) if str(talk).strip() else None - except: talk=None - data=dict( + code = (row.get("code") or "").strip() + talk = row.get("talk number") + try: + talk = int(talk) if str(talk).strip() else None + except Exception: + talk = None + + data = dict( subject=row.get("subject") or "", illustration=row.get("illustration") or "", application=row.get("application") or "", @@ -54,34 +117,76 @@ def import_csv_bytes(b: bytes, dry_run=True): date_added=parse_date(row.get("date")), date_edited=parse_date(row.get("date edited")), ) - parsed=parse_scripture(data["scripture_raw"]) + + parsed = parse_scripture(data["scripture_raw"]) for it in parsed: - if it: report["scripture_parsed"]+=1 - else: report["scripture_failed"]+=1 - obj=None + if it: + report["scripture_parsed"] += 1 + else: + report["scripture_failed"] += 1 + if not dry_run: + obj = None if code: try: - obj=Entry.objects.get(entry_code=code) + obj = Entry.objects.get(entry_code=code) except Entry.DoesNotExist: - obj=None + obj = None + if obj: - for k,v in data.items(): setattr(obj,k,v) - obj.save(); obj.scripture_refs.all().delete(); report["updated"]+=1 + for k, v in data.items(): + setattr(obj, k, v) + obj.save() + obj.scripture_refs.all().delete() + report["updated"] += 1 else: - obj=Entry.objects.create(**data); report["inserted"]+=1 + obj = Entry.objects.create(**data) + report["inserted"] += 1 + for it in parsed: - if it: ScriptureRef.objects.create(entry=obj, **it) + if it: + ScriptureRef.objects.create(entry=obj, **it) + except Exception as e: - report["skipped"]+=1; report["errors"].append(str(e)) + report["skipped"] += 1 + report["errors"].append(str(e)) + return report -# Tokenization with quoted phrases; wildcards tolerated but removed for icontains + +# -------------------------------------------- +# Search helpers: tokens & wildcard-to-regex +# -------------------------------------------- _QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)') + + def terms(q: str): + """ + Split into tokens while preserving quoted phrases. + """ out = [] for m in _QUOTED_OR_WORD.finditer(q or ""): - token = (m.group(1) or m.group(2) or "").replace("*","").replace("?","").strip() + token = (m.group(1) or m.group(2) or "").strip() if token: out.append(token) return out + + +def has_wildcards(token: str) -> bool: + return "*" in token or "?" in token + + +def wildcard_to_regex(token: str) -> str: + """ + Convert user wildcard token to a safe regex: + * -> .* + ? -> . + Everything else is escaped. + We rely on Django's `__iregex` for case-insensitive matching. + """ + STAR = "__STAR__" + QMARK = "__QMARK__" + s = token.replace("*", STAR).replace("?", QMARK) + s = re.escape(s) + s = s.replace(STAR, ".*").replace(QMARK, ".") + return s \ No newline at end of file