# core/utils.py from __future__ import annotations import csv import io import re from datetime import datetime from typing import Dict, List, Optional from django.db import transaction, IntegrityError, DataError, DatabaseError from .models import Entry # ============================ # Search helpers (used by views) # ============================ def terms(q: str) -> List[str]: """Split search query into terms; keep quoted phrases together.""" if not q: return [] out, buf, in_quote = [], [], False for ch in q: if ch == '"': in_quote = not in_quote continue if ch.isspace() and not in_quote: if buf: out.append("".join(buf)) buf = [] else: buf.append(ch) if buf: out.append("".join(buf)) return out def has_wildcards(s: str) -> bool: return bool(s) and ("*" in s or "?" in s) def wildcard_to_regex(s: str) -> str: """ Convert user wildcards to a Postgres-friendly regex: * -> .* ? -> . (escape regex meta first) """ if s is None: return "" s = re.escape(s) s = s.replace(r"\*", ".*").replace(r"\?", ".") return f"^{s}$" # ============================ # CSV import – robust version # ============================ # Canonical header names we expect (case-insensitive on input): CANON_HEADERS = [ "subject", "illustration", "application", "scripture", "source", "talk title", "talk number", "code", "date", "date edited", ] EXPECTED_COLS = len(CANON_HEADERS) # Curly quotes & odd whitespace we normalize QUOTE_MAP = { "\u201c": '"', "\u201d": '"', # “ ” "\u2018": "'", "\u2019": "'", # ‘ ’ } CTRL_MAP = { "\x0b": " ", # vertical tab "\x0c": " ", # form feed } def _decode_bytes(b: bytes) -> str: """Decode bytes with utf-8-sig, normalize line endings and characters.""" t = b.decode("utf-8-sig", errors="replace") # normalize curly quotes and control chars for k, v in QUOTE_MAP.items(): t = t.replace(k, v) for k, v in CTRL_MAP.items(): t = t.replace(k, v) # normalize newlines t = t.replace("\r\n", "\n").replace("\r", "\n") return t def _sniff_dialect(text: str) -> csv.Dialect: """Sniff CSV dialect or default to comma.""" snippet = text[:4096] try: return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"]) except Exception: class D(csv.Dialect): delimiter = "," quotechar = '"' doublequote = True skipinitialspace = False lineterminator = "\n" quoting = csv.QUOTE_MINIMAL return D() def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]: """ Split a CSV line manually, respecting quotes. Works even if the line contains inconsistent quoting (e.g., inner quotes not doubled). Ensures we return exactly `expected` fields by merging overflow cells into the current text field (before the trailing short/meta columns). """ out, field = [], [] in_quotes = False i, n = 0, len(line) while i < n: ch = line[i] if ch == '"': # doubled quote inside a quoted field -> literal quote if in_quotes and i + 1 < n and line[i + 1] == '"': field.append('"') i += 2 continue in_quotes = not in_quotes i += 1 continue if ch == delimiter and not in_quotes: out.append("".join(field)) field = [] i += 1 continue field.append(ch) i += 1 out.append("".join(field)) # Repair count to exactly `expected` if len(out) < expected: out += [""] * (expected - len(out)) elif len(out) > expected: head = out[:expected - 1] tail = out[expected - 1:] head[-1] = head[-1] + delimiter + delimiter.join(tail) out = head return out def _build_header_map(headers: List[str]) -> Dict[str, str]: """ Map incoming headers (any case) to our canonical keys. """ key = {h.lower().strip(): h for h in headers} mapping: Dict[str, Optional[str]] = {} for canon in CANON_HEADERS: if canon in key: mapping[canon] = key[canon] else: aliases = { "talk title": ["talk_title", "title"], "talk number": ["talk_no", "talk#", "talknum"], "date edited": ["edited", "date_edited", "edited date"], }.get(canon, []) found = next((a for a in aliases if a in key), None) mapping[canon] = key.get(found) if found else None return mapping # type: ignore[return-value] def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str: src = header_map.get(canon_key) return (row.get(src) if src else "") or "" def _parse_date(val: str): val = (val or "").strip() if not val: return None # Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux) for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"): try: return datetime.strptime(val, fmt).date() except Exception: pass # Fallback to dateutil if present try: from dateutil import parser # type: ignore return parser.parse(val).date() except Exception: return None def _clip(field_name: str, value: str) -> str: """ Clip to model field's max_length if needed, to avoid DB DataError. """ try: f = Entry._meta.get_field(field_name) max_len = getattr(f, "max_length", None) if max_len and value and len(value) > max_len: return value[:max_len] except Exception: pass return value def _coerce_int(val: str): val = (val or "").strip() if not val: return None m = re.search(r"(-?\d+)", val.replace(",", "")) if not m: return None try: return int(m.group(1)) except Exception: return None def import_csv_bytes(b: bytes, dry_run: bool = False, commit_every: int = 500) -> Dict[str, object]: """ Robust CSV import. Commits each row in its own transaction so that one bad row does not poison the entire import (avoids TransactionManagementError cascades). Returns a report dict with counts and first-line error messages. """ text = _decode_bytes(b) dialect = _sniff_dialect(text) delimiter = getattr(dialect, "delimiter", ",") # --- headers --- f = io.StringIO(text) reader = csv.reader(f, dialect=dialect) try: raw_headers = next(reader) except StopIteration: return { "rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [], "scripture_parsed": 0, "scripture_failed": 0, "dialect_delimiter": delimiter, "used_headerless_mode": False, "seen_headers": [] } headers = raw_headers if len(raw_headers) == EXPECTED_COLS else _split_lenient( ",".join(raw_headers), delimiter=delimiter, expected=EXPECTED_COLS ) header_map = _build_header_map(headers) # Pair raw lines so we can repair rows mis-split by csv raw_lines = text.splitlines()[1:] # skip header dict_reader = csv.DictReader(io.StringIO(text), fieldnames=headers, dialect=dialect) next(dict_reader, None) # skip header total = inserted = updated = skipped = 0 errors: List[str] = [] scripture_ok = scripture_bad = 0 # Import loop (row-by-row atomic) for idx, (raw_line, row) in enumerate(zip(raw_lines, dict_reader), start=2): total += 1 # Repair if DictReader got the wrong shape (inconsistent quotes in source) if len(row) != EXPECTED_COLS or None in row: cells = _split_lenient(raw_line, delimiter=delimiter, expected=EXPECTED_COLS) row = dict(zip(headers, cells)) # Extract canonical fields subject = _getv(row, header_map, "subject").strip() illustration = _getv(row, header_map, "illustration").strip() application = _getv(row, header_map, "application").strip() scripture = _getv(row, header_map, "scripture").strip() source = _getv(row, header_map, "source").strip() talk_title = _getv(row, header_map, "talk title").strip() talk_number = _coerce_int(_getv(row, header_map, "talk number")) entry_code = _getv(row, header_map, "code").strip() date_added = _parse_date(_getv(row, header_map, "date")) date_edited = _parse_date(_getv(row, header_map, "date edited")) # Skip rows with no meaningful text if not (subject or illustration or application): skipped += 1 continue # Clip to DB lengths subject = _clip("subject", subject) illustration = _clip("illustration", illustration) application = _clip("application", application) scripture = _clip("scripture_raw", scripture) source = _clip("source", source) talk_title = _clip("talk_title", talk_title) entry_code = _clip("entry_code", entry_code) scripture_ok += 1 if scripture else 0 scripture_bad += 0 if scripture else 1 # Upsert key: prefer entry_code; else (subject + illustration) lookup: Dict[str, object] = {} if entry_code: lookup["entry_code"] = entry_code else: lookup["subject"] = subject lookup["illustration"] = illustration if dry_run: exists = Entry.objects.filter(**lookup).exists() inserted += 0 if exists else 1 updated += 1 if exists else 0 continue try: # Isolate each row so a failure rolls back only that row with transaction.atomic(): obj = Entry.objects.filter(**lookup).first() created = False if not obj: obj = Entry(**lookup) created = True obj.subject = subject obj.illustration = illustration obj.application = application obj.scripture_raw = scripture obj.source = source obj.talk_title = talk_title obj.talk_number = talk_number if entry_code: obj.entry_code = entry_code if date_added: obj.date_added = date_added if date_edited: obj.date_edited = date_edited obj.save() inserted += 1 if created else 0 updated += 0 if created else 1 except (IntegrityError, DataError, DatabaseError, ValueError) as e: msg = str(e).splitlines()[0] errors.append(f"line {idx}: {type(e).__name__}: {msg}") skipped += 1 # continue to next row return { "rows": total, "inserted": inserted, "updated": updated, "skipped": skipped, "errors": errors, "scripture_parsed": scripture_ok, "scripture_failed": scripture_bad, "dialect_delimiter": delimiter, "used_headerless_mode": False, "seen_headers": headers, }