93 lines
3.9 KiB
Python
93 lines
3.9 KiB
Python
import csv, io, re
|
|
from dateutil import parser as dateparser
|
|
from datetime import date, timedelta
|
|
from .models import Entry, ScriptureRef
|
|
|
|
SCR_REF_RE = re.compile(r"""^\s*([1-3]?\s*[A-Za-z\.]+)\s+(\d+)(?::(\d+))?(?:\s*[-–—]\s*(\d+)(?::(\d+))?)?\s*$""", re.VERBOSE)
|
|
BOOK_ALIASES={'matt':'Matthew','mt':'Matthew','jn':'John','john':'John','lk':'Luke','luke':'Luke','ps':'Psalms'}
|
|
|
|
def normalize_book(s):
|
|
b = re.sub(r"[\.\s]","", s).lower()
|
|
return BOOK_ALIASES.get(b, s.strip())
|
|
|
|
def parse_scripture(s):
|
|
items=[];
|
|
for p in [x.strip() for x in (s or '').split(';') if x.strip()]:
|
|
m = SCR_REF_RE.match(p)
|
|
if not m: items.append(None); continue
|
|
br, ch1, v1, ch2, v2 = m.groups()
|
|
items.append({"book": normalize_book(br), "chapter_from": int(ch1), "verse_from": int(v1) if v1 else None,
|
|
"chapter_to": int(ch2) if ch2 else None, "verse_to": int(v2) if v2 else None})
|
|
return items
|
|
|
|
def parse_date(v):
|
|
if not v or not str(v).strip(): return None
|
|
try: return dateparser.parse(str(v)).date()
|
|
except Exception: return None
|
|
|
|
EXPECTED_HEADERS=[h.lower() for h in ["Subject","Illustration","Application","Scripture","Source","Talk Title","Talk Number","Code","Date","Date Edited"]]
|
|
|
|
def import_csv_bytes(b: bytes, dry_run=True):
|
|
text = b.decode("utf-8-sig")
|
|
reader = csv.DictReader(io.StringIO(text))
|
|
headers=[(h or '').strip().lower() for h in (reader.fieldnames or [])]
|
|
missing=[h for h in EXPECTED_HEADERS if h not in headers]
|
|
if missing: raise ValueError(f"Missing required headers: {missing}")
|
|
report={"rows":0,"inserted":0,"updated":0,"skipped":0,"errors":[],"scripture_parsed":0,"scripture_failed":0}
|
|
rows=list(reader); report["rows"]=len(rows)
|
|
for row in rows:
|
|
try:
|
|
code=(row.get("code") or "").strip()
|
|
talk=row.get("talk number");
|
|
try: talk=int(talk) if str(talk).strip() else None
|
|
except: talk=None
|
|
data=dict(
|
|
subject=row.get("subject") or "",
|
|
illustration=row.get("illustration") or "",
|
|
application=row.get("application") or "",
|
|
scripture_raw=row.get("scripture") or "",
|
|
source=row.get("source") or "",
|
|
talk_number=talk,
|
|
talk_title=row.get("talk title") or "",
|
|
entry_code=code,
|
|
date_added=parse_date(row.get("date")),
|
|
date_edited=parse_date(row.get("date edited")),
|
|
)
|
|
parsed=parse_scripture(data["scripture_raw"])
|
|
for it in parsed:
|
|
if it: report["scripture_parsed"]+=1
|
|
else: report["scripture_failed"]+=1
|
|
obj=None
|
|
if not dry_run:
|
|
if code:
|
|
try:
|
|
obj=Entry.objects.get(entry_code=code)
|
|
except Entry.DoesNotExist:
|
|
obj=None
|
|
if obj:
|
|
for k,v in data.items(): setattr(obj,k,v)
|
|
obj.save(); obj.scripture_refs.all().delete(); report["updated"]+=1
|
|
else:
|
|
obj=Entry.objects.create(**data); report["inserted"]+=1
|
|
from .models import ScriptureRef
|
|
for it in parsed:
|
|
if it: ScriptureRef.objects.create(entry=obj, **it)
|
|
except Exception as e:
|
|
report["skipped"]+=1; report["errors"].append(str(e))
|
|
return report
|
|
|
|
def wildcard_to_like(q:str)->str:
|
|
return q.replace("%","\%").replace("_","\_").replace("*","%").replace("?","_")
|
|
|
|
def terms(q:str): return [t for t in q.split() if t.strip()]
|
|
|
|
def month_buckets_last_12(today: date):
|
|
months=[]; y=today.year; m=today.month
|
|
for i in range(12):
|
|
mm=m-i; yy=y
|
|
while mm<=0: mm+=12; yy-=1
|
|
start=date(yy,mm,1)
|
|
end=date(yy+1,1,1) if mm==12 else date(yy,mm+1,1)
|
|
months.append((f"{yy}-{mm:02d}", start, end))
|
|
return list(reversed(months))
|