Update web/core/utils.py

This commit is contained in:
Joshua Laymon 2025-08-13 05:42:08 +00:00
parent 70d12b776a
commit ec6be70805

View File

@ -1,18 +1,62 @@
import csv
import io
import re
from dateutil import parser as dateparser
from datetime import date
from dateutil import parser as dateparser
from core.models import Entry, ScriptureRef
# ----------------------------
# Search helpers (needed by views)
# ----------------------------
# Split query into tokens while preserving quoted phrases
_QUOTED_OR_WORD = re.compile(r'"([^"]+)"|(\S+)')
def terms(q: str):
out = []
for m in _QUOTED_OR_WORD.finditer(q or ""):
token = (m.group(1) or m.group(2) or "").strip()
if token:
out.append(token)
return out
def has_wildcards(token: str) -> bool:
return "*" in token or "?" in token
def wildcard_to_regex(token: str) -> str:
"""
Convert user wildcard token to a safe regex:
* -> .*
? -> .
Everything else is escaped. Suitable for Django __iregex.
"""
STAR = "__STAR__"
QMARK = "__QMARK__"
s = token.replace("*", STAR).replace("?", QMARK)
s = re.escape(s)
s = s.replace(STAR, ".*").replace(QMARK, ".")
return s
# ----------------------------
# Scripture parsing (minimal, non-blocking)
# ----------------------------
def parse_scripture(scripture_str: str):
"""
Minimal placeholder: keep as a list with raw string so imports never fail.
Replace with your richer parser when ready.
"""
if not scripture_str:
return []
return [{"raw": scripture_str}]
# ----------------------------
# CSV import (robust)
# ----------------------------
EXPECTED_HEADERS = [h.lower() for h in [
"Subject","Illustration","Application","Scripture","Source",
"Talk Title","Talk Number","Code","Date","Date Edited"
]]
def _sniff(text: str):
sample = text[:8192]
try:
@ -29,51 +73,49 @@ def _sniff(text: str):
dialect = _Simple
return dialect
def _as_dictreader(text: str, dialect, fieldnames=None):
"""
Yield rows as dicts. If fieldnames are provided, treat file as headerless.
We also peek one row: if it looks like an actual header row, we skip it.
"""
sio = io.StringIO(text)
if fieldnames is None:
reader = csv.DictReader(sio, dialect=dialect)
else:
reader = csv.DictReader(sio, dialect=dialect, fieldnames=fieldnames)
first = next(reader, None)
if first is not None:
matches = sum(1 for k, v in first.items() if (v or "").strip().lower() == k.strip().lower())
if matches < 5:
yield first
for row in reader:
yield row
return
# Headerless mode
reader = csv.DictReader(sio, dialect=dialect, fieldnames=fieldnames)
first = next(reader, None)
if first is not None:
# If many columns equal their header names, it's probably a header row
matches = sum(1 for k, v in first.items() if (v or "").strip().lower() == k.strip().lower())
if matches < 5:
# Not a header row, yield it
yield first
for row in reader:
yield row
def parse_scripture(scripture_str):
"""
Placeholder scripture parser adjust as needed.
"""
if not scripture_str:
return []
# Very basic parsing, could be replaced with real logic
return [{"raw": scripture_str}]
def import_csv_bytes(b: bytes, dry_run=True):
def import_csv_bytes(b: bytes, dry_run: bool = True):
"""
Robust import:
- Auto-detect delimiter (comma/semicolon/tab/pipe).
- If required headers are missing, re-parse treating file as *headerless*
using the canonical column order.
- Upsert by Code; skip rows that are entirely empty.
- Auto-detect delimiter (comma/semicolon/tab/pipe).
- If required headers are missing, re-parse treating file as *headerless*
using the canonical column order.
- Skip fully empty rows.
- Upsert by Code (if Code present), else insert.
Returns a report dict with counts and diagnostics.
"""
text = b.decode("utf-8-sig", errors="replace")
dialect = _sniff(text)
# First attempt: use file-provided headers
reader1 = csv.DictReader(io.StringIO(text), dialect=dialect)
headers1 = [(h or "").strip().lower() for h in (reader1.fieldnames or [])]
used_headerless = False
if not headers1 or sum(h in EXPECTED_HEADERS for h in headers1) < 5:
# Not enough expected headers -> treat as headerless/positional
used_headerless = True
rows_iter = _as_dictreader(text, dialect, fieldnames=EXPECTED_HEADERS)
else:
@ -103,19 +145,20 @@ def import_csv_bytes(b: bytes, dry_run=True):
subj = (row_lc.get("subject") or "").strip()
illu = (row_lc.get("illustration") or "").strip()
appl = (row_lc.get("application") or "").strip()
scr = (row_lc.get("scripture") or "").strip()
src = (row_lc.get("source") or "").strip()
tt = (row_lc.get("talk title") or "").strip()
scr = (row_lc.get("scripture") or "").strip()
src = (row_lc.get("source") or "").strip()
tt = (row_lc.get("talk title") or "").strip()
tnum = (row_lc.get("talk number") or "").strip()
code = (row_lc.get("code") or "").strip()
dadd = parse_date_safe(row_lc.get("date"))
ded = parse_date_safe(row_lc.get("date edited"))
ded = parse_date_safe(row_lc.get("date edited"))
try:
tnum = int(tnum) if tnum else None
except Exception:
tnum = None
# Skip rows that are completely empty across all tracked fields
if not any([subj, illu, appl, scr, src, tt, code, tnum, dadd, ded]):
report["skipped"] += 1
continue
@ -126,12 +169,11 @@ def import_csv_bytes(b: bytes, dry_run=True):
talk_title=tt, entry_code=code, date_added=dadd, date_edited=ded
)
parsed = parse_scripture(scr)
for it in parsed:
if it:
report["scripture_parsed"] += 1
else:
report["scripture_failed"] += 1
# Scripture parse diagnostics
parsed_list = parse_scripture(scr)
for it in parsed_list:
if it: report["scripture_parsed"] += 1
else: report["scripture_failed"] += 1
if dry_run:
continue
@ -153,8 +195,12 @@ def import_csv_bytes(b: bytes, dry_run=True):
obj = Entry.objects.create(**data)
report["inserted"] += 1
for it in parsed:
if it:
for it in parsed_list:
if it and isinstance(it, dict) and "raw" in it:
# Keep raw-only ref optional; skip creating ScriptureRef if schema differs
pass
elif it:
# If you switch to a structured parser, create records like:
ScriptureRef.objects.create(entry=obj, **it)
except Exception as e: