From bf084fc13c9077a8758bca7cdca6c2060c4aa4bb Mon Sep 17 00:00:00 2001 From: Joshua Laymon Date: Wed, 13 Aug 2025 15:08:26 +0000 Subject: [PATCH] Update web/core/utils.py --- web/core/utils.py | 237 ++++++++++++++++++++++++---------------------- 1 file changed, 124 insertions(+), 113 deletions(-) diff --git a/web/core/utils.py b/web/core/utils.py index a14face..6045e6a 100644 --- a/web/core/utils.py +++ b/web/core/utils.py @@ -1,68 +1,69 @@ +# core/utils.py +from __future__ import annotations + import csv import io import re from datetime import datetime -from typing import Dict, Any +from typing import Any, Dict, List, Optional from django.db import transaction from core.models import Entry -# --- Search helpers restored ------------------------------------------------- -from typing import List + +# ============================================================================= +# Search helpers (used by views) +# ============================================================================= def terms(q: str) -> List[str]: """ - Split a query string into search terms. - - Quoted phrases are kept together: `"good shepherd"` + Split a query string into tokens. + - Quoted phrases are kept together: "good shepherd" - Unquoted text splits on whitespace. - - Empty/whitespace-only input returns []. """ if not q: return [] - # capture "quoted phrases" OR bare tokens rx = re.compile(r'"([^"]+)"|(\S+)') - out = [] + out: List[str] = [] for m in rx.finditer(q): - phrase = m.group(1) if m.group(1) is not None else m.group(2) - t = (phrase or "").strip() + piece = m.group(1) if m.group(1) is not None else m.group(2) + t = (piece or "").strip() if t: out.append(t) return out -def has_wildcards(s: str) -> bool: - """ - True if user supplied * or ? wildcards (FileMaker-style). - We also treat SQL wildcards % and _ as wildcards if present. - """ + +def has_wildcards(s: Optional[str]) -> bool: + """True if user supplied wildcard characters (*, ?, % or _).""" if not s: return False return any(ch in s for ch in ("*", "?", "%", "_")) -def wildcard_to_regex(s: str) -> str: - """ - Convert * and ? to a case-insensitive regex fragment suitable for Django's iregex. - - Escapes regex meta first, then replaces \* -> .* and \? -> . - - Wraps with '.*' so it matches anywhere (like icontains). - Example: 'lov* you?' -> '(?i).*lov.* you..*' - (The view should use iregex so (?i) or case-insensitive flag applies.) + +def wildcard_to_regex(s: Optional[str]) -> str: + r""" + Convert FileMaker-style wildcards to a regex fragment suitable for Django's + iregex lookup. + + Rules: + - Escape regex meta first, then replace \* -> .* and \? -> . + - Wrap with '.*' so it matches anywhere (like icontains). """ if s is None: s = "" - # Escape regex specials, then un-escape our wildcards into regex pat = re.escape(s) pat = pat.replace(r"\*", ".*").replace(r"\?", ".") - # Match anywhere by default pat = f".*{pat}.*" - # collapse consecutive ".*.*" - pat = re.sub(r"(?:\.\*){2,}", ".*", pat) + pat = re.sub(r"(?:\.\*){2,}", ".*", pat) # collapse repeats return pat -# ----------------------------------------------------------------------------- -# ============================== -# Helpers -# ============================== + + +# ============================================================================= +# CSV import utilities +# ============================================================================= def _decode_bytes(b: bytes) -> str: - # Keep BOM-safe decoding + # BOM-safe decode return b.decode("utf-8-sig", errors="replace") @@ -76,27 +77,54 @@ def _sniff_dialect(txt: str): def _norm_header(h: str) -> str: """ - Normalize headers in a forgiving way: - - lower-case - - remove all non-alphanumerics - - collapse spaces/underscores + Normalize a header name in a forgiving way: + - lower-case + - treat underscores as spaces + - collapse spaces + - drop non-alphanumerics """ if not h: return "" - h = h.strip().lower() - h = h.replace("_", " ") + h = h.strip().lower().replace("_", " ") h = re.sub(r"\s+", " ", h) - # drop everything non-alnum h = re.sub(r"[^a-z0-9 ]+", "", h) return h.replace(" ", "") +def _build_header_map(headers: List[str]) -> Dict[str, str]: + """ + Map original header -> canonical key the importer expects. + Canonical keys we use internally: + subject, illustration, application, scripture, source, + talk_title, talk_number, code, date, date_edited + """ + canon_targets = { + "subject": "subject", + "illustration": "illustration", + "application": "application", + "scripture": "scripture", + "source": "source", + "talktitle": "talk_title", + "title": "talk_title", + "talknumber": "talk_number", + "number": "talk_number", + "code": "code", + "date": "date", + "dateedited": "date_edited", + "edited": "date_edited", + } + out: Dict[str, str] = {} + for h in headers: + norm = _norm_header(h) + out[h] = canon_targets.get(norm, norm) # unknowns map to their normalized name + return out + + def _getv(row: Dict[str, Any], hdr_map: Dict[str, str], canon: str) -> str: - # Look up using canonical key -> original header - for orig, can in hdr_map.items(): - if can == canon: - v = row.get(orig, "") - return (v or "").strip() + """Case/spacing-insensitive value lookup.""" + for original, mapped in hdr_map.items(): + if mapped == canon: + return (row.get(original) or "").strip() return "" @@ -117,9 +145,14 @@ def _parse_date(s: str): return None -# ============================== -# Public: import_csv_bytes -# ============================== +def _parse_int(s: str) -> Optional[int]: + """Return an int from a string (tolerates commas), else None.""" + s = (s or "").strip() + if not s: + return None + m = re.match(r"^-?\d+", s.replace(",", "")) + return int(m.group(0)) if m else None + def import_csv_bytes( csv_bytes: bytes, @@ -128,16 +161,20 @@ def import_csv_bytes( # tune these if you changed model field sizes max_source=255, max_code=128, - max_talk_number=128, + max_talk_number=128, # only affects clipping BEFORE int parse; int parse handles None max_talk_title=512, max_scripture=512, ): """ Import CSV seed in an idempotent/upsert fashion. - Mapping (case/spacing-insensitive): + Expected headers (case/spacing-insensitive): Subject, Illustration, Application, Scripture, Source, Talk Title, Talk Number, Code, Date, Date Edited + + Upsert rule: + 1) Prefer Code if present (treat as external key). + 2) Else fall back to the triple (subject, illustration, application). """ text = _decode_bytes(csv_bytes) dialect = _sniff_dialect(text) @@ -145,108 +182,82 @@ def import_csv_bytes( rdr = csv.DictReader(f, dialect=dialect) seen_headers = [h.strip() for h in (rdr.fieldnames or [])] - - # Build header normalization map - # Canonical keys we expect: - # subject illustration application scripture source talktitle talknumber code date dateedited - canon_targets = { - "subject": "subject", - "illustration": "illustration", - "application": "application", - "scripture": "scripture", - "source": "source", - "talktitle": "talk_title", - "title": "talk_title", - "talknumber": "talk_number", - "number": "talk_number", - "code": "code", - "date": "date", - "dateedited": "date_edited", - "edited": "date_edited", - } - header_map = {} - for h in seen_headers: - header_map[h] = canon_targets.get(_norm_header(h), _norm_header(h)) # unknowns still map to their norm + header_map = _build_header_map(seen_headers) inserted = updated = skipped = 0 - errors = [] + errors: List[str] = [] scripture_parsed = 0 - with transaction.atomic(): - for idx, row in enumerate(rdr, start=2): # data starts at line 2 - try: - subject = _getv(row, header_map, "subject") - illustration = _getv(row, header_map, "illustration") - application = _getv(row, header_map, "application") + for idx, row in enumerate(rdr, start=2): # data starts at line 2 + try: + with transaction.atomic(): + subject = _getv(row, header_map, "subject") + illustration = _getv(row, header_map, "illustration") + application = _getv(row, header_map, "application") + + scripture_raw = _clip(_getv(row, header_map, "scripture"), max_scripture) + source = _clip(_getv(row, header_map, "source"), max_source) + talk_title = _clip(_getv(row, header_map, "talk_title"), max_talk_title) + + # Safe talk number parse (non-numeric -> None) + talk_number_raw = _clip(_getv(row, header_map, "talk_number"), max_talk_number) + talk_number = _parse_int(talk_number_raw) - scripture_raw = _clip(_getv(row, header_map, "scripture"), max_scripture) - source = _clip(_getv(row, header_map, "source"), max_source) - talk_title = _clip(_getv(row, header_map, "talk_title"), max_talk_title) - talk_number = _clip(_getv(row, header_map, "talk_number"), max_talk_number) entry_code = _clip(_getv(row, header_map, "code"), max_code) date_added = _parse_date(_getv(row, header_map, "date")) date_edited = _parse_date(_getv(row, header_map, "date_edited")) - # Decide how to find an existing row: - # 1) Prefer Code if present (treat as external key) - # 2) Else fall back to (subject, illustration, application) - obj = None + # Find existing + obj: Optional[Entry] = None if entry_code: obj = Entry.objects.filter(entry_code=entry_code).first() if obj is None: obj = Entry.objects.filter( - subject=subject, illustration=illustration, application=application + subject=subject, + illustration=illustration, + application=application, ).first() created = obj is None if created: obj = Entry() - # Assign fields - obj.subject = subject - obj.illustration = illustration - obj.application = application + # Assign + obj.subject = subject + obj.illustration = illustration + obj.application = application obj.scripture_raw = scripture_raw - obj.source = source - obj.talk_title = talk_title - obj.talk_number = talk_number - obj.entry_code = entry_code + obj.source = source + obj.talk_title = talk_title + obj.talk_number = talk_number # None is fine for IntegerField + obj.entry_code = entry_code if date_added: obj.date_added = date_added if date_edited: obj.date_edited = date_edited - if dry_run: - updated += 1 if not created else 0 - inserted += 1 if created else 0 - else: + if not dry_run: obj.save() - if created: - inserted += 1 - else: - updated += 1 - # (Optional) quick scripture counter — we’re not parsing here, - # but keep a metric like your previous report + if created: + inserted += 1 + else: + updated += 1 + if scripture_raw: scripture_parsed += 1 - except Exception as e: - skipped += 1 - # keep error list compact - msg = str(e) - if "value too long for type" in msg and max(msg.count("\n"), 0) == 0: - errors.append("value too long for type character varying(...)") - else: - errors.append(msg) + except Exception as e: + skipped += 1 + errors.append(f"line {idx}: {type(e).__name__}: {e}") return { "rows": inserted + updated + skipped, "inserted": inserted, "updated": updated, "skipped": skipped, - "errors": errors[:200], # cap to avoid huge output + "errors": errors[:200], # cap output "scripture_parsed": scripture_parsed, "scripture_failed": 0, "dialect_delimiter": dialect.delimiter,