Update web/core/utils.py

This commit is contained in:
Joshua Laymon 2025-08-13 05:11:36 +00:00
parent a266dd9ea2
commit e67834a5e8

View File

@ -2,47 +2,110 @@ import csv, io, re
from dateutil import parser as dateparser from dateutil import parser as dateparser
from datetime import date from datetime import date
SCR_REF_RE = re.compile(r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", re.VERBOSE) # ----------------------------
BOOK_ALIASES={'matt':'Matthew','mt':'Matthew','jn':'John','john':'John','lk':'Luke','luke':'Luke','ps':'Psalms'} # Scripture parsing (unchanged)
# ----------------------------
SCR_REF_RE = re.compile(
r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""",
re.VERBOSE,
)
BOOK_ALIASES = {
"matt": "Matthew",
"mt": "Matthew",
"jn": "John",
"john": "John",
"lk": "Luke",
"luke": "Luke",
"ps": "Psalms",
}
def normalize_book(s):
import re as _re def normalize_book(s: str) -> str:
b = _re.sub(r"[.\s]","", s).lower() b = re.sub(r"[.\s]", "", s).lower()
return BOOK_ALIASES.get(b, s.strip()) return BOOK_ALIASES.get(b, s.strip())
def parse_scripture(s):
items=[] def parse_scripture(s: str):
for p in [x.strip() for x in (s or '').split(';') if x.strip()]: items = []
for p in [x.strip() for x in (s or "").split(";") if x.strip()]:
m = SCR_REF_RE.match(p) m = SCR_REF_RE.match(p)
if not m: items.append(None); continue if not m:
items.append(None)
continue
br, ch1, v1, ch2, v2 = m.groups() br, ch1, v1, ch2, v2 = m.groups()
items.append({"book": normalize_book(br), "chapter_from": int(ch1), "verse_from": int(v1) if v1 else None, items.append(
"chapter_to": int(ch2) if ch2 else None, "verse_to": int(v2) if v2 else None}) {
"book": normalize_book(br),
"chapter_from": int(ch1),
"verse_from": int(v1) if v1 else None,
"chapter_to": int(ch2) if ch2 else None,
"verse_to": int(v2) if v2 else None,
}
)
return items return items
def parse_date(v):
if not v or not str(v).strip(): return None
try: return dateparser.parse(str(v)).date()
except Exception: return None
EXPECTED_HEADERS=[h.lower() for h in ["Subject","Illustration","Application","Scripture","Source","Talk Title","Talk Number","Code","Date","Date Edited"]] def parse_date(v):
if not v or not str(v).strip():
return None
try:
return dateparser.parse(str(v)).date()
except Exception:
return None
# -------------------------------------
# CSV import (headers/format as agreed)
# -------------------------------------
EXPECTED_HEADERS = [
h.lower()
for h in [
"Subject",
"Illustration",
"Application",
"Scripture",
"Source",
"Talk Title",
"Talk Number",
"Code",
"Date",
"Date Edited",
]
]
def import_csv_bytes(b: bytes, dry_run=True): def import_csv_bytes(b: bytes, dry_run=True):
text = b.decode("utf-8-sig") text = b.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text)) reader = csv.DictReader(io.StringIO(text))
headers=[(h or '').strip().lower() for h in (reader.fieldnames or [])] headers = [(h or "").strip().lower() for h in (reader.fieldnames or [])]
missing=[h for h in EXPECTED_HEADERS if h not in headers] missing = [h for h in EXPECTED_HEADERS if h not in headers]
if missing: raise ValueError(f"Missing required headers: {missing}") if missing:
report={"rows":0,"inserted":0,"updated":0,"skipped":0,"errors":[],"scripture_parsed":0,"scripture_failed":0} raise ValueError(f"Missing required headers: {missing}")
rows=list(reader); report["rows"]=len(rows)
report = {
"rows": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": [],
"scripture_parsed": 0,
"scripture_failed": 0,
}
rows = list(reader)
report["rows"] = len(rows)
from core.models import Entry, ScriptureRef from core.models import Entry, ScriptureRef
for row in rows: for row in rows:
try: try:
code=(row.get("code") or "").strip() code = (row.get("code") or "").strip()
talk=row.get("talk number") talk = row.get("talk number")
try: talk=int(talk) if str(talk).strip() else None try:
except: talk=None talk = int(talk) if str(talk).strip() else None
data=dict( except Exception:
talk = None
data = dict(
subject=row.get("subject") or "", subject=row.get("subject") or "",
illustration=row.get("illustration") or "", illustration=row.get("illustration") or "",
application=row.get("application") or "", application=row.get("application") or "",
@ -54,34 +117,76 @@ def import_csv_bytes(b: bytes, dry_run=True):
date_added=parse_date(row.get("date")), date_added=parse_date(row.get("date")),
date_edited=parse_date(row.get("date edited")), date_edited=parse_date(row.get("date edited")),
) )
parsed=parse_scripture(data["scripture_raw"])
parsed = parse_scripture(data["scripture_raw"])
for it in parsed: for it in parsed:
if it: report["scripture_parsed"]+=1 if it:
else: report["scripture_failed"]+=1 report["scripture_parsed"] += 1
obj=None else:
report["scripture_failed"] += 1
if not dry_run: if not dry_run:
obj = None
if code: if code:
try: try:
obj=Entry.objects.get(entry_code=code) obj = Entry.objects.get(entry_code=code)
except Entry.DoesNotExist: except Entry.DoesNotExist:
obj=None obj = None
if obj: if obj:
for k,v in data.items(): setattr(obj,k,v) for k, v in data.items():
obj.save(); obj.scripture_refs.all().delete(); report["updated"]+=1 setattr(obj, k, v)
obj.save()
obj.scripture_refs.all().delete()
report["updated"] += 1
else: else:
obj=Entry.objects.create(**data); report["inserted"]+=1 obj = Entry.objects.create(**data)
report["inserted"] += 1
for it in parsed: for it in parsed:
if it: ScriptureRef.objects.create(entry=obj, **it) if it:
ScriptureRef.objects.create(entry=obj, **it)
except Exception as e: except Exception as e:
report["skipped"]+=1; report["errors"].append(str(e)) report["skipped"] += 1
report["errors"].append(str(e))
return report return report
# Tokenization with quoted phrases; wildcards tolerated but removed for icontains
# --------------------------------------------
# Search helpers: tokens & wildcard-to-regex
# --------------------------------------------
_QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)') _QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)')
def terms(q: str): def terms(q: str):
"""
Split into tokens while preserving quoted phrases.
"""
out = [] out = []
for m in _QUOTED_OR_WORD.finditer(q or ""): for m in _QUOTED_OR_WORD.finditer(q or ""):
token = (m.group(1) or m.group(2) or "").replace("*","").replace("?","").strip() token = (m.group(1) or m.group(2) or "").strip()
if token: if token:
out.append(token) out.append(token)
return out return out
def has_wildcards(token: str) -> bool:
return "*" in token or "?" in token
def wildcard_to_regex(token: str) -> str:
"""
Convert user wildcard token to a safe regex:
* -> .*
? -> .
Everything else is escaped.
We rely on Django's `__iregex` for case-insensitive matching.
"""
STAR = "__STAR__"
QMARK = "__QMARK__"
s = token.replace("*", STAR).replace("?", QMARK)
s = re.escape(s)
s = s.replace(STAR, ".*").replace(QMARK, ".")
return s