Illustrations/web/core/utils.py
Joshua Laymon 2fb9e7c39c Update
2025-08-12 21:53:03 -05:00

168 lines
6.2 KiB
Python

import csv, io, re, calendar
from datetime import datetime, timedelta, date
from dateutil import parser as dateparser
from collections import Counter, defaultdict
from django.db.models.functions import TruncMonth
from django.db.models import Count
from .models import Entry, ScriptureRef
BOOK_ALIASES = {
"gen": "Genesis","ex": "Exodus","lev": "Leviticus","num": "Numbers","deut": "Deuteronomy",
"josh": "Joshua","judg":"Judges","rut":"Ruth","1sam":"1 Samuel","2sam":"2 Samuel",
"1kings":"1 Kings","2kings":"2 Kings","1chron":"1 Chronicles","2chron":"2 Chronicles",
"ezra":"Ezra","neh":"Nehemiah","esth":"Esther","job":"Job","ps":"Psalms","prov":"Proverbs",
"eccl":"Ecclesiastes","song":"Song of Solomon","isa":"Isaiah","jer":"Jeremiah","lam":"Lamentations",
"ezek":"Ezekiel","dan":"Daniel","hos":"Hosea","joel":"Joel","amos":"Amos","obad":"Obadiah",
"jon":"Jonah","mic":"Micah","nah":"Nahum","hab":"Habakkuk","zeph":"Zephaniah","hag":"Haggai",
"zech":"Zechariah","mal":"Malachi","matt":"Matthew","mt":"Matthew","mark":"Mark","lk":"Luke",
"luke":"Luke","jn":"John","john":"John","acts":"Acts","rom":"Romans","1cor":"1 Corinthians",
"2cor":"2 Corinthians","gal":"Galatians","eph":"Ephesians","phil":"Philippians","col":"Colossians",
"1thess":"1 Thessalonians","2thess":"2 Thessalonians","1tim":"1 Timothy","2tim":"2 Timothy",
"titus":"Titus","phlm":"Philemon","heb":"Hebrews","jas":"James","1pet":"1 Peter","2pet":"2 Peter",
"1john":"1 John","2john":"2 John","3john":"3 John","jude":"Jude","rev":"Revelation",
}
SCR_REF_RE = re.compile(r"""
^\s*([1-3]?\s*[A-Za-z\.]+)\s+
(\d+)
(?::(\d+))?
(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?
\s*$
""", re.VERBOSE)
def normalize_book(book_raw:str) -> str:
b = re.sub(r"[\.\s]","", book_raw).lower()
return BOOK_ALIASES.get(b, book_raw.strip())
def parse_scripture(s: str):
parts = [p.strip() for p in s.split(";") if p.strip()]
parsed = []
for p in parts:
m = SCR_REF_RE.match(p)
if not m:
parsed.append(None)
continue
book_raw, ch1, v1, ch2, v2 = m.groups()
parsed.append({
"book": normalize_book(book_raw),
"chapter_from": int(ch1),
"verse_from": int(v1) if v1 else None,
"chapter_to": int(ch2) if ch2 else None,
"verse_to": int(v2) if v2 else None,
})
return parsed
def parse_date(value):
if not value or not str(value).strip():
return None
try:
d = dateparser.parse(str(value)).date()
return d
except Exception:
return None
EXPECTED_HEADERS = [
"subject","illustration","application","scripture","source","talk number",
"talk title","code","date","date edited"
]
def import_csv_bytes(file_bytes: bytes, dry_run: bool=True):
text = file_bytes.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text))
headers = [h.strip().lower() for h in reader.fieldnames or []]
missing = [h for h in EXPECTED_HEADERS if h not in headers]
if missing:
raise ValueError(f"Missing required headers: {missing}")
report = {"rows": 0,"inserted": 0,"updated": 0,"skipped": 0,"errors": [],"scripture_parsed": 0,"scripture_failed": 0}
rows = list(reader)
report["rows"] = len(rows)
for row in rows:
try:
entry_code = (row.get("code") or "").strip()
talk_number = row.get("talk number")
try:
talk_number = int(talk_number) if str(talk_number).strip() else None
except Exception:
talk_number = None
date_added = parse_date(row.get("date"))
date_edited = parse_date(row.get("date edited"))
data = dict(
subject=row.get("subject") or "",
illustration=row.get("illustration") or "",
application=row.get("application") or "",
scripture_raw=row.get("scripture") or "",
source=row.get("source") or "",
talk_number=talk_number,
talk_title=row.get("talk title") or "",
entry_code=entry_code,
date_added=date_added,
date_edited=date_edited,
)
from .models import Entry, ScriptureRef
obj = None
if entry_code:
try:
obj = Entry.objects.get(entry_code=entry_code)
except Entry.DoesNotExist:
obj = None
# parse scriptures for reporting
parsed_list = parse_scripture(data["scripture_raw"])
for item in parsed_list:
if item:
report["scripture_parsed"] += 1
else:
report["scripture_failed"] += 1
if not dry_run:
if obj:
for k, v in data.items():
setattr(obj, k, v)
obj.save()
obj.scripture_refs.all().delete()
report["updated"] += 1
else:
from .models import Entry
obj = Entry.objects.create(**data)
report["inserted"] += 1
# persist parsed scripture refs
for item in parsed_list:
if item:
ScriptureRef.objects.create(entry=obj, **item)
except Exception as e:
report["skipped"] += 1
report["errors"].append(str(e))
return report
def wildcard_to_like(q: str) -> str:
# Convert * and ? to SQL LIKE wildcards
return q.replace("%","\%").replace("_","\_").replace("*","%").replace("?","_")
def terms(q: str):
return [t for t in q.split() if t.strip()]
def month_buckets_last_12(today: date):
# returns list of (YYYY-MM, start, end)
months = []
y, m = today.year, today.month
for i in range(12):
mm = m - i
yy = y
while mm <= 0:
mm += 12
yy -= 1
start = date(yy, mm, 1)
if mm == 12:
end = date(yy+1, 1, 1)
else:
end = date(yy, mm+1, 1)
months.append((f"{yy}-{mm:02d}", start, end))
return list(reversed(months))