Update web/core/utils.py

breaking down imports into chunks
This commit is contained in:
Joshua Laymon 2025-08-13 16:34:40 +00:00
parent 19ac15ec61
commit 885a91701b

View File

@ -4,12 +4,10 @@ from __future__ import annotations
import csv
import io
import re
import unicodedata
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional
from django.db import transaction
from django.db.models import Model
from django.db import transaction, IntegrityError, DataError, DatabaseError
from .models import Entry
@ -18,8 +16,6 @@ from .models import Entry
# Search helpers (used by views)
# ============================
_WORD_RE = re.compile(r"[^\s]+")
def terms(q: str) -> List[str]:
"""Split search query into terms; keep quoted phrases together."""
if not q:
@ -39,17 +35,18 @@ def terms(q: str) -> List[str]:
out.append("".join(buf))
return out
def has_wildcards(s: str) -> bool:
return bool(s) and ("*" in s or "?" in s)
def wildcard_to_regex(s: str) -> str:
"""
Convert user wildcards to a Postgres-friendly regex:
* -> .* ? -> . escape regex meta first
* -> .* ? -> . (escape regex meta first)
"""
if s is None:
return ""
# Escape regex meta, then translate wildcards
s = re.escape(s)
s = s.replace(r"\*", ".*").replace(r"\?", ".")
return f"^{s}$"
@ -62,7 +59,7 @@ def wildcard_to_regex(s: str) -> str:
# Canonical header names we expect (case-insensitive on input):
CANON_HEADERS = [
"subject", "illustration", "application", "scripture",
"source", "talk title", "talk number", "code", "date", "date edited"
"source", "talk title", "talk number", "code", "date", "date edited",
]
EXPECTED_COLS = len(CANON_HEADERS)
@ -111,7 +108,7 @@ def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
Split a CSV line manually, respecting quotes. Works even if the line
contains inconsistent quoting (e.g., inner quotes not doubled).
Ensures we return exactly `expected` fields by merging overflow cells
into the current text field (typically Illustration/Application/Scripture).
into the current text field (before the trailing short/meta columns).
"""
out, field = [], []
in_quotes = False
@ -119,7 +116,7 @@ def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
while i < n:
ch = line[i]
if ch == '"':
# If we see a doubled quote, treat as a literal quote and skip one
# doubled quote inside a quoted field -> literal quote
if in_quotes and i + 1 < n and line[i + 1] == '"':
field.append('"')
i += 2
@ -136,12 +133,10 @@ def _split_lenient(line: str, delimiter: str, expected: int) -> List[str]:
i += 1
out.append("".join(field))
# If we ended with quotes unbalanced, we still got something. Now repair count.
# Repair count to exactly `expected`
if len(out) < expected:
out += [""] * (expected - len(out))
elif len(out) > expected:
# Merge overflow columns into the last texty field before we hit short fields.
# Strategy: merge extras into the last non-empty field before Date columns.
head = out[:expected - 1]
tail = out[expected - 1:]
head[-1] = head[-1] + delimiter + delimiter.join(tail)
@ -155,21 +150,19 @@ def _build_header_map(headers: List[str]) -> Dict[str, str]:
Map incoming headers (any case) to our canonical keys.
"""
key = {h.lower().strip(): h for h in headers}
mapping = {}
mapping: Dict[str, Optional[str]] = {}
for canon in CANON_HEADERS:
# exact match first (case-insensitive)
if canon in key:
mapping[canon] = key[canon]
else:
# fallback: try common variants
aliases = {
"talk title": ["talk_title", "title"],
"talk number": ["talk_no", "talk#", "talk number", "talknum"],
"talk number": ["talk_no", "talk#", "talknum"],
"date edited": ["edited", "date_edited", "edited date"],
}.get(canon, [])
found = next((a for a in aliases if a in key), None)
mapping[canon] = key.get(found, None)
return mapping
mapping[canon] = key.get(found) if found else None
return mapping # type: ignore[return-value]
def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> str:
@ -177,17 +170,17 @@ def _getv(row: Dict[str, str], header_map: Dict[str, str], canon_key: str) -> st
return (row.get(src) if src else "") or ""
def _parse_date(val: str) -> Optional[datetime.date]:
def _parse_date(val: str):
val = (val or "").strip()
if not val:
return None
# Try common formats: m/d/Y, Y-m-d
# Common formats: m/d/Y, Y-m-d (also tolerate single-digit m/d on Linux)
for fmt in ("%m/%d/%Y", "%-m/%-d/%Y", "%Y-%m-%d"):
try:
return datetime.strptime(val, fmt).date()
except Exception:
pass
# Try letting dateutil if available (optional), else skip
# Fallback to dateutil if present
try:
from dateutil import parser # type: ignore
return parser.parse(val).date()
@ -209,12 +202,11 @@ def _clip(field_name: str, value: str) -> str:
return value
def _coerce_int(val: str) -> Optional[int]:
def _coerce_int(val: str):
val = (val or "").strip()
if not val:
return None
# allow like "#35" or "35)"
m = re.search(r"(-?\d+)", val)
m = re.search(r"(-?\d+)", val.replace(",", ""))
if not m:
return None
try:
@ -223,60 +215,55 @@ def _coerce_int(val: str) -> Optional[int]:
return None
@transaction.atomic
def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]:
def import_csv_bytes(b: bytes, dry_run: bool = False, commit_every: int = 500) -> Dict[str, object]:
"""
Robust CSV import. Idempotent-ish upsert by (subject, illustration).
Robust CSV import. Commits each row in its own transaction so that one bad
row does not poison the entire import (avoids TransactionManagementError cascades).
Returns a report dict with counts and first-line error messages.
"""
text = _decode_bytes(b)
dialect = _sniff_dialect(text)
delimiter = getattr(dialect, "delimiter", ",")
# --- headers ---
f = io.StringIO(text)
reader = csv.reader(f, dialect=dialect)
# Read header row
try:
raw_headers = next(reader)
except StopIteration:
return {"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [], "scripture_parsed": 0, "scripture_failed": 0, "dialect_delimiter": dialect.delimiter, "used_headerless_mode": False, "seen_headers": []}
# If header count is wrong, repair via lenient split
if len(raw_headers) != EXPECTED_COLS:
fixed = _split_lenient(",".join(raw_headers), delimiter=dialect.delimiter, expected=EXPECTED_COLS)
headers = fixed
else:
headers = raw_headers
return {
"rows": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": [],
"scripture_parsed": 0, "scripture_failed": 0,
"dialect_delimiter": delimiter, "used_headerless_mode": False,
"seen_headers": []
}
headers = raw_headers if len(raw_headers) == EXPECTED_COLS else _split_lenient(
",".join(raw_headers), delimiter=delimiter, expected=EXPECTED_COLS
)
header_map = _build_header_map(headers)
total = 0
inserted = 0
updated = 0
skipped = 0
errors: List[str] = []
scripture_ok = 0
scripture_bad = 0
# Pair raw lines so we can repair rows mis-split by csv
raw_lines = text.splitlines()[1:] # skip header
# Re-open to iterate rows with the *raw* lines paired to parsed ones
f2 = io.StringIO(text)
lines = f2.read().splitlines()
# first line is header
raw_data_lines = lines[1:]
# Iterate again with DictReader for convenience
f3 = io.StringIO(text)
dict_reader = csv.DictReader(f3, fieldnames=headers, dialect=dialect)
dict_reader = csv.DictReader(io.StringIO(text), fieldnames=headers, dialect=dialect)
next(dict_reader, None) # skip header
for idx, (raw_line, row) in enumerate(zip(raw_data_lines, dict_reader), start=2):
total = inserted = updated = skipped = 0
errors: List[str] = []
scripture_ok = scripture_bad = 0
# Import loop (row-by-row atomic)
for idx, (raw_line, row) in enumerate(zip(raw_lines, dict_reader), start=2):
total += 1
# Some rows are mis-split by csv due to bad quotes -> repair
# Repair if DictReader got the wrong shape (inconsistent quotes in source)
if len(row) != EXPECTED_COLS or None in row:
cells = _split_lenient(raw_line, delimiter=dialect.delimiter, expected=EXPECTED_COLS)
cells = _split_lenient(raw_line, delimiter=delimiter, expected=EXPECTED_COLS)
row = dict(zip(headers, cells))
# Extract using canonical keys
# Extract canonical fields
subject = _getv(row, header_map, "subject").strip()
illustration = _getv(row, header_map, "illustration").strip()
application = _getv(row, header_map, "application").strip()
@ -288,12 +275,12 @@ def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]:
date_added = _parse_date(_getv(row, header_map, "date"))
date_edited = _parse_date(_getv(row, header_map, "date edited"))
# Basic sanity: if all major text fields empty, skip
# Skip rows with no meaningful text
if not (subject or illustration or application):
skipped += 1
continue
# Clip to DB lengths to avoid DataError (robustness)
# Clip to DB lengths
subject = _clip("subject", subject)
illustration = _clip("illustration", illustration)
application = _clip("application", application)
@ -302,10 +289,8 @@ def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]:
talk_title = _clip("talk_title", talk_title)
entry_code = _clip("entry_code", entry_code)
if scripture:
scripture_ok += 1
else:
scripture_bad += 1
scripture_ok += 1 if scripture else 0
scripture_bad += 0 if scripture else 1
# Upsert key: prefer entry_code; else (subject + illustration)
lookup: Dict[str, object] = {}
@ -315,40 +300,45 @@ def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]:
lookup["subject"] = subject
lookup["illustration"] = illustration
if dry_run:
exists = Entry.objects.filter(**lookup).exists()
inserted += 0 if exists else 1
updated += 1 if exists else 0
continue
try:
obj = Entry.objects.filter(**lookup).first()
if not obj:
obj = Entry(**lookup)
created = True
else:
# Isolate each row so a failure rolls back only that row
with transaction.atomic():
obj = Entry.objects.filter(**lookup).first()
created = False
if not obj:
obj = Entry(**lookup)
created = True
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number
obj.entry_code = entry_code or obj.entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
obj.subject = subject
obj.illustration = illustration
obj.application = application
obj.scripture_raw = scripture
obj.source = source
obj.talk_title = talk_title
obj.talk_number = talk_number
if entry_code:
obj.entry_code = entry_code
if date_added:
obj.date_added = date_added
if date_edited:
obj.date_edited = date_edited
if not dry_run:
obj.save()
if created:
inserted += 1
else:
updated += 1
inserted += 1 if created else 0
updated += 0 if created else 1
except Exception as e:
# Keep importing other rows; capture the first part of the error
except (IntegrityError, DataError, DatabaseError, ValueError) as e:
msg = str(e).splitlines()[0]
errors.append(f"line {idx}: {type(e).__name__}: {msg}")
skipped += 1
# continue to next row
return {
"rows": total,
@ -358,7 +348,7 @@ def import_csv_bytes(b: bytes, dry_run: bool = False) -> Dict[str, object]:
"errors": errors,
"scripture_parsed": scripture_ok,
"scripture_failed": scripture_bad,
"dialect_delimiter": getattr(_sniff_dialect(text), "delimiter", ","),
"dialect_delimiter": delimiter,
"used_headerless_mode": False,
"seen_headers": headers,
}