|
1 #!/usr/bin/env python3 |
|
2 |
|
3 import json |
|
4 from decimal import Decimal |
|
5 from datetime import datetime |
|
6 from collections import OrderedDict, defaultdict |
|
7 from operator import itemgetter |
|
8 |
|
9 class Transaction: |
|
10 description = "" |
|
11 amount = Decimal(0) |
|
12 |
|
13 def __init__(self, **args): |
|
14 self.budget = defaultdict(Decimal) |
|
15 self.expenses = defaultdict(Decimal) |
|
16 self.date = datetime.utcnow().replace(microsecond = 0) |
|
17 balance = None |
|
18 for arg, val in args.items(): |
|
19 if arg == "splits": |
|
20 self.add_splits(val) |
|
21 elif arg == "budget": |
|
22 for env, amount in val: |
|
23 self.add_budget(env, amount) |
|
24 elif arg == "expenses": |
|
25 for env, amount in val: |
|
26 self.add_expense(env, amount) |
|
27 elif arg in ("balance", "envelop"): |
|
28 balance = val |
|
29 else: |
|
30 setattr(self, arg, val) |
|
31 if balance is not None: |
|
32 self.rebalance(balance) |
|
33 |
|
34 def add_budget(self, envelop, amount): |
|
35 assert(amount > 0) |
|
36 self.budget[envelop] += amount |
|
37 |
|
38 def add_expense(self, envelop, amount): |
|
39 assert(amount > 0) |
|
40 self.expenses[envelop] += amount |
|
41 |
|
42 def add_splits(self, *splits): |
|
43 for env, amount in splits: |
|
44 if amount < 0: |
|
45 self.add_expense(env, -amount) |
|
46 else: |
|
47 self.add_budget(env, amount) |
|
48 |
|
49 def rebalance(self, envelop): |
|
50 b = self.balance |
|
51 if b < 0: |
|
52 self.add_expense(envelop, -b) |
|
53 else: |
|
54 self.add_budget(envelop, b) |
|
55 assert(self.balance == 0) |
|
56 |
|
57 @property |
|
58 def balance(self): |
|
59 return self.amount \ |
|
60 - sum(v for v in self.budget.values()) \ |
|
61 + sum(v for v in self.expenses.values()) |
|
62 |
|
63 @property |
|
64 def splits(self): |
|
65 for env, amount in self.budget.items(): |
|
66 yield (env, amount) |
|
67 for env, amount in self.expenses.items(): |
|
68 yield (env, -amount) |
|
69 |
|
70 def rename_envelop(self, oldname, newname): |
|
71 if oldname in self.budget: |
|
72 self.budget[newname] += self.budget[oldname] |
|
73 del self.budget[oldname] |
|
74 if oldname in self.expenses: |
|
75 self.expenses[newname] += self.expenses[oldname] |
|
76 del self.expenses[oldname] |
|
77 |
|
78 def as_dict(self): |
|
79 def serialize(values): |
|
80 for e, v in sorted(values.items(), key = itemgetter(0)): |
|
81 d = OrderedDict.fromkeys(["envelop", "value"]) |
|
82 d.update(envelop = e, value = float(v)) |
|
83 yield d |
|
84 d = OrderedDict.fromkeys(["description", "date", "amount", "budget", "expenses"]) |
|
85 d.update( |
|
86 description = self.description, |
|
87 date = self.date.isoformat(), |
|
88 amount = float(self.amount), |
|
89 budget = list(serialize(self.budget)), |
|
90 expenses = list(serialize(self.expenses))) |
|
91 return d |
|
92 |
|
93 @classmethod |
|
94 def from_dict(cls, d): |
|
95 try: |
|
96 date = datetime.strptime("%Y-%m-%dT%H:%M:%S", d["date"]) |
|
97 except: |
|
98 date = datetime.utcnow().replace(microsecond = 0) |
|
99 return Transaction( |
|
100 description = d.get("description", ""), |
|
101 date = date, |
|
102 amount = Decimal(d.get("amount", Decimal(0))), |
|
103 budget = ((v["envelop"], v["value"]) for v in d.get("budget", [])), |
|
104 expenses = ((v["envelop"], v["value"]) for v in d.get("expenses", []))) |
|
105 |
|
106 class Envelop: |
|
107 name = "" |
|
108 description = "" |
|
109 budget = Decimal(0) |
|
110 expenses = Decimal(0) |
|
111 |
|
112 def __init__(self, name = "", description = ""): |
|
113 self.name = name |
|
114 self.description = description |
|
115 |
|
116 @property |
|
117 def balance(self): |
|
118 return self.budget - self.expenses |
|
119 |
|
120 def update(self, amount): |
|
121 if (amount < 0): |
|
122 self.expenses -= amount |
|
123 else: |
|
124 self.budget += amount |
|
125 |
|
126 class Budget: |
|
127 name = "Budget" |
|
128 date = datetime.utcnow().replace(microsecond = 0) |
|
129 |
|
130 def __init__(self): |
|
131 self.metaenvelops = defaultdict(str) |
|
132 self.transactions = [] |
|
133 |
|
134 def load(self, fp): |
|
135 d = json.load(fp, parse_float = Decimal) |
|
136 self.metaenvelops = dict((e["name"], e["description"]) for e in d.get("envelops",[])) |
|
137 self.transactions = [] |
|
138 add = self.add_transaction |
|
139 for t in d.get("transactions",[]): add(Transaction.from_dict(t)) |
|
140 |
|
141 def save(self, fp, **args): |
|
142 d = OrderedDict.fromkeys(["name", "date", "envelops", "transactions"]) |
|
143 envelops = list() |
|
144 for name, desc in sorted(self.metaenvelops.items(), key = itemgetter(0)): |
|
145 if name != "": |
|
146 env = OrderedDict.fromkeys(["name", "description"]) |
|
147 env.update(name = name, description = desc) |
|
148 envelops.append(env) |
|
149 d.update(name = self.name, |
|
150 date = self.date.isoformat(), |
|
151 envelops = envelops, |
|
152 transactions = list(t.as_dict() for t in self.transactions)) |
|
153 json.dump(d, fp, allow_nan=False, **args) |
|
154 |
|
155 def add_transaction(self, t): |
|
156 for e in (s[0] for s in t.splits): |
|
157 if e not in self.metaenvelops: |
|
158 self.metaenvelops[e] = "" |
|
159 self.transactions.append(t) |
|
160 return t |
|
161 |
|
162 def set_envelop_description(self, name, description): |
|
163 self.metaenvelops[name] = description |
|
164 |
|
165 def rename_envelop(self, oldname, newname): |
|
166 for t in self.transactions: |
|
167 t.rename_envelop(oldname, newname) |
|
168 if oldname in self.metaenvelops: |
|
169 self.metaenvelops.setdefault(newname, self.metaenvelops[oldname]) |
|
170 del self.metaenvelops[oldname] |
|
171 |
|
172 @property |
|
173 def envelops(self): |
|
174 total = Envelop("Total") |
|
175 unassigned = Envelop("Unassigned", "Unassigned balance") |
|
176 envelops = defaultdict(Envelop) |
|
177 for t in self.transactions: |
|
178 for name, value in t.splits: |
|
179 assert(name) |
|
180 envelops[name].update(value) |
|
181 unassigned.update(t.balance) |
|
182 for name, env in envelops.items(): |
|
183 env.name = name |
|
184 env.description = self.metaenvelops.get(name,"") |
|
185 total.budget += env.budget |
|
186 total.expenses += env.expenses |
|
187 yield env |
|
188 yield total |
|
189 yield unassigned |
|
190 |
|
191 def report(self): |
|
192 for env in self.envelops: |
|
193 yield "| {:<20} | {:<50} | {:> 10.2f} | {:> 10.2f} | {:> 10.2f} |".format(env.name, env.description, env.budget, env.expenses, env.balance) |
|
194 |
|
195 def read_transaction(self, reader = input): |
|
196 name = reader("Description: ") |
|
197 amount = Decimal(reader("Amount: ")) |
|
198 t = Transaction(name = name, amount = amount) |
|
199 |
|
200 while t.balance != 0: |
|
201 envelop = reader("Balance: {:> 10.2f}\nEnvelop: ".format(t.balance)) |
|
202 if not envelop: |
|
203 break |
|
204 amount = reader("Amount:" ) |
|
205 if amount == "": |
|
206 t.rebalance(envelop) |
|
207 else: |
|
208 amount = Decimal(amount) |
|
209 t.add_expense(envelop, -amount) if amount < 0 else t.add_budget(envelop, amount) |
|
210 keep = reader("Do you want to keep the transaction [y/N]? ") |
|
211 if keep and keep[0].upper() == 'Y': |
|
212 return t |
|
213 return None |
|
214 |
|
215 |
|
216 def main(filename, action): |
|
217 import os |
|
218 import sys |
|
219 b = Budget() |
|
220 if action == "init": |
|
221 with open(filename, "w") as fp: |
|
222 b.save(fp) |
|
223 elif action == "report": |
|
224 with open(filename, "r") as fp: |
|
225 b.load(fp) |
|
226 for l in b.report(): |
|
227 print(l) |
|
228 elif action == "add": |
|
229 with open(filename, "r") as fp: |
|
230 b.load(fp) |
|
231 t = b.read_transaction() |
|
232 if t: |
|
233 b.add_transaction(t) |
|
234 os.rename(os.path.realpath(filename), os.path.realpath(filename) + ".bak") |
|
235 with open(filename, "w") as fp: |
|
236 b.save(fp) |
|
237 else: |
|
238 print("bad command line option") |
|
239 |
|
240 if __name__ == '__main__': |
|
241 import sys |
|
242 import os |
|
243 filename = sys.argv[1] |
|
244 action = sys.argv[2] |
|
245 main(filename, action) |