519 lines
15 KiB
Python
519 lines
15 KiB
Python
# core/utils.py
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Any, Tuple, Iterable
|
||
|
||
from django.db import transaction, IntegrityError, DataError, DatabaseError
|
||
|
||
from .models import Entry
|
||
|
||
EXPECTED_HEADERS: List[str] = [
|
||
"Subject", "Illustration", "Application", "Scripture", "Source",
|
||
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
|
||
]
|
||
|
||
# Map CSV header labels -> Entry model field names
|
||
HEADER_MAP: Dict[str, str] = {
|
||
"Subject": "subject",
|
||
"Illustration": "illustration",
|
||
"Application": "application",
|
||
"Scripture": "scripture_raw",
|
||
"Source": "source",
|
||
"Talk Title": "talk_title",
|
||
"Talk Number": "talk_number",
|
||
"Code": "entry_code",
|
||
"Date": "date_added",
|
||
"Date Edited": "date_edited",
|
||
}
|
||
|
||
# Accept both the pretty labels *and* the actual model field names
|
||
# (lets you import older dumps or hand-made files)
|
||
ACCEPTABLE_HEADERS: Dict[str, str] = {
|
||
**{h.lower(): HEADER_MAP[h] for h in EXPECTED_HEADERS},
|
||
# direct model names also OK
|
||
"subject": "subject",
|
||
"illustration": "illustration",
|
||
"application": "application",
|
||
"scripture_raw": "scripture_raw",
|
||
"source": "source",
|
||
"talk_title": "talk_title",
|
||
"talk_number": "talk_number",
|
||
"entry_code": "entry_code",
|
||
"date_added": "date_added",
|
||
"date_edited": "date_edited",
|
||
}
|
||
|
||
|
||
# ============================
|
||
# Search helpers (used by views)
|
||
# ============================
|
||
|
||
def terms(q: str) -> List[str]:
|
||
"""Split search query into terms; keep quoted phrases together."""
|
||
if not q:
|
||
return []
|
||
out, buf, in_quote = [], [], False
|
||
for ch in q:
|
||
if ch == '"':
|
||
in_quote = not in_quote
|
||
continue
|
||
if ch.isspace() and not in_quote:
|
||
if buf:
|
||
out.append("".join(buf))
|
||
buf = []
|
||
else:
|
||
buf.append(ch)
|
||
if buf:
|
||
out.append("".join(buf))
|
||
return out
|
||
|
||
|
||
def has_wildcards(s: str) -> bool:
|
||
return bool(s) and ("*" in s or "?" in s)
|
||
|
||
|
||
def wildcard_to_regex(s: str) -> str:
|
||
"""
|
||
Convert user wildcards to a Postgres-friendly regex:
|
||
* -> .* ? -> . (escape regex meta first)
|
||
"""
|
||
if s is None:
|
||
return ""
|
||
s = re.escape(s)
|
||
s = s.replace(r"\*", ".*").replace(r"\?", ".")
|
||
return f"^{s}$"
|
||
|
||
|
||
# ============================
|
||
# CSV import – robust version
|
||
# ============================
|
||
|
||
# Canonical header names we expect (case-insensitive on input):
|
||
CANON_HEADERS = [
|
||
"subject", "illustration", "application", "scripture",
|
||
"source", "talk title", "talk number", "code", "date", "date edited",
|
||
]
|
||
EXPECTED_COLS = len(CANON_HEADERS)
|
||
|
||
# Curly quotes & odd whitespace we normalize
|
||
QUOTE_MAP = {
|
||
"\u201c": '"', "\u201d": '"', # “ ”
|
||
"\u2018": "'", "\u2019": "'", # ‘ ’
|
||
}
|
||
CTRL_MAP = {
|
||
"\x0b": " ", # vertical tab
|
||
"\x0c": " ", # form feed
|
||
}
|
||
|
||
|
||
def _decode_bytes(b: bytes) -> str:
|
||
"""Decode bytes with utf-8-sig, normalize line endings and characters."""
|
||
t = b.decode("utf-8-sig", errors="replace")
|
||
# normalize curly quotes and control chars
|
||
for k, v in QUOTE_MAP.items():
|
||
t = t.replace(k, v)
|
||
for k, v in CTRL_MAP.items():
|
||
t = t.replace(k, v)
|
||
# normalize newlines
|
||
t = t.replace("\r\n", "\n").replace("\r", "\n")
|
||
return t
|
||
|
||
|
||
def _sniff_dialect(text: str) -> csv.Dialect:
|
||
"""Sniff CSV dialect or default to comma."""
|
||
snippet = text[:4096]
|
||
try:
|
||
return csv.Sniffer().sniff(snippet, delimiters=[",", ";", "\t", "|"])
|
||
except Exception:
|
||
class D(csv.Dialect):
|
||
delimiter = ","
|
||
quotechar = '"'
|
||
doublequote = True
|
||
skipinitialspace = False
|
||
lineterminator = "\n"
|
||
quoting = csv.QUOTE_MINIMAL
|
||
return D()
|
||
|
||
|
||
def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
|
||
"""
|
||
Split a CSV line manually, respecting quotes. Works even if the line
|
||
contains inconsistent quoting (e.g., inner quotes not doubled).
|
||
Ensures we return exactly `expected` fields by merging overflow cells
|
||
into the current text field (before the trailing short/meta columns).
|
||
"""
|
||
out, field = [], []
|
||
in_quotes = False
|
||
i, n = 0, len(line)
|
||
while i < n:
|
||
ch = line[i]
|
||
if ch == '"':
|
||
# doubled quote inside a quoted field -> literal quote
|
||
if in_quotes and i + 1 < n and line[i + 1] == '"':
|
||
field.append('"')
|
||
i += 2
|
||
continue
|
||
in_quotes = not in_quotes
|
||
i += 1
|
||
continue
|
||
if ch == delimiter and not in_quotes:
|
||
out.append("".join(field))
|
||
field = []
|
||
i += 1
|
||
continue
|
||
field.append(ch)
|
||
i += 1
|
||
out.append("".join(field))
|
||
|
||
# Repair count to exactly `expected`
|
||
if len(out) < expected:
|
||
out += [""] * (expected - len(out))
|
||
elif len(out) > expected:
|
||
head = out[:expected - 1]
|
||
tail = out[expected - 1:]
|
||
head[-1] = head[-1] + delimiter + delimiter.join(tail)
|
||
out = head
|
||
|
||
return out
|
||
|
||
|
||
def _build_header_map(headers: List[str]) -> Dict[str, str]:
|
||
"""
|
||
Map incoming headers (any case) to our canonical keys.
|
||
"""
|
||
key = {h.lower().strip(): h for h in headers}
|
||
mapping: Dict[str, Optional[str]] = {}
|
||
for canon in CANON_HEADERS:
|
||
if canon in key:
|
||
mapping[canon] = key[canon]
|
||
else:
|
||
aliases = {
|
||
"talk title": ["talk_title", "title"],
|
||
"talk number": ["talk_no", "talk#", "talknum"],
|
||
"date edited": ["edited", "date_edited", "edited date"],
|
||
}.get(canon, [])
|
||
found = next((a for a in aliases if a in key), None)
|
||
mapping[canon] = key.get(found) if found else None
|
||
return mapping # type: ignore[return-value]
|
||
|
||
|
||
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
|
||
src = header_map.get(canon_key)
|
||
return (row.get(src) if src else "") or ""
|
||
|
||
|
||
def _parse_date(val: str):
|
||
val = (val or "").strip()
|
||
if not val:
|
||
return None
|
||
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
|
||
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
|
||
try:
|
||
return datetime.strptime(val, fmt).date()
|
||
except Exception:
|
||
pass
|
||
# Fallback to dateutil if present
|
||
try:
|
||
from dateutil import parser # type: ignore
|
||
return parser.parse(val).date()
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _clip(field_name: str, value: str) -> str:
|
||
"""
|
||
Clip to model field's max_length if needed, to avoid DB DataError.
|
||
"""
|
||
try:
|
||
f = Entry._meta.get_field(field_name)
|
||
max_len = getattr(f, "max_length", None)
|
||
if max_len and value and len(value) > max_len:
|
||
return value[:max_len]
|
||
except Exception:
|
||
pass
|
||
return value
|
||
|
||
|
||
def _coerce_int(val: str):
|
||
val = (val or "").strip()
|
||
if not val:
|
||
return None
|
||
m = re.search(r"(-?\d+)", val.replace(",", ""))
|
||
if not m:
|
||
return None
|
||
try:
|
||
return int(m.group(1))
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _to_int_or_none(s: str) -> Optional[int]:
|
||
s = (s or "").strip()
|
||
if not s:
|
||
return None
|
||
try:
|
||
return int(s)
|
||
except Exception:
|
||
return None
|
||
|
||
def _to_date_or_none(s: str) -> Optional[datetime.date]:
|
||
s = (s or "").strip()
|
||
if not s:
|
||
return None
|
||
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"):
|
||
try:
|
||
return datetime.datetime.strptime(s, fmt).date()
|
||
except Exception:
|
||
pass
|
||
return None # let caller decide if this is acceptable
|
||
|
||
def _clean_header_token(s: Any) -> str:
|
||
"""
|
||
Make a header token safe/normalized:
|
||
- None -> ""
|
||
- trim spaces
|
||
- strip surrounding single/double quotes
|
||
- drop weird prefixes like r:"Talk Title" or r.'Talk Title'
|
||
- lowercase for matching
|
||
"""
|
||
s = "" if s is None else str(s)
|
||
s = s.strip()
|
||
# strip surrounding quotes
|
||
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
|
||
s = s[1:-1]
|
||
# drop r: or r. prefix some CSV tools add
|
||
if s[:2].lower() in ("r:", "r."):
|
||
s = s[2:].lstrip()
|
||
return s.strip().lower()
|
||
|
||
|
||
_DATE_FORMATS = (
|
||
"%Y-%m-%d",
|
||
"%m/%d/%Y",
|
||
"%m/%d/%y",
|
||
"%d-%b-%Y", # 05-Sep-2024
|
||
"%Y/%m/%d",
|
||
)
|
||
|
||
def _parse_date(val: str) -> Optional[datetime.date]:
|
||
if not val:
|
||
return None
|
||
txt = str(val).strip()
|
||
# Accept ISO-like with time: 2024-01-02T00:00:00
|
||
if "T" in txt:
|
||
try:
|
||
return datetime.fromisoformat(txt).date()
|
||
except Exception:
|
||
pass
|
||
for fmt in _DATE_FORMATS:
|
||
try:
|
||
return datetime.strptime(txt, fmt).date()
|
||
except Exception:
|
||
continue
|
||
# as a last resort, try only year-month-day pieces
|
||
try:
|
||
parts = [int(p) for p in txt.replace("/", "-").split("-")]
|
||
if len(parts) >= 3:
|
||
return datetime(parts[0], parts[1], parts[2]).date()
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _to_int_or_none(v: Any) -> Optional[int]:
|
||
if v is None:
|
||
return None
|
||
s = str(v).strip()
|
||
if s == "":
|
||
return None
|
||
try:
|
||
return int(float(s)) # tolerate "123.0"
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
import csv
|
||
import io
|
||
from datetime import datetime
|
||
from typing import Optional, List, Dict, Any
|
||
|
||
from django.db import transaction
|
||
|
||
from .models import Entry
|
||
|
||
# Canonical header order expected from the CSV (and shown in the UI)
|
||
EXPECTED_HEADERS = [
|
||
"Subject", "Illustration", "Application", "Scripture", "Source",
|
||
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
|
||
]
|
||
|
||
def _clean_header_cell(s: str) -> str:
|
||
if s is None:
|
||
return ""
|
||
s = str(s).strip()
|
||
# Handle odd prefixes like r:"Talk Title"
|
||
low = s.lower()
|
||
if low.startswith("r:") or low.startswith("r="):
|
||
s = s[2:].lstrip()
|
||
# Strip wrapping quotes
|
||
if len(s) >= 2 and s[0] == s[-1] and s[0] in ('"', "'"):
|
||
s = s[1:-1]
|
||
return s.strip()
|
||
|
||
def _parse_int(x: str) -> Optional[int]:
|
||
x = (x or "").strip()
|
||
if not x:
|
||
return None
|
||
try:
|
||
return int(x)
|
||
except Exception:
|
||
return None
|
||
|
||
def _parse_date(x: str):
|
||
"""
|
||
Returns a date object or None.
|
||
Tries several common formats, then ISO.
|
||
"""
|
||
x = (x or "").strip()
|
||
if not x:
|
||
return None
|
||
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y"):
|
||
try:
|
||
return datetime.strptime(x, fmt).date()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
return datetime.fromisoformat(x).date()
|
||
except Exception:
|
||
return None
|
||
|
||
def import_csv_bytes(content: bytes, dry_run: bool = True, batch_size: int = 1000) -> Dict[str, Any]:
|
||
"""
|
||
Parse the uploaded CSV (bytes), optionally write to DB.
|
||
Returns a report dict the templates expect:
|
||
|
||
{
|
||
"total": <int>,
|
||
"created": <int>,
|
||
"updated": 0,
|
||
"skipped": <int>,
|
||
"errors": [ ... ],
|
||
"preview": [ [cell,...], ... up to 10 rows ],
|
||
"columns": EXPECTED_HEADERS,
|
||
}
|
||
|
||
Notes:
|
||
- This implementation always CREATES new rows (no dedupe).
|
||
If you want upserts later, we can key on entry_code or (talk_number, entry_code).
|
||
"""
|
||
report = {
|
||
"total": 0,
|
||
"created": 0,
|
||
"updated": 0,
|
||
"skipped": 0,
|
||
"errors": [],
|
||
"preview": [],
|
||
"columns": EXPECTED_HEADERS[:],
|
||
}
|
||
|
||
# Decode once (BOM-safe), sniff dialect, fall back to excel
|
||
text = content.decode("utf-8-sig", errors="replace")
|
||
try:
|
||
first_line = text.splitlines()[0] if text else ""
|
||
dialect = csv.Sniffer().sniff(first_line) if first_line else csv.excel
|
||
except Exception:
|
||
dialect = csv.excel
|
||
|
||
rows = list(csv.reader(io.StringIO(text), dialect))
|
||
if not rows:
|
||
return report # empty file
|
||
|
||
# Header handling (tolerant)
|
||
first = rows[0]
|
||
norm_first = [_clean_header_cell(c).lower() for c in first]
|
||
expected_norm = [h.lower() for h in EXPECTED_HEADERS]
|
||
header_ok = (norm_first == expected_norm)
|
||
|
||
if header_ok:
|
||
data_rows = rows[1:]
|
||
else:
|
||
# If first row isn't a match but the column count matches, treat it as data
|
||
if len(first) == len(EXPECTED_HEADERS):
|
||
data_rows = rows # treat all rows as data; we'll use EXPECTED order
|
||
else:
|
||
# Try common alternate delimiters to recover
|
||
for delim in (";", "\t"):
|
||
rows2 = list(csv.reader(io.StringIO(text), delimiter=delim))
|
||
if rows2 and len(rows2[0]) == len(EXPECTED_HEADERS):
|
||
rows = rows2
|
||
first = rows[0]
|
||
norm_first = [_clean_header_cell(c).lower() for c in first]
|
||
header_ok = (norm_first == expected_norm)
|
||
data_rows = rows[1:] if header_ok else rows
|
||
break
|
||
else:
|
||
# Could not reconcile columns
|
||
report["errors"].append(
|
||
f"Column mismatch: saw {len(first)} but expected {len(EXPECTED_HEADERS)}."
|
||
)
|
||
return report
|
||
|
||
# Normalize rows length (pad/trim) and build preview (first 10)
|
||
normalized_rows: List[List[str]] = []
|
||
for r in data_rows:
|
||
if not r or all((c or "").strip() == "" for c in r):
|
||
continue
|
||
if len(r) < len(EXPECTED_HEADERS):
|
||
r = r + [""] * (len(EXPECTED_HEADERS) - len(r))
|
||
elif len(r) > len(EXPECTED_HEADERS):
|
||
r = r[:len(EXPECTED_HEADERS)]
|
||
normalized_rows.append(r)
|
||
|
||
report["total"] = len(normalized_rows)
|
||
report["preview"] = normalized_rows[:10] # show first 10 rows exactly as seen
|
||
if dry_run or report["total"] == 0:
|
||
return report # preview only
|
||
|
||
# Create entries in batches (transactional)
|
||
to_create: List[Entry] = []
|
||
for r in normalized_rows:
|
||
try:
|
||
obj = Entry(
|
||
subject=(r[0] or "").strip(),
|
||
illustration=(r[1] or "").strip(),
|
||
application=(r[2] or "").strip(),
|
||
scripture_raw=(r[3] or "").strip(),
|
||
source=(r[4] or "").strip(),
|
||
talk_title=(r[5] or "").strip(),
|
||
talk_number=_parse_int(r[6]),
|
||
entry_code=(r[7] or "").strip(),
|
||
date_added=_parse_date(r[8]),
|
||
date_edited=_parse_date(r[9]),
|
||
)
|
||
to_create.append(obj)
|
||
except Exception as e:
|
||
report["skipped"] += 1
|
||
report["errors"].append(f"Row skipped due to error: {e}")
|
||
|
||
if len(to_create) >= batch_size:
|
||
with transaction.atomic():
|
||
Entry.objects.bulk_create(to_create, batch_size=batch_size)
|
||
report["created"] += len(to_create)
|
||
to_create.clear()
|
||
|
||
if to_create:
|
||
with transaction.atomic():
|
||
Entry.objects.bulk_create(to_create, batch_size=batch_size)
|
||
report["created"] += len(to_create)
|
||
to_create.clear()
|
||
|
||
return report
|
||
|
||
# small context manager used above
|
||
class _noop_context:
|
||
def __enter__(self): return self
|
||
def __exit__(self, exc_type, exc, tb): return False |