Illustrations/web/core/utils.py

519 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/utils.py
from __future__ import annotations
import csv
import io
import re
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple, Iterable
from django.db import transaction, IntegrityError, DataError, DatabaseError
from .models import Entry
EXPECTED_HEADERS: List[str] = [
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
# Map CSV header labels -> Entry model field names
HEADER_MAP: Dict[str, str] = {
"Subject": "subject",
"Illustration": "illustration",
"Application": "application",
"Scripture": "scripture_raw",
"Source": "source",
"Talk Title": "talk_title",
"Talk Number": "talk_number",
"Code": "entry_code",
"Date": "date_added",
"Date Edited": "date_edited",
}
# Accept both the pretty labels *and* the actual model field names
# (lets you import older dumps or hand-made files)
ACCEPTABLE_HEADERS: Dict[str, str] = {
**{h.lower(): HEADER_MAP[h] for h in EXPECTED_HEADERS},
# direct model names also OK
"subject": "subject",
"illustration": "illustration",
"application": "application",
"scripture_raw": "scripture_raw",
"source": "source",
"talk_title": "talk_title",
"talk_number": "talk_number",
"entry_code": "entry_code",
"date_added": "date_added",
"date_edited": "date_edited",
}
# ============================
# Search helpers (used by views)
# ============================
def terms(q: str) -> List[str]:
"""Split search query into terms; keep quoted phrases together."""
if not q:
return []
out, buf, in_quote = [], [], False
for ch in q:
if ch == '"':
in_quote = not in_quote
continue
if ch.isspace() and not in_quote:
if buf:
out.append("".join(buf))
buf = []
else:
buf.append(ch)
if buf:
out.append("".join(buf))
return out
def has_wildcards(s: str) -> bool:
return bool(s) and ("*" in s or "?" in s)
def wildcard_to_regex(s: str) -> str:
"""
Convert user wildcards to a Postgres-friendly regex:
* -> .* ? -> . (escape regex meta first)
"""
if s is None:
return ""
s = re.escape(s)
s = s.replace(r"\*", ".*").replace(r"\?", ".")
return f"^{s}$"
# ============================
# CSV import robust version
# ============================
# Canonical header names we expect (case-insensitive on input):
CANON_HEADERS = [
"subject", "illustration", "application", "scripture",
"source", "talk title", "talk number", "code", "date", "date edited",
]
EXPECTED_COLS = len(CANON_HEADERS)
# Curly quotes & odd whitespace we normalize
QUOTE_MAP = {
"\u201c": '"', "\u201d": '"', # “ ”
"\u2018": "'", "\u2019": "'", #
}
CTRL_MAP = {
"\x0b": " ", # vertical tab
"\x0c": " ", # form feed
}
def _decode_bytes(b: bytes) -> str:
"""Decode bytes with utf-8-sig, normalize line endings and characters."""
t = b.decode("utf-8-sig", errors="replace")
# normalize curly quotes and control chars
for k, v in QUOTE_MAP.items():
t = t.replace(k, v)
for k, v in CTRL_MAP.items():
t = t.replace(k, v)
# normalize newlines
t = t.replace("\r\n", "\n").replace("\r", "\n")
return t
def _sniff_dialect(text: str) -> csv.Dialect:
"""Sniff CSV dialect or default to comma."""
snippet = text[:4096]
try:
return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"])
except Exception:
class D(csv.Dialect):
delimiter = ","
quotechar = '"'
doublequote = True
skipinitialspace = False
lineterminator = "\n"
quoting = csv.QUOTE_MINIMAL
return D()
def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
"""
Split a CSV line manually, respecting quotes. Works even if the line
contains inconsistent quoting (e.g., inner quotes not doubled).
Ensures we return exactly `expected` fields by merging overflow cells
into the current text field (before the trailing short/meta columns).
"""
out, field = [], []
in_quotes = False
i, n = 0, len(line)
while i < n:
ch = line[i]
if ch == '"':
# doubled quote inside a quoted field -> literal quote
if in_quotes and i + 1 < n and line[i + 1] == '"':
field.append('"')
i += 2
continue
in_quotes = not in_quotes
i += 1
continue
if ch == delimiter and not in_quotes:
out.append("".join(field))
field = []
i += 1
continue
field.append(ch)
i += 1
out.append("".join(field))
# Repair count to exactly `expected`
if len(out) < expected:
out += [""] * (expected - len(out))
elif len(out) > expected:
head = out[:expected - 1]
tail = out[expected - 1:]
head[-1] = head[-1] + delimiter + delimiter.join(tail)
out = head
return out
def _build_header_map(headers: List[str]) -> Dict[str, str]:
"""
Map incoming headers (any case) to our canonical keys.
"""
key = {h.lower().strip(): h for h in headers}
mapping: Dict[str, Optional[str]] = {}
for canon in CANON_HEADERS:
if canon in key:
mapping[canon] = key[canon]
else:
aliases = {
"talk title": ["talk_title", "title"],
"talk number": ["talk_no", "talk#", "talknum"],
"date edited": ["edited", "date_edited", "edited date"],
}.get(canon, [])
found = next((a for a in aliases if a in key), None)
mapping[canon] = key.get(found) if found else None
return mapping # type: ignore[return-value]
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
src = header_map.get(canon_key)
return (row.get(src) if src else "") or ""
def _parse_date(val: str):
val = (val or "").strip()
if not val:
return None
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
try:
return datetime.strptime(val, fmt).date()
except Exception:
pass
# Fallback to dateutil if present
try:
from dateutil import parser # type: ignore
return parser.parse(val).date()
except Exception:
return None
def _clip(field_name: str, value: str) -> str:
"""
Clip to model field's max_length if needed, to avoid DB DataError.
"""
try:
f = Entry._meta.get_field(field_name)
max_len = getattr(f, "max_length", None)
if max_len and value and len(value) > max_len:
return value[:max_len]
except Exception:
pass
return value
def _coerce_int(val: str):
val = (val or "").strip()
if not val:
return None
m = re.search(r"(-?\d+)", val.replace(",", ""))
if not m:
return None
try:
return int(m.group(1))
except Exception:
return None
def _to_int_or_none(s: str) -> Optional[int]:
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except Exception:
return None
def _to_date_or_none(s: str) -> Optional[datetime.date]:
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except Exception:
pass
return None # let caller decide if this is acceptable
def _clean_header_token(s: Any) -> str:
"""
Make a header token safe/normalized:
- None -> ""
- trim spaces
- strip surrounding single/double quotes
- drop weird prefixes like r:"Talk Title" or r.'Talk Title'
- lowercase for matching
"""
s = "" if s is None else str(s)
s = s.strip()
# strip surrounding quotes
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
s = s[1:-1]
# drop r: or r. prefix some CSV tools add
if s[:2].lower() in ("r:", "r."):
s = s[2:].lstrip()
return s.strip().lower()
_DATE_FORMATS = (
"%Y-%m-%d",
"%m/%d/%Y",
"%m/%d/%y",
"%d-%b-%Y", # 05-Sep-2024
"%Y/%m/%d",
)
def _parse_date(val: str) -> Optional[datetime.date]:
if not val:
return None
txt = str(val).strip()
# Accept ISO-like with time: 2024-01-02T00:00:00
if "T" in txt:
try:
return datetime.fromisoformat(txt).date()
except Exception:
pass
for fmt in _DATE_FORMATS:
try:
return datetime.strptime(txt, fmt).date()
except Exception:
continue
# as a last resort, try only year-month-day pieces
try:
parts = [int(p) for p in txt.replace("/", "-").split("-")]
if len(parts) >= 3:
return datetime(parts[0], parts[1], parts[2]).date()
except Exception:
pass
return None
def _to_int_or_none(v: Any) -> Optional[int]:
if v is None:
return None
s = str(v).strip()
if s == "":
return None
try:
return int(float(s)) # tolerate "123.0"
except Exception:
return None
import csv
import io
from datetime import datetime
from typing import Optional, List, Dict, Any
from django.db import transaction
from .models import Entry
# Canonical header order expected from the CSV (and shown in the UI)
EXPECTED_HEADERS = [
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
def _clean_header_cell(s: str) -> str:
if s is None:
return ""
s = str(s).strip()
# Handle odd prefixes like r:"Talk Title"
low = s.lower()
if low.startswith("r:") or low.startswith("r="):
s = s[2:].lstrip()
# Strip wrapping quotes
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
s = s[1:-1]
return s.strip()
def _parse_int(x: str) -> Optional[int]:
x = (x or "").strip()
if not x:
return None
try:
return int(x)
except Exception:
return None
def _parse_date(x: str):
"""
Returns a date object or None.
Tries several common formats, then ISO.
"""
x = (x or "").strip()
if not x:
return None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"):
try:
return datetime.strptime(x, fmt).date()
except Exception:
pass
try:
return datetime.fromisoformat(x).date()
except Exception:
return None
def import_csv_bytes(content: bytes, dry_run: bool = True, batch_size: int = 1000) -> Dict[str, Any]:
"""
Parse the uploaded CSV (bytes), optionally write to DB.
Returns a report dict the templates expect:
{
"total": <int>,
"created": <int>,
"updated": 0,
"skipped": <int>,
"errors": [ ... ],
"preview": [ [cell,...], ... up to 10 rows ],
"columns": EXPECTED_HEADERS,
}
Notes:
- This implementation always CREATES new rows (no dedupe).
If you want upserts later, we can key on entry_code or (talk_number, entry_code).
"""
report = {
"total": 0,
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [],
"preview": [],
"columns": EXPECTED_HEADERS[:],
}
# Decode once (BOM-safe), sniff dialect, fall back to excel
text = content.decode("utf-8-sig", errors="replace")
try:
first_line = text.splitlines()[0] if text else ""
dialect = csv.Sniffer().sniff(first_line) if first_line else csv.excel
except Exception:
dialect = csv.excel
rows = list(csv.reader(io.StringIO(text), dialect))
if not rows:
return report # empty file
# Header handling (tolerant)
first = rows[0]
norm_first = [_clean_header_cell(c).lower() for c in first]
expected_norm = [h.lower() for h in EXPECTED_HEADERS]
header_ok = (norm_first == expected_norm)
if header_ok:
data_rows = rows[1:]
else:
# If first row isn't a match but the column count matches, treat it as data
if len(first) == len(EXPECTED_HEADERS):
data_rows = rows # treat all rows as data; we'll use EXPECTED order
else:
# Try common alternate delimiters to recover
for delim in (";", "\t"):
rows2 = list(csv.reader(io.StringIO(text), delimiter=delim))
if rows2 and len(rows2[0]) == len(EXPECTED_HEADERS):
rows = rows2
first = rows[0]
norm_first = [_clean_header_cell(c).lower() for c in first]
header_ok = (norm_first == expected_norm)
data_rows = rows[1:] if header_ok else rows
break
else:
# Could not reconcile columns
report["errors"].append(
f"Column mismatch: saw {len(first)} but expected {len(EXPECTED_HEADERS)}."
)
return report
# Normalize rows length (pad/trim) and build preview (first 10)
normalized_rows: List[List[str]] = []
for r in data_rows:
if not r or all((c or "").strip() == "" for c in r):
continue
if len(r) < len(EXPECTED_HEADERS):
r = r + [""] * (len(EXPECTED_HEADERS) - len(r))
elif len(r) > len(EXPECTED_HEADERS):
r = r[:len(EXPECTED_HEADERS)]
normalized_rows.append(r)
report["total"] = len(normalized_rows)
report["preview"] = normalized_rows[:10] # show first 10 rows exactly as seen
if dry_run or report["total"] == 0:
return report # preview only
# Create entries in batches (transactional)
to_create: List[Entry] = []
for r in normalized_rows:
try:
obj = Entry(
subject=(r[0] or "").strip(),
illustration=(r[1] or "").strip(),
application=(r[2] or "").strip(),
scripture_raw=(r[3] or "").strip(),
source=(r[4] or "").strip(),
talk_title=(r[5] or "").strip(),
talk_number=_parse_int(r[6]),
entry_code=(r[7] or "").strip(),
date_added=_parse_date(r[8]),
date_edited=_parse_date(r[9]),
)
to_create.append(obj)
except Exception as e:
report["skipped"] += 1
report["errors"].append(f"Row skipped due to error: {e}")
if len(to_create) >= batch_size:
with transaction.atomic():
Entry.objects.bulk_create(to_create, batch_size=batch_size)
report["created"] += len(to_create)
to_create.clear()
if to_create:
with transaction.atomic():
Entry.objects.bulk_create(to_create, batch_size=batch_size)
report["created"] += len(to_create)
to_create.clear()
return report
# small context manager used above
class _noop_context:
def __enter__(self): return self
def __exit__(self, exc_type, exc, tb): return False