Update web/core/utils.py

This commit is contained in:
Joshua Laymon 2025-08-13 05:37:32 +00:00
parent 11f01fd044
commit 4ad562250c

View File

@ -1,51 +1,93 @@
import csv, io, re import csv
import io
import re
from dateutil import parser as dateparser from dateutil import parser as dateparser
from datetime import date from datetime import date
# ---------------------------- from core.models import Entry, ScriptureRef
# Scripture parsing (unchanged)
# ----------------------------
SCR_REF_RE = re.compile( EXPECTED_HEADERS = [h.lower() for h in [
r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", "Subject","Illustration","Application","Scripture","Source",
re.VERBOSE, "Talk Title","Talk Number","Code","Date","Date Edited"
) ]]
BOOK_ALIASES = {
"matt": "Matthew",
"mt": "Matthew", def _sniff(text: str):
"jn": "John", sample = text[:8192]
"john": "John", try:
"lk": "Luke", dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|")
"luke": "Luke", except Exception:
"ps": "Psalms", class _Simple(csv.Dialect):
delimiter = ','
quotechar = '"'
escapechar = None
doublequote = True
skipinitialspace = True
lineterminator = '\n'
quoting = csv.QUOTE_MINIMAL
dialect = _Simple
return dialect
def _as_dictreader(text: str, dialect, fieldnames=None):
sio = io.StringIO(text)
if fieldnames is None:
reader = csv.DictReader(sio, dialect=dialect)
else:
reader = csv.DictReader(sio, dialect=dialect, fieldnames=fieldnames)
first = next(reader, None)
if first is not None:
matches = sum(1 for k, v in first.items() if (v or "").strip().lower() == k.strip().lower())
if matches < 5:
yield first
for row in reader:
yield row
return
for row in reader:
yield row
def parse_scripture(scripture_str):
"""
Placeholder scripture parser adjust as needed.
"""
if not scripture_str:
return []
# Very basic parsing, could be replaced with real logic
return [{"raw": scripture_str}]
def import_csv_bytes(b: bytes, dry_run=True):
"""
Robust import:
- Auto-detect delimiter (comma/semicolon/tab/pipe).
- If required headers are missing, re-parse treating file as *headerless*
using the canonical column order.
- Upsert by Code; skip rows that are entirely empty.
"""
text = b.decode("utf-8-sig", errors="replace")
dialect = _sniff(text)
reader1 = csv.DictReader(io.StringIO(text), dialect=dialect)
headers1 = [(h or "").strip().lower() for h in (reader1.fieldnames or [])]
used_headerless = False
if not headers1 or sum(h in EXPECTED_HEADERS for h in headers1) < 5:
used_headerless = True
rows_iter = _as_dictreader(text, dialect, fieldnames=EXPECTED_HEADERS)
else:
rows_iter = (row for row in reader1)
report = {
"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [],
"scripture_parsed": 0, "scripture_failed": 0,
"dialect_delimiter": getattr(dialect, "delimiter", "?"),
"used_headerless_mode": used_headerless,
"seen_headers": headers1,
} }
def parse_date_safe(v):
def normalize_book(s: str) -> str:
b = re.sub(r"[.\s]", "", s).lower()
return BOOK_ALIASES.get(b, s.strip())
def parse_scripture(s: str):
items = []
for p in [x.strip() for x in (s or "").split(";") if x.strip()]:
m = SCR_REF_RE.match(p)
if not m:
items.append(None)
continue
br, ch1, v1, ch2, v2 = m.groups()
items.append(
{
"book": normalize_book(br),
"chapter_from": int(ch1),
"verse_from": int(v1) if v1 else None,
"chapter_to": int(ch2) if ch2 else None,
"verse_to": int(v2) if v2 else None,
}
)
return items
def parse_date(v):
if not v or not str(v).strip(): if not v or not str(v).strip():
return None return None
try: try:
@ -53,79 +95,47 @@ def parse_date(v):
except Exception: except Exception:
return None return None
for row in rows_iter:
# ------------------------------------- report["rows"] += 1
# CSV import (headers/format as agreed)
# -------------------------------------
EXPECTED_HEADERS = [
h.lower()
for h in [
"Subject",
"Illustration",
"Application",
"Scripture",
"Source",
"Talk Title",
"Talk Number",
"Code",
"Date",
"Date Edited",
]
]
def import_csv_bytes(b: bytes, dry_run=True):
text = b.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text))
headers = [(h or "").strip().lower() for h in (reader.fieldnames or [])]
missing = [h for h in EXPECTED_HEADERS if h not in headers]
if missing:
raise ValueError(f"Missing required headers: {missing}")
report = {
"rows": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": [],
"scripture_parsed": 0,
"scripture_failed": 0,
}
rows = list(reader)
report["rows"] = len(rows)
from core.models import Entry, ScriptureRef
for row in rows:
try: try:
code = (row.get("code") or "").strip() row_lc = {(k or "").strip().lower(): (v or "") for k, v in row.items()}
talk = row.get("talk number")
subj = (row_lc.get("subject") or "").strip()
illu = (row_lc.get("illustration") or "").strip()
appl = (row_lc.get("application") or "").strip()
scr = (row_lc.get("scripture") or "").strip()
src = (row_lc.get("source") or "").strip()
tt = (row_lc.get("talk title") or "").strip()
tnum = (row_lc.get("talk number") or "").strip()
code = (row_lc.get("code") or "").strip()
dadd = parse_date_safe(row_lc.get("date"))
ded = parse_date_safe(row_lc.get("date edited"))
try: try:
talk = int(talk) if str(talk).strip() else None tnum = int(tnum) if tnum else None
except Exception: except Exception:
talk = None tnum = None
if not any([subj, illu, appl, scr, src, tt, code, tnum, dadd, ded]):
report["skipped"] += 1
continue
data = dict( data = dict(
subject=row.get("subject") or "", subject=subj, illustration=illu, application=appl,
illustration=row.get("illustration") or "", scripture_raw=scr, source=src, talk_number=tnum,
application=row.get("application") or "", talk_title=tt, entry_code=code, date_added=dadd, date_edited=ded
scripture_raw=row.get("scripture") or "",
source=row.get("source") or "",
talk_number=talk,
talk_title=row.get("talk title") or "",
entry_code=code,
date_added=parse_date(row.get("date")),
date_edited=parse_date(row.get("date edited")),
) )
parsed = parse_scripture(data["scripture_raw"]) parsed = parse_scripture(scr)
for it in parsed: for it in parsed:
if it: if it:
report["scripture_parsed"] += 1 report["scripture_parsed"] += 1
else: else:
report["scripture_failed"] += 1 report["scripture_failed"] += 1
if not dry_run: if dry_run:
continue
obj = None obj = None
if code: if code:
try: try:
@ -152,41 +162,3 @@ def import_csv_bytes(b: bytes, dry_run=True):
report["errors"].append(str(e)) report["errors"].append(str(e))
return report return report
# --------------------------------------------
# Search helpers: tokens & wildcard-to-regex
# --------------------------------------------
_QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)')
def terms(q: str):
"""
Split into tokens while preserving quoted phrases.
"""
out = []
for m in _QUOTED_OR_WORD.finditer(q or ""):
token = (m.group(1) or m.group(2) or "").strip()
if token:
out.append(token)
return out
def has_wildcards(token: str) -> bool:
return "*" in token or "?" in token
def wildcard_to_regex(token: str) -> str:
"""
Convert user wildcard token to a safe regex:
* -> .*
? -> .
Everything else is escaped.
We rely on Django's `__iregex` for case-insensitive matching.
"""
STAR = "__STAR__"
QMARK = "__QMARK__"
s = token.replace("*", STAR).replace("?", QMARK)
s = re.escape(s)
s = s.replace(STAR, ".*").replace(QMARK, ".")
return s