164 lines
6.8 KiB
Python
164 lines
6.8 KiB
Python
|
|
import csv, io, re
|
|
from datetime import datetime
|
|
from dateutil import parser as dateparser
|
|
from django.db.models import Q
|
|
from .models import Entry, ScriptureRef
|
|
|
|
# Scripture parsing --------------------------------------------------
|
|
BOOK_ALIASES = {
|
|
"gen":"Genesis","ge":"Genesis","ex":"Exodus","lev":"Leviticus","num":"Numbers","deut":"Deuteronomy",
|
|
"josh":"Joshua","judg":"Judges","rut":"Ruth","1sam":"1 Samuel","2sam":"2 Samuel",
|
|
"1kings":"1 Kings","2kings":"2 Kings","1chron":"1 Chronicles","2chron":"2 Chronicles",
|
|
"ezra":"Ezra","neh":"Nehemiah","esth":"Esther","job":"Job","ps":"Psalms","psa":"Psalms","prov":"Proverbs",
|
|
"eccl":"Ecclesiastes","song":"Song of Solomon","isa":"Isaiah","jer":"Jeremiah","lam":"Lamentations",
|
|
"ezek":"Ezekiel","dan":"Daniel","hos":"Hosea","joel":"Joel","amos":"Amos","obad":"Obadiah",
|
|
"jon":"Jonah","mic":"Micah","nah":"Nahum","hab":"Habakkuk","zeph":"Zephaniah","hag":"Haggai",
|
|
"zech":"Zechariah","mal":"Malachi","matt":"Matthew","mt":"Matthew","mark":"Mark","mk":"Mark","lk":"Luke",
|
|
"luke":"Luke","jn":"John","john":"John","acts":"Acts","rom":"Romans","1cor":"1 Corinthians",
|
|
"2cor":"2 Corinthians","gal":"Galatians","eph":"Ephesians","phil":"Philippians","col":"Colossians",
|
|
"1thess":"1 Thessalonians","2thess":"2 Thessalonians","1tim":"1 Timothy","2tim":"2 Timothy",
|
|
"titus":"Titus","phlm":"Philemon","heb":"Hebrews","jas":"James","jam":"James","1pet":"1 Peter","2pet":"2 Peter",
|
|
"1john":"1 John","2john":"2 John","3john":"3 John","jude":"Jude","rev":"Revelation","re":"Revelation",
|
|
}
|
|
|
|
SCR_REF_RE = re.compile(r"""
|
|
^\s*([1-3]?\s*[A-Za-z\.]+)\s+ # book
|
|
(\d+) # chapter start
|
|
(?::(\d+))? # verse start
|
|
(?:\s*[-–—]\s*(\d+)(?::(\d+))?)? # optional range
|
|
\s*$
|
|
""", re.VERBOSE)
|
|
|
|
def normalize_book(book_raw:str) -> str:
|
|
b = re.sub(r"[\.\s]","", book_raw).lower()
|
|
return BOOK_ALIASES.get(b, book_raw.strip())
|
|
|
|
def parse_scripture(s: str):
|
|
parts = [p.strip() for p in (s or "").split(";") if p.strip()]
|
|
parsed = []
|
|
for p in parts:
|
|
m = SCR_REF_RE.match(p)
|
|
if not m:
|
|
parsed.append(None); continue
|
|
book_raw, ch1, v1, ch2, v2 = m.groups()
|
|
parsed.append({
|
|
"book": normalize_book(book_raw),
|
|
"chapter_from": int(ch1),
|
|
"verse_from": int(v1) if v1 else None,
|
|
"chapter_to": int(ch2) if ch2 else None,
|
|
"verse_to": int(v2) if v2 else None,
|
|
})
|
|
return parsed
|
|
|
|
# CSV import ---------------------------------------------------------
|
|
EXPECTED_HEADERS = ["Subject","Illustration","Application","Scripture","Source","Talk Title","Talk Number","Code","Date","Date Edited"]
|
|
|
|
def parse_date(value):
|
|
if not value or not str(value).strip(): return None
|
|
try: return dateparser.parse(str(value)).date()
|
|
except Exception: return None
|
|
|
|
def import_csv(file_bytes: bytes, dry_run: bool=True):
|
|
text = file_bytes.decode("utf-8-sig")
|
|
reader = csv.DictReader(io.StringIO(text))
|
|
headers = reader.fieldnames or []
|
|
# normalize
|
|
lower_map = {h.lower():h for h in headers}
|
|
required_lower = [h.lower() for h in EXPECTED_HEADERS]
|
|
missing = [orig for orig in EXPECTED_HEADERS if orig.lower() not in lower_map]
|
|
if missing:
|
|
raise ValueError(f"Missing required headers: {missing}")
|
|
report = {"rows":0,"inserted":0,"updated":0,"skipped":0,"errors":[],"scripture_parsed":0,"scripture_failed":0}
|
|
rows = list(reader); report["rows"] = len(rows)
|
|
for r in rows:
|
|
try:
|
|
def get(name):
|
|
return r[ lower_map[name.lower()] ].strip() if r.get(lower_map[name.lower()]) is not None else ""
|
|
|
|
entry_code = get("Code")
|
|
data = dict(
|
|
subject=get("Subject"),
|
|
illustration=get("Illustration"),
|
|
application=get("Application"),
|
|
scripture_raw=get("Scripture"),
|
|
source=get("Source"),
|
|
talk_title=get("Talk Title"),
|
|
talk_number=int(get("Talk Number")) if get("Talk Number") else None,
|
|
entry_code=entry_code,
|
|
date_added=parse_date(get("Date")),
|
|
date_edited=parse_date(get("Date Edited")),
|
|
)
|
|
obj = None
|
|
if entry_code:
|
|
try: obj = Entry.objects.get(entry_code=entry_code)
|
|
except Entry.DoesNotExist: obj = None
|
|
|
|
if not dry_run:
|
|
if obj:
|
|
for k,v in data.items(): setattr(obj,k,v)
|
|
obj.save()
|
|
obj.scripture_refs.all().delete()
|
|
report["updated"] += 1
|
|
else:
|
|
obj = Entry.objects.create(**data)
|
|
report["inserted"] += 1
|
|
for pr in parse_scripture(data["scripture_raw"]):
|
|
if pr: ScriptureRef.objects.create(entry=obj, **pr); report["scripture_parsed"] += 1
|
|
else: report["scripture_failed"] += 1
|
|
else:
|
|
for pr in parse_scripture(data["scripture_raw"]):
|
|
if pr: report["scripture_parsed"] += 1
|
|
else: report["scripture_failed"] += 1
|
|
|
|
except Exception as e:
|
|
report["skipped"] += 1
|
|
report["errors"].append(str(e))
|
|
return report
|
|
|
|
# Search helpers -----------------------------------------------------
|
|
SEARCHABLE_FIELDS = {
|
|
"Subject": "subject",
|
|
"Illustration": "illustration",
|
|
"Application": "application",
|
|
"Scripture": "scripture_raw",
|
|
"Source": "source",
|
|
"Talk Title": "talk_title",
|
|
"Talk Number": "talk_number",
|
|
"Code": "entry_code",
|
|
}
|
|
|
|
def wildcard_to_ilike(term:str)->str:
|
|
# Convert * ? to SQL ILIKE pattern
|
|
return term.replace('%','\%').replace('_','\_').replace('*','%').replace('?','_')
|
|
|
|
def build_query(selected_fields, query_text):
|
|
# Split on spaces unless inside quotes
|
|
tokens = []
|
|
buf = ''
|
|
in_quotes = False
|
|
for ch in query_text:
|
|
if ch == '"': in_quotes = not in_quotes; continue
|
|
if ch.isspace() and not in_quotes:
|
|
if buf: tokens.append(buf); buf=''
|
|
else:
|
|
buf += ch
|
|
if buf: tokens.append(buf)
|
|
|
|
# Build Q objects: AND across tokens, OR across fields for each token
|
|
q = Q()
|
|
for t in tokens:
|
|
pat = wildcard_to_ilike(t)
|
|
token_q = Q()
|
|
# OR across fields
|
|
for label in selected_fields:
|
|
col = SEARCHABLE_FIELDS[label]
|
|
if col == "talk_number" and pat.replace('%','').replace('_','').isdigit():
|
|
try:
|
|
token_q |= Q(**{col: int(pat.replace('%','').replace('_',''))})
|
|
except: pass
|
|
else:
|
|
token_q |= Q(**{f"{col}__icontains": t.replace('*','').replace('?','')}) | Q(**{f"{col}__iregex": pat.replace('%','.*').replace('_','.')})
|
|
q &= token_q
|
|
return q
|