354 lines
11 KiB
Python
354 lines
11 KiB
Python
# core/utils.py
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
from django.db import transaction, IntegrityError, DataError, DatabaseError
|
||
|
||
from .models import Entry
|
||
|
||
|
||
# ============================
|
||
# Search helpers (used by views)
|
||
# ============================
|
||
|
||
def terms(q: str) -> List[str]:
|
||
"""Split search query into terms; keep quoted phrases together."""
|
||
if not q:
|
||
return []
|
||
out, buf, in_quote = [], [], False
|
||
for ch in q:
|
||
if ch == '"':
|
||
in_quote = not in_quote
|
||
continue
|
||
if ch.isspace() and not in_quote:
|
||
if buf:
|
||
out.append("".join(buf))
|
||
buf = []
|
||
else:
|
||
buf.append(ch)
|
||
if buf:
|
||
out.append("".join(buf))
|
||
return out
|
||
|
||
|
||
def has_wildcards(s: str) -> bool:
|
||
return bool(s) and ("*" in s or "?" in s)
|
||
|
||
|
||
def wildcard_to_regex(s: str) -> str:
|
||
"""
|
||
Convert user wildcards to a Postgres-friendly regex:
|
||
* -> .* ? -> . (escape regex meta first)
|
||
"""
|
||
if s is None:
|
||
return ""
|
||
s = re.escape(s)
|
||
s = s.replace(r"\*", ".*").replace(r"\?", ".")
|
||
return f"^{s}$"
|
||
|
||
|
||
# ============================
|
||
# CSV import – robust version
|
||
# ============================
|
||
|
||
# Canonical header names we expect (case-insensitive on input):
|
||
CANON_HEADERS = [
|
||
"subject", "illustration", "application", "scripture",
|
||
"source", "talk title", "talk number", "code", "date", "date edited",
|
||
]
|
||
EXPECTED_COLS = len(CANON_HEADERS)
|
||
|
||
# Curly quotes & odd whitespace we normalize
|
||
QUOTE_MAP = {
|
||
"\u201c": '"', "\u201d": '"', # “ ”
|
||
"\u2018": "'", "\u2019": "'", # ‘ ’
|
||
}
|
||
CTRL_MAP = {
|
||
"\x0b": " ", # vertical tab
|
||
"\x0c": " ", # form feed
|
||
}
|
||
|
||
|
||
def _decode_bytes(b: bytes) -> str:
|
||
"""Decode bytes with utf-8-sig, normalize line endings and characters."""
|
||
t = b.decode("utf-8-sig", errors="replace")
|
||
# normalize curly quotes and control chars
|
||
for k, v in QUOTE_MAP.items():
|
||
t = t.replace(k, v)
|
||
for k, v in CTRL_MAP.items():
|
||
t = t.replace(k, v)
|
||
# normalize newlines
|
||
t = t.replace("\r\n", "\n").replace("\r", "\n")
|
||
return t
|
||
|
||
|
||
def _sniff_dialect(text: str) -> csv.Dialect:
|
||
"""Sniff CSV dialect or default to comma."""
|
||
snippet = text[:4096]
|
||
try:
|
||
return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"])
|
||
except Exception:
|
||
class D(csv.Dialect):
|
||
delimiter = ","
|
||
quotechar = '"'
|
||
doublequote = True
|
||
skipinitialspace = False
|
||
lineterminator = "\n"
|
||
quoting = csv.QUOTE_MINIMAL
|
||
return D()
|
||
|
||
|
||
def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
|
||
"""
|
||
Split a CSV line manually, respecting quotes. Works even if the line
|
||
contains inconsistent quoting (e.g., inner quotes not doubled).
|
||
Ensures we return exactly `expected` fields by merging overflow cells
|
||
into the current text field (before the trailing short/meta columns).
|
||
"""
|
||
out, field = [], []
|
||
in_quotes = False
|
||
i, n = 0, len(line)
|
||
while i < n:
|
||
ch = line[i]
|
||
if ch == '"':
|
||
# doubled quote inside a quoted field -> literal quote
|
||
if in_quotes and i + 1 < n and line[i + 1] == '"':
|
||
field.append('"')
|
||
i += 2
|
||
continue
|
||
in_quotes = not in_quotes
|
||
i += 1
|
||
continue
|
||
if ch == delimiter and not in_quotes:
|
||
out.append("".join(field))
|
||
field = []
|
||
i += 1
|
||
continue
|
||
field.append(ch)
|
||
i += 1
|
||
out.append("".join(field))
|
||
|
||
# Repair count to exactly `expected`
|
||
if len(out) < expected:
|
||
out += [""] * (expected - len(out))
|
||
elif len(out) > expected:
|
||
head = out[:expected - 1]
|
||
tail = out[expected - 1:]
|
||
head[-1] = head[-1] + delimiter + delimiter.join(tail)
|
||
out = head
|
||
|
||
return out
|
||
|
||
|
||
def _build_header_map(headers: List[str]) -> Dict[str, str]:
|
||
"""
|
||
Map incoming headers (any case) to our canonical keys.
|
||
"""
|
||
key = {h.lower().strip(): h for h in headers}
|
||
mapping: Dict[str, Optional[str]] = {}
|
||
for canon in CANON_HEADERS:
|
||
if canon in key:
|
||
mapping[canon] = key[canon]
|
||
else:
|
||
aliases = {
|
||
"talk title": ["talk_title", "title"],
|
||
"talk number": ["talk_no", "talk#", "talknum"],
|
||
"date edited": ["edited", "date_edited", "edited date"],
|
||
}.get(canon, [])
|
||
found = next((a for a in aliases if a in key), None)
|
||
mapping[canon] = key.get(found) if found else None
|
||
return mapping # type: ignore[return-value]
|
||
|
||
|
||
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
|
||
src = header_map.get(canon_key)
|
||
return (row.get(src) if src else "") or ""
|
||
|
||
|
||
def _parse_date(val: str):
|
||
val = (val or "").strip()
|
||
if not val:
|
||
return None
|
||
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
|
||
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
|
||
try:
|
||
return datetime.strptime(val, fmt).date()
|
||
except Exception:
|
||
pass
|
||
# Fallback to dateutil if present
|
||
try:
|
||
from dateutil import parser # type: ignore
|
||
return parser.parse(val).date()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _clip(field_name: str, value: str) -> str:
|
||
"""
|
||
Clip to model field's max_length if needed, to avoid DB DataError.
|
||
"""
|
||
try:
|
||
f = Entry._meta.get_field(field_name)
|
||
max_len = getattr(f, "max_length", None)
|
||
if max_len and value and len(value) > max_len:
|
||
return value[:max_len]
|
||
except Exception:
|
||
pass
|
||
return value
|
||
|
||
|
||
def _coerce_int(val: str):
|
||
val = (val or "").strip()
|
||
if not val:
|
||
return None
|
||
m = re.search(r"(-?\d+)", val.replace(",", ""))
|
||
if not m:
|
||
return None
|
||
try:
|
||
return int(m.group(1))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def import_csv_bytes(b: bytes, dry_run: bool = False, commit_every: int = 500) -> Dict[str, object]:
|
||
"""
|
||
Robust CSV import. Commits each row in its own transaction so that one bad
|
||
row does not poison the entire import (avoids TransactionManagementError cascades).
|
||
|
||
Returns a report dict with counts and first-line error messages.
|
||
"""
|
||
text = _decode_bytes(b)
|
||
dialect = _sniff_dialect(text)
|
||
delimiter = getattr(dialect, "delimiter", ",")
|
||
|
||
# --- headers ---
|
||
f = io.StringIO(text)
|
||
reader = csv.reader(f, dialect=dialect)
|
||
try:
|
||
raw_headers = next(reader)
|
||
except StopIteration:
|
||
return {
|
||
"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [],
|
||
"scripture_parsed": 0, "scripture_failed": 0,
|
||
"dialect_delimiter": delimiter, "used_headerless_mode": False,
|
||
"seen_headers": []
|
||
}
|
||
|
||
headers = raw_headers if len(raw_headers) == EXPECTED_COLS else _split_lenient(
|
||
",".join(raw_headers), delimiter=delimiter, expected=EXPECTED_COLS
|
||
)
|
||
header_map = _build_header_map(headers)
|
||
|
||
# Pair raw lines so we can repair rows mis-split by csv
|
||
raw_lines = text.splitlines()[1:] # skip header
|
||
|
||
dict_reader = csv.DictReader(io.StringIO(text), fieldnames=headers, dialect=dialect)
|
||
next(dict_reader, None) # skip header
|
||
|
||
total = inserted = updated = skipped = 0
|
||
errors: List[str] = []
|
||
scripture_ok = scripture_bad = 0
|
||
|
||
# Import loop (row-by-row atomic)
|
||
for idx, (raw_line, row) in enumerate(zip(raw_lines, dict_reader), start=2):
|
||
total += 1
|
||
|
||
# Repair if DictReader got the wrong shape (inconsistent quotes in source)
|
||
if len(row) != EXPECTED_COLS or None in row:
|
||
cells = _split_lenient(raw_line, delimiter=delimiter, expected=EXPECTED_COLS)
|
||
row = dict(zip(headers, cells))
|
||
|
||
# Extract canonical fields
|
||
subject = _getv(row, header_map, "subject").strip()
|
||
illustration = _getv(row, header_map, "illustration").strip()
|
||
application = _getv(row, header_map, "application").strip()
|
||
scripture = _getv(row, header_map, "scripture").strip()
|
||
source = _getv(row, header_map, "source").strip()
|
||
talk_title = _getv(row, header_map, "talk title").strip()
|
||
talk_number = _coerce_int(_getv(row, header_map, "talk number"))
|
||
entry_code = _getv(row, header_map, "code").strip()
|
||
date_added = _parse_date(_getv(row, header_map, "date"))
|
||
date_edited = _parse_date(_getv(row, header_map, "date edited"))
|
||
|
||
# Skip rows with no meaningful text
|
||
if not (subject or illustration or application):
|
||
skipped += 1
|
||
continue
|
||
|
||
# Clip to DB lengths
|
||
subject = _clip("subject", subject)
|
||
illustration = _clip("illustration", illustration)
|
||
application = _clip("application", application)
|
||
scripture = _clip("scripture_raw", scripture)
|
||
source = _clip("source", source)
|
||
talk_title = _clip("talk_title", talk_title)
|
||
entry_code = _clip("entry_code", entry_code)
|
||
|
||
scripture_ok += 1 if scripture else 0
|
||
scripture_bad += 0 if scripture else 1
|
||
|
||
# Upsert key: prefer entry_code; else (subject + illustration)
|
||
lookup: Dict[str, object] = {}
|
||
if entry_code:
|
||
lookup["entry_code"] = entry_code
|
||
else:
|
||
lookup["subject"] = subject
|
||
lookup["illustration"] = illustration
|
||
|
||
if dry_run:
|
||
exists = Entry.objects.filter(**lookup).exists()
|
||
inserted += 0 if exists else 1
|
||
updated += 1 if exists else 0
|
||
continue
|
||
|
||
try:
|
||
# Isolate each row so a failure rolls back only that row
|
||
with transaction.atomic():
|
||
obj = Entry.objects.filter(**lookup).first()
|
||
created = False
|
||
if not obj:
|
||
obj = Entry(**lookup)
|
||
created = True
|
||
|
||
obj.subject = subject
|
||
obj.illustration = illustration
|
||
obj.application = application
|
||
obj.scripture_raw = scripture
|
||
obj.source = source
|
||
obj.talk_title = talk_title
|
||
obj.talk_number = talk_number
|
||
if entry_code:
|
||
obj.entry_code = entry_code
|
||
if date_added:
|
||
obj.date_added = date_added
|
||
if date_edited:
|
||
obj.date_edited = date_edited
|
||
|
||
obj.save()
|
||
|
||
inserted += 1 if created else 0
|
||
updated += 0 if created else 1
|
||
|
||
except (IntegrityError, DataError, DatabaseError, ValueError) as e:
|
||
msg = str(e).splitlines()[0]
|
||
errors.append(f"line {idx}: {type(e).__name__}: {msg}")
|
||
skipped += 1
|
||
# continue to next row
|
||
|
||
return {
|
||
"rows": total,
|
||
"inserted": inserted,
|
||
"updated": updated,
|
||
"skipped": skipped,
|
||
"errors": errors,
|
||
"scripture_parsed": scripture_ok,
|
||
"scripture_failed": scripture_bad,
|
||
"dialect_delimiter": delimiter,
|
||
"used_headerless_mode": False,
|
||
"seen_headers": headers,
|
||
} |