1302 lines
46 KiB
Python
1302 lines
46 KiB
Python
from datetime import date, timedelta
|
||
import csv
|
||
import re
|
||
|
||
from django.contrib import messages
|
||
from django.contrib.auth import authenticate, login
|
||
from django.contrib.auth.decorators import login_required, user_passes_test
|
||
from django.db.models import Q
|
||
from django.http import HttpResponse, JsonResponse
|
||
from django.shortcuts import render, redirect, get_object_or_404
|
||
from django.views.decorators.http import require_http_methods
|
||
from django.utils.text import Truncator
|
||
from django.urls import reverse
|
||
from django.views.decorators.cache import never_cache
|
||
from django.views.decorators.http import require_POST
|
||
from .models_user import SearchHistory, ViewedIllustration
|
||
|
||
from .forms import ImportForm, EntryForm
|
||
from .models import Entry
|
||
from .scripture_normalizer import normalize_scripture_field # NEW
|
||
from .source_normalizer import normalize_source_field # NEW
|
||
from .subject_normalizer import normalize_subject_field # NEW
|
||
from .utils import terms, has_wildcards, wildcard_to_regex, import_csv_bytes
|
||
from django.contrib.staticfiles.storage import staticfiles_storage
|
||
from django.db import transaction
|
||
from . import utils as core_utils
|
||
from .models_audit import AuditLog
|
||
from .scripture_normalizer import normalize_scripture_field
|
||
|
||
|
||
# Order + labels used in the Search UI
|
||
FIELD_ORDER = [
|
||
("subject", "Subject"),
|
||
("illustration", "Illustration"),
|
||
("application", "Application"),
|
||
("scripture_raw", "Scripture"),
|
||
("source", "Source"),
|
||
("talk_title", "Talk Title"),
|
||
("talk_number", "Talk Number"),
|
||
("entry_code", "Code"),
|
||
]
|
||
|
||
EXPECTED_HEADERS = [
|
||
"Subject",
|
||
"Illustration",
|
||
"Application",
|
||
"Scripture",
|
||
"Source",
|
||
"Talk Title",
|
||
"Talk Number",
|
||
"Code",
|
||
"Date",
|
||
"Date Edited",
|
||
]
|
||
|
||
def login_view(request):
|
||
# Already logged into Django
|
||
if request.user.is_authenticated:
|
||
return redirect("search")
|
||
|
||
# Auto-initiate OIDC only for direct /login access
|
||
if request.method == "GET" and request.path == "/login/":
|
||
return redirect("oidc_authentication_init")
|
||
|
||
ctx = {}
|
||
|
||
# Optional local login fallback
|
||
if request.method == "POST":
|
||
u = request.POST.get("username")
|
||
p = request.POST.get("password")
|
||
user = authenticate(request, username=u, password=p)
|
||
if user:
|
||
login(request, user)
|
||
return redirect("search")
|
||
ctx["error"] = "Invalid credentials"
|
||
|
||
return render(request, "login.html", ctx)
|
||
|
||
|
||
def is_admin(user):
|
||
return user.is_superuser or user.is_staff
|
||
|
||
def entry_context(entry, result_ids):
|
||
"""
|
||
Build the navigation + chips context for the entry pages.
|
||
"""
|
||
count = len(result_ids or [])
|
||
if entry and result_ids and entry.id in result_ids:
|
||
position = result_ids.index(entry.id) + 1
|
||
else:
|
||
position = 1
|
||
|
||
subject_list = [t.strip() for t in (entry.subject or "").split(",") if t.strip()]
|
||
scripture_list = [
|
||
t.strip() for t in (entry.scripture_raw or "").split(";") if t.strip()
|
||
]
|
||
|
||
# NEW: compute talk PDF URL if present and the file exists
|
||
talk_pdf_url = None
|
||
try:
|
||
if entry.talk_number:
|
||
filename = f"talk_pdfs/S-34_E_{int(entry.talk_number):03d}.pdf"
|
||
if staticfiles_storage.exists(filename):
|
||
talk_pdf_url = staticfiles_storage.url(filename)
|
||
except Exception:
|
||
# fail gracefully; leave talk_pdf_url as None
|
||
pass
|
||
|
||
return {
|
||
"entry": entry,
|
||
"locked": True,
|
||
"position": position,
|
||
"count": count,
|
||
"subject_list": subject_list,
|
||
"scripture_list": scripture_list,
|
||
# NEW
|
||
"talk_pdf_url": talk_pdf_url,
|
||
}
|
||
|
||
|
||
@login_required
|
||
def search_page(request):
|
||
"""
|
||
Search-first landing. Defaults to Subject, Illustration, Application.
|
||
Supports:
|
||
- quoted phrases
|
||
- * and ? wildcards (regex); if regex returns zero, falls back to icontains
|
||
- AND across tokens, OR across the selected fields
|
||
|
||
Special power term:
|
||
- 'invalidscripture' -> entries whose Scripture would be INVALID per the JS validator
|
||
"""
|
||
default_fields = {
|
||
"subject": True,
|
||
"illustration": True,
|
||
"application": True,
|
||
"scripture_raw": False,
|
||
"source": False,
|
||
"talk_title": False,
|
||
"talk_number": False,
|
||
"entry_code": False,
|
||
}
|
||
|
||
form_submitted = ("q" in request.GET) or any(k in request.GET for k in default_fields)
|
||
if form_submitted:
|
||
selected = {k: (k in request.GET) for k in default_fields}
|
||
else:
|
||
selected = default_fields.copy()
|
||
|
||
field_options = [
|
||
{"name": k, "label": label, "checked": bool(selected.get(k))}
|
||
for k, label in FIELD_ORDER
|
||
]
|
||
|
||
q = (request.GET.get("q") or "").strip()
|
||
if q:
|
||
# ===== SPECIAL POWER TERM (JS-compatible scripture validation) =====
|
||
if q.lower() == "invalidscripture":
|
||
import re
|
||
|
||
# --- JS validator port (same logic as ScriptureValidator.isValidSingleRef) ---
|
||
FULL_TO_CODE = {
|
||
# OT
|
||
"genesis":"Ge","exodus":"Ex","leviticus":"Le","numbers":"Nu","deuteronomy":"De",
|
||
"joshua":"Jos","judges":"Jg","ruth":"Ru",
|
||
"1 samuel":"1Sa","2 samuel":"2Sa","1 kings":"1Ki","2 kings":"2Ki",
|
||
"1 chronicles":"1Ch","2 chronicles":"2Ch",
|
||
"ezra":"Ezr","nehemiah":"Ne","esther":"Es","job":"Job","psalms":"Ps","psalm":"Ps",
|
||
"proverbs":"Pr","ecclesiastes":"Ec","song of solomon":"Ca","song of songs":"Ca",
|
||
"isaiah":"Isa","jeremiah":"Jer","lamentations":"La","ezekiel":"Eze","daniel":"Da",
|
||
"hosea":"Ho","joel":"Joe","amos":"Am","obadiah":"Ob","jonah":"Jon","micah":"Mic",
|
||
"nahum":"Na","habakkuk":"Hab","zephaniah":"Zep","haggai":"Hag","zechariah":"Zec","malachi":"Mal",
|
||
# NT
|
||
"matthew":"Mt","mark":"Mr","luke":"Lu","john":"Joh","acts":"Ac","romans":"Ro",
|
||
"1 corinthians":"1Co","2 corinthians":"2Co",
|
||
"galatians":"Ga","ephesians":"Eph","philippians":"Php","colossians":"Col",
|
||
"1 thessalonians":"1Th","2 thessalonians":"2Th",
|
||
"1 timothy":"1Ti","2 timothy":"2Ti",
|
||
"titus":"Tit","philemon":"Phm","hebrews":"Heb","james":"Jas",
|
||
"1 peter":"1Pe","2 peter":"2Pe",
|
||
"1 john":"1Jo","2 john":"2Jo","3 john":"3Jo",
|
||
"jude":"Jude","revelation":"Re",
|
||
}
|
||
ALIAS_TO_CODE = {
|
||
# OT
|
||
"gen":"Ge","exod":"Ex","lev":"Le","num":"Nu","deut":"De",
|
||
"josh":"Jos","judg":"Jg","ps":"Ps","prov":"Pr","eccl":"Ec","song":"Ca","cant":"Ca",
|
||
"isa":"Isa","jer":"Jer","lam":"La","ezek":"Eze","dan":"Da","hos":"Ho","joel":"Joe",
|
||
"amos":"Am","obad":"Ob","jon":"Jon","mic":"Mic","nah":"Na","hab":"Hab","zeph":"Zep",
|
||
"hag":"Hag","zech":"Zec","mal":"Mal",
|
||
# NT
|
||
"matt":"Mt","mark":"Mr","luke":"Lu","john":"Joh","acts":"Ac","rom":"Ro",
|
||
"gal":"Ga","eph":"Eph","phil":"Php","col":"Col","heb":"Heb","jas":"Jas",
|
||
"jude":"Jude","rev":"Re",
|
||
}
|
||
CODE_TO_NUM = {
|
||
# OT
|
||
"Ge":1,"Ex":2,"Le":3,"Nu":4,"De":5,"Jos":6,"Jg":7,"Ru":8,"1Sa":9,"2Sa":10,
|
||
"1Ki":11,"2Ki":12,"1Ch":13,"2Ch":14,"Ezr":15,"Ne":16,"Es":17,"Job":18,
|
||
"Ps":19,"Pr":20,"Ec":21,"Ca":22,"Isa":23,"Jer":24,"La":25,"Eze":26,"Da":27,"Ho":28,
|
||
"Joe":29,"Am":30,"Ob":31,"Jon":32,"Mic":33,"Na":34,"Hab":35,"Zep":36,"Hag":37,"Zec":38,"Mal":39,
|
||
# NT
|
||
"Mt":40,"Mr":41,"Lu":42,"Joh":43,"Ac":44,"Ro":45,"1Co":46,"2Co":47,"Ga":48,"Eph":49,
|
||
"Php":50,"Col":51,"1Th":52,"2Th":53,"1Ti":54,"2Ti":55,"Tit":56,"Phm":57,
|
||
"Heb":58,"Jas":59,"1Pe":60,"2Pe":61,"1Jo":62,"2Jo":63,"3Jo":64,"Jude":65,"Re":66,
|
||
}
|
||
SERIES = [
|
||
{"prefixes":["sam","samu","samuel"], "codes":{1:"1Sa",2:"2Sa"}},
|
||
{"prefixes":["ki","king","kings","kgs"], "codes":{1:"1Ki",2:"2Ki"}},
|
||
{"prefixes":["chron","chr","ch","chronicles"], "codes":{1:"1Ch",2:"2Ch"}},
|
||
{"prefixes":["cor","corin","corinth","corinthians","co","c"], "codes":{1:"1Co",2:"2Co"}},
|
||
{"prefixes":["thes","thess","thessalon","thessalonians","th"], "codes":{1:"1Th",2:"2Th"}},
|
||
{"prefixes":["tim","ti","timothy","t"], "codes":{1:"1Ti",2:"2Ti"}},
|
||
{"prefixes":["pet","pe","peter","pt","p"], "codes":{1:"1Pe",2:"2Pe"}},
|
||
{"prefixes":["jo","jn","joh","john","jno","jhn"], "codes":{1:"1Jo",2:"2Jo",3:"3Jo"}},
|
||
]
|
||
WOL_ABBR = set(CODE_TO_NUM.keys())
|
||
versesRe = re.compile(r"""
|
||
^
|
||
(?:
|
||
(\d{1,3}) # chapter only
|
||
|
|
||
(\d{1,3})\s*:\s*
|
||
(
|
||
\d{1,3} # v1
|
||
(?:\s*-\s*(?:\d{1,3}|\d{1,3}:\d{1,3}))? # -v2 OR -ch:vs
|
||
(?:\s*,\s*\d{1,3}(?:\s*-\s*(?:\d{1,3}|\d{1,3}:\d{1,3}))?)* # ,vN[-…]
|
||
)
|
||
)
|
||
$
|
||
""", re.VERBOSE)
|
||
|
||
def _norm_spaces(s): return re.sub(r"\s+", " ", (s or "").strip())
|
||
def _strip_dots(s): return re.sub(r"\.+$", "", s or "")
|
||
def _lower(s): return (s or "").lower()
|
||
|
||
def _lookup_book_code(book_raw: str):
|
||
b = _norm_spaces(_strip_dots(book_raw))
|
||
# Full names
|
||
c = FULL_TO_CODE.get(_lower(b))
|
||
if c: return c
|
||
# Aliases
|
||
c = ALIAS_TO_CODE.get(_lower(b))
|
||
if c: return c
|
||
# WOL abbr (allow a space after the number, and arbitrary spaces)
|
||
tightened = re.sub(r"^([1-3])\s+([A-Za-z].*)$", r"\1\2", b)
|
||
if tightened in WOL_ABBR: return tightened
|
||
no_space = re.sub(r"\s+", "", b)
|
||
if no_space in WOL_ABBR: return no_space
|
||
# Numbered prose (e.g., "2 Sam", "1 Chron", "3 Jo")
|
||
m = re.match(r"^([1-3])\s*([A-Za-z]+)$", _lower(b))
|
||
if m:
|
||
n = int(m.group(1)); base = m.group(2)
|
||
for fam in SERIES:
|
||
if any(base.startswith(p) for p in fam["prefixes"]):
|
||
code = fam["codes"].get(n)
|
||
if code: return code
|
||
return None
|
||
|
||
def _split_book_and_rest(s: str):
|
||
m = re.match(r"^(.+?)\s+(\d{1,3}(?:\s*:\s*.*)?)$", s)
|
||
return {"book": m.group(1), "rest": m.group(2)} if m else None
|
||
|
||
def _is_valid_single_ref(ref: str) -> bool:
|
||
s = (ref or "").strip()
|
||
if not s: return False
|
||
parts = _split_book_and_rest(s)
|
||
if not parts: return False
|
||
if not _lookup_book_code(parts["book"]): return False
|
||
rest = (parts.get("rest") or "").trim() if hasattr(str, "trim") else (parts.get("rest") or "").strip()
|
||
if not rest: return False
|
||
return bool(versesRe.match(rest))
|
||
|
||
def _field_is_valid(text: str) -> bool:
|
||
pieces = [p.strip() for p in (text or "").split(";") if p.strip()]
|
||
if not pieces: # empty field considered neutral/invalid? UI treats empty as neither; we exclude empties anyway
|
||
return False
|
||
return all(_is_valid_single_ref(p) for p in pieces)
|
||
# --- end JS port ---
|
||
|
||
invalid_ids = []
|
||
qs = Entry.objects.exclude(scripture_raw="").only("id", "scripture_raw", "date_added")
|
||
for e in qs.iterator(chunk_size=1000):
|
||
if not _field_is_valid(e.scripture_raw or ""):
|
||
invalid_ids.append(e.id)
|
||
|
||
ids = list(
|
||
Entry.objects.filter(id__in=invalid_ids)
|
||
.order_by("-date_added", "-id")
|
||
.values_list("id", flat=True)
|
||
)
|
||
|
||
try:
|
||
print(f"[search] q='invalidscripture' count={len(ids)}")
|
||
except Exception:
|
||
pass
|
||
|
||
request.session["result_ids"] = ids
|
||
request.session["last_search"] = {"q": q, "fields": ["scripture_raw"]}
|
||
request.session.modified = True
|
||
|
||
if ids:
|
||
entry = Entry.objects.get(pk=ids[0])
|
||
ctx = entry_context(entry, ids)
|
||
ctx.update({"from_search": True})
|
||
if request.user.is_staff:
|
||
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id])
|
||
return render(request, "entry_view.html", ctx)
|
||
|
||
total = Entry.objects.count()
|
||
return render(
|
||
request,
|
||
"search.html",
|
||
{
|
||
"q": q,
|
||
"selected": selected,
|
||
"field_options": field_options,
|
||
"total": total,
|
||
"ran_search": True,
|
||
"result_count": 0,
|
||
},
|
||
)
|
||
# ===== END SPECIAL TERM =====
|
||
|
||
# --- existing search flow ---
|
||
tokens = terms(q)
|
||
fields = [f for f, sel in selected.items() if sel] or ["subject"]
|
||
|
||
qs = Entry.objects.all()
|
||
used_regex = False
|
||
for tok in tokens:
|
||
clause = Q()
|
||
if has_wildcards(tok):
|
||
used_regex = True
|
||
pattern = wildcard_to_regex(tok)
|
||
for f in fields:
|
||
clause |= Q(**{f + "__iregex": pattern})
|
||
else:
|
||
for f in fields:
|
||
clause |= Q(**{f + "__icontains": tok})
|
||
qs = qs.filter(clause)
|
||
|
||
ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True))
|
||
|
||
if used_regex and not ids:
|
||
qs = Entry.objects.all()
|
||
for tok in tokens:
|
||
clause = Q()
|
||
tok_stripped = tok.replace("*", "").replace("?", "")
|
||
for f in fields:
|
||
clause |= Q(**{f + "__icontains": tok_stripped})
|
||
qs = qs.filter(clause)
|
||
ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True))
|
||
|
||
try:
|
||
print(f"[search] q={q!r} tokens={tokens} fields={fields} count={len(ids)}")
|
||
except Exception:
|
||
pass
|
||
|
||
request.session["result_ids"] = ids
|
||
request.session["last_search"] = {"q": q, "fields": fields}
|
||
request.session.modified = True
|
||
|
||
if ids:
|
||
entry = Entry.objects.get(pk=ids[0])
|
||
ctx = entry_context(entry, ids)
|
||
ctx.update({"from_search": True})
|
||
if request.user.is_staff:
|
||
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id])
|
||
return render(request, "entry_view.html", ctx)
|
||
|
||
total = Entry.objects.count()
|
||
return render(
|
||
request,
|
||
"search.html",
|
||
{
|
||
"q": q,
|
||
"selected": selected,
|
||
"field_options": field_options,
|
||
"total": total,
|
||
"ran_search": True,
|
||
"result_count": 0,
|
||
},
|
||
)
|
||
|
||
total = Entry.objects.count()
|
||
return render(
|
||
request,
|
||
"search.html",
|
||
{
|
||
"q": q,
|
||
"selected": selected,
|
||
"field_options": field_options,
|
||
"total": total,
|
||
"ran_search": False,
|
||
},
|
||
)
|
||
|
||
|
||
@login_required
|
||
def nav_next(request):
|
||
ids = request.session.get("result_ids", [])
|
||
if not ids:
|
||
return redirect("search")
|
||
idx = int(request.GET.get("i", "0"))
|
||
idx = min(idx + 1, len(ids) - 1)
|
||
entry = get_object_or_404(Entry, pk=ids[idx])
|
||
|
||
# NEW: build ctx and inject tts_url for staff
|
||
ctx = entry_context(entry, ids)
|
||
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else ""
|
||
|
||
return render(request, "entry_view.html", ctx)
|
||
|
||
|
||
@login_required
|
||
def nav_prev(request):
|
||
ids = request.session.get("result_ids", [])
|
||
if not ids:
|
||
return redirect("search")
|
||
idx = int(request.GET.get("i", "0"))
|
||
idx = max(idx - 1, 0)
|
||
entry = get_object_or_404(Entry, pk=ids[idx])
|
||
|
||
# NEW: build ctx and inject tts_url for staff
|
||
ctx = entry_context(entry, ids)
|
||
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else ""
|
||
|
||
return render(request, "entry_view.html", ctx)
|
||
|
||
|
||
@never_cache
|
||
def entry_view(request, entry_id):
|
||
ids = request.session.get("result_ids", [])
|
||
entry = get_object_or_404(Entry, pk=entry_id)
|
||
|
||
ctx = entry_context(entry, ids)
|
||
|
||
# Pass last search to the template for the highlighter
|
||
last = request.session.get("last_search") or {}
|
||
ctx["last_search_q"] = last.get("q", "")
|
||
ctx["last_search_fields"] = last.get("fields", [])
|
||
|
||
ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else None
|
||
return render(request, "entry_view.html", ctx)
|
||
|
||
|
||
|
||
@login_required
|
||
def entry_add(request):
|
||
"""
|
||
Create a brand new Entry using the same EntryForm you use for editing.
|
||
Since EntryForm is a regular Form (not a ModelForm), we copy fields manually.
|
||
"""
|
||
if request.method == "POST":
|
||
form = EntryForm(request.POST)
|
||
if form.is_valid():
|
||
entry = Entry()
|
||
for k, v in form.cleaned_data.items():
|
||
setattr(entry, k, v)
|
||
entry.save()
|
||
messages.success(request, "New entry added.")
|
||
return redirect("entry_view", entry_id=entry.id)
|
||
else:
|
||
form = EntryForm()
|
||
|
||
return render(request, "entry_add.html", {"form": form})
|
||
|
||
|
||
@login_required
|
||
def entry_edit(request, entry_id):
|
||
ids = request.session.get("result_ids", [])
|
||
entry = get_object_or_404(Entry, pk=entry_id)
|
||
|
||
if request.method == "POST":
|
||
form = EntryForm(request.POST)
|
||
if form.is_valid():
|
||
for k, v in form.cleaned_data.items():
|
||
setattr(entry, k, v)
|
||
entry.save()
|
||
messages.success(request, "Entry saved.")
|
||
return redirect("entry_view", entry_id=entry.id)
|
||
else:
|
||
form = EntryForm(
|
||
initial={
|
||
"subject": entry.subject,
|
||
"illustration": entry.illustration,
|
||
"application": entry.application,
|
||
"scripture_raw": entry.scripture_raw,
|
||
"source": entry.source,
|
||
"talk_number": entry.talk_number,
|
||
"talk_title": entry.talk_title,
|
||
"entry_code": entry.entry_code,
|
||
"date_added": entry.date_added,
|
||
"date_edited": entry.date_edited,
|
||
}
|
||
)
|
||
|
||
ctx = {"entry": entry, "form": form}
|
||
ctx.update(entry_context(entry, ids))
|
||
return render(request, "entry_edit.html", ctx)
|
||
|
||
|
||
@login_required
|
||
def entry_delete(request, entry_id):
|
||
entry = get_object_or_404(Entry, pk=entry_id)
|
||
if request.method == "POST":
|
||
entry.delete()
|
||
messages.success(request, "Entry deleted.")
|
||
return redirect("search")
|
||
return render(request, "entry_delete_confirm.html", {"entry": entry})
|
||
|
||
|
||
@login_required
|
||
@user_passes_test(is_admin)
|
||
def import_wizard(request):
|
||
EXPECTED = [
|
||
"Subject", "Illustration", "Application", "Scripture", "Source",
|
||
"Talk Title", "Talk Number", "Code", "Date", "Date Edited",
|
||
]
|
||
|
||
if request.method == "POST":
|
||
form = ImportForm(request.POST, request.FILES)
|
||
if form.is_valid():
|
||
try:
|
||
raw = form.cleaned_data["file"].read()
|
||
|
||
import io, csv as _csv
|
||
text = raw.decode("utf-8-sig", errors="replace")
|
||
|
||
# --- Try to sniff dialect ---
|
||
try:
|
||
first_line = text.splitlines()[0] if text else ""
|
||
dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel
|
||
except Exception:
|
||
dialect = _csv.excel
|
||
|
||
rdr = _csv.reader(io.StringIO(text), dialect)
|
||
rows = list(rdr)
|
||
if not rows:
|
||
raise ValueError("The CSV file appears to be empty.")
|
||
|
||
expected_norm = [h.lower() for h in EXPECTED]
|
||
|
||
def _clean_header(s):
|
||
s = "" if s is None else str(s)
|
||
s = s.strip()
|
||
# Strip r: or r= prefixes and wrapping quotes
|
||
if s.lower().startswith("r:") or s.lower().startswith("r="):
|
||
s = s[2:].lstrip()
|
||
if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'):
|
||
s = s[1:-1]
|
||
return s.strip().lower()
|
||
|
||
first = rows[0]
|
||
norm_first = [_clean_header(c) for c in first]
|
||
header_ok = (norm_first == expected_norm)
|
||
|
||
# If no header but same column count, inject expected
|
||
if not header_ok and len(first) == len(EXPECTED):
|
||
rows.insert(0, EXPECTED)
|
||
elif not header_ok and len(first) != len(EXPECTED):
|
||
# Retry with alternate delimiters
|
||
for delim in (";", "\t"):
|
||
rows2 = list(_csv.reader(io.StringIO(text), delimiter=delim))
|
||
if rows2 and len(rows2[0]) == len(EXPECTED):
|
||
rows = rows2
|
||
first = rows[0]
|
||
norm_first = [_clean_header(c) for c in first]
|
||
header_ok = (norm_first == expected_norm)
|
||
if not header_ok:
|
||
rows.insert(0, EXPECTED)
|
||
break
|
||
|
||
# Re-encode sanitized CSV for importer
|
||
out = io.StringIO()
|
||
w = _csv.writer(out)
|
||
for r in rows:
|
||
w.writerow(r)
|
||
fixed_raw = out.getvalue().encode("utf-8")
|
||
|
||
# Keep utils in sync
|
||
from . import utils as core_utils
|
||
core_utils.EXPECTED_HEADERS = EXPECTED
|
||
|
||
# Run robust importer
|
||
report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {}
|
||
|
||
# Normalize preview
|
||
report.setdefault("columns", EXPECTED)
|
||
preview = report.get("preview") or []
|
||
if preview and isinstance(preview[0], dict):
|
||
cols = report["columns"]
|
||
report["preview"] = [[row.get(c, "") for c in cols] for row in preview]
|
||
|
||
report["header_ok"] = header_ok
|
||
|
||
if not header_ok:
|
||
messages.warning(
|
||
request,
|
||
"The first row didn’t match the expected header; a clean header was injected automatically."
|
||
)
|
||
|
||
return render(
|
||
request,
|
||
"import_result.html",
|
||
{"report": report, "dry_run": form.cleaned_data["dry_run"]},
|
||
)
|
||
|
||
except Exception as e:
|
||
messages.error(request, f"Import failed: {e}")
|
||
|
||
# Invalid form or exception → show form again
|
||
return render(request, "import_wizard.html", {"form": form})
|
||
|
||
# GET → show empty form
|
||
form = ImportForm()
|
||
return render(request, "import_wizard.html", {"form": form})
|
||
|
||
@login_required
|
||
@user_passes_test(is_admin)
|
||
def export_csv(request):
|
||
ts = date.today().strftime("%Y-%m-%d")
|
||
response = HttpResponse(content_type="text/csv")
|
||
response["Content-Disposition"] = (
|
||
f'attachment; filename="illustrations_backup_{ts}.csv"'
|
||
)
|
||
w = csv.writer(response)
|
||
w.writerow(
|
||
[
|
||
"Subject",
|
||
"Illustration",
|
||
"Application",
|
||
"Scripture",
|
||
"Source",
|
||
"Talk Number",
|
||
"Talk Title",
|
||
"Code",
|
||
"Date",
|
||
"Date Edited",
|
||
]
|
||
)
|
||
for e in Entry.objects.all().order_by("id"):
|
||
w.writerow(
|
||
[
|
||
e.subject,
|
||
e.illustration,
|
||
e.application,
|
||
e.scripture_raw,
|
||
e.source,
|
||
e.talk_number if e.talk_number is not None else "",
|
||
e.talk_title,
|
||
e.entry_code,
|
||
e.date_added.isoformat() if e.date_added else "",
|
||
e.date_edited.isoformat() if e.date_edited else "",
|
||
]
|
||
)
|
||
return response
|
||
|
||
|
||
#@login_required
|
||
#def stats_page(request):
|
||
# total = Entry.objects.count()
|
||
# today = date.today()
|
||
# last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count()
|
||
# last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count()
|
||
#
|
||
# from collections import Counter
|
||
#
|
||
# months = []
|
||
# y = today.year
|
||
# m = today.month
|
||
# for i in range(12):
|
||
# mm = m - i
|
||
# yy = y
|
||
# while mm <= 0:
|
||
# mm += 12
|
||
# yy -= 1
|
||
# from datetime import date as _d
|
||
# start = _d(yy, mm, 1)
|
||
# end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1)
|
||
# label = f"{yy}-{mm:02d}"
|
||
# months.append((label, start, end))
|
||
# months = list(reversed(months))
|
||
#
|
||
# series = [
|
||
# (label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count())
|
||
# for label, start, end in months
|
||
# ]
|
||
# peak = max((v for _, v in series), default=1)
|
||
# heights = [
|
||
# (label, value, 8 + int((value / peak) * 100) if peak else 8)
|
||
# for label, value in series
|
||
# ]
|
||
#
|
||
# counts = Counter()
|
||
# for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True):
|
||
# for tag in [t.strip() for t in subj.split(",") if t.strip()]:
|
||
# counts[tag.lower()] += 1
|
||
# top_subjects = [{"name": n.title(), "count": c} for n, c in counts.most_common(20)]
|
||
#
|
||
# return render(
|
||
# request,
|
||
# "stats.html",
|
||
# {
|
||
# "total": total,
|
||
# "last30": last30,
|
||
# "last365": last365,
|
||
# "series": series,
|
||
# "heights": heights,
|
||
# "top_subjects": top_subjects,
|
||
# },
|
||
# )
|
||
|
||
|
||
# ========= Scripture Normalizer =========
|
||
|
||
@login_required
|
||
@user_passes_test(is_admin)
|
||
@require_http_methods(["GET", "POST"])
|
||
def normalize_scripture(request):
|
||
"""
|
||
GET -> dry-run preview (summary + first 100 examples)
|
||
POST -> apply changes to all entries' scripture_raw (batched)
|
||
Optional ?limit= for preview subset.
|
||
"""
|
||
apply = request.method == "POST"
|
||
limit = int(request.GET.get("limit", "0") or "0")
|
||
|
||
qs = Entry.objects.all().order_by("id")
|
||
if limit:
|
||
qs = qs[:limit]
|
||
|
||
changed = 0
|
||
warnings_total = 0
|
||
preview = []
|
||
|
||
if apply:
|
||
from django.db import transaction
|
||
batch, pending = 500, []
|
||
for e in qs.iterator():
|
||
original = (e.scripture_raw or "").strip()
|
||
normalized, warns = normalize_scripture_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
e.scripture_raw = normalized
|
||
pending.append(e)
|
||
if len(pending) >= batch:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["scripture_raw"])
|
||
pending.clear()
|
||
if pending:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["scripture_raw"])
|
||
else:
|
||
# dry-run only
|
||
for e in qs.iterator():
|
||
original = (e.scripture_raw or "").strip()
|
||
normalized, warns = normalize_scripture_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
|
||
preview = preview[:100]
|
||
|
||
messages.info(
|
||
request,
|
||
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
|
||
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
|
||
)
|
||
return render(
|
||
request,
|
||
"normalize_result.html",
|
||
{
|
||
"applied": apply,
|
||
"changed": changed,
|
||
"warnings_total": warnings_total,
|
||
"preview": preview,
|
||
"limit": limit,
|
||
},
|
||
)
|
||
|
||
|
||
# ========= Source Normalizer =========
|
||
|
||
@login_required
|
||
@user_passes_test(is_admin)
|
||
@require_http_methods(["GET", "POST"])
|
||
def normalize_source(request):
|
||
"""
|
||
GET -> dry-run preview (summary + first 100 examples)
|
||
POST -> apply changes to all entries' source (batched)
|
||
Optional ?limit= for preview subset.
|
||
"""
|
||
apply = request.method == "POST"
|
||
limit = int(request.GET.get("limit", "0") or "0")
|
||
|
||
qs = Entry.objects.all().order_by("id")
|
||
if limit:
|
||
qs = qs[:limit]
|
||
|
||
changed = 0
|
||
warnings_total = 0
|
||
preview = []
|
||
|
||
if apply:
|
||
from django.db import transaction
|
||
batch, pending = 500, []
|
||
for e in qs.iterator():
|
||
original = (e.source or "").strip()
|
||
normalized, warns = normalize_source_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
e.source = normalized
|
||
pending.append(e)
|
||
if len(pending) >= batch:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["source"])
|
||
pending.clear()
|
||
if pending:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["source"])
|
||
else:
|
||
# dry-run
|
||
for e in qs.iterator():
|
||
original = (e.source or "").strip()
|
||
normalized, warns = normalize_source_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
|
||
preview = preview[:100]
|
||
|
||
messages.info(
|
||
request,
|
||
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
|
||
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
|
||
)
|
||
return render(
|
||
request,
|
||
"normalize_source_result.html",
|
||
{
|
||
"applied": apply,
|
||
"changed": changed,
|
||
"warnings_total": warnings_total,
|
||
"preview": preview,
|
||
"limit": limit,
|
||
},
|
||
)
|
||
|
||
|
||
# ========= Subject Normalizer =========
|
||
|
||
@login_required
|
||
@user_passes_test(is_admin)
|
||
@require_http_methods(["GET", "POST"])
|
||
def normalize_subjects(request):
|
||
"""
|
||
GET -> dry-run preview (summary + first 100 examples)
|
||
POST -> apply changes to all entries' subject (batched)
|
||
Optional ?limit= for preview subset.
|
||
"""
|
||
apply = request.method == "POST"
|
||
limit = int(request.GET.get("limit", "0") or "0")
|
||
|
||
qs = Entry.objects.all().order_by("id")
|
||
if limit:
|
||
qs = qs[:limit]
|
||
|
||
changed = 0
|
||
warnings_total = 0
|
||
preview = []
|
||
|
||
if apply:
|
||
from django.db import transaction
|
||
batch, pending = 500, []
|
||
for e in qs.iterator():
|
||
original = (e.subject or "").strip()
|
||
normalized, warns = normalize_subject_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
e.subject = normalized
|
||
pending.append(e)
|
||
if len(pending) >= batch:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["subject"])
|
||
pending.clear()
|
||
if pending:
|
||
with transaction.atomic():
|
||
for obj in pending:
|
||
obj.save(update_fields=["subject"])
|
||
else:
|
||
# dry-run only
|
||
for e in qs.iterator():
|
||
original = (e.subject or "").strip()
|
||
normalized, warns = normalize_subject_field(original)
|
||
warnings_total += len(warns)
|
||
if normalized != original:
|
||
changed += 1
|
||
preview.append((e.id, original, normalized))
|
||
|
||
preview = preview[:100]
|
||
|
||
messages.info(
|
||
request,
|
||
f"{'Applied' if apply else 'Dry-run'}: {changed} entries "
|
||
f"{'changed' if apply else 'would change'}; {warnings_total} warnings."
|
||
)
|
||
return render(
|
||
request,
|
||
"normalize_subjects_result.html",
|
||
{
|
||
"applied": apply,
|
||
"changed": changed,
|
||
"warnings_total": warnings_total,
|
||
"preview": preview,
|
||
"limit": limit,
|
||
},
|
||
)
|
||
|
||
|
||
# ========= API: Recently Viewed (for 20-word snippet + correct link) =========
|
||
|
||
@login_required
|
||
def api_get_recent_views(request):
|
||
"""
|
||
Return the current user's recently viewed entries (up to 50),
|
||
including a precomputed 20-word snippet from illustration (or a sensible fallback).
|
||
"""
|
||
from .models import RecentView
|
||
|
||
recents = (
|
||
RecentView.objects
|
||
.filter(user=request.user)
|
||
.select_related("entry")
|
||
.order_by("-viewed_at")[:50]
|
||
)
|
||
|
||
def make_snippet(e):
|
||
base = (e.illustration or "").strip() or (e.application or "").strip() or (e.subject or "").strip()
|
||
if not base:
|
||
return ""
|
||
return Truncator(" ".join(base.split())).words(20, truncate="…")
|
||
|
||
items = []
|
||
for rv in recents:
|
||
e = rv.entry
|
||
items.append({
|
||
"entry_id": rv.entry_id,
|
||
"viewed_at": rv.viewed_at.isoformat(),
|
||
"illustration": e.illustration or "",
|
||
"snippet": make_snippet(e),
|
||
})
|
||
|
||
return JsonResponse({"ok": True, "items": items})
|
||
|
||
@login_required
|
||
def settings_home(request):
|
||
return render(request, "settings/home.html")
|
||
|
||
@login_required
|
||
def stats_page(request):
|
||
from collections import Counter, OrderedDict
|
||
from calendar import month_abbr
|
||
|
||
total = Entry.objects.count()
|
||
today = date.today()
|
||
last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count()
|
||
last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count()
|
||
|
||
# ---- Sparkline (last 12 months) ----
|
||
months = []
|
||
y = today.year
|
||
m = today.month
|
||
for i in range(12):
|
||
mm = m - i
|
||
yy = y
|
||
while mm <= 0:
|
||
mm += 12
|
||
yy -= 1
|
||
from datetime import date as _d
|
||
start = _d(yy, mm, 1)
|
||
end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1)
|
||
label = f"{month_abbr[mm]} {str(yy)[2:]}"
|
||
months.append((label, start, end))
|
||
months = list(reversed(months))
|
||
|
||
series = [
|
||
(label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count())
|
||
for label, start, end in months
|
||
]
|
||
peak = max((v for _, v in series), default=1)
|
||
heights = [
|
||
(label, value, 8 + int((value / peak) * 100) if peak else 8)
|
||
for label, value in series
|
||
]
|
||
|
||
# ---- Top subjects (existing) ----
|
||
subject_counts = Counter()
|
||
for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True):
|
||
for tag in [t.strip() for t in subj.split(",") if t.strip()]:
|
||
subject_counts[tag.lower()] += 1
|
||
top_subjects = [{"name": n.title(), "count": c} for n, c in subject_counts.most_common(10)]
|
||
|
||
# ===============================
|
||
# Scripture analytics (from scripture_raw)
|
||
# ===============================
|
||
# A light normalizer so common abbreviations map to canonical book names.
|
||
BOOK_MAP = {
|
||
# OT (examples; extend as needed)
|
||
"gen":"Genesis","ge":"Genesis","gn":"Genesis",
|
||
"ex":"Exodus","exo":"Exodus",
|
||
"lev":"Leviticus","le":"Leviticus",
|
||
"num":"Numbers","nu":"Numbers",
|
||
"de":"Deuteronomy","deut":"Deuteronomy",
|
||
"jos":"Joshua","josh":"Joshua",
|
||
"jdg":"Judges","judg":"Judges",
|
||
"ru":"Ruth","rut":"Ruth",
|
||
"ps":"Psalms","psalm":"Psalms","psalms":"Psalms",
|
||
"pr":"Proverbs","pro":"Proverbs",
|
||
"ec":"Ecclesiastes","ecc":"Ecclesiastes",
|
||
"isa":"Isaiah","is":"Isaiah",
|
||
"jer":"Jeremiah","je":"Jeremiah",
|
||
"eze":"Ezekiel","ez":"Ezekiel",
|
||
"da":"Daniel","dan":"Daniel",
|
||
"ho":"Hosea","hos":"Hosea",
|
||
# NT (examples; extend as needed)
|
||
"mt":"Matthew","matt":"Matthew",
|
||
"mr":"Mark","mk":"Mark",
|
||
"lu":"Luke","lk":"Luke",
|
||
"joh":"John","john":"John","jn":"John",
|
||
"ac":"Acts","acts":"Acts",
|
||
"rom":"Romans","ro":"Romans",
|
||
"1cor":"1 Corinthians","1 co":"1 Corinthians","1 cor":"1 Corinthians",
|
||
"2cor":"2 Corinthians","2 co":"2 Corinthians","2 cor":"2 Corinthians",
|
||
}
|
||
|
||
BOOK_RE = re.compile(r"""
|
||
^\s*
|
||
(?P<book>(?:[1-3]\s*)?[A-Za-z\.]+) # optional 1/2/3 prefix + word
|
||
[\s\.]+
|
||
(?P<ref>\d+[:\.]\d+.*)? # 3:16 or 3.16 etc (optional tail)
|
||
""", re.X)
|
||
|
||
def normalize_book(raw):
|
||
b = raw.strip().lower().replace('.', '')
|
||
b = re.sub(r'\s+', '', b) # "1 john" -> "1john"
|
||
return BOOK_MAP.get(b, raw.strip().title())
|
||
|
||
def split_refs(text):
|
||
if not text:
|
||
return []
|
||
# Entries are typically separated by semicolons; allow commas too.
|
||
parts = re.split(r'[;]+', text)
|
||
return [p.strip() for p in parts if p.strip()]
|
||
|
||
def parse_piece(piece):
|
||
m = BOOK_RE.match(piece)
|
||
if not m:
|
||
return None, None
|
||
book = normalize_book(m.group('book'))
|
||
ref = (m.group('ref') or '').strip()
|
||
return book, (f"{book} {ref}" if ref else book)
|
||
|
||
book_counts = Counter()
|
||
ref_counts = Counter()
|
||
refs_per_entry = []
|
||
|
||
entries_with_script = (Entry.objects
|
||
.exclude(scripture_raw__isnull=True)
|
||
.exclude(scripture_raw__exact=""))
|
||
for e in entries_with_script.iterator():
|
||
pieces = split_refs(e.scripture_raw)
|
||
entry_ref_count = 0
|
||
for piece in pieces:
|
||
book, full = parse_piece(piece)
|
||
if not book:
|
||
continue
|
||
book_counts[book] += 1
|
||
if full and full != book:
|
||
ref_counts[full] += 1
|
||
entry_ref_count += 1
|
||
if entry_ref_count:
|
||
refs_per_entry.append(entry_ref_count)
|
||
|
||
avg_refs_per_entry = round(sum(refs_per_entry) / len(refs_per_entry), 2) if refs_per_entry else 0
|
||
top_books = list(book_counts.most_common(10))
|
||
top_refs = list(ref_counts.most_common(10))
|
||
|
||
return render(
|
||
request,
|
||
"stats.html",
|
||
{
|
||
"total": total,
|
||
"last30": last30,
|
||
"last365": last365,
|
||
"series": series,
|
||
"heights": heights,
|
||
"top_subjects": top_subjects,
|
||
# NEW:
|
||
"avg_refs_per_entry": avg_refs_per_entry,
|
||
"top_books": top_books, # iterable of (book, count)
|
||
"top_refs": top_refs, # iterable of (ref, count)
|
||
},
|
||
)
|
||
def is_superuser(user):
|
||
return user.is_superuser
|
||
|
||
@login_required
|
||
@user_passes_test(is_superuser)
|
||
def delete_all_entries(request):
|
||
"""
|
||
Confirmation screen + POST to delete ALL Entry records.
|
||
Mirrors the style of the single-entry delete page.
|
||
"""
|
||
if request.method == "POST":
|
||
# extra safeguard: only delete if the form had the confirm field
|
||
if request.POST.get("confirm") == "yes":
|
||
with transaction.atomic():
|
||
from .models import Entry
|
||
deleted, _ = Entry.objects.all().delete()
|
||
messages.success(request, f"Deleted all illustrations ({deleted} rows).")
|
||
return redirect("settings_home")
|
||
messages.info(request, "Deletion cancelled.")
|
||
return redirect("settings_home")
|
||
|
||
return render(request, "settings/delete_all_confirm.html", {})
|
||
|
||
# ----- Release announcements (superuser tools + dismiss) -----
|
||
from django.contrib.auth.decorators import login_required, user_passes_test
|
||
from django.views.decorators.http import require_POST
|
||
from django.http import JsonResponse, HttpResponseBadRequest
|
||
from .forms import AnnouncementForm
|
||
from .models_ann import Announcement, AnnouncementDismissal
|
||
|
||
is_superuser = user_passes_test(lambda u: u.is_superuser)
|
||
|
||
@is_superuser
|
||
@login_required
|
||
def announcement_tools(request):
|
||
"""
|
||
Superuser-only: publish an announcement. Users will see it once on next search page load.
|
||
"""
|
||
if request.method == "POST":
|
||
form = AnnouncementForm(request.POST)
|
||
if form.is_valid():
|
||
ann = form.save(commit=False)
|
||
ann.created_by = request.user
|
||
ann.save()
|
||
messages.success(request, "Announcement published.")
|
||
return redirect("settings_home")
|
||
else:
|
||
form = AnnouncementForm()
|
||
|
||
recent = Announcement.objects.order_by("-created_at")[:25]
|
||
return render(request, "settings/home.html", {
|
||
"announcement_form": form,
|
||
"announcements_recent": recent,
|
||
# Keep the rest of your settings page content intact; template guards will show this block
|
||
})
|
||
|
||
@login_required
|
||
@require_POST
|
||
def dismiss_announcement(request, pk):
|
||
ann = get_object_or_404(Announcement, pk=pk)
|
||
AnnouncementDismissal.objects.get_or_create(user=request.user, announcement=ann)
|
||
return JsonResponse({"ok": True})
|
||
|
||
# ----- Login attempts (superuser only) -----
|
||
from django.contrib.auth.decorators import user_passes_test, login_required
|
||
from django.utils import timezone
|
||
from datetime import timedelta
|
||
from .models_login import LoginAttempt
|
||
|
||
is_superuser = user_passes_test(lambda u: u.is_superuser)
|
||
|
||
@is_superuser
|
||
@login_required
|
||
def login_attempts(request):
|
||
"""
|
||
Show last 7 days of login attempts (success + failure) with username, IP, and timestamp.
|
||
"""
|
||
cutoff = timezone.now() - timedelta(days=7)
|
||
attempts = LoginAttempt.objects.filter(timestamp__gte=cutoff).order_by("-timestamp")
|
||
return render(request, "tools/login_attempts.html", {"attempts": attempts})
|
||
|
||
@login_required
|
||
@require_POST
|
||
def clear_history(request):
|
||
"""
|
||
Clear the current user's history shown on the Search page:
|
||
- SearchHistory (last 10 searches)
|
||
- ViewedIllustration (recently viewed entries)
|
||
Also clears any session keys that might be used as UI caches.
|
||
"""
|
||
# Delete DB-backed recents for this user
|
||
SearchHistory.objects.filter(user=request.user).delete()
|
||
ViewedIllustration.objects.filter(user=request.user).delete()
|
||
|
||
# (Harmless) clear possible session caches if present
|
||
for k in ["recent_searches", "recent_entries", "recent_viewed", "recently_viewed"]:
|
||
request.session.pop(k, None)
|
||
request.session.modified = True
|
||
|
||
return JsonResponse({"ok": True})
|
||
|
||
|
||
is_superuser = user_passes_test(lambda u: u.is_superuser)
|
||
|
||
@is_superuser
|
||
@login_required
|
||
def audit_log(request):
|
||
rows = AuditLog.objects.all().order_by("-timestamp")[:100]
|
||
return render(request, "tools/audit_log.html", {"rows": rows})
|
||
|
||
|
||
# === Theme selection (robust) ================================================
|
||
def _is_valid_theme(name: str) -> bool:
|
||
# Validate that /static/themes/<name>.css exists (works with collectstatic)
|
||
try:
|
||
return staticfiles_storage.exists(f"themes/{name}.css")
|
||
except Exception:
|
||
return False
|
||
|
||
@login_required
|
||
@require_POST
|
||
def set_theme(request):
|
||
theme = (request.POST.get("theme") or "").strip()
|
||
if not theme:
|
||
messages.error(request, "Pick a theme first.")
|
||
return redirect("settings_home")
|
||
|
||
if not _is_valid_theme(theme):
|
||
messages.error(
|
||
request,
|
||
f"Theme “{theme}” not found. Make sure /static/themes/{theme}.css exists (and run collectstatic in production)."
|
||
)
|
||
return redirect("settings_home")
|
||
|
||
request.session["theme"] = theme
|
||
messages.success(request, f"Theme set to {theme.title()}.")
|
||
return redirect("settings_home")
|
||
|
||
# web/core/views.py
|
||
import json
|
||
import os
|
||
from django.conf import settings
|
||
from django.contrib.auth.decorators import login_required, user_passes_test
|
||
from django.http import JsonResponse, HttpResponseBadRequest
|
||
from django.views.decorators.http import require_POST
|
||
|
||
@require_POST
|
||
@login_required
|
||
@user_passes_test(lambda u: u.is_superuser)
|
||
def api_update_pub_codes(request):
|
||
"""
|
||
Accepts a 'json' field (string) that should parse to {"pub_codes": [..]}.
|
||
Normalizes, de-duplicates, and writes to web/static/data/wol-pub-codes.v1.json.
|
||
"""
|
||
payload = request.POST.get("json") or (request.body.decode("utf-8") if request.body else "")
|
||
if not payload:
|
||
return HttpResponseBadRequest("Missing 'json'.")
|
||
|
||
try:
|
||
data = json.loads(payload)
|
||
except Exception as e:
|
||
return HttpResponseBadRequest(f"Invalid JSON: {e}")
|
||
|
||
if not isinstance(data, dict) or "pub_codes" not in data or not isinstance(data["pub_codes"], list):
|
||
return HttpResponseBadRequest('JSON must be an object with a "pub_codes" array.')
|
||
|
||
# Normalize to unique, lowercase, trimmed strings
|
||
seen = set()
|
||
codes = []
|
||
for c in data["pub_codes"]:
|
||
s = str(c or "").strip().lower()
|
||
if s and s not in seen:
|
||
seen.add(s)
|
||
codes.append(s)
|
||
|
||
# Write back to static data file
|
||
target_path = os.path.join(settings.BASE_DIR, "web", "static", "data", "wol-pub-codes.v1.json")
|
||
try:
|
||
with open(target_path, "w", encoding="utf-8") as f:
|
||
json.dump({"pub_codes": codes}, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
return HttpResponseBadRequest(f"Could not write file: {e}")
|
||
|
||
return JsonResponse({"ok": True, "count": len(codes)}) |