Illustrations/web/core/utils.py

255 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import io
import re
from datetime import datetime
from typing import Dict, Any
from django.db import transaction
from core.models import Entry
# --- Search helpers restored -------------------------------------------------
from typing import List
def terms(q: str) -> List[str]:
"""
Split a query string into search terms.
- Quoted phrases are kept together: `"good shepherd"`
- Unquoted text splits on whitespace.
- Empty/whitespace-only input returns [].
"""
if not q:
return []
# capture "quoted phrases" OR bare tokens
rx = re.compile(r'"([^"]+)"|(\S+)')
out = []
for m in rx.finditer(q):
phrase = m.group(1) if m.group(1) is not None else m.group(2)
t = (phrase or "").strip()
if t:
out.append(t)
return out
def has_wildcards(s: str) -> bool:
"""
True if user supplied * or ? wildcards (FileMaker-style).
We also treat SQL wildcards % and _ as wildcards if present.
"""
if not s:
return False
return any(ch in s for ch in ("*", "?", "%", "_"))
def wildcard_to_regex(s: str) -> str:
"""
Convert * and ? to a case-insensitive regex fragment suitable for Django's iregex.
- Escapes regex meta first, then replaces \* -> .* and \? -> .
- Wraps with '.*' so it matches anywhere (like icontains).
Example: 'lov* you?' -> '(?i).*lov.* you..*'
(The view should use iregex so (?i) or case-insensitive flag applies.)
"""
if s is None:
s = ""
# Escape regex specials, then un-escape our wildcards into regex
pat = re.escape(s)
pat = pat.replace(r"\*", ".*").replace(r"\?", ".")
# Match anywhere by default
pat = f".*{pat}.*"
# collapse consecutive ".*.*"
pat = re.sub(r"(?:\.\*){2,}", ".*", pat)
return pat
# -----------------------------------------------------------------------------
# ==============================
# Helpers
# ==============================
def _decode_bytes(b: bytes) -> str:
# Keep BOM-safe decoding
return b.decode("utf-8-sig", errors="replace")
def _sniff_dialect(txt: str):
try:
return csv.Sniffer().sniff(txt[:4096], delimiters=[",", ";", "\t", "|"])
except Exception:
class _D: delimiter = ","
return _D()
def _norm_header(h: str) -> str:
"""
Normalize headers in a forgiving way:
- lower-case
- remove all non-alphanumerics
- collapse spaces/underscores
"""
if not h:
return ""
h = h.strip().lower()
h = h.replace("_", " ")
h = re.sub(r"\s+", " ", h)
# drop everything non-alnum
h = re.sub(r"[^a-z0-9 ]+", "", h)
return h.replace(" ", "")
def _getv(row: Dict[str, Any], hdr_map: Dict[str, str], canon: str) -> str:
# Look up using canonical key -> original header
for orig, can in hdr_map.items():
if can == canon:
v = row.get(orig, "")
return (v or "").strip()
return ""
def _clip(s: str, n: int) -> str:
s = (s or "").strip()
return s[:n] if n and len(s) > n else s
def _parse_date(s: str):
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y", "%m/%d/%y", "%Y.%m.%d", "%m-%d-%Y"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
# ==============================
# Public: import_csv_bytes
# ==============================
def import_csv_bytes(
csv_bytes: bytes,
dry_run: bool = False,
*,
# tune these if you changed model field sizes
max_source=255,
max_code=128,
max_talk_number=128,
max_talk_title=512,
max_scripture=512,
):
"""
Import CSV seed in an idempotent/upsert fashion.
Mapping (case/spacing-insensitive):
Subject, Illustration, Application, Scripture, Source,
Talk Title, Talk Number, Code, Date, Date Edited
"""
text = _decode_bytes(csv_bytes)
dialect = _sniff_dialect(text)
f = io.StringIO(text)
rdr = csv.DictReader(f, dialect=dialect)
seen_headers = [h.strip() for h in (rdr.fieldnames or [])]
# Build header normalization map
# Canonical keys we expect:
# subject illustration application scripture source talktitle talknumber code date dateedited
canon_targets = {
"subject": "subject",
"illustration": "illustration",
"application": "application",
"scripture": "scripture",
"source": "source",
"talktitle": "talk_title",
"title": "talk_title",
"talknumber": "talk_number",
"number": "talk_number",
"code": "code",
"date": "date",
"dateedited": "date_edited",
"edited": "date_edited",
}
header_map = {}
for h in seen_headers:
header_map[h] = canon_targets.get(_norm_header(h), _norm_header(h)) # unknowns still map to their norm
inserted = updated = skipped = 0
errors = []
scripture_parsed = 0
with transaction.atomic():
for idx, row in enumerate(rdr, start=2): # data starts at line 2
try:
subject = _getv(row, header_map, "subject")
illustration = _getv(row, header_map, "illustration")
application = _getv(row, header_map, "application")
scripture_raw = _clip(_getv(row, header_map, "scripture"), max_scripture)
source = _clip(_getv(row, header_map, "source"), max_source)
talk_title = _clip(_getv(row, header_map, "talk_title"), max_talk_title)
talk_number = _clip(_getv(row, header_map, "talk_number"), max_talk_number)
entry_code = _clip(_getv(row, header_map, "code"), max_code)
date_added = _parse_date(_getv(row, header_map, "date"))
date_edited = _parse_date(_getv(row, header_map, "date_edited"))
# Decide how to find an existing row:
# 1) Prefer Code if present (treat as external key)
# 2) Else fall back to (subject, illustration, application)
obj = None
if entry_code:
obj = Entry.objects.filter(entry_code=entry_code).first()
if obj is None:
obj = Entry.objects.filter(
subject=subject, illustration=illustration, application=application
).first()
created = obj is None
if created:
obj = Entry()
# Assign fields
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture_raw
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number
obj.entry_code = entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
if dry_run:
updated += 1 if not created else 0
inserted += 1 if created else 0
else:
obj.save()
if created:
inserted += 1
else:
updated += 1
# (Optional) quick scripture counter — were not parsing here,
# but keep a metric like your previous report
if scripture_raw:
scripture_parsed += 1
except Exception as e:
skipped += 1
# keep error list compact
msg = str(e)
if "value too long for type" in msg and max(msg.count("\n"), 0) == 0:
errors.append("value too long for type character varying(...)")
else:
errors.append(msg)
return {
"rows": inserted + updated + skipped,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": errors[:200], # cap to avoid huge output
"scripture_parsed": scripture_parsed,
"scripture_failed": 0,
"dialect_delimiter": dialect.delimiter,
"used_headerless_mode": False,
"seen_headers": [h.lower() for h in seen_headers],
}