Illustrations/web/core/utils.py

496 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/utils.py
from __future__ import annotations
import csv
import io
import re
from datetime import datetime
from typing import Dict, List, Optional, Any, Tuple, Iterable
from django.db import transaction, IntegrityError, DataError, DatabaseError
from .models import Entry
EXPECTED_HEADERS: List[str] = [
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
# Map CSV header labels -> Entry model field names
HEADER_MAP: Dict[str, str] = {
"Subject": "subject",
"Illustration": "illustration",
"Application": "application",
"Scripture": "scripture_raw",
"Source": "source",
"Talk Title": "talk_title",
"Talk Number": "talk_number",
"Code": "entry_code",
"Date": "date_added",
"Date Edited": "date_edited",
}
# Accept both the pretty labels *and* the actual model field names
# (lets you import older dumps or hand-made files)
ACCEPTABLE_HEADERS: Dict[str, str] = {
**{h.lower(): HEADER_MAP[h] for h in EXPECTED_HEADERS},
# direct model names also OK
"subject": "subject",
"illustration": "illustration",
"application": "application",
"scripture_raw": "scripture_raw",
"source": "source",
"talk_title": "talk_title",
"talk_number": "talk_number",
"entry_code": "entry_code",
"date_added": "date_added",
"date_edited": "date_edited",
}
# ============================
# Search helpers (used by views)
# ============================
def terms(q: str) -> List[str]:
"""Split search query into terms; keep quoted phrases together."""
if not q:
return []
out, buf, in_quote = [], [], False
for ch in q:
if ch == '"':
in_quote = not in_quote
continue
if ch.isspace() and not in_quote:
if buf:
out.append("".join(buf))
buf = []
else:
buf.append(ch)
if buf:
out.append("".join(buf))
return out
def has_wildcards(s: str) -> bool:
return bool(s) and ("*" in s or "?" in s)
def wildcard_to_regex(s: str) -> str:
"""
Convert user wildcards to a Postgres-friendly regex:
* -> .* ? -> . (escape regex meta first)
"""
if s is None:
return ""
s = re.escape(s)
s = s.replace(r"\*", ".*").replace(r"\?", ".")
return f"^{s}$"
# ============================
# CSV import robust version
# ============================
# Canonical header names we expect (case-insensitive on input):
CANON_HEADERS = [
"subject", "illustration", "application", "scripture",
"source", "talk title", "talk number", "code", "date", "date edited",
]
EXPECTED_COLS = len(CANON_HEADERS)
# Curly quotes & odd whitespace we normalize
QUOTE_MAP = {
"\u201c": '"', "\u201d": '"', # “ ”
"\u2018": "'", "\u2019": "'", #
}
CTRL_MAP = {
"\x0b": " ", # vertical tab
"\x0c": " ", # form feed
}
def _decode_bytes(b: bytes) -> str:
"""Decode bytes with utf-8-sig, normalize line endings and characters."""
t = b.decode("utf-8-sig", errors="replace")
# normalize curly quotes and control chars
for k, v in QUOTE_MAP.items():
t = t.replace(k, v)
for k, v in CTRL_MAP.items():
t = t.replace(k, v)
# normalize newlines
t = t.replace("\r\n", "\n").replace("\r", "\n")
return t
def _sniff_dialect(text: str) -> csv.Dialect:
"""Sniff CSV dialect or default to comma."""
snippet = text[:4096]
try:
return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"])
except Exception:
class D(csv.Dialect):
delimiter = ","
quotechar = '"'
doublequote = True
skipinitialspace = False
lineterminator = "\n"
quoting = csv.QUOTE_MINIMAL
return D()
def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
"""
Split a CSV line manually, respecting quotes. Works even if the line
contains inconsistent quoting (e.g., inner quotes not doubled).
Ensures we return exactly `expected` fields by merging overflow cells
into the current text field (before the trailing short/meta columns).
"""
out, field = [], []
in_quotes = False
i, n = 0, len(line)
while i < n:
ch = line[i]
if ch == '"':
# doubled quote inside a quoted field -> literal quote
if in_quotes and i + 1 < n and line[i + 1] == '"':
field.append('"')
i += 2
continue
in_quotes = not in_quotes
i += 1
continue
if ch == delimiter and not in_quotes:
out.append("".join(field))
field = []
i += 1
continue
field.append(ch)
i += 1
out.append("".join(field))
# Repair count to exactly `expected`
if len(out) < expected:
out += [""] * (expected - len(out))
elif len(out) > expected:
head = out[:expected - 1]
tail = out[expected - 1:]
head[-1] = head[-1] + delimiter + delimiter.join(tail)
out = head
return out
def _build_header_map(headers: List[str]) -> Dict[str, str]:
"""
Map incoming headers (any case) to our canonical keys.
"""
key = {h.lower().strip(): h for h in headers}
mapping: Dict[str, Optional[str]] = {}
for canon in CANON_HEADERS:
if canon in key:
mapping[canon] = key[canon]
else:
aliases = {
"talk title": ["talk_title", "title"],
"talk number": ["talk_no", "talk#", "talknum"],
"date edited": ["edited", "date_edited", "edited date"],
}.get(canon, [])
found = next((a for a in aliases if a in key), None)
mapping[canon] = key.get(found) if found else None
return mapping # type: ignore[return-value]
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
src = header_map.get(canon_key)
return (row.get(src) if src else "") or ""
def _parse_date(val: str):
val = (val or "").strip()
if not val:
return None
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
try:
return datetime.strptime(val, fmt).date()
except Exception:
pass
# Fallback to dateutil if present
try:
from dateutil import parser # type: ignore
return parser.parse(val).date()
except Exception:
return None
def _clip(field_name: str, value: str) -> str:
"""
Clip to model field's max_length if needed, to avoid DB DataError.
"""
try:
f = Entry._meta.get_field(field_name)
max_len = getattr(f, "max_length", None)
if max_len and value and len(value) > max_len:
return value[:max_len]
except Exception:
pass
return value
def _coerce_int(val: str):
val = (val or "").strip()
if not val:
return None
m = re.search(r"(-?\d+)", val.replace(",", ""))
if not m:
return None
try:
return int(m.group(1))
except Exception:
return None
def _to_int_or_none(s: str) -> Optional[int]:
s = (s or "").strip()
if not s:
return None
try:
return int(s)
except Exception:
return None
def _to_date_or_none(s: str) -> Optional[datetime.date]:
s = (s or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"):
try:
return datetime.datetime.strptime(s, fmt).date()
except Exception:
pass
return None # let caller decide if this is acceptable
def _clean_header_token(s: Any) -> str:
"""
Make a header token safe/normalized:
- None -> ""
- trim spaces
- strip surrounding single/double quotes
- drop weird prefixes like r:"Talk Title" or r.'Talk Title'
- lowercase for matching
"""
s = "" if s is None else str(s)
s = s.strip()
# strip surrounding quotes
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
s = s[1:-1]
# drop r: or r. prefix some CSV tools add
if s[:2].lower() in ("r:", "r."):
s = s[2:].lstrip()
return s.strip().lower()
_DATE_FORMATS = (
"%Y-%m-%d",
"%m/%d/%Y",
"%m/%d/%y",
"%d-%b-%Y", # 05-Sep-2024
"%Y/%m/%d",
)
def _parse_date(val: str) -> Optional[datetime.date]:
if not val:
return None
txt = str(val).strip()
# Accept ISO-like with time: 2024-01-02T00:00:00
if "T" in txt:
try:
return datetime.fromisoformat(txt).date()
except Exception:
pass
for fmt in _DATE_FORMATS:
try:
return datetime.strptime(txt, fmt).date()
except Exception:
continue
# as a last resort, try only year-month-day pieces
try:
parts = [int(p) for p in txt.replace("/", "-").split("-")]
if len(parts) >= 3:
return datetime(parts[0], parts[1], parts[2]).date()
except Exception:
pass
return None
def _to_int_or_none(v: Any) -> Optional[int]:
if v is None:
return None
s = str(v).strip()
if s == "":
return None
try:
return int(float(s)) # tolerate "123.0"
except Exception:
return None
def import_csv_bytes(data: bytes, dry_run: bool = True) -> Dict[str, Any]:
"""
Robust CSV importer for Entry.
- Accepts your human-readable header (Subject, Illustration, ...)
and/or direct model field names.
- Normalizes odd headers like r."Talk Title".
- Handles BOM & dialect sniffing.
- Returns a report dict: {ok, created, updated, skipped, errors, preview, total_rows, header}
"""
report: Dict[str, Any] = {
"ok": False,
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [], # list[str]
"preview": [], # first ~10 rows that would be imported
"total_rows": 0,
"header": [],
}
# --- decode safely (remove BOM, keep unknowns) ---
text = data.decode("utf-8-sig", errors="replace")
# --- sniff dialect; fall back to excel ---
try:
sample = "\n".join(text.splitlines()[:10])
dialect = csv.Sniffer().sniff(sample) if sample.strip() else csv.excel
except Exception:
dialect = csv.excel
rdr = csv.reader(io.StringIO(text), dialect)
try:
raw_header = next(rdr, [])
except Exception as e:
report["errors"].append(f"Failed reading header: {e}")
return report
# Clean & map header
cleaned = [_clean_header_token(h) for h in raw_header]
mapped: List[str] = []
unknowns: List[str] = []
for token in cleaned:
target = ACCEPTABLE_HEADERS.get(token)
if target:
mapped.append(target)
else:
unknowns.append(token or "(empty)")
# If header doesn't match expected width but row count does, assume *no* header;
# inject expected header so downstream works.
has_header = True
if unknowns:
# Heuristic: if the number of columns equals EXPECTED_HEADERS and *none*
# of the cleaned tokens map, it's probably a data row (no header)
matches = sum(1 for t in cleaned if t in ACCEPTABLE_HEADERS)
if matches == 0 and len(cleaned) == len(EXPECTED_HEADERS):
# inject expected header and re-run
has_header = False
mapped = [HEADER_MAP[h] for h in EXPECTED_HEADERS]
# rebuild a reader with the expected header injected
sio = io.StringIO(text)
rdr_tmp = csv.reader(sio, dialect)
rows = list(rdr_tmp)
rows.insert(0, EXPECTED_HEADERS) # inject pretty header for report
rdr = iter(rows) # consume from this list iterator
next(rdr, None) # skip our injected header
else:
# keep going but warn in the report
report["errors"].append(
"Some header columns were not recognized: "
+ ", ".join(unknowns)
+ " (continuing with best-effort mapping)"
)
report["header"] = mapped
# Read rows
rows = list(rdr)
report["total_rows"] = len(rows)
# Build row dicts
def row_to_obj(row_idx: int, row: List[str]) -> Tuple[Optional[Entry], Optional[Dict[str, Any]], Optional[str]]:
"""
Returns (entry_instance_or_None, values_dict_or_None, error_message_or_None)
but does not save to DB.
"""
if len(row) < len(mapped):
return None, None, f"Row {row_idx}: expected {len(mapped)} columns, found {len(row)}."
values: Dict[str, Any] = {}
for i, field in enumerate(mapped):
raw_val = row[i] if i < len(row) else ""
# Coerce types for specific fields
if field in ("date_added", "date_edited"):
values[field] = _parse_date(raw_val)
elif field == "talk_number":
values[field] = _to_int_or_none(raw_val)
else:
values[field] = (raw_val or "").strip()
# Create (unsaved) Entry instance for preview/validation
e = Entry(**{k: v for k, v in values.items() if v not in (None, "")})
return e, values, None
# Preview first few
for i, row in enumerate(rows[:10], start=1):
e, values, err = row_to_obj(i, row)
report["preview"].append({
"row": i,
"values": values if values else {},
"error": err,
})
if dry_run:
# Dry run: dont write, just validate basic structure
bad = [p for p in report["preview"] if p["error"]]
if bad:
report["errors"].extend(p["error"] for p in bad if p["error"])
report["ok"] = len(report["errors"]) == 0
return report
# Real import (create new rows).
# If you want update/merge behavior, add a key strategy here.
created = 0
updated = 0
skipped = 0
errors: List[str] = []
with transaction.atomic():
for idx, row in enumerate(rows, start=1):
e, values, err = row_to_obj(idx, row)
if err:
errors.append(err)
skipped += 1
continue
try:
# Simple create-only behavior:
Entry.objects.create(**values)
created += 1
except Exception as ex:
errors.append(f"Row {idx}: failed to save ({ex})")
skipped += 1
report.update({
"ok": len(errors) == 0,
"created": created,
"updated": updated,
"skipped": skipped,
"errors": errors,
})
return report
# small context manager used above
class _noop_context:
def __enter__(self): return self
def __exit__(self, exc_type, exc, tb): return False