# core/utils.py from __future__ import annotations import csv import io import re import unicodedata from datetime import datetime from typing import Dict, List, Optional, Tuple from django.db import transaction from django.db.models import Model from .models import Entry # ============================ # Search helpers (used by views) # ============================ _WORD_RE = re.compile(r"[^\s]+") def terms(q: str) -> List[str]: """Split search query into terms; keep quoted phrases together.""" if not q: return [] out, buf, in_quote = [], [], False for ch in q: if ch == '"': in_quote = not in_quote continue if ch.isspace() and not in_quote: if buf: out.append("".join(buf)) buf = [] else: buf.append(ch) if buf: out.append("".join(buf)) return out def has_wildcards(s: str) -> bool: return bool(s) and ("*" in s or "?" in s) def wildcard_to_regex(s: str) -> str: """ Convert user wildcards to a Postgres-friendly regex: * -> .* ? -> . escape regex meta first """ if s is None: return "" # Escape regex meta, then translate wildcards s = re.escape(s) s = s.replace(r"\*", ".*").replace(r"\?", ".") return f"^{s}$" # ============================ # CSV import – robust version # ============================ # Canonical header names we expect (case-insensitive on input): CANON_HEADERS = [ "subject", "illustration", "application", "scripture", "source", "talk title", "talk number", "code", "date", "date edited" ] EXPECTED_COLS = len(CANON_HEADERS) # Curly quotes & odd whitespace we normalize QUOTE_MAP = { "\u201c": '"', "\u201d": '"', # “ ” "\u2018": "'", "\u2019": "'", # ‘ ’ } CTRL_MAP = { "\x0b": " ", # vertical tab "\x0c": " ", # form feed } def _decode_bytes(b: bytes) -> str: """Decode bytes with utf-8-sig, normalize line endings and characters.""" t = b.decode("utf-8-sig", errors="replace") # normalize curly quotes and control chars for k, v in QUOTE_MAP.items(): t = t.replace(k, v) for k, v in CTRL_MAP.items(): t = t.replace(k, v) # normalize newlines t = t.replace("\r\n", "\n").replace("\r", "\n") return t def _sniff_dialect(text: str) -> csv.Dialect: """Sniff CSV dialect or default to comma.""" snippet = text[:4096] try: return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"]) except Exception: class D(csv.Dialect): delimiter = "," quotechar = '"' doublequote = True skipinitialspace = False lineterminator = "\n" quoting = csv.QUOTE_MINIMAL return D() def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]: """ Split a CSV line manually, respecting quotes. Works even if the line contains inconsistent quoting (e.g., inner quotes not doubled). Ensures we return exactly `expected` fields by merging overflow cells into the current text field (typically Illustration/Application/Scripture). """ out, field = [], [] in_quotes = False i, n = 0, len(line) while i < n: ch = line[i] if ch == '"': # If we see a doubled quote, treat as a literal quote and skip one if in_quotes and i + 1 < n and line[i + 1] == '"': field.append('"') i += 2 continue in_quotes = not in_quotes i += 1 continue if ch == delimiter and not in_quotes: out.append("".join(field)) field = [] i += 1 continue field.append(ch) i += 1 out.append("".join(field)) # If we ended with quotes unbalanced, we still got something. Now repair count. if len(out) < expected: out += [""] * (expected - len(out)) elif len(out) > expected: # Merge overflow columns into the last texty field before we hit short fields. # Strategy: merge extras into the last non-empty field before Date columns. head = out[:expected - 1] tail = out[expected - 1:] head[-1] = head[-1] + delimiter + delimiter.join(tail) out = head return out def _build_header_map(headers: List[str]) -> Dict[str, str]: """ Map incoming headers (any case) to our canonical keys. """ key = {h.lower().strip(): h for h in headers} mapping = {} for canon in CANON_HEADERS: # exact match first (case-insensitive) if canon in key: mapping[canon] = key[canon] else: # fallback: try common variants aliases = { "talk title": ["talk_title", "title"], "talk number": ["talk_no", "talk#", "talk number", "talknum"], "date edited": ["edited", "date_edited", "edited date"], }.get(canon, []) found = next((a for a in aliases if a in key), None) mapping[canon] = key.get(found, None) return mapping def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str: src = header_map.get(canon_key) return (row.get(src) if src else "") or "" def _parse_date(val: str) -> Optional[datetime.date]: val = (val or "").strip() if not val: return None # Try common formats: m/d/Y, Y-m-d for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"): try: return datetime.strptime(val, fmt).date() except Exception: pass # Try letting dateutil if available (optional), else skip try: from dateutil import parser # type: ignore return parser.parse(val).date() except Exception: return None def _clip(field_name: str, value: str) -> str: """ Clip to model field's max_length if needed, to avoid DB DataError. """ try: f = Entry._meta.get_field(field_name) max_len = getattr(f, "max_length", None) if max_len and value and len(value) > max_len: return value[:max_len] except Exception: pass return value def _coerce_int(val: str) -> Optional[int]: val = (val or "").strip() if not val: return None # allow like "#35" or "35)" m = re.search(r"(-?\d+)", val) if not m: return None try: return int(m.group(1)) except Exception: return None @transaction.atomic def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]: """ Robust CSV import. Idempotent-ish upsert by (subject, illustration). """ text = _decode_bytes(b) dialect = _sniff_dialect(text) f = io.StringIO(text) reader = csv.reader(f, dialect=dialect) # Read header row try: raw_headers = next(reader) except StopIteration: return {"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [], "scripture_parsed": 0, "scripture_failed": 0, "dialect_delimiter": dialect.delimiter, "used_headerless_mode": False, "seen_headers": []} # If header count is wrong, repair via lenient split if len(raw_headers) != EXPECTED_COLS: fixed = _split_lenient(",".join(raw_headers), delimiter=dialect.delimiter, expected=EXPECTED_COLS) headers = fixed else: headers = raw_headers header_map = _build_header_map(headers) total = 0 inserted = 0 updated = 0 skipped = 0 errors: List[str] = [] scripture_ok = 0 scripture_bad = 0 # Re-open to iterate rows with the *raw* lines paired to parsed ones f2 = io.StringIO(text) lines = f2.read().splitlines() # first line is header raw_data_lines = lines[1:] # Iterate again with DictReader for convenience f3 = io.StringIO(text) dict_reader = csv.DictReader(f3, fieldnames=headers, dialect=dialect) next(dict_reader, None) # skip header for idx, (raw_line, row) in enumerate(zip(raw_data_lines, dict_reader), start=2): total += 1 # Some rows are mis-split by csv due to bad quotes -> repair if len(row) != EXPECTED_COLS or None in row: cells = _split_lenient(raw_line, delimiter=dialect.delimiter, expected=EXPECTED_COLS) row = dict(zip(headers, cells)) # Extract using canonical keys subject = _getv(row, header_map, "subject").strip() illustration = _getv(row, header_map, "illustration").strip() application = _getv(row, header_map, "application").strip() scripture = _getv(row, header_map, "scripture").strip() source = _getv(row, header_map, "source").strip() talk_title = _getv(row, header_map, "talk title").strip() talk_number = _coerce_int(_getv(row, header_map, "talk number")) entry_code = _getv(row, header_map, "code").strip() date_added = _parse_date(_getv(row, header_map, "date")) date_edited = _parse_date(_getv(row, header_map, "date edited")) # Basic sanity: if all major text fields empty, skip if not (subject or illustration or application): skipped += 1 continue # Clip to DB lengths to avoid DataError (robustness) subject = _clip("subject", subject) illustration = _clip("illustration", illustration) application = _clip("application", application) scripture = _clip("scripture_raw", scripture) source = _clip("source", source) talk_title = _clip("talk_title", talk_title) entry_code = _clip("entry_code", entry_code) if scripture: scripture_ok += 1 else: scripture_bad += 1 # Upsert key: prefer entry_code; else (subject + illustration) lookup: Dict[str, object] = {} if entry_code: lookup["entry_code"] = entry_code else: lookup["subject"] = subject lookup["illustration"] = illustration try: obj = Entry.objects.filter(**lookup).first() if not obj: obj = Entry(**lookup) created = True else: created = False obj.subject = subject obj.illustration = illustration obj.application = application obj.scripture_raw = scripture obj.source = source obj.talk_title = talk_title obj.talk_number = talk_number obj.entry_code = entry_code or obj.entry_code if date_added: obj.date_added = date_added if date_edited: obj.date_edited = date_edited if not dry_run: obj.save() if created: inserted += 1 else: updated += 1 except Exception as e: # Keep importing other rows; capture the first part of the error msg = str(e).splitlines()[0] errors.append(f"line {idx}: {type(e).__name__}: {msg}") skipped += 1 return { "rows": total, "inserted": inserted, "updated": updated, "skipped": skipped, "errors": errors, "scripture_parsed": scripture_ok, "scripture_failed": scripture_bad, "dialect_delimiter": getattr(_sniff_dialect(text), "delimiter", ","), "used_headerless_mode": False, "seen_headers": headers, }