Illustrations/web/core/utils.py
Joshua Laymon 885a91701b Update web/core/utils.py
breaking down imports into chunks
2025-08-13 16:34:40 +00:00

354 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/utils.py
from __future__ import annotations
import csv
import io
import re
from datetime import datetime
from typing import Dict, List, Optional
from django.db import transaction, IntegrityError, DataError, DatabaseError
from .models import Entry
# ============================
# Search helpers (used by views)
# ============================
def terms(q: str) -> List[str]:
"""Split search query into terms; keep quoted phrases together."""
if not q:
return []
out, buf, in_quote = [], [], False
for ch in q:
if ch == '"':
in_quote = not in_quote
continue
if ch.isspace() and not in_quote:
if buf:
out.append("".join(buf))
buf = []
else:
buf.append(ch)
if buf:
out.append("".join(buf))
return out
def has_wildcards(s: str) -> bool:
return bool(s) and ("*" in s or "?" in s)
def wildcard_to_regex(s: str) -> str:
"""
Convert user wildcards to a Postgres-friendly regex:
* -> .* ? -> . (escape regex meta first)
"""
if s is None:
return ""
s = re.escape(s)
s = s.replace(r"\*", ".*").replace(r"\?", ".")
return f"^{s}$"
# ============================
# CSV import robust version
# ============================
# Canonical header names we expect (case-insensitive on input):
CANON_HEADERS = [
"subject", "illustration", "application", "scripture",
"source", "talk title", "talk number", "code", "date", "date edited",
]
EXPECTED_COLS = len(CANON_HEADERS)
# Curly quotes & odd whitespace we normalize
QUOTE_MAP = {
"\u201c": '"', "\u201d": '"', # “ ”
"\u2018": "'", "\u2019": "'", #
}
CTRL_MAP = {
"\x0b": " ", # vertical tab
"\x0c": " ", # form feed
}
def _decode_bytes(b: bytes) -> str:
"""Decode bytes with utf-8-sig, normalize line endings and characters."""
t = b.decode("utf-8-sig", errors="replace")
# normalize curly quotes and control chars
for k, v in QUOTE_MAP.items():
t = t.replace(k, v)
for k, v in CTRL_MAP.items():
t = t.replace(k, v)
# normalize newlines
t = t.replace("\r\n", "\n").replace("\r", "\n")
return t
def _sniff_dialect(text: str) -> csv.Dialect:
"""Sniff CSV dialect or default to comma."""
snippet = text[:4096]
try:
return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"])
except Exception:
class D(csv.Dialect):
delimiter = ","
quotechar = '"'
doublequote = True
skipinitialspace = False
lineterminator = "\n"
quoting = csv.QUOTE_MINIMAL
return D()
def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
"""
Split a CSV line manually, respecting quotes. Works even if the line
contains inconsistent quoting (e.g., inner quotes not doubled).
Ensures we return exactly `expected` fields by merging overflow cells
into the current text field (before the trailing short/meta columns).
"""
out, field = [], []
in_quotes = False
i, n = 0, len(line)
while i < n:
ch = line[i]
if ch == '"':
# doubled quote inside a quoted field -> literal quote
if in_quotes and i + 1 < n and line[i + 1] == '"':
field.append('"')
i += 2
continue
in_quotes = not in_quotes
i += 1
continue
if ch == delimiter and not in_quotes:
out.append("".join(field))
field = []
i += 1
continue
field.append(ch)
i += 1
out.append("".join(field))
# Repair count to exactly `expected`
if len(out) < expected:
out += [""] * (expected - len(out))
elif len(out) > expected:
head = out[:expected - 1]
tail = out[expected - 1:]
head[-1] = head[-1] + delimiter + delimiter.join(tail)
out = head
return out
def _build_header_map(headers: List[str]) -> Dict[str, str]:
"""
Map incoming headers (any case) to our canonical keys.
"""
key = {h.lower().strip(): h for h in headers}
mapping: Dict[str, Optional[str]] = {}
for canon in CANON_HEADERS:
if canon in key:
mapping[canon] = key[canon]
else:
aliases = {
"talk title": ["talk_title", "title"],
"talk number": ["talk_no", "talk#", "talknum"],
"date edited": ["edited", "date_edited", "edited date"],
}.get(canon, [])
found = next((a for a in aliases if a in key), None)
mapping[canon] = key.get(found) if found else None
return mapping # type: ignore[return-value]
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
src = header_map.get(canon_key)
return (row.get(src) if src else "") or ""
def _parse_date(val: str):
val = (val or "").strip()
if not val:
return None
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
try:
return datetime.strptime(val, fmt).date()
except Exception:
pass
# Fallback to dateutil if present
try:
from dateutil import parser # type: ignore
return parser.parse(val).date()
except Exception:
return None
def _clip(field_name: str, value: str) -> str:
"""
Clip to model field's max_length if needed, to avoid DB DataError.
"""
try:
f = Entry._meta.get_field(field_name)
max_len = getattr(f, "max_length", None)
if max_len and value and len(value) > max_len:
return value[:max_len]
except Exception:
pass
return value
def _coerce_int(val: str):
val = (val or "").strip()
if not val:
return None
m = re.search(r"(-?\d+)", val.replace(",", ""))
if not m:
return None
try:
return int(m.group(1))
except Exception:
return None
def import_csv_bytes(b: bytes, dry_run: bool = False, commit_every: int = 500) -> Dict[str, object]:
"""
Robust CSV import. Commits each row in its own transaction so that one bad
row does not poison the entire import (avoids TransactionManagementError cascades).
Returns a report dict with counts and first-line error messages.
"""
text = _decode_bytes(b)
dialect = _sniff_dialect(text)
delimiter = getattr(dialect, "delimiter", ",")
# --- headers ---
f = io.StringIO(text)
reader = csv.reader(f, dialect=dialect)
try:
raw_headers = next(reader)
except StopIteration:
return {
"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [],
"scripture_parsed": 0, "scripture_failed": 0,
"dialect_delimiter": delimiter, "used_headerless_mode": False,
"seen_headers": []
}
headers = raw_headers if len(raw_headers) == EXPECTED_COLS else _split_lenient(
",".join(raw_headers), delimiter=delimiter, expected=EXPECTED_COLS
)
header_map = _build_header_map(headers)
# Pair raw lines so we can repair rows mis-split by csv
raw_lines = text.splitlines()[1:] # skip header
dict_reader = csv.DictReader(io.StringIO(text), fieldnames=headers, dialect=dialect)
next(dict_reader, None) # skip header
total = inserted = updated = skipped = 0
errors: List[str] = []
scripture_ok = scripture_bad = 0
# Import loop (row-by-row atomic)
for idx, (raw_line, row) in enumerate(zip(raw_lines, dict_reader), start=2):
total += 1
# Repair if DictReader got the wrong shape (inconsistent quotes in source)
if len(row) != EXPECTED_COLS or None in row:
cells = _split_lenient(raw_line, delimiter=delimiter, expected=EXPECTED_COLS)
row = dict(zip(headers, cells))
# Extract canonical fields
subject = _getv(row, header_map, "subject").strip()
illustration = _getv(row, header_map, "illustration").strip()
application = _getv(row, header_map, "application").strip()
scripture = _getv(row, header_map, "scripture").strip()
source = _getv(row, header_map, "source").strip()
talk_title = _getv(row, header_map, "talk title").strip()
talk_number = _coerce_int(_getv(row, header_map, "talk number"))
entry_code = _getv(row, header_map, "code").strip()
date_added = _parse_date(_getv(row, header_map, "date"))
date_edited = _parse_date(_getv(row, header_map, "date edited"))
# Skip rows with no meaningful text
if not (subject or illustration or application):
skipped += 1
continue
# Clip to DB lengths
subject = _clip("subject", subject)
illustration = _clip("illustration", illustration)
application = _clip("application", application)
scripture = _clip("scripture_raw", scripture)
source = _clip("source", source)
talk_title = _clip("talk_title", talk_title)
entry_code = _clip("entry_code", entry_code)
scripture_ok += 1 if scripture else 0
scripture_bad += 0 if scripture else 1
# Upsert key: prefer entry_code; else (subject + illustration)
lookup: Dict[str, object] = {}
if entry_code:
lookup["entry_code"] = entry_code
else:
lookup["subject"] = subject
lookup["illustration"] = illustration
if dry_run:
exists = Entry.objects.filter(**lookup).exists()
inserted += 0 if exists else 1
updated += 1 if exists else 0
continue
try:
# Isolate each row so a failure rolls back only that row
with transaction.atomic():
obj = Entry.objects.filter(**lookup).first()
created = False
if not obj:
obj = Entry(**lookup)
created = True
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number
if entry_code:
obj.entry_code = entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
obj.save()
inserted += 1 if created else 0
updated += 0 if created else 1
except (IntegrityError, DataError, DatabaseError, ValueError) as e:
msg = str(e).splitlines()[0]
errors.append(f"line {idx}: {type(e).__name__}: {msg}")
skipped += 1
# continue to next row
return {
"rows": total,
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": errors,
"scripture_parsed": scripture_ok,
"scripture_failed": scripture_bad,
"dialect_delimiter": delimiter,
"used_headerless_mode": False,
"seen_headers": headers,
}