@@ -0,0 +1,145 @@
+import json
+from operator import attrgetter
+import time
+from threading import Thread
+from random import random
+# ========================================================
+# Contact Model
+# ========================================================
+PAGE_SIZE = 100
+class Contact:
+ # mock contacts database
+ db = {}
+ def __init__(self, id_=None, first=None, last=None, phone=None, email=None):
+ self.id = id_
+ self.first = first
+ self.last = last
+ self.phone = phone
+ self.email = email
+ self.errors = {}
+ def __str__(self):
+ return json.dumps(self.__dict__, ensure_ascii=False)
+ def update(self, first, last, phone, email):
+ self.first = first
+ self.last = last
+ self.phone = phone
+ self.email = email
+ def validate(self):
+ if not self.email:
+ self.errors['email'] = "Email Required"
+ existing_contact = next(filter(lambda c: c.id != self.id and c.email == self.email, Contact.db.values()), None)
+ if existing_contact:
+ self.errors['email'] = "Email Must Be Unique"
+ return len(self.errors) == 0
+ def save(self):
+ if not self.validate():
+ return False
+ if self.id is None:
+ if len(Contact.db) == 0:
+ max_id = 1
+ else:
+ max_id = max(contact.id for contact in Contact.db.values())
+ self.id = max_id + 1
+ Contact.db[self.id] = self
+ Contact.save_db()
+ return True
+ def delete(self):
+ del Contact.db[self.id]
+ Contact.save_db()
+ @classmethod
+ def count(cls):
+ time.sleep(2)
+ return len(cls.db)
+ @classmethod
+ def all(cls, page=1):
+ page = int(page)
+ start = (page - 1) * PAGE_SIZE
+ end = start + PAGE_SIZE
+ return list(cls.db.values())[start:end]
+ @classmethod
+ def search(cls, text):
+ result = []
+ for c in cls.db.values():
+ match_first = c.first is not None and text in c.first
+ match_last = c.last is not None and text in c.last
+ match_email = c.email is not None and text in c.email
+ match_phone = c.phone is not None and text in c.phone
+ if match_first or match_last or match_email or match_phone:
+ result.append(c)
+ return result
+ @classmethod
+ def load_db(cls):
+ with open('contacts.json', 'r') as contacts_file:
+ contacts = json.load(contacts_file)
+ cls.db.clear()
+ for c in contacts:
+ cls.db[c['id']] = Contact(c['id'], c['first'], c['last'], c['phone'], c['email'])
+ @staticmethod
+ def save_db():
+ out_arr = [c.__dict__ for c in Contact.db.values()]
+ with open("contacts.json", "w") as f:
+ json.dump(out_arr, f, indent=2)
+ @classmethod
+ def find(cls, id_):
+ id_ = int(id_)
+ c = cls.db.get(id_)
+ if c is not None:
+ c.errors = {}
+ return c
+class Archiver:
+ archive_status = "Waiting"
+ archive_progress = 0
+ thread = None
+ def status(self):
+ return Archiver.archive_status
+ def progress(self):
+ return Archiver.archive_progress
+ def run(self):
+ if Archiver.archive_status == "Waiting":
+ Archiver.archive_status = "Running"
+ Archiver.archive_progress = 0
+ Archiver.thread = Thread(target=self.run_impl)
+ Archiver.thread.start()
+ def run_impl(self):
+ for i in range(10):
+ time.sleep(1 * random())
+ if Archiver.archive_status != "Running":
+ return
+ Archiver.archive_progress = (i + 1) / 10
+ print("Here... " + str(Archiver.archive_progress))
+ time.sleep(1)
+ if Archiver.archive_status != "Running":
+ return
+ Archiver.archive_status = "Complete"
+ def archive_file(self):
+ return "contacts.json"
+ def reset(self):
+ Archiver.archive_status = "Waiting"
+ @classmethod
+ def get(cls):
+ return Archiver()