Illustrations/web/core/utils.py

266 lines
8.2 KiB
Python

# core/utils.py
from __future__ import annotations
import csv
import io
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from django.db import transaction
from core.models import Entry
# =============================================================================
# Search helpers (used by views)
# =============================================================================
def terms(q: str) -> List[str]:
"""
Split a query string into tokens.
- Quoted phrases are kept together: "good shepherd"
- Unquoted text splits on whitespace.
"""
if not q:
return []
rx = re.compile(r'"([^"]+)"|(\S+)')
out: List[str] = []
for m in rx.finditer(q):
piece = m.group(1) if m.group(1) is not None else m.group(2)
t = (piece or "").strip()
if t:
out.append(t)
return out
def has_wildcards(s: Optional[str]) -> bool:
"""True if user supplied wildcard characters (*, ?, % or _)."""
if not s:
return False
return any(ch in s for ch in ("*", "?", "%", "_"))
def wildcard_to_regex(s: Optional[str]) -> str:
r"""
Convert FileMaker-style wildcards to a regex fragment suitable for Django's
iregex lookup.
Rules:
- Escape regex meta first, then replace \* -> .* and \? -> .
- Wrap with '.*' so it matches anywhere (like icontains).
"""
if s is None:
s = ""
pat = re.escape(s)
pat = pat.replace(r"\*", ".*").replace(r"\?", ".")
pat = f".*{pat}.*"
pat = re.sub(r"(?:\.\*){2,}", ".*", pat) # collapse repeats
return pat
# =============================================================================
# CSV import utilities
# =============================================================================
def _decode_bytes(b: bytes) -> str:
# BOM-safe decode
return b.decode("utf-8-sig", errors="replace")
def _sniff_dialect(txt: str):
try:
return csv.Sniffer().sniff(txt[:4096], delimiters=[",", ";", "\t", "|"])
except Exception:
class _D: delimiter = ","
return _D()
def _norm_header(h: str) -> str:
"""
Normalize a header name in a forgiving way:
- lower-case
- treat underscores as spaces
- collapse spaces
- drop non-alphanumerics
"""
if not h:
return ""
h = h.strip().lower().replace("_", " ")
h = re.sub(r"\s+", " ", h)
h = re.sub(r"[^a-z0-9 ]+", "", h)
return h.replace(" ", "")
def _build_header_map(headers: List[str]) -> Dict[str, str]:
"""
Map original header -> canonical key the importer expects.
Canonical keys we use internally:
subject, illustration, application, scripture, source,
talk_title, talk_number, code, date, date_edited
"""
canon_targets = {
"subject": "subject",
"illustration": "illustration",
"application": "application",
"scripture": "scripture",
"source": "source",
"talktitle": "talk_title",
"title": "talk_title",
"talknumber": "talk_number",
"number": "talk_number",
"code": "code",
"date": "date",
"dateedited": "date_edited",
"edited": "date_edited",
}
out: Dict[str, str] = {}
for h in headers:
norm = _norm_header(h)
out[h] = canon_targets.get(norm, norm) # unknowns map to their normalized name
return out
def _getv(row: Dict[str, Any], hdr_map: Dict[str, str], canon: str) -> str:
"""Case/spacing-insensitive value lookup."""
for original, mapped in hdr_map.items():
if mapped == canon:
return (row.get(original) or "").strip()
return ""
def _clip(s: str, n: int) -> str:
s = (s or "").strip()
return s[:n] if n and len(s) > n else s
def _parse_date(s: str):
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y", "%m/%d/%y", "%Y.%m.%d", "%m-%d-%Y"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def _parse_int(s: str) -> Optional[int]:
"""Return an int from a string (tolerates commas), else None."""
s = (s or "").strip()
if not s:
return None
m = re.match(r"^-?\d+", s.replace(",", ""))
return int(m.group(0)) if m else None
def import_csv_bytes(
csv_bytes: bytes,
dry_run: bool = False,
*,
# tune these if you changed model field sizes
max_source=255,
max_code=128,
max_talk_number=128, # only affects clipping BEFORE int parse; int parse handles None
max_talk_title=512,
max_scripture=512,
):
"""
Import CSV seed in an idempotent/upsert fashion.
Expected headers (case/spacing-insensitive):
Subject, Illustration, Application, Scripture, Source,
Talk Title, Talk Number, Code, Date, Date Edited
Upsert rule:
1) Prefer Code if present (treat as external key).
2) Else fall back to the triple (subject, illustration, application).
"""
text = _decode_bytes(csv_bytes)
dialect = _sniff_dialect(text)
f = io.StringIO(text)
rdr = csv.DictReader(f, dialect=dialect)
seen_headers = [h.strip() for h in (rdr.fieldnames or [])]
header_map = _build_header_map(seen_headers)
inserted = updated = skipped = 0
errors: List[str] = []
scripture_parsed = 0
for idx, row in enumerate(rdr, start=2): # data starts at line 2
try:
with transaction.atomic():
subject = _getv(row, header_map, "subject")
illustration = _getv(row, header_map, "illustration")
application = _getv(row, header_map, "application")
scripture_raw = _clip(_getv(row, header_map, "scripture"), max_scripture)
source = _clip(_getv(row, header_map, "source"), max_source)
talk_title = _clip(_getv(row, header_map, "talk_title"), max_talk_title)
# Safe talk number parse (non-numeric -> None)
talk_number_raw = _clip(_getv(row, header_map, "talk_number"), max_talk_number)
talk_number = _parse_int(talk_number_raw)
entry_code = _clip(_getv(row, header_map, "code"), max_code)
date_added = _parse_date(_getv(row, header_map, "date"))
date_edited = _parse_date(_getv(row, header_map, "date_edited"))
# Find existing
obj: Optional[Entry] = None
if entry_code:
obj = Entry.objects.filter(entry_code=entry_code).first()
if obj is None:
obj = Entry.objects.filter(
subject=subject,
illustration=illustration,
application=application,
).first()
created = obj is None
if created:
obj = Entry()
# Assign
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture_raw
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number # None is fine for IntegerField
obj.entry_code = entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
if not dry_run:
obj.save()
if created:
inserted += 1
else:
updated += 1
if scripture_raw:
scripture_parsed += 1
except Exception as e:
skipped += 1
errors.append(f"line {idx}: {type(e).__name__}: {e}")
return {
"rows": inserted + updated + skipped,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": errors[:200], # cap output
"scripture_parsed": scripture_parsed,
"scripture_failed": 0,
"dialect_delimiter": dialect.delimiter,
"used_headerless_mode": False,
"seen_headers": [h.lower() for h in seen_headers],
}