123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import json
- from operator import attrgetter
- import time
- from threading import Thread
- from random import random
- # ========================================================
- # Contact Model
- # ========================================================
- PAGE_SIZE = 100
- class Contact:
- # mock contacts database
- db = {}
- def __init__(self, id_=None, first=None, last=None, phone=None, email=None):
- self.id = id_
- self.first = first
- self.last = last
- self.phone = phone
- self.email = email
- self.errors = {}
- def __str__(self):
- return json.dumps(self.__dict__, ensure_ascii=False)
- def update(self, first, last, phone, email):
- self.first = first
- self.last = last
- self.phone = phone
- self.email = email
- def validate(self):
- if not self.email:
- self.errors['email'] = "Email Required"
- existing_contact = next(filter(lambda c: c.id != self.id and c.email == self.email, Contact.db.values()), None)
- if existing_contact:
- self.errors['email'] = "Email Must Be Unique"
- return len(self.errors) == 0
- def save(self):
- if not self.validate():
- return False
- if self.id is None:
- if len(Contact.db) == 0:
- max_id = 1
- else:
- max_id = max(contact.id for contact in Contact.db.values())
- self.id = max_id + 1
- Contact.db[self.id] = self
- Contact.save_db()
- return True
- def delete(self):
- del Contact.db[self.id]
- Contact.save_db()
- @classmethod
- def count(cls):
- time.sleep(2)
- return len(cls.db)
- @classmethod
- def all(cls, page=1):
- page = int(page)
- start = (page - 1) * PAGE_SIZE
- end = start + PAGE_SIZE
- return list(cls.db.values())[start:end]
- @classmethod
- def search(cls, text):
- result = []
- for c in cls.db.values():
- match_first = c.first is not None and text in c.first
- match_last = c.last is not None and text in c.last
- match_email = c.email is not None and text in c.email
- match_phone = c.phone is not None and text in c.phone
- if match_first or match_last or match_email or match_phone:
- result.append(c)
- return result
- @classmethod
- def load_db(cls):
- with open('contacts.json', 'r') as contacts_file:
- contacts = json.load(contacts_file)
- cls.db.clear()
- for c in contacts:
- cls.db[c['id']] = Contact(c['id'], c['first'], c['last'], c['phone'], c['email'])
- @staticmethod
- def save_db():
- out_arr = [c.__dict__ for c in Contact.db.values()]
- with open("contacts.json", "w") as f:
- json.dump(out_arr, f, indent=2)
- @classmethod
- def find(cls, id_):
- id_ = int(id_)
- c = cls.db.get(id_)
- if c is not None:
- c.errors = {}
- return c
- class Archiver:
- archive_status = "Waiting"
- archive_progress = 0
- thread = None
- def status(self):
- return Archiver.archive_status
- def progress(self):
- return Archiver.archive_progress
- def run(self):
- if Archiver.archive_status == "Waiting":
- Archiver.archive_status = "Running"
- Archiver.archive_progress = 0
- Archiver.thread = Thread(target=self.run_impl)
- Archiver.thread.start()
- def run_impl(self):
- for i in range(10):
- time.sleep(1 * random())
- if Archiver.archive_status != "Running":
- return
- Archiver.archive_progress = (i + 1) / 10
- print("Here... " + str(Archiver.archive_progress))
- time.sleep(1)
- if Archiver.archive_status != "Running":
- return
- Archiver.archive_status = "Complete"
- def archive_file(self):
- return "contacts.json"
- def reset(self):
- Archiver.archive_status = "Waiting"
- @classmethod
- def get(cls):
- return Archiver()
|