Illustrations/web/core/views.py

1303 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import date, timedelta
import csv
import re
from django.contrib import messages
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required, user_passes_test
from django.db.models import Q
from django.http import HttpResponse, JsonResponse
from django.shortcuts import render, redirect, get_object_or_404
from django.views.decorators.http import require_http_methods
from django.utils.text import Truncator
from django.urls import reverse
from django.views.decorators.cache import never_cache
from django.views.decorators.http import require_POST
from .models_user import SearchHistory, ViewedIllustration
from .forms import ImportForm, EntryForm
from .models import Entry
from .scripture_normalizer import normalize_scripture_field # NEW
from .source_normalizer import normalize_source_field # NEW
from .subject_normalizer import normalize_subject_field # NEW
from .utils import terms, has_wildcards, wildcard_to_regex, import_csv_bytes
from django.contrib.staticfiles.storage import staticfiles_storage
from django.db import transaction
from . import utils as core_utils
from .models_audit import AuditLog
from .scripture_normalizer import normalize_scripture_field
# Order + labels used in the Search UI
FIELD_ORDER = [
("subject", "Subject"),
("illustration", "Illustration"),
("application", "Application"),
("scripture_raw", "Scripture"),
("source", "Source"),
("talk_title", "Talk Title"),
("talk_number", "Talk Number"),
("entry_code", "Code"),
]
EXPECTED_HEADERS = [
"Subject",
"Illustration",
"Application",
"Scripture",
"Source",
"Talk Title",
"Talk Number",
"Code",
"Date",
"Date Edited",
]
def login_view(request):
# If Django session already exists, go straight in
if request.user.is_authenticated:
return redirect("search")
# Auto-start OIDC ONLY on a clean GET to /login/
# If ?next= is present, Django is already in a redirect flow — don't loop
if request.method == "GET" and "next" not in request.GET:
return redirect("oidc_authentication_init")
# Fallback (rare): render the page so the user can click manually
ctx = {}
if request.method == "POST":
u = request.POST.get("username")
p = request.POST.get("password")
user = authenticate(request, username=u, password=p)
if user:
login(request, user)
return redirect("search")
ctx["error"] = "Invalid credentials"
return render(request, "login.html", ctx)
def is_admin(user):
return user.is_superuser or user.is_staff
def entry_context(entry, result_ids):
"""
Build the navigation + chips context for the entry pages.
"""
count = len(result_ids or [])
if entry and result_ids and entry.id in result_ids:
position = result_ids.index(entry.id) + 1
else:
position = 1
subject_list = [t.strip() for t in (entry.subject or "").split(",") if t.strip()]
scripture_list = [
t.strip() for t in (entry.scripture_raw or "").split(";") if t.strip()
]
# NEW: compute talk PDF URL if present and the file exists
talk_pdf_url = None
try:
if entry.talk_number:
filename = f"talk_pdfs/S-34_E_{int(entry.talk_number):03d}.pdf"
if staticfiles_storage.exists(filename):
talk_pdf_url = staticfiles_storage.url(filename)
except Exception:
# fail gracefully; leave talk_pdf_url as None
pass
return {
"entry": entry,
"locked": True,
"position": position,
"count": count,
"subject_list": subject_list,
"scripture_list": scripture_list,
# NEW
"talk_pdf_url": talk_pdf_url,
}
@login_required
def search_page(request):
"""
Search-first landing. Defaults to Subject, Illustration, Application.
Supports:
- quoted phrases
- * and ? wildcards (regex); if regex returns zero, falls back to icontains
- AND across tokens, OR across the selected fields
Special power term:
- 'invalidscripture' -> entries whose Scripture would be INVALID per the JS validator
"""
default_fields = {
"subject": True,
"illustration": True,
"application": True,
"scripture_raw": False,
"source": False,
"talk_title": False,
"talk_number": False,
"entry_code": False,
}
form_submitted = ("q" in request.GET) or any(k in request.GET for k in default_fields)
if form_submitted:
selected = {k: (k in request.GET) for k in default_fields}
else:
selected = default_fields.copy()
field_options = [
{"name": k, "label": label, "checked": bool(selected.get(k))}
for k, label in FIELD_ORDER
]
q = (request.GET.get("q") or "").strip()
if q:
# ===== SPECIAL POWER TERM (JS-compatible scripture validation) =====
if q.lower() == "invalidscripture":
import re
# --- JS validator port (same logic as ScriptureValidator.isValidSingleRef) ---
FULL_TO_CODE = {
# OT
"genesis":"Ge","exodus":"Ex","leviticus":"Le","numbers":"Nu","deuteronomy":"De",
"joshua":"Jos","judges":"Jg","ruth":"Ru",
"1 samuel":"1Sa","2 samuel":"2Sa","1 kings":"1Ki","2 kings":"2Ki",
"1 chronicles":"1Ch","2 chronicles":"2Ch",
"ezra":"Ezr","nehemiah":"Ne","esther":"Es","job":"Job","psalms":"Ps","psalm":"Ps",
"proverbs":"Pr","ecclesiastes":"Ec","song of solomon":"Ca","song of songs":"Ca",
"isaiah":"Isa","jeremiah":"Jer","lamentations":"La","ezekiel":"Eze","daniel":"Da",
"hosea":"Ho","joel":"Joe","amos":"Am","obadiah":"Ob","jonah":"Jon","micah":"Mic",
"nahum":"Na","habakkuk":"Hab","zephaniah":"Zep","haggai":"Hag","zechariah":"Zec","malachi":"Mal",
# NT
"matthew":"Mt","mark":"Mr","luke":"Lu","john":"Joh","acts":"Ac","romans":"Ro",
"1 corinthians":"1Co","2 corinthians":"2Co",
"galatians":"Ga","ephesians":"Eph","philippians":"Php","colossians":"Col",
"1 thessalonians":"1Th","2 thessalonians":"2Th",
"1 timothy":"1Ti","2 timothy":"2Ti",
"titus":"Tit","philemon":"Phm","hebrews":"Heb","james":"Jas",
"1 peter":"1Pe","2 peter":"2Pe",
"1 john":"1Jo","2 john":"2Jo","3 john":"3Jo",
"jude":"Jude","revelation":"Re",
}
ALIAS_TO_CODE = {
# OT
"gen":"Ge","exod":"Ex","lev":"Le","num":"Nu","deut":"De",
"josh":"Jos","judg":"Jg","ps":"Ps","prov":"Pr","eccl":"Ec","song":"Ca","cant":"Ca",
"isa":"Isa","jer":"Jer","lam":"La","ezek":"Eze","dan":"Da","hos":"Ho","joel":"Joe",
"amos":"Am","obad":"Ob","jon":"Jon","mic":"Mic","nah":"Na","hab":"Hab","zeph":"Zep",
"hag":"Hag","zech":"Zec","mal":"Mal",
# NT
"matt":"Mt","mark":"Mr","luke":"Lu","john":"Joh","acts":"Ac","rom":"Ro",
"gal":"Ga","eph":"Eph","phil":"Php","col":"Col","heb":"Heb","jas":"Jas",
"jude":"Jude","rev":"Re",
}
CODE_TO_NUM = {
# OT
"Ge":1,"Ex":2,"Le":3,"Nu":4,"De":5,"Jos":6,"Jg":7,"Ru":8,"1Sa":9,"2Sa":10,
"1Ki":11,"2Ki":12,"1Ch":13,"2Ch":14,"Ezr":15,"Ne":16,"Es":17,"Job":18,
"Ps":19,"Pr":20,"Ec":21,"Ca":22,"Isa":23,"Jer":24,"La":25,"Eze":26,"Da":27,"Ho":28,
"Joe":29,"Am":30,"Ob":31,"Jon":32,"Mic":33,"Na":34,"Hab":35,"Zep":36,"Hag":37,"Zec":38,"Mal":39,
# NT
"Mt":40,"Mr":41,"Lu":42,"Joh":43,"Ac":44,"Ro":45,"1Co":46,"2Co":47,"Ga":48,"Eph":49,
"Php":50,"Col":51,"1Th":52,"2Th":53,"1Ti":54,"2Ti":55,"Tit":56,"Phm":57,
"Heb":58,"Jas":59,"1Pe":60,"2Pe":61,"1Jo":62,"2Jo":63,"3Jo":64,"Jude":65,"Re":66,
}
SERIES = [
{"prefixes":["sam","samu","samuel"], "codes":{1:"1Sa",2:"2Sa"}},
{"prefixes":["ki","king","kings","kgs"], "codes":{1:"1Ki",2:"2Ki"}},
{"prefixes":["chron","chr","ch","chronicles"], "codes":{1:"1Ch",2:"2Ch"}},
{"prefixes":["cor","corin","corinth","corinthians","co","c"], "codes":{1:"1Co",2:"2Co"}},
{"prefixes":["thes","thess","thessalon","thessalonians","th"], "codes":{1:"1Th",2:"2Th"}},
{"prefixes":["tim","ti","timothy","t"], "codes":{1:"1Ti",2:"2Ti"}},
{"prefixes":["pet","pe","peter","pt","p"], "codes":{1:"1Pe",2:"2Pe"}},
{"prefixes":["jo","jn","joh","john","jno","jhn"], "codes":{1:"1Jo",2:"2Jo",3:"3Jo"}},
]
WOL_ABBR = set(CODE_TO_NUM.keys())
versesRe = re.compile(r"""
^
(?:
(\d{1,3}) # chapter only
|
(\d{1,3})\s*:\s*
(
\d{1,3} # v1
(?:\s*-\s*(?:\d{1,3}|\d{1,3}:\d{1,3}))? # -v2 OR -ch:vs
(?:\s*,\s*\d{1,3}(?:\s*-\s*(?:\d{1,3}|\d{1,3}:\d{1,3}))?)* # ,vN[-…]
)
)
$
""", re.VERBOSE)
def _norm_spaces(s): return re.sub(r"\s+", " ", (s or "").strip())
def _strip_dots(s): return re.sub(r"\.+$", "", s or "")
def _lower(s): return (s or "").lower()
def _lookup_book_code(book_raw: str):
b = _norm_spaces(_strip_dots(book_raw))
# Full names
c = FULL_TO_CODE.get(_lower(b))
if c: return c
# Aliases
c = ALIAS_TO_CODE.get(_lower(b))
if c: return c
# WOL abbr (allow a space after the number, and arbitrary spaces)
tightened = re.sub(r"^([1-3])\s+([A-Za-z].*)$", r"\1\2", b)
if tightened in WOL_ABBR: return tightened
no_space = re.sub(r"\s+", "", b)
if no_space in WOL_ABBR: return no_space
# Numbered prose (e.g., "2 Sam", "1 Chron", "3 Jo")
m = re.match(r"^([1-3])\s*([A-Za-z]+)$", _lower(b))
if m:
n = int(m.group(1)); base = m.group(2)
for fam in SERIES:
if any(base.startswith(p) for p in fam["prefixes"]):
code = fam["codes"].get(n)
if code: return code
return None
def _split_book_and_rest(s: str):
m = re.match(r"^(.+?)\s+(\d{1,3}(?:\s*:\s*.*)?)$", s)
return {"book": m.group(1), "rest": m.group(2)} if m else None
def _is_valid_single_ref(ref: str) -> bool:
s = (ref or "").strip()
if not s: return False
parts = _split_book_and_rest(s)
if not parts: return False
if not _lookup_book_code(parts["book"]): return False
rest = (parts.get("rest") or "").trim() if hasattr(str, "trim") else (parts.get("rest") or "").strip()
if not rest: return False
return bool(versesRe.match(rest))
def _field_is_valid(text: str) -> bool:
pieces = [p.strip() for p in (text or "").split(";") if p.strip()]
if not pieces: # empty field considered neutral/invalid? UI treats empty as neither; we exclude empties anyway
return False
return all(_is_valid_single_ref(p) for p in pieces)
# --- end JS port ---
invalid_ids = []
qs = Entry.objects.exclude(scripture_raw="").only("id", "scripture_raw", "date_added")
for e in qs.iterator(chunk_size=1000):
if not _field_is_valid(e.scripture_raw or ""):
invalid_ids.append(e.id)
ids = list(
Entry.objects.filter(id__in=invalid_ids)
.order_by("-date_added", "-id")
.values_list("id", flat=True)
)
try:
print(f"[search] q='invalidscripture' count={len(ids)}")
except Exception:
pass
request.session["result_ids"] = ids
request.session["last_search"] = {"q": q, "fields": ["scripture_raw"]}
request.session.modified = True
if ids:
entry = Entry.objects.get(pk=ids[0])
ctx = entry_context(entry, ids)
ctx.update({"from_search": True})
if request.user.is_staff:
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id])
return render(request, "entry_view.html", ctx)
total = Entry.objects.count()
return render(
request,
"search.html",
{
"q": q,
"selected": selected,
"field_options": field_options,
"total": total,
"ran_search": True,
"result_count": 0,
},
)
# ===== END SPECIAL TERM =====
# --- existing search flow ---
tokens = terms(q)
fields = [f for f, sel in selected.items() if sel] or ["subject"]
qs = Entry.objects.all()
used_regex = False
for tok in tokens:
clause = Q()
if has_wildcards(tok):
used_regex = True
pattern = wildcard_to_regex(tok)
for f in fields:
clause |= Q(**{f + "__iregex": pattern})
else:
for f in fields:
clause |= Q(**{f + "__icontains": tok})
qs = qs.filter(clause)
ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True))
if used_regex and not ids:
qs = Entry.objects.all()
for tok in tokens:
clause = Q()
tok_stripped = tok.replace("*", "").replace("?", "")
for f in fields:
clause |= Q(**{f + "__icontains": tok_stripped})
qs = qs.filter(clause)
ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True))
try:
print(f"[search] q={q!r} tokens={tokens} fields={fields} count={len(ids)}")
except Exception:
pass
request.session["result_ids"] = ids
request.session["last_search"] = {"q": q, "fields": fields}
request.session.modified = True
if ids:
entry = Entry.objects.get(pk=ids[0])
ctx = entry_context(entry, ids)
ctx.update({"from_search": True})
if request.user.is_staff:
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id])
return render(request, "entry_view.html", ctx)
total = Entry.objects.count()
return render(
request,
"search.html",
{
"q": q,
"selected": selected,
"field_options": field_options,
"total": total,
"ran_search": True,
"result_count": 0,
},
)
total = Entry.objects.count()
return render(
request,
"search.html",
{
"q": q,
"selected": selected,
"field_options": field_options,
"total": total,
"ran_search": False,
},
)
@login_required
def nav_next(request):
ids = request.session.get("result_ids", [])
if not ids:
return redirect("search")
idx = int(request.GET.get("i", "0"))
idx = min(idx + 1, len(ids) - 1)
entry = get_object_or_404(Entry, pk=ids[idx])
# NEW: build ctx and inject tts_url for staff
ctx = entry_context(entry, ids)
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else ""
return render(request, "entry_view.html", ctx)
@login_required
def nav_prev(request):
ids = request.session.get("result_ids", [])
if not ids:
return redirect("search")
idx = int(request.GET.get("i", "0"))
idx = max(idx - 1, 0)
entry = get_object_or_404(Entry, pk=ids[idx])
# NEW: build ctx and inject tts_url for staff
ctx = entry_context(entry, ids)
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else ""
return render(request, "entry_view.html", ctx)
@never_cache
def entry_view(request, entry_id):
ids = request.session.get("result_ids", [])
entry = get_object_or_404(Entry, pk=entry_id)
ctx = entry_context(entry, ids)
# Pass last search to the template for the highlighter
last = request.session.get("last_search") or {}
ctx["last_search_q"] = last.get("q", "")
ctx["last_search_fields"] = last.get("fields", [])
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else None
return render(request, "entry_view.html", ctx)
@login_required
def entry_add(request):
"""
Create a brand new Entry using the same EntryForm you use for editing.
Since EntryForm is a regular Form (not a ModelForm), we copy fields manually.
"""
if request.method == "POST":
form = EntryForm(request.POST)
if form.is_valid():
entry = Entry()
for k, v in form.cleaned_data.items():
setattr(entry, k, v)
entry.save()
messages.success(request, "New entry added.")
return redirect("entry_view", entry_id=entry.id)
else:
form = EntryForm()
return render(request, "entry_add.html", {"form": form})
@login_required
def entry_edit(request, entry_id):
ids = request.session.get("result_ids", [])
entry = get_object_or_404(Entry, pk=entry_id)
if request.method == "POST":
form = EntryForm(request.POST)
if form.is_valid():
for k, v in form.cleaned_data.items():
setattr(entry, k, v)
entry.save()
messages.success(request, "Entry saved.")
return redirect("entry_view", entry_id=entry.id)
else:
form = EntryForm(
initial={
"subject": entry.subject,
"illustration": entry.illustration,
"application": entry.application,
"scripture_raw": entry.scripture_raw,
"source": entry.source,
"talk_number": entry.talk_number,
"talk_title": entry.talk_title,
"entry_code": entry.entry_code,
"date_added": entry.date_added,
"date_edited": entry.date_edited,
}
)
ctx = {"entry": entry, "form": form}
ctx.update(entry_context(entry, ids))
return render(request, "entry_edit.html", ctx)
@login_required
def entry_delete(request, entry_id):
entry = get_object_or_404(Entry, pk=entry_id)
if request.method == "POST":
entry.delete()
messages.success(request, "Entry deleted.")
return redirect("search")
return render(request, "entry_delete_confirm.html", {"entry": entry})
@login_required
@user_passes_test(is_admin)
def import_wizard(request):
EXPECTED = [
"Subject", "Illustration", "Application", "Scripture", "Source",
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
]
if request.method == "POST":
form = ImportForm(request.POST, request.FILES)
if form.is_valid():
try:
raw = form.cleaned_data["file"].read()
import io, csv as _csv
text = raw.decode("utf-8-sig", errors="replace")
# --- Try to sniff dialect ---
try:
first_line = text.splitlines()[0] if text else ""
dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel
except Exception:
dialect = _csv.excel
rdr = _csv.reader(io.StringIO(text), dialect)
rows = list(rdr)
if not rows:
raise ValueError("The CSV file appears to be empty.")
expected_norm = [h.lower() for h in EXPECTED]
def _clean_header(s):
s = "" if s is None else str(s)
s = s.strip()
# Strip r: or r= prefixes and wrapping quotes
if s.lower().startswith("r:") or s.lower().startswith("r="):
s = s[2:].lstrip()
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
s = s[1:-1]
return s.strip().lower()
first = rows[0]
norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == expected_norm)
# If no header but same column count, inject expected
if not header_ok and len(first) == len(EXPECTED):
rows.insert(0, EXPECTED)
elif not header_ok and len(first) != len(EXPECTED):
# Retry with alternate delimiters
for delim in (";", "\t"):
rows2 = list(_csv.reader(io.StringIO(text), delimiter=delim))
if rows2 and len(rows2[0]) == len(EXPECTED):
rows = rows2
first = rows[0]
norm_first = [_clean_header(c) for c in first]
header_ok = (norm_first == expected_norm)
if not header_ok:
rows.insert(0, EXPECTED)
break
# Re-encode sanitized CSV for importer
out = io.StringIO()
w = _csv.writer(out)
for r in rows:
w.writerow(r)
fixed_raw = out.getvalue().encode("utf-8")
# Keep utils in sync
from . import utils as core_utils
core_utils.EXPECTED_HEADERS = EXPECTED
# Run robust importer
report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {}
# Normalize preview
report.setdefault("columns", EXPECTED)
preview = report.get("preview") or []
if preview and isinstance(preview[0], dict):
cols = report["columns"]
report["preview"] = [[row.get(c, "") for c in cols] for row in preview]
report["header_ok"] = header_ok
if not header_ok:
messages.warning(
request,
"The first row didnt match the expected header; a clean header was injected automatically."
)
return render(
request,
"import_result.html",
{"report": report, "dry_run": form.cleaned_data["dry_run"]},
)
except Exception as e:
messages.error(request, f"Import failed: {e}")
# Invalid form or exception → show form again
return render(request, "import_wizard.html", {"form": form})
# GET → show empty form
form = ImportForm()
return render(request, "import_wizard.html", {"form": form})
@login_required
@user_passes_test(is_admin)
def export_csv(request):
ts = date.today().strftime("%Y-%m-%d")
response = HttpResponse(content_type="text/csv")
response["Content-Disposition"] = (
f'attachment; filename="illustrations_backup_{ts}.csv"'
)
w = csv.writer(response)
w.writerow(
[
"Subject",
"Illustration",
"Application",
"Scripture",
"Source",
"Talk Number",
"Talk Title",
"Code",
"Date",
"Date Edited",
]
)
for e in Entry.objects.all().order_by("id"):
w.writerow(
[
e.subject,
e.illustration,
e.application,
e.scripture_raw,
e.source,
e.talk_number if e.talk_number is not None else "",
e.talk_title,
e.entry_code,
e.date_added.isoformat() if e.date_added else "",
e.date_edited.isoformat() if e.date_edited else "",
]
)
return response
#@login_required
#def stats_page(request):
# total = Entry.objects.count()
# today = date.today()
# last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count()
# last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count()
#
# from collections import Counter
#
# months = []
# y = today.year
# m = today.month
# for i in range(12):
# mm = m - i
# yy = y
# while mm <= 0:
# mm += 12
# yy -= 1
# from datetime import date as _d
# start = _d(yy, mm, 1)
# end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1)
# label = f"{yy}-{mm:02d}"
# months.append((label, start, end))
# months = list(reversed(months))
#
# series = [
# (label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count())
# for label, start, end in months
# ]
# peak = max((v for _, v in series), default=1)
# heights = [
# (label, value, 8 + int((value / peak) * 100) if peak else 8)
# for label, value in series
# ]
#
# counts = Counter()
# for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True):
# for tag in [t.strip() for t in subj.split(",") if t.strip()]:
# counts[tag.lower()] += 1
# top_subjects = [{"name": n.title(), "count": c} for n, c in counts.most_common(20)]
#
# return render(
# request,
# "stats.html",
# {
# "total": total,
# "last30": last30,
# "last365": last365,
# "series": series,
# "heights": heights,
# "top_subjects": top_subjects,
# },
# )
# ========= Scripture Normalizer =========
@login_required
@user_passes_test(is_admin)
@require_http_methods(["GET", "POST"])
def normalize_scripture(request):
"""
GET -> dry-run preview (summary + first 100 examples)
POST -> apply changes to all entries' scripture_raw (batched)
Optional ?limit= for preview subset.
"""
apply = request.method == "POST"
limit = int(request.GET.get("limit", "0") or "0")
qs = Entry.objects.all().order_by("id")
if limit:
qs = qs[:limit]
changed = 0
warnings_total = 0
preview = []
if apply:
from django.db import transaction
batch, pending = 500, []
for e in qs.iterator():
original = (e.scripture_raw or "").strip()
normalized, warns = normalize_scripture_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
e.scripture_raw = normalized
pending.append(e)
if len(pending) >= batch:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["scripture_raw"])
pending.clear()
if pending:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["scripture_raw"])
else:
# dry-run only
for e in qs.iterator():
original = (e.scripture_raw or "").strip()
normalized, warns = normalize_scripture_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
preview = preview[:100]
messages.info(
request,
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
)
return render(
request,
"normalize_result.html",
{
"applied": apply,
"changed": changed,
"warnings_total": warnings_total,
"preview": preview,
"limit": limit,
},
)
# ========= Source Normalizer =========
@login_required
@user_passes_test(is_admin)
@require_http_methods(["GET", "POST"])
def normalize_source(request):
"""
GET -> dry-run preview (summary + first 100 examples)
POST -> apply changes to all entries' source (batched)
Optional ?limit= for preview subset.
"""
apply = request.method == "POST"
limit = int(request.GET.get("limit", "0") or "0")
qs = Entry.objects.all().order_by("id")
if limit:
qs = qs[:limit]
changed = 0
warnings_total = 0
preview = []
if apply:
from django.db import transaction
batch, pending = 500, []
for e in qs.iterator():
original = (e.source or "").strip()
normalized, warns = normalize_source_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
e.source = normalized
pending.append(e)
if len(pending) >= batch:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["source"])
pending.clear()
if pending:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["source"])
else:
# dry-run
for e in qs.iterator():
original = (e.source or "").strip()
normalized, warns = normalize_source_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
preview = preview[:100]
messages.info(
request,
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
)
return render(
request,
"normalize_source_result.html",
{
"applied": apply,
"changed": changed,
"warnings_total": warnings_total,
"preview": preview,
"limit": limit,
},
)
# ========= Subject Normalizer =========
@login_required
@user_passes_test(is_admin)
@require_http_methods(["GET", "POST"])
def normalize_subjects(request):
"""
GET -> dry-run preview (summary + first 100 examples)
POST -> apply changes to all entries' subject (batched)
Optional ?limit= for preview subset.
"""
apply = request.method == "POST"
limit = int(request.GET.get("limit", "0") or "0")
qs = Entry.objects.all().order_by("id")
if limit:
qs = qs[:limit]
changed = 0
warnings_total = 0
preview = []
if apply:
from django.db import transaction
batch, pending = 500, []
for e in qs.iterator():
original = (e.subject or "").strip()
normalized, warns = normalize_subject_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
e.subject = normalized
pending.append(e)
if len(pending) >= batch:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["subject"])
pending.clear()
if pending:
with transaction.atomic():
for obj in pending:
obj.save(update_fields=["subject"])
else:
# dry-run only
for e in qs.iterator():
original = (e.subject or "").strip()
normalized, warns = normalize_subject_field(original)
warnings_total += len(warns)
if normalized != original:
changed += 1
preview.append((e.id, original, normalized))
preview = preview[:100]
messages.info(
request,
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
)
return render(
request,
"normalize_subjects_result.html",
{
"applied": apply,
"changed": changed,
"warnings_total": warnings_total,
"preview": preview,
"limit": limit,
},
)
# ========= API: Recently Viewed (for 20-word snippet + correct link) =========
@login_required
def api_get_recent_views(request):
"""
Return the current user's recently viewed entries (up to 50),
including a precomputed 20-word snippet from illustration (or a sensible fallback).
"""
from .models import RecentView
recents = (
RecentView.objects
.filter(user=request.user)
.select_related("entry")
.order_by("-viewed_at")[:50]
)
def make_snippet(e):
base = (e.illustration or "").strip() or (e.application or "").strip() or (e.subject or "").strip()
if not base:
return ""
return Truncator(" ".join(base.split())).words(20, truncate="")
items = []
for rv in recents:
e = rv.entry
items.append({
"entry_id": rv.entry_id,
"viewed_at": rv.viewed_at.isoformat(),
"illustration": e.illustration or "",
"snippet": make_snippet(e),
})
return JsonResponse({"ok": True, "items": items})
@login_required
def settings_home(request):
return render(request, "settings/home.html")
@login_required
def stats_page(request):
from collections import Counter, OrderedDict
from calendar import month_abbr
total = Entry.objects.count()
today = date.today()
last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count()
last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count()
# ---- Sparkline (last 12 months) ----
months = []
y = today.year
m = today.month
for i in range(12):
mm = m - i
yy = y
while mm <= 0:
mm += 12
yy -= 1
from datetime import date as _d
start = _d(yy, mm, 1)
end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1)
label = f"{month_abbr[mm]} {str(yy)[2:]}"
months.append((label, start, end))
months = list(reversed(months))
series = [
(label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count())
for label, start, end in months
]
peak = max((v for _, v in series), default=1)
heights = [
(label, value, 8 + int((value / peak) * 100) if peak else 8)
for label, value in series
]
# ---- Top subjects (existing) ----
subject_counts = Counter()
for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True):
for tag in [t.strip() for t in subj.split(",") if t.strip()]:
subject_counts[tag.lower()] += 1
top_subjects = [{"name": n.title(), "count": c} for n, c in subject_counts.most_common(10)]
# ===============================
# Scripture analytics (from scripture_raw)
# ===============================
# A light normalizer so common abbreviations map to canonical book names.
BOOK_MAP = {
# OT (examples; extend as needed)
"gen":"Genesis","ge":"Genesis","gn":"Genesis",
"ex":"Exodus","exo":"Exodus",
"lev":"Leviticus","le":"Leviticus",
"num":"Numbers","nu":"Numbers",
"de":"Deuteronomy","deut":"Deuteronomy",
"jos":"Joshua","josh":"Joshua",
"jdg":"Judges","judg":"Judges",
"ru":"Ruth","rut":"Ruth",
"ps":"Psalms","psalm":"Psalms","psalms":"Psalms",
"pr":"Proverbs","pro":"Proverbs",
"ec":"Ecclesiastes","ecc":"Ecclesiastes",
"isa":"Isaiah","is":"Isaiah",
"jer":"Jeremiah","je":"Jeremiah",
"eze":"Ezekiel","ez":"Ezekiel",
"da":"Daniel","dan":"Daniel",
"ho":"Hosea","hos":"Hosea",
# NT (examples; extend as needed)
"mt":"Matthew","matt":"Matthew",
"mr":"Mark","mk":"Mark",
"lu":"Luke","lk":"Luke",
"joh":"John","john":"John","jn":"John",
"ac":"Acts","acts":"Acts",
"rom":"Romans","ro":"Romans",
"1cor":"1 Corinthians","1 co":"1 Corinthians","1 cor":"1 Corinthians",
"2cor":"2 Corinthians","2 co":"2 Corinthians","2 cor":"2 Corinthians",
}
BOOK_RE = re.compile(r"""
^\s*
(?P<book>(?:[1-3]\s*)?[A-Za-z\.]+) # optional 1/2/3 prefix + word
[\s\.]+
(?P<ref>\d+[:\.]\d+.*)? # 3:16 or 3.16 etc (optional tail)
""", re.X)
def normalize_book(raw):
b = raw.strip().lower().replace('.', '')
b = re.sub(r'\s+', '', b) # "1 john" -> "1john"
return BOOK_MAP.get(b, raw.strip().title())
def split_refs(text):
if not text:
return []
# Entries are typically separated by semicolons; allow commas too.
parts = re.split(r'[;]+', text)
return [p.strip() for p in parts if p.strip()]
def parse_piece(piece):
m = BOOK_RE.match(piece)
if not m:
return None, None
book = normalize_book(m.group('book'))
ref = (m.group('ref') or '').strip()
return book, (f"{book} {ref}" if ref else book)
book_counts = Counter()
ref_counts = Counter()
refs_per_entry = []
entries_with_script = (Entry.objects
.exclude(scripture_raw__isnull=True)
.exclude(scripture_raw__exact=""))
for e in entries_with_script.iterator():
pieces = split_refs(e.scripture_raw)
entry_ref_count = 0
for piece in pieces:
book, full = parse_piece(piece)
if not book:
continue
book_counts[book] += 1
if full and full != book:
ref_counts[full] += 1
entry_ref_count += 1
if entry_ref_count:
refs_per_entry.append(entry_ref_count)
avg_refs_per_entry = round(sum(refs_per_entry) / len(refs_per_entry), 2) if refs_per_entry else 0
top_books = list(book_counts.most_common(10))
top_refs = list(ref_counts.most_common(10))
return render(
request,
"stats.html",
{
"total": total,
"last30": last30,
"last365": last365,
"series": series,
"heights": heights,
"top_subjects": top_subjects,
# NEW:
"avg_refs_per_entry": avg_refs_per_entry,
"top_books": top_books, # iterable of (book, count)
"top_refs": top_refs, # iterable of (ref, count)
},
)
def is_superuser(user):
return user.is_superuser
@login_required
@user_passes_test(is_superuser)
def delete_all_entries(request):
"""
Confirmation screen + POST to delete ALL Entry records.
Mirrors the style of the single-entry delete page.
"""
if request.method == "POST":
# extra safeguard: only delete if the form had the confirm field
if request.POST.get("confirm") == "yes":
with transaction.atomic():
from .models import Entry
deleted, _ = Entry.objects.all().delete()
messages.success(request, f"Deleted all illustrations ({deleted} rows).")
return redirect("settings_home")
messages.info(request, "Deletion cancelled.")
return redirect("settings_home")
return render(request, "settings/delete_all_confirm.html", {})
# ----- Release announcements (superuser tools + dismiss) -----
from django.contrib.auth.decorators import login_required, user_passes_test
from django.views.decorators.http import require_POST
from django.http import JsonResponse, HttpResponseBadRequest
from .forms import AnnouncementForm
from .models_ann import Announcement, AnnouncementDismissal
is_superuser = user_passes_test(lambda u: u.is_superuser)
@is_superuser
@login_required
def announcement_tools(request):
"""
Superuser-only: publish an announcement. Users will see it once on next search page load.
"""
if request.method == "POST":
form = AnnouncementForm(request.POST)
if form.is_valid():
ann = form.save(commit=False)
ann.created_by = request.user
ann.save()
messages.success(request, "Announcement published.")
return redirect("settings_home")
else:
form = AnnouncementForm()
recent = Announcement.objects.order_by("-created_at")[:25]
return render(request, "settings/home.html", {
"announcement_form": form,
"announcements_recent": recent,
# Keep the rest of your settings page content intact; template guards will show this block
})
@login_required
@require_POST
def dismiss_announcement(request, pk):
ann = get_object_or_404(Announcement, pk=pk)
AnnouncementDismissal.objects.get_or_create(user=request.user, announcement=ann)
return JsonResponse({"ok": True})
# ----- Login attempts (superuser only) -----
from django.contrib.auth.decorators import user_passes_test, login_required
from django.utils import timezone
from datetime import timedelta
from .models_login import LoginAttempt
is_superuser = user_passes_test(lambda u: u.is_superuser)
@is_superuser
@login_required
def login_attempts(request):
"""
Show last 7 days of login attempts (success + failure) with username, IP, and timestamp.
"""
cutoff = timezone.now() - timedelta(days=7)
attempts = LoginAttempt.objects.filter(timestamp__gte=cutoff).order_by("-timestamp")
return render(request, "tools/login_attempts.html", {"attempts": attempts})
@login_required
@require_POST
def clear_history(request):
"""
Clear the current user's history shown on the Search page:
- SearchHistory (last 10 searches)
- ViewedIllustration (recently viewed entries)
Also clears any session keys that might be used as UI caches.
"""
# Delete DB-backed recents for this user
SearchHistory.objects.filter(user=request.user).delete()
ViewedIllustration.objects.filter(user=request.user).delete()
# (Harmless) clear possible session caches if present
for k in ["recent_searches", "recent_entries", "recent_viewed", "recently_viewed"]:
request.session.pop(k, None)
request.session.modified = True
return JsonResponse({"ok": True})
is_superuser = user_passes_test(lambda u: u.is_superuser)
@is_superuser
@login_required
def audit_log(request):
rows = AuditLog.objects.all().order_by("-timestamp")[:100]
return render(request, "tools/audit_log.html", {"rows": rows})
# === Theme selection (robust) ================================================
def _is_valid_theme(name: str) -> bool:
# Validate that /static/themes/<name>.css exists (works with collectstatic)
try:
return staticfiles_storage.exists(f"themes/{name}.css")
except Exception:
return False
@login_required
@require_POST
def set_theme(request):
theme = (request.POST.get("theme") or "").strip()
if not theme:
messages.error(request, "Pick a theme first.")
return redirect("settings_home")
if not _is_valid_theme(theme):
messages.error(
request,
f"Theme “{theme}” not found. Make sure /static/themes/{theme}.css exists (and run collectstatic in production)."
)
return redirect("settings_home")
request.session["theme"] = theme
messages.success(request, f"Theme set to {theme.title()}.")
return redirect("settings_home")
# web/core/views.py
import json
import os
from django.conf import settings
from django.contrib.auth.decorators import login_required, user_passes_test
from django.http import JsonResponse, HttpResponseBadRequest
from django.views.decorators.http import require_POST
@require_POST
@login_required
@user_passes_test(lambda u: u.is_superuser)
def api_update_pub_codes(request):
"""
Accepts a 'json' field (string) that should parse to {"pub_codes": [..]}.
Normalizes, de-duplicates, and writes to web/static/data/wol-pub-codes.v1.json.
"""
payload = request.POST.get("json") or (request.body.decode("utf-8") if request.body else "")
if not payload:
return HttpResponseBadRequest("Missing 'json'.")
try:
data = json.loads(payload)
except Exception as e:
return HttpResponseBadRequest(f"Invalid JSON: {e}")
if not isinstance(data, dict) or "pub_codes" not in data or not isinstance(data["pub_codes"], list):
return HttpResponseBadRequest('JSON must be an object with a "pub_codes" array.')
# Normalize to unique, lowercase, trimmed strings
seen = set()
codes = []
for c in data["pub_codes"]:
s = str(c or "").strip().lower()
if s and s not in seen:
seen.add(s)
codes.append(s)
# Write back to static data file
target_path = os.path.join(settings.BASE_DIR, "web", "static", "data", "wol-pub-codes.v1.json")
try:
with open(target_path, "w", encoding="utf-8") as f:
json.dump({"pub_codes": codes}, f, ensure_ascii=False, indent=2)
except Exception as e:
return HttpResponseBadRequest(f"Could not write file: {e}")
return JsonResponse({"ok": True, "count": len(codes)})