# core/utils.py from __future__ import annotations import csv import io import re from datetime import datetime from typing import Dict, List, Optional, Any, Tuple, Iterable from django.db import transaction, IntegrityError, DataError, DatabaseError from .models import Entry EXPECTED_HEADERS: List[str] = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] # Map CSV header labels -> Entry model field names HEADER_MAP: Dict[str, str] = { "Subject": "subject", "Illustration": "illustration", "Application": "application", "Scripture": "scripture_raw", "Source": "source", "Talk Title": "talk_title", "Talk Number": "talk_number", "Code": "entry_code", "Date": "date_added", "Date Edited": "date_edited", } # Accept both the pretty labels *and* the actual model field names # (lets you import older dumps or hand-made files) ACCEPTABLE_HEADERS: Dict[str, str] = { **{h.lower(): HEADER_MAP[h] for h in EXPECTED_HEADERS}, # direct model names also OK "subject": "subject", "illustration": "illustration", "application": "application", "scripture_raw": "scripture_raw", "source": "source", "talk_title": "talk_title", "talk_number": "talk_number", "entry_code": "entry_code", "date_added": "date_added", "date_edited": "date_edited", } # ============================ # Search helpers (used by views) # ============================ def terms(q: str) -> List[str]: """Split search query into terms; keep quoted phrases together.""" if not q: return [] out, buf, in_quote = [], [], False for ch in q: if ch == '"': in_quote = not in_quote continue if ch.isspace() and not in_quote: if buf: out.append("".join(buf)) buf = [] else: buf.append(ch) if buf: out.append("".join(buf)) return out def has_wildcards(s: str) -> bool: return bool(s) and ("*" in s or "?" in s) def wildcard_to_regex(s: str) -> str: """ Convert user wildcards to a Postgres-friendly regex: * -> .* ? -> . (escape regex meta first) """ if s is None: return "" s = re.escape(s) s = s.replace(r"\*", ".*").replace(r"\?", ".") return f"^{s}$" # ============================ # CSV import – robust version # ============================ # Canonical header names we expect (case-insensitive on input): CANON_HEADERS = [ "subject", "illustration", "application", "scripture", "source", "talk title", "talk number", "code", "date", "date edited", ] EXPECTED_COLS = len(CANON_HEADERS) # Curly quotes & odd whitespace we normalize QUOTE_MAP = { "\u201c": '"', "\u201d": '"', # “ ” "\u2018": "'", "\u2019": "'", # ‘ ’ } CTRL_MAP = { "\x0b": " ", # vertical tab "\x0c": " ", # form feed } def _decode_bytes(b: bytes) -> str: """Decode bytes with utf-8-sig, normalize line endings and characters.""" t = b.decode("utf-8-sig", errors="replace") # normalize curly quotes and control chars for k, v in QUOTE_MAP.items(): t = t.replace(k, v) for k, v in CTRL_MAP.items(): t = t.replace(k, v) # normalize newlines t = t.replace("\r\n", "\n").replace("\r", "\n") return t def _sniff_dialect(text: str) -> csv.Dialect: """Sniff CSV dialect or default to comma.""" snippet = text[:4096] try: return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"]) except Exception: class D(csv.Dialect): delimiter = "," quotechar = '"' doublequote = True skipinitialspace = False lineterminator = "\n" quoting = csv.QUOTE_MINIMAL return D() def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]: """ Split a CSV line manually, respecting quotes. Works even if the line contains inconsistent quoting (e.g., inner quotes not doubled). Ensures we return exactly `expected` fields by merging overflow cells into the current text field (before the trailing short/meta columns). """ out, field = [], [] in_quotes = False i, n = 0, len(line) while i < n: ch = line[i] if ch == '"': # doubled quote inside a quoted field -> literal quote if in_quotes and i + 1 < n and line[i + 1] == '"': field.append('"') i += 2 continue in_quotes = not in_quotes i += 1 continue if ch == delimiter and not in_quotes: out.append("".join(field)) field = [] i += 1 continue field.append(ch) i += 1 out.append("".join(field)) # Repair count to exactly `expected` if len(out) < expected: out += [""] * (expected - len(out)) elif len(out) > expected: head = out[:expected - 1] tail = out[expected - 1:] head[-1] = head[-1] + delimiter + delimiter.join(tail) out = head return out def _build_header_map(headers: List[str]) -> Dict[str, str]: """ Map incoming headers (any case) to our canonical keys. """ key = {h.lower().strip(): h for h in headers} mapping: Dict[str, Optional[str]] = {} for canon in CANON_HEADERS: if canon in key: mapping[canon] = key[canon] else: aliases = { "talk title": ["talk_title", "title"], "talk number": ["talk_no", "talk#", "talknum"], "date edited": ["edited", "date_edited", "edited date"], }.get(canon, []) found = next((a for a in aliases if a in key), None) mapping[canon] = key.get(found) if found else None return mapping # type: ignore[return-value] def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str: src = header_map.get(canon_key) return (row.get(src) if src else "") or "" def _parse_date(val: str): val = (val or "").strip() if not val: return None # Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux) for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"): try: return datetime.strptime(val, fmt).date() except Exception: pass # Fallback to dateutil if present try: from dateutil import parser # type: ignore return parser.parse(val).date() except Exception: return None def _clip(field_name: str, value: str) -> str: """ Clip to model field's max_length if needed, to avoid DB DataError. """ try: f = Entry._meta.get_field(field_name) max_len = getattr(f, "max_length", None) if max_len and value and len(value) > max_len: return value[:max_len] except Exception: pass return value def _coerce_int(val: str): val = (val or "").strip() if not val: return None m = re.search(r"(-?\d+)", val.replace(",", "")) if not m: return None try: return int(m.group(1)) except Exception: return None def _to_int_or_none(s: str) -> Optional[int]: s = (s or "").strip() if not s: return None try: return int(s) except Exception: return None def _to_date_or_none(s: str) -> Optional[datetime.date]: s = (s or "").strip() if not s: return None for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"): try: return datetime.datetime.strptime(s, fmt).date() except Exception: pass return None # let caller decide if this is acceptable def _clean_header_token(s: Any) -> str: """ Make a header token safe/normalized: - None -> "" - trim spaces - strip surrounding single/double quotes - drop weird prefixes like r:"Talk Title" or r.'Talk Title' - lowercase for matching """ s = "" if s is None else str(s) s = s.strip() # strip surrounding quotes if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'): s = s[1:-1] # drop r: or r. prefix some CSV tools add if s[:2].lower() in ("r:", "r."): s = s[2:].lstrip() return s.strip().lower() _DATE_FORMATS = ( "%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y", "%d-%b-%Y", # 05-Sep-2024 "%Y/%m/%d", ) def _parse_date(val: str) -> Optional[datetime.date]: if not val: return None txt = str(val).strip() # Accept ISO-like with time: 2024-01-02T00:00:00 if "T" in txt: try: return datetime.fromisoformat(txt).date() except Exception: pass for fmt in _DATE_FORMATS: try: return datetime.strptime(txt, fmt).date() except Exception: continue # as a last resort, try only year-month-day pieces try: parts = [int(p) for p in txt.replace("/", "-").split("-")] if len(parts) >= 3: return datetime(parts[0], parts[1], parts[2]).date() except Exception: pass return None def _to_int_or_none(v: Any) -> Optional[int]: if v is None: return None s = str(v).strip() if s == "": return None try: return int(float(s)) # tolerate "123.0" except Exception: return None import csv import io from datetime import datetime from typing import Optional, List, Dict, Any from django.db import transaction from .models import Entry # Canonical header order expected from the CSV (and shown in the UI) EXPECTED_HEADERS = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] def _clean_header_cell(s: str) -> str: if s is None: return "" s = str(s).strip() # Handle odd prefixes like r:"Talk Title" low = s.lower() if low.startswith("r:") or low.startswith("r="): s = s[2:].lstrip() # Strip wrapping quotes if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"): s = s[1:-1] return s.strip() def _parse_int(x: str) -> Optional[int]: x = (x or "").strip() if not x: return None try: return int(x) except Exception: return None def _parse_date(x: str): """ Returns a date object or None. Tries several common formats, then ISO. """ x = (x or "").strip() if not x: return None for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"): try: return datetime.strptime(x, fmt).date() except Exception: pass try: return datetime.fromisoformat(x).date() except Exception: return None def import_csv_bytes(content: bytes, dry_run: bool = True, batch_size: int = 1000) -> Dict[str, Any]: """ Parse the uploaded CSV (bytes), optionally write to DB. Returns a report dict the templates expect: { "total": , "created": , "updated": 0, "skipped": , "errors": [ ... ], "preview": [ [cell,...], ... up to 10 rows ], "columns": EXPECTED_HEADERS, } Notes: - This implementation always CREATES new rows (no dedupe). If you want upserts later, we can key on entry_code or (talk_number, entry_code). """ report = { "total": 0, "created": 0, "updated": 0, "skipped": 0, "errors": [], "preview": [], "columns": EXPECTED_HEADERS[:], } # Decode once (BOM-safe), sniff dialect, fall back to excel text = content.decode("utf-8-sig", errors="replace") try: first_line = text.splitlines()[0] if text else "" dialect = csv.Sniffer().sniff(first_line) if first_line else csv.excel except Exception: dialect = csv.excel rows = list(csv.reader(io.StringIO(text), dialect)) if not rows: return report # empty file # Header handling (tolerant) first = rows[0] norm_first = [_clean_header_cell(c).lower() for c in first] expected_norm = [h.lower() for h in EXPECTED_HEADERS] header_ok = (norm_first == expected_norm) if header_ok: data_rows = rows[1:] else: # If first row isn't a match but the column count matches, treat it as data if len(first) == len(EXPECTED_HEADERS): data_rows = rows # treat all rows as data; we'll use EXPECTED order else: # Try common alternate delimiters to recover for delim in (";", "\t"): rows2 = list(csv.reader(io.StringIO(text), delimiter=delim)) if rows2 and len(rows2[0]) == len(EXPECTED_HEADERS): rows = rows2 first = rows[0] norm_first = [_clean_header_cell(c).lower() for c in first] header_ok = (norm_first == expected_norm) data_rows = rows[1:] if header_ok else rows break else: # Could not reconcile columns report["errors"].append( f"Column mismatch: saw {len(first)} but expected {len(EXPECTED_HEADERS)}." ) return report # Normalize rows length (pad/trim) and build preview (first 10) normalized_rows: List[List[str]] = [] for r in data_rows: if not r or all((c or "").strip() == "" for c in r): continue if len(r) < len(EXPECTED_HEADERS): r = r + [""] * (len(EXPECTED_HEADERS) - len(r)) elif len(r) > len(EXPECTED_HEADERS): r = r[:len(EXPECTED_HEADERS)] normalized_rows.append(r) report["total"] = len(normalized_rows) report["preview"] = normalized_rows[:10] # show first 10 rows exactly as seen if dry_run or report["total"] == 0: return report # preview only # Create entries in batches (transactional) to_create: List[Entry] = [] for r in normalized_rows: try: obj = Entry( subject=(r[0] or "").strip(), illustration=(r[1] or "").strip(), application=(r[2] or "").strip(), scripture_raw=(r[3] or "").strip(), source=(r[4] or "").strip(), talk_title=(r[5] or "").strip(), talk_number=_parse_int(r[6]), entry_code=(r[7] or "").strip(), date_added=_parse_date(r[8]), date_edited=_parse_date(r[9]), ) to_create.append(obj) except Exception as e: report["skipped"] += 1 report["errors"].append(f"Row skipped due to error: {e}") if len(to_create) >= batch_size: with transaction.atomic(): Entry.objects.bulk_create(to_create, batch_size=batch_size) report["created"] += len(to_create) to_create.clear() if to_create: with transaction.atomic(): Entry.objects.bulk_create(to_create, batch_size=batch_size) report["created"] += len(to_create) to_create.clear() return report # small context manager used above class _noop_context: def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False