You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

400 lines
15 KiB
Python

2 months ago
from __future__ import annotations
2 months ago
import os, sqlite3, csv, io, json, datetime as dt
from typing import Optional, Dict, Any
from flask import Flask, g, request, redirect, url_for, render_template, send_file, jsonify
2 months ago
app = Flask(__name__)
# ----- Config -----
2 months ago
DB_PATH = os.environ.get("SPLITBUDDY_DB", "splitbuddy.db")
CURRENCY = os.environ.get("SPLITBUDDY_CURRENCY", "")
2 months ago
PERSON_A = os.environ.get("SPLITBUDDY_ME", "Me") # you
2 months ago
PERSON_B = os.environ.get("SPLITBUDDY_ROOMIE", "Idan") # roommate
2 months ago
DEFAULT_ACTOR = os.environ.get("SPLITBUDDY_ACTOR", "anon") # who is using this device (optional)
DEFAULT_A_SHARE_PCT = os.environ.get("SPLITBUDDY_DEFAULT_A_SHARE_PCT", 66.6667) # <-- you pay 2/3 by default
WEBAPP_PORT = os.environ.get("SPLITBUDDY_WEBAPP_PORT", 42069)
2 months ago
# ----- DB helpers -----
2 months ago
def get_db() -> sqlite3.Connection:
if "db" not in g:
g.db = sqlite3.connect(DB_PATH)
g.db.row_factory = sqlite3.Row
return g.db
@app.teardown_appcontext
def close_db(_=None):
db = g.pop("db", None)
if db is not None:
db.close()
def init_db():
db = get_db()
2 months ago
# entries
2 months ago
db.execute("""
2 months ago
CREATE TABLE IF NOT EXISTS entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
2 months ago
kind TEXT NOT NULL DEFAULT 'bill', -- 'bill' | 'transfer'
2 months ago
total REAL NOT NULL DEFAULT 0,
payer TEXT NOT NULL DEFAULT 'A', -- 'A' or 'B'
a_share REAL NOT NULL DEFAULT 0.666667, -- your share (0..1) for bills
2 months ago
method TEXT NOT NULL DEFAULT 'cash',
note TEXT
2 months ago
)
2 months ago
""")
2 months ago
# audits
db.execute("""
CREATE TABLE IF NOT EXISTS audits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL, -- UTC ISO timestamp
action TEXT NOT NULL, -- 'add' | 'edit' | 'delete'
entry_id INTEGER, -- affected entry id (may be null for failures)
actor TEXT, -- who (env/config or future auth)
ip TEXT,
user_agent TEXT,
device TEXT, -- parsed (macOS, Windows, iPhone, Android, etc.)
old_row TEXT, -- JSON of previous row (if any)
new_row TEXT -- JSON of new row (if any)
)
""")
# (best-effort) migrations, ignore if exists
for ddl in [
"ALTER TABLE entries ADD COLUMN kind TEXT NOT NULL DEFAULT 'bill'",
"ALTER TABLE entries ADD COLUMN total REAL",
"ALTER TABLE entries ADD COLUMN payer TEXT",
"ALTER TABLE entries ADD COLUMN a_share REAL",
2 months ago
]:
try:
db.execute(ddl)
except sqlite3.OperationalError:
pass
2 months ago
2 months ago
db.execute("""
UPDATE entries
2 months ago
SET kind = COALESCE(kind, 'bill'),
total = COALESCE(total, 0),
payer = COALESCE(payer, 'A'),
a_share= COALESCE(a_share, 0.666667)
2 months ago
""")
2 months ago
db.commit()
# ----- Utility & models -----
2 months ago
def _now_local_iso_min() -> str:
2 months ago
return dt.datetime.now().replace(second=0, microsecond=0).isoformat(timespec="minutes")
2 months ago
2 months ago
def _now_utc_iso() -> str:
return dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def _client_ip() -> str:
# works behind simple proxy too
xff = request.headers.get("X-Forwarded-For")
return xff.split(",")[0].strip() if xff else (request.remote_addr or "")
def _guess_device(ua: str) -> str:
u = (ua or "").lower()
if "iphone" in u: return "iPhone"
if "ipad" in u: return "iPad"
if "android" in u: return "Android"
if "mac os x" in u or "macintosh" in u: return "macOS"
if "windows" in u: return "Windows"
if "linux" in u and "android" not in u: return "Linux"
return "Unknown"
2 months ago
class ByMethod:
def __init__(self, cash=0.0, transfer=0.0, other=0.0):
2 months ago
self.cash = cash; self.transfer = transfer; self.other = other
2 months ago
class Summary:
def __init__(self, total: float, count: int, latest: Optional[str], by_method: ByMethod):
2 months ago
self.total = total; self.count = count; self.latest = latest; self.by_method = by_method
2 months ago
def _delta_for_entry(kind: str, total: float, payer: str, a_share: float) -> float:
2 months ago
"""
Positive => YOU owe Idan. Negative => Idan owes YOU.
- bill: delta = (your_share * total) - (total if YOU paid else 0)
2 months ago
- transfer: delta = -total if YOU sent; +total if Idan sent
2 months ago
"""
2 months ago
if kind == "transfer":
return -total if payer == "A" else total
2 months ago
paid_by_a = total if payer == "A" else 0.0
return a_share * total - paid_by_a
2 months ago
2 months ago
def _row_to_dict(row: sqlite3.Row | Dict[str, Any] | None) -> Dict[str, Any] | None:
if row is None: return None
return dict(row)
def _audit(action: str, entry_id: Optional[int], old_row: Optional[dict], new_row: Optional[dict]):
db = get_db()
ua = request.headers.get("User-Agent", "")
db.execute("""
INSERT INTO audits (ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
_now_utc_iso(),
action,
entry_id,
DEFAULT_ACTOR,
_client_ip(),
ua,
_guess_device(ua),
json.dumps(old_row, ensure_ascii=False) if old_row is not None else None,
json.dumps(new_row, ensure_ascii=False) if new_row is not None else None,
))
db.commit()
# ----- Routes -----
2 months ago
@app.before_request
def _ensure_db():
init_db()
@app.get("/")
def index():
db = get_db()
2 months ago
rows = db.execute("""
2 months ago
SELECT id, created_at, kind, total, payer, a_share, method, note
2 months ago
FROM entries
ORDER BY datetime(created_at) DESC, id DESC
""").fetchall()
2 months ago
entries = []
for r in rows:
e = dict(r)
2 months ago
e["delta"] = _delta_for_entry(e["kind"], e["total"], e["payer"], e["a_share"])
2 months ago
entries.append(e)
2 months ago
2 months ago
total_balance = sum(e["delta"] for e in entries) if entries else 0.0
2 months ago
latest = entries[0]["created_at"] if entries else None
bm = ByMethod(
2 months ago
cash=sum(e["delta"] for e in entries if e["method"] == "cash"),
transfer=sum(e["delta"] for e in entries if e["method"] == "transfer"),
other=sum(e["delta"] for e in entries if e["method"] == "other"),
2 months ago
)
2 months ago
summary = Summary(total=total_balance, count=len(entries), latest=latest, by_method=bm)
2 months ago
return render_template(
"index.html",
2 months ago
entries=entries,
summary=summary,
A=PERSON_A, B=PERSON_B,
currency=CURRENCY,
now_local=_now_local_iso_min(),
default_a_share_pct=DEFAULT_A_SHARE_PCT,
2 months ago
)
@app.post("/add")
def add():
2 months ago
kind = (request.form.get("kind") or "bill").strip().lower()
payer = (request.form.get("payer") or "A").strip().upper()
2 months ago
total = request.form.get("total", type=float)
a_share_pct = request.form.get("a_share_pct", type=float)
method = (request.form.get("method") or "cash").strip().lower()
note = (request.form.get("note") or "").strip()
2 months ago
created_at = request.form.get("created_at") or _now_local_iso_min()
2 months ago
if total is None or total < 0:
return redirect(url_for("index"))
2 months ago
if payer not in ("A","B"): payer = "A"
if kind not in ("bill","transfer"): kind = "bill"
if kind == "bill":
if a_share_pct is None:
a_share_pct = DEFAULT_A_SHARE_PCT
2 months ago
a_share = max(0.0, min(100.0, a_share_pct)) / 100.0
else:
a_share = DEFAULT_A_SHARE_PCT / 100.0 # stored but ignored for transfers
2 months ago
db = get_db()
2 months ago
cur = db.execute(
2 months ago
"INSERT INTO entries (created_at, kind, total, payer, a_share, method, note) VALUES (?, ?, ?, ?, ?, ?, ?)",
(created_at, kind, total, payer, a_share, method, note),
2 months ago
)
db.commit()
2 months ago
new_id = cur.lastrowid
new_row = _row_to_dict(db.execute("SELECT * FROM entries WHERE id = ?", (new_id,)).fetchone())
_audit("add", new_id, old_row=None, new_row=new_row)
2 months ago
return redirect(url_for("index"))
@app.post("/delete/<int:entry_id>")
def delete(entry_id: int):
db = get_db()
2 months ago
old = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone()
2 months ago
db.execute("DELETE FROM entries WHERE id = ?", (entry_id,))
db.commit()
2 months ago
_audit("delete", entry_id, old_row=_row_to_dict(old), new_row=None)
return redirect(url_for("index"))
# Minimal edit endpoint (POST-only). Accepts same fields as /add.
@app.post("/edit/<int:entry_id>")
def edit(entry_id: int):
db = get_db()
old = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone()
if not old:
_audit("edit", entry_id, old_row=None, new_row=None)
return redirect(url_for("index"))
kind = (request.form.get("kind") or old["kind"]).strip().lower()
payer = (request.form.get("payer") or old["payer"]).strip().upper()
total = request.form.get("total", type=float)
a_share_pct = request.form.get("a_share_pct", type=float)
method = (request.form.get("method") or old["method"]).strip().lower()
note = (request.form.get("note") or old["note"] or "").strip()
created_at = request.form.get("created_at") or old["created_at"]
if total is None:
total = float(old["total"])
if payer not in ("A","B"):
payer = old["payer"]
if kind not in ("bill","transfer"):
kind = old["kind"]
if kind == "bill":
if a_share_pct is None:
a_share = float(old["a_share"])
else:
a_share = max(0.0, min(100.0, a_share_pct)) / 100.0
else:
a_share = float(old["a_share"]) # ignored but stored
db.execute("""
UPDATE entries
SET created_at = ?, kind = ?, total = ?, payer = ?, a_share = ?, method = ?, note = ?
WHERE id = ?
""", (created_at, kind, total, payer, a_share, method, note, entry_id))
db.commit()
new = db.execute("SELECT * FROM entries WHERE id = ?", (entry_id,)).fetchone()
_audit("edit", entry_id, old_row=_row_to_dict(old), new_row=_row_to_dict(new))
2 months ago
return redirect(url_for("index"))
@app.get("/export.csv")
def export_csv():
db = get_db()
2 months ago
rows = db.execute("""
2 months ago
SELECT id, created_at, kind, total, payer, a_share, method, note
2 months ago
FROM entries
ORDER BY datetime(created_at) DESC, id DESC
""").fetchall()
2 months ago
buff = io.StringIO()
2 months ago
w = csv.writer(buff)
2 months ago
w.writerow(["id","created_at","kind","payer","a_share_pct","total","method","note","delta"])
2 months ago
for r in rows:
2 months ago
a_share_pct = float(r["a_share"]) * 100.0
2 months ago
delta = _delta_for_entry(r["kind"], r["total"], r["payer"], r["a_share"])
w.writerow([r["id"], r["created_at"], r["kind"], r["payer"], f"{a_share_pct:.2f}",
f"{r['total']:.2f}", r["method"], r["note"] or "", f"{delta:.2f}"])
2 months ago
buff.seek(0)
2 months ago
return send_file(io.BytesIO(buff.read().encode("utf-8")), mimetype="text/csv",
as_attachment=True, download_name="splitbuddy_export.csv")
2 months ago
2 months ago
# ----- Audit viewers -----
@app.get("/audit")
def audit_html():
db = get_db()
2 months ago
# --- filters from query params ---
entry_id = request.args.get("entry_id", type=int)
action = request.args.get("action", type=str) # add|edit|delete
actor = request.args.get("actor", type=str)
device = request.args.get("device", type=str)
q = request.args.get("q", type=str) # substring search in old/new JSON
page = max(1, request.args.get("page", default=1, type=int))
per_page = min(200, request.args.get("per_page", default=50, type=int))
offset = (page - 1) * per_page
# --- build WHERE clause dynamically ---
wh, params = [], []
if entry_id is not None:
wh.append("entry_id = ?"); params.append(entry_id)
if action:
wh.append("action = ?"); params.append(action)
if actor:
wh.append("actor LIKE ?"); params.append(f"%{actor}%")
if device:
wh.append("device = ?"); params.append(device)
if q:
wh.append("(COALESCE(old_row,'') LIKE ? OR COALESCE(new_row,'') LIKE ? OR COALESCE(user_agent,'') LIKE ?)")
params.extend([f"%{q}%", f"%{q}%", f"%{q}%"])
where_sql = ("WHERE " + " AND ".join(wh)) if wh else ""
# total count for pagination
total = db.execute(f"SELECT COUNT(*) FROM audits {where_sql}", params).fetchone()[0]
rows = db.execute(f"""
2 months ago
SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row
FROM audits
2 months ago
{where_sql}
2 months ago
ORDER BY datetime(ts) DESC, id DESC
2 months ago
LIMIT ? OFFSET ?
""", (*params, per_page, offset)).fetchall()
# pretty JSON (server-side) to avoid giant lines
def pretty(s):
if not s: return ""
try:
obj = json.loads(s)
return json.dumps(obj, ensure_ascii=False, indent=2)
except Exception:
return s
# enrich rows for template
data = []
2 months ago
for r in rows:
2 months ago
d = dict(r)
d["old_pretty"] = pretty(d.get("old_row"))
d["new_pretty"] = pretty(d.get("new_row"))
data.append(d)
# distinct devices/actions for filter dropdowns
devices = [r[0] for r in db.execute("SELECT DISTINCT device FROM audits WHERE device IS NOT NULL ORDER BY device").fetchall()]
actions = ["add", "edit", "delete"]
return render_template(
"audit.html",
rows=data,
total=total,
page=page,
per_page=per_page,
pages=max(1, (total + per_page - 1) // per_page),
# current filters
f_entry_id=entry_id, f_action=action, f_actor=actor, f_device=device, f_q=q,
devices=devices, actions=actions
)
2 months ago
@app.get("/audit.csv")
def audit_csv():
db = get_db()
rows = db.execute("""
SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row
FROM audits
ORDER BY datetime(ts) DESC, id DESC
""").fetchall()
buff = io.StringIO()
w = csv.writer(buff)
w.writerow(["id","ts","action","entry_id","actor","ip","device","user_agent","old_row","new_row"])
for r in rows:
w.writerow([r["id"], r["ts"], r["action"], r["entry_id"], r["actor"], r["ip"], r["device"], r["user_agent"], r["old_row"] or "", r["new_row"] or ""])
buff.seek(0)
return send_file(io.BytesIO(buff.read().encode("utf-8")), mimetype="text/csv",
as_attachment=True, download_name="splitbuddy_audit.csv")
@app.get("/api/audit")
def audit_api():
db = get_db()
rows = db.execute("""
SELECT id, ts, action, entry_id, actor, ip, user_agent, device, old_row, new_row
FROM audits
ORDER BY datetime(ts) DESC, id DESC
LIMIT 200
""").fetchall()
data = [dict(r) for r in rows]
return jsonify(data)
# ----- Main -----
2 months ago
if __name__ == "__main__":
os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True)
with app.app_context():
init_db()
app.run(debug=True, port=WEBAPP_PORT)