from datetime import date, timedelta import csv import re from django.contrib import messages from django.contrib.auth import authenticate, login from django.contrib.auth.decorators import login_required, user_passes_test from django.db.models import Q from django.http import HttpResponse, JsonResponse from django.shortcuts import render, redirect, get_object_or_404 from django.views.decorators.http import require_http_methods from django.utils.text import Truncator from django.urls import reverse from django.views.decorators.cache import never_cache from django.views.decorators.http import require_POST from .models_user import SearchHistory, ViewedIllustration from .forms import ImportForm, EntryForm from .models import Entry from .scripture_normalizer import normalize_scripture_field # NEW from .source_normalizer import normalize_source_field # NEW from .subject_normalizer import normalize_subject_field # NEW from .utils import terms, has_wildcards, wildcard_to_regex, import_csv_bytes from django.contrib.staticfiles.storage import staticfiles_storage from django.db import transaction from . import utils as core_utils from .models_audit import AuditLog # Order + labels used in the Search UI FIELD_ORDER = [ ("subject", "Subject"), ("illustration", "Illustration"), ("application", "Application"), ("scripture_raw", "Scripture"), ("source", "Source"), ("talk_title", "Talk Title"), ("talk_number", "Talk Number"), ("entry_code", "Code"), ] EXPECTED_HEADERS = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] def is_admin(user): return user.is_superuser or user.is_staff def login_view(request): if request.user.is_authenticated: return redirect("search") ctx = {} if request.method == "POST": u = request.POST.get("username") p = request.POST.get("password") user = authenticate(request, username=u, password=p) if user: login(request, user) return redirect("search") ctx["error"] = "Invalid credentials" return render(request, "login.html", ctx) def entry_context(entry, result_ids): """ Build the navigation + chips context for the entry pages. """ count = len(result_ids or []) if entry and result_ids and entry.id in result_ids: position = result_ids.index(entry.id) + 1 else: position = 1 subject_list = [t.strip() for t in (entry.subject or "").split(",") if t.strip()] scripture_list = [ t.strip() for t in (entry.scripture_raw or "").split(";") if t.strip() ] # NEW: compute talk PDF URL if present and the file exists talk_pdf_url = None try: if entry.talk_number: filename = f"talk_pdfs/S-34_E_{int(entry.talk_number):03d}.pdf" if staticfiles_storage.exists(filename): talk_pdf_url = staticfiles_storage.url(filename) except Exception: # fail gracefully; leave talk_pdf_url as None pass return { "entry": entry, "locked": True, "position": position, "count": count, "subject_list": subject_list, "scripture_list": scripture_list, # NEW "talk_pdf_url": talk_pdf_url, } @login_required def search_page(request): """ Search-first landing. Defaults to Subject, Illustration, Application. Supports: - quoted phrases - * and ? wildcards (regex); if regex returns zero, falls back to icontains - AND across tokens, OR across the selected fields """ default_fields = { "subject": True, "illustration": True, "application": True, "scripture_raw": False, "source": False, "talk_title": False, "talk_number": False, "entry_code": False, } form_submitted = ("q" in request.GET) or any(k in request.GET for k in default_fields) if form_submitted: selected = {k: (k in request.GET) for k in default_fields} else: selected = default_fields.copy() field_options = [ {"name": k, "label": label, "checked": bool(selected.get(k))} for k, label in FIELD_ORDER ] q = (request.GET.get("q") or "").strip() if q: tokens = terms(q) fields = [f for f, sel in selected.items() if sel] or ["subject"] qs = Entry.objects.all() used_regex = False for tok in tokens: clause = Q() if has_wildcards(tok): used_regex = True pattern = wildcard_to_regex(tok) for f in fields: clause |= Q(**{f + "__iregex": pattern}) else: for f in fields: clause |= Q(**{f + "__icontains": tok}) qs = qs.filter(clause) ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True)) if used_regex and not ids: qs = Entry.objects.all() for tok in tokens: clause = Q() tok_stripped = tok.replace("*", "").replace("?", "") for f in fields: clause |= Q(**{f + "__icontains": tok_stripped}) qs = qs.filter(clause) ids = list(qs.order_by("-date_added", "-id").values_list("id", flat=True)) try: print(f"[search] q={q!r} tokens={tokens} fields={fields} count={len(ids)}") except Exception: pass request.session["result_ids"] = ids count = len(ids) # Ensure highlighter data is available BEFORE navigating to entry_view request.session["last_search"] = {"q": q, "fields": fields} request.session.modified = True # be explicit so it’s flushed if count: entry = Entry.objects.get(pk=ids[0]) ctx = entry_context(entry, ids) ctx.update({"from_search": True}) # 🔽 ADD THIS if request.user.is_staff: ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) return render(request, "entry_view.html", ctx) total = Entry.objects.count() return render( request, "search.html", { "q": q, "selected": selected, "field_options": field_options, "total": total, "ran_search": True, "result_count": 0, }, ) total = Entry.objects.count() return render( request, "search.html", { "q": q, "selected": selected, "field_options": field_options, "total": total, "ran_search": False, }, ) @login_required def nav_next(request): ids = request.session.get("result_ids", []) if not ids: return redirect("search") idx = int(request.GET.get("i", "0")) idx = min(idx + 1, len(ids) - 1) entry = get_object_or_404(Entry, pk=ids[idx]) # NEW: build ctx and inject tts_url for staff ctx = entry_context(entry, ids) ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else "" return render(request, "entry_view.html", ctx) @login_required def nav_prev(request): ids = request.session.get("result_ids", []) if not ids: return redirect("search") idx = int(request.GET.get("i", "0")) idx = max(idx - 1, 0) entry = get_object_or_404(Entry, pk=ids[idx]) # NEW: build ctx and inject tts_url for staff ctx = entry_context(entry, ids) ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else "" return render(request, "entry_view.html", ctx) @never_cache def entry_view(request, entry_id): ids = request.session.get("result_ids", []) entry = get_object_or_404(Entry, pk=entry_id) ctx = entry_context(entry, ids) # Pass last search to the template for the highlighter last = request.session.get("last_search") or {} ctx["last_search_q"] = last.get("q", "") ctx["last_search_fields"] = last.get("fields", []) ctx["tts_url"] = reverse("api_tts_for_entry", args=[entry.id]) if request.user.is_staff else None return render(request, "entry_view.html", ctx) @login_required def entry_add(request): """ Create a brand new Entry using the same EntryForm you use for editing. Since EntryForm is a regular Form (not a ModelForm), we copy fields manually. """ if request.method == "POST": form = EntryForm(request.POST) if form.is_valid(): entry = Entry() for k, v in form.cleaned_data.items(): setattr(entry, k, v) entry.save() messages.success(request, "New entry added.") return redirect("entry_view", entry_id=entry.id) else: form = EntryForm() return render(request, "entry_add.html", {"form": form}) @login_required def entry_edit(request, entry_id): ids = request.session.get("result_ids", []) entry = get_object_or_404(Entry, pk=entry_id) if request.method == "POST": form = EntryForm(request.POST) if form.is_valid(): for k, v in form.cleaned_data.items(): setattr(entry, k, v) entry.save() messages.success(request, "Entry saved.") return redirect("entry_view", entry_id=entry.id) else: form = EntryForm( initial={ "subject": entry.subject, "illustration": entry.illustration, "application": entry.application, "scripture_raw": entry.scripture_raw, "source": entry.source, "talk_number": entry.talk_number, "talk_title": entry.talk_title, "entry_code": entry.entry_code, "date_added": entry.date_added, "date_edited": entry.date_edited, } ) ctx = {"entry": entry, "form": form} ctx.update(entry_context(entry, ids)) return render(request, "entry_edit.html", ctx) @login_required def entry_delete(request, entry_id): entry = get_object_or_404(Entry, pk=entry_id) if request.method == "POST": entry.delete() messages.success(request, "Entry deleted.") return redirect("search") return render(request, "entry_delete_confirm.html", {"entry": entry}) @login_required @user_passes_test(is_admin) def import_wizard(request): EXPECTED = [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Title", "Talk Number", "Code", "Date", "Date Edited", ] if request.method == "POST": form = ImportForm(request.POST, request.FILES) if form.is_valid(): try: raw = form.cleaned_data["file"].read() import io, csv as _csv text = raw.decode("utf-8-sig", errors="replace") # --- Try to sniff dialect --- try: first_line = text.splitlines()[0] if text else "" dialect = _csv.Sniffer().sniff(first_line) if first_line else _csv.excel except Exception: dialect = _csv.excel rdr = _csv.reader(io.StringIO(text), dialect) rows = list(rdr) if not rows: raise ValueError("The CSV file appears to be empty.") expected_norm = [h.lower() for h in EXPECTED] def _clean_header(s): s = "" if s is None else str(s) s = s.strip() # Strip r: or r= prefixes and wrapping quotes if s.lower().startswith("r:") or s.lower().startswith("r="): s = s[2:].lstrip() if len(s) >= 2 and s[0] == s[-1] and s[0] in ("'", '"'): s = s[1:-1] return s.strip().lower() first = rows[0] norm_first = [_clean_header(c) for c in first] header_ok = (norm_first == expected_norm) # If no header but same column count, inject expected if not header_ok and len(first) == len(EXPECTED): rows.insert(0, EXPECTED) elif not header_ok and len(first) != len(EXPECTED): # Retry with alternate delimiters for delim in (";", "\t"): rows2 = list(_csv.reader(io.StringIO(text), delimiter=delim)) if rows2 and len(rows2[0]) == len(EXPECTED): rows = rows2 first = rows[0] norm_first = [_clean_header(c) for c in first] header_ok = (norm_first == expected_norm) if not header_ok: rows.insert(0, EXPECTED) break # Re-encode sanitized CSV for importer out = io.StringIO() w = _csv.writer(out) for r in rows: w.writerow(r) fixed_raw = out.getvalue().encode("utf-8") # Keep utils in sync from . import utils as core_utils core_utils.EXPECTED_HEADERS = EXPECTED # Run robust importer report = import_csv_bytes(fixed_raw, dry_run=form.cleaned_data["dry_run"]) or {} # Normalize preview report.setdefault("columns", EXPECTED) preview = report.get("preview") or [] if preview and isinstance(preview[0], dict): cols = report["columns"] report["preview"] = [[row.get(c, "") for c in cols] for row in preview] report["header_ok"] = header_ok if not header_ok: messages.warning( request, "The first row didn’t match the expected header; a clean header was injected automatically." ) return render( request, "import_result.html", {"report": report, "dry_run": form.cleaned_data["dry_run"]}, ) except Exception as e: messages.error(request, f"Import failed: {e}") # Invalid form or exception → show form again return render(request, "import_wizard.html", {"form": form}) # GET → show empty form form = ImportForm() return render(request, "import_wizard.html", {"form": form}) @login_required @user_passes_test(is_admin) def export_csv(request): ts = date.today().strftime("%Y-%m-%d") response = HttpResponse(content_type="text/csv") response["Content-Disposition"] = ( f'attachment; filename="illustrations_backup_{ts}.csv"' ) w = csv.writer(response) w.writerow( [ "Subject", "Illustration", "Application", "Scripture", "Source", "Talk Number", "Talk Title", "Code", "Date", "Date Edited", ] ) for e in Entry.objects.all().order_by("id"): w.writerow( [ e.subject, e.illustration, e.application, e.scripture_raw, e.source, e.talk_number if e.talk_number is not None else "", e.talk_title, e.entry_code, e.date_added.isoformat() if e.date_added else "", e.date_edited.isoformat() if e.date_edited else "", ] ) return response #@login_required #def stats_page(request): # total = Entry.objects.count() # today = date.today() # last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count() # last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count() # # from collections import Counter # # months = [] # y = today.year # m = today.month # for i in range(12): # mm = m - i # yy = y # while mm <= 0: # mm += 12 # yy -= 1 # from datetime import date as _d # start = _d(yy, mm, 1) # end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1) # label = f"{yy}-{mm:02d}" # months.append((label, start, end)) # months = list(reversed(months)) # # series = [ # (label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count()) # for label, start, end in months # ] # peak = max((v for _, v in series), default=1) # heights = [ # (label, value, 8 + int((value / peak) * 100) if peak else 8) # for label, value in series # ] # # counts = Counter() # for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True): # for tag in [t.strip() for t in subj.split(",") if t.strip()]: # counts[tag.lower()] += 1 # top_subjects = [{"name": n.title(), "count": c} for n, c in counts.most_common(20)] # # return render( # request, # "stats.html", # { # "total": total, # "last30": last30, # "last365": last365, # "series": series, # "heights": heights, # "top_subjects": top_subjects, # }, # ) # ========= Scripture Normalizer ========= @login_required @user_passes_test(is_admin) @require_http_methods(["GET", "POST"]) def normalize_scripture(request): """ GET -> dry-run preview (summary + first 100 examples) POST -> apply changes to all entries' scripture_raw (batched) Optional ?limit= for preview subset. """ apply = request.method == "POST" limit = int(request.GET.get("limit", "0") or "0") qs = Entry.objects.all().order_by("id") if limit: qs = qs[:limit] changed = 0 warnings_total = 0 preview = [] if apply: from django.db import transaction batch, pending = 500, [] for e in qs.iterator(): original = (e.scripture_raw or "").strip() normalized, warns = normalize_scripture_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) e.scripture_raw = normalized pending.append(e) if len(pending) >= batch: with transaction.atomic(): for obj in pending: obj.save(update_fields=["scripture_raw"]) pending.clear() if pending: with transaction.atomic(): for obj in pending: obj.save(update_fields=["scripture_raw"]) else: # dry-run only for e in qs.iterator(): original = (e.scripture_raw or "").strip() normalized, warns = normalize_scripture_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) preview = preview[:100] messages.info( request, f"{'Applied' if apply else 'Dry-run'}: {changed} entries " f"{'changed' if apply else 'would change'}; {warnings_total} warnings." ) return render( request, "normalize_result.html", { "applied": apply, "changed": changed, "warnings_total": warnings_total, "preview": preview, "limit": limit, }, ) # ========= Source Normalizer ========= @login_required @user_passes_test(is_admin) @require_http_methods(["GET", "POST"]) def normalize_source(request): """ GET -> dry-run preview (summary + first 100 examples) POST -> apply changes to all entries' source (batched) Optional ?limit= for preview subset. """ apply = request.method == "POST" limit = int(request.GET.get("limit", "0") or "0") qs = Entry.objects.all().order_by("id") if limit: qs = qs[:limit] changed = 0 warnings_total = 0 preview = [] if apply: from django.db import transaction batch, pending = 500, [] for e in qs.iterator(): original = (e.source or "").strip() normalized, warns = normalize_source_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) e.source = normalized pending.append(e) if len(pending) >= batch: with transaction.atomic(): for obj in pending: obj.save(update_fields=["source"]) pending.clear() if pending: with transaction.atomic(): for obj in pending: obj.save(update_fields=["source"]) else: # dry-run for e in qs.iterator(): original = (e.source or "").strip() normalized, warns = normalize_source_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) preview = preview[:100] messages.info( request, f"{'Applied' if apply else 'Dry-run'}: {changed} entries " f"{'changed' if apply else 'would change'}; {warnings_total} warnings." ) return render( request, "normalize_source_result.html", { "applied": apply, "changed": changed, "warnings_total": warnings_total, "preview": preview, "limit": limit, }, ) # ========= Subject Normalizer ========= @login_required @user_passes_test(is_admin) @require_http_methods(["GET", "POST"]) def normalize_subjects(request): """ GET -> dry-run preview (summary + first 100 examples) POST -> apply changes to all entries' subject (batched) Optional ?limit= for preview subset. """ apply = request.method == "POST" limit = int(request.GET.get("limit", "0") or "0") qs = Entry.objects.all().order_by("id") if limit: qs = qs[:limit] changed = 0 warnings_total = 0 preview = [] if apply: from django.db import transaction batch, pending = 500, [] for e in qs.iterator(): original = (e.subject or "").strip() normalized, warns = normalize_subject_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) e.subject = normalized pending.append(e) if len(pending) >= batch: with transaction.atomic(): for obj in pending: obj.save(update_fields=["subject"]) pending.clear() if pending: with transaction.atomic(): for obj in pending: obj.save(update_fields=["subject"]) else: # dry-run only for e in qs.iterator(): original = (e.subject or "").strip() normalized, warns = normalize_subject_field(original) warnings_total += len(warns) if normalized != original: changed += 1 preview.append((e.id, original, normalized)) preview = preview[:100] messages.info( request, f"{'Applied' if apply else 'Dry-run'}: {changed} entries " f"{'changed' if apply else 'would change'}; {warnings_total} warnings." ) return render( request, "normalize_subjects_result.html", { "applied": apply, "changed": changed, "warnings_total": warnings_total, "preview": preview, "limit": limit, }, ) # ========= API: Recently Viewed (for 20-word snippet + correct link) ========= @login_required def api_get_recent_views(request): """ Return the current user's recently viewed entries (up to 50), including a precomputed 20-word snippet from illustration (or a sensible fallback). """ from .models import RecentView recents = ( RecentView.objects .filter(user=request.user) .select_related("entry") .order_by("-viewed_at")[:50] ) def make_snippet(e): base = (e.illustration or "").strip() or (e.application or "").strip() or (e.subject or "").strip() if not base: return "" return Truncator(" ".join(base.split())).words(20, truncate="…") items = [] for rv in recents: e = rv.entry items.append({ "entry_id": rv.entry_id, "viewed_at": rv.viewed_at.isoformat(), "illustration": e.illustration or "", "snippet": make_snippet(e), }) return JsonResponse({"ok": True, "items": items}) @login_required def settings_home(request): return render(request, "settings/home.html") @login_required def stats_page(request): from collections import Counter, OrderedDict from calendar import month_abbr total = Entry.objects.count() today = date.today() last30 = Entry.objects.filter(date_added__gte=today - timedelta(days=30)).count() last365 = Entry.objects.filter(date_added__gte=today - timedelta(days=365)).count() # ---- Sparkline (last 12 months) ---- months = [] y = today.year m = today.month for i in range(12): mm = m - i yy = y while mm <= 0: mm += 12 yy -= 1 from datetime import date as _d start = _d(yy, mm, 1) end = _d(yy + 1, 1, 1) if mm == 12 else _d(yy, mm + 1, 1) label = f"{month_abbr[mm]} {str(yy)[2:]}" months.append((label, start, end)) months = list(reversed(months)) series = [ (label, Entry.objects.filter(date_added__gte=start, date_added__lt=end).count()) for label, start, end in months ] peak = max((v for _, v in series), default=1) heights = [ (label, value, 8 + int((value / peak) * 100) if peak else 8) for label, value in series ] # ---- Top subjects (existing) ---- subject_counts = Counter() for subj in Entry.objects.exclude(subject="").values_list("subject", flat=True): for tag in [t.strip() for t in subj.split(",") if t.strip()]: subject_counts[tag.lower()] += 1 top_subjects = [{"name": n.title(), "count": c} for n, c in subject_counts.most_common(10)] # =============================== # Scripture analytics (from scripture_raw) # =============================== # A light normalizer so common abbreviations map to canonical book names. BOOK_MAP = { # OT (examples; extend as needed) "gen":"Genesis","ge":"Genesis","gn":"Genesis", "ex":"Exodus","exo":"Exodus", "lev":"Leviticus","le":"Leviticus", "num":"Numbers","nu":"Numbers", "de":"Deuteronomy","deut":"Deuteronomy", "jos":"Joshua","josh":"Joshua", "jdg":"Judges","judg":"Judges", "ru":"Ruth","rut":"Ruth", "ps":"Psalms","psalm":"Psalms","psalms":"Psalms", "pr":"Proverbs","pro":"Proverbs", "ec":"Ecclesiastes","ecc":"Ecclesiastes", "isa":"Isaiah","is":"Isaiah", "jer":"Jeremiah","je":"Jeremiah", "eze":"Ezekiel","ez":"Ezekiel", "da":"Daniel","dan":"Daniel", "ho":"Hosea","hos":"Hosea", # NT (examples; extend as needed) "mt":"Matthew","matt":"Matthew", "mr":"Mark","mk":"Mark", "lu":"Luke","lk":"Luke", "joh":"John","john":"John","jn":"John", "ac":"Acts","acts":"Acts", "rom":"Romans","ro":"Romans", "1cor":"1 Corinthians","1 co":"1 Corinthians","1 cor":"1 Corinthians", "2cor":"2 Corinthians","2 co":"2 Corinthians","2 cor":"2 Corinthians", } BOOK_RE = re.compile(r""" ^\s* (?P(?:[1-3]\s*)?[A-Za-z\.]+) # optional 1/2/3 prefix + word [\s\.]+ (?P\d+[:\.]\d+.*)? # 3:16 or 3.16 etc (optional tail) """, re.X) def normalize_book(raw): b = raw.strip().lower().replace('.', '') b = re.sub(r'\s+', '', b) # "1 john" -> "1john" return BOOK_MAP.get(b, raw.strip().title()) def split_refs(text): if not text: return [] # Entries are typically separated by semicolons; allow commas too. parts = re.split(r'[;]+', text) return [p.strip() for p in parts if p.strip()] def parse_piece(piece): m = BOOK_RE.match(piece) if not m: return None, None book = normalize_book(m.group('book')) ref = (m.group('ref') or '').strip() return book, (f"{book} {ref}" if ref else book) book_counts = Counter() ref_counts = Counter() refs_per_entry = [] entries_with_script = (Entry.objects .exclude(scripture_raw__isnull=True) .exclude(scripture_raw__exact="")) for e in entries_with_script.iterator(): pieces = split_refs(e.scripture_raw) entry_ref_count = 0 for piece in pieces: book, full = parse_piece(piece) if not book: continue book_counts[book] += 1 if full and full != book: ref_counts[full] += 1 entry_ref_count += 1 if entry_ref_count: refs_per_entry.append(entry_ref_count) avg_refs_per_entry = round(sum(refs_per_entry) / len(refs_per_entry), 2) if refs_per_entry else 0 top_books = list(book_counts.most_common(10)) top_refs = list(ref_counts.most_common(10)) return render( request, "stats.html", { "total": total, "last30": last30, "last365": last365, "series": series, "heights": heights, "top_subjects": top_subjects, # NEW: "avg_refs_per_entry": avg_refs_per_entry, "top_books": top_books, # iterable of (book, count) "top_refs": top_refs, # iterable of (ref, count) }, ) def is_superuser(user): return user.is_superuser @login_required @user_passes_test(is_superuser) def delete_all_entries(request): """ Confirmation screen + POST to delete ALL Entry records. Mirrors the style of the single-entry delete page. """ if request.method == "POST": # extra safeguard: only delete if the form had the confirm field if request.POST.get("confirm") == "yes": with transaction.atomic(): from .models import Entry deleted, _ = Entry.objects.all().delete() messages.success(request, f"Deleted all illustrations ({deleted} rows).") return redirect("settings_home") messages.info(request, "Deletion cancelled.") return redirect("settings_home") return render(request, "settings/delete_all_confirm.html", {}) # ----- Release announcements (superuser tools + dismiss) ----- from django.contrib.auth.decorators import login_required, user_passes_test from django.views.decorators.http import require_POST from django.http import JsonResponse, HttpResponseBadRequest from .forms import AnnouncementForm from .models_ann import Announcement, AnnouncementDismissal is_superuser = user_passes_test(lambda u: u.is_superuser) @is_superuser @login_required def announcement_tools(request): """ Superuser-only: publish an announcement. Users will see it once on next search page load. """ if request.method == "POST": form = AnnouncementForm(request.POST) if form.is_valid(): ann = form.save(commit=False) ann.created_by = request.user ann.save() messages.success(request, "Announcement published.") return redirect("settings_home") else: form = AnnouncementForm() recent = Announcement.objects.order_by("-created_at")[:25] return render(request, "settings/home.html", { "announcement_form": form, "announcements_recent": recent, # Keep the rest of your settings page content intact; template guards will show this block }) @login_required @require_POST def dismiss_announcement(request, pk): ann = get_object_or_404(Announcement, pk=pk) AnnouncementDismissal.objects.get_or_create(user=request.user, announcement=ann) return JsonResponse({"ok": True}) # ----- Login attempts (superuser only) ----- from django.contrib.auth.decorators import user_passes_test, login_required from django.utils import timezone from datetime import timedelta from .models_login import LoginAttempt is_superuser = user_passes_test(lambda u: u.is_superuser) @is_superuser @login_required def login_attempts(request): """ Show last 7 days of login attempts (success + failure) with username, IP, and timestamp. """ cutoff = timezone.now() - timedelta(days=7) attempts = LoginAttempt.objects.filter(timestamp__gte=cutoff).order_by("-timestamp") return render(request, "tools/login_attempts.html", {"attempts": attempts}) @login_required @require_POST def clear_history(request): """ Clear the current user's history shown on the Search page: - SearchHistory (last 10 searches) - ViewedIllustration (recently viewed entries) Also clears any session keys that might be used as UI caches. """ # Delete DB-backed recents for this user SearchHistory.objects.filter(user=request.user).delete() ViewedIllustration.objects.filter(user=request.user).delete() # (Harmless) clear possible session caches if present for k in ["recent_searches", "recent_entries", "recent_viewed", "recently_viewed"]: request.session.pop(k, None) request.session.modified = True return JsonResponse({"ok": True}) is_superuser = user_passes_test(lambda u: u.is_superuser) @is_superuser @login_required def audit_log(request): rows = AuditLog.objects.all().order_by("-timestamp")[:100] return render(request, "tools/audit_log.html", {"rows": rows}) # === Theme selection (robust) ================================================ def _is_valid_theme(name: str) -> bool: # Validate that /static/themes/.css exists (works with collectstatic) try: return staticfiles_storage.exists(f"themes/{name}.css") except Exception: return False @login_required @require_POST def set_theme(request): theme = (request.POST.get("theme") or "").strip() if not theme: messages.error(request, "Pick a theme first.") return redirect("settings_home") if not _is_valid_theme(theme): messages.error( request, f"Theme “{theme}” not found. Make sure /static/themes/{theme}.css exists (and run collectstatic in production)." ) return redirect("settings_home") request.session["theme"] = theme messages.success(request, f"Theme set to {theme.title()}.") return redirect("settings_home")