contacts_model.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import json
  2. from operator import attrgetter
  3. import time
  4. from threading import Thread
  5. from random import random
  6. # ========================================================
  7. # Contact Model
  8. # ========================================================
  9. PAGE_SIZE = 100
  10. class Contact:
  11. # mock contacts database
  12. db = {}
  13. def __init__(self, id_=None, first=None, last=None, phone=None, email=None):
  14. self.id = id_
  15. self.first = first
  16. self.last = last
  17. self.phone = phone
  18. self.email = email
  19. self.errors = {}
  20. def __str__(self):
  21. return json.dumps(self.__dict__, ensure_ascii=False)
  22. def update(self, first, last, phone, email):
  23. self.first = first
  24. self.last = last
  25. self.phone = phone
  26. self.email = email
  27. def validate(self):
  28. if not self.email:
  29. self.errors['email'] = "Email Required"
  30. existing_contact = next(filter(lambda c: c.id != self.id and c.email == self.email, Contact.db.values()), None)
  31. if existing_contact:
  32. self.errors['email'] = "Email Must Be Unique"
  33. return len(self.errors) == 0
  34. def save(self):
  35. if not self.validate():
  36. return False
  37. if self.id is None:
  38. if len(Contact.db) == 0:
  39. max_id = 1
  40. else:
  41. max_id = max(contact.id for contact in Contact.db.values())
  42. self.id = max_id + 1
  43. Contact.db[self.id] = self
  44. Contact.save_db()
  45. return True
  46. def delete(self):
  47. del Contact.db[self.id]
  48. Contact.save_db()
  49. @classmethod
  50. def count(cls):
  51. time.sleep(2)
  52. return len(cls.db)
  53. @classmethod
  54. def all(cls, page=1):
  55. page = int(page)
  56. start = (page - 1) * PAGE_SIZE
  57. end = start + PAGE_SIZE
  58. return list(cls.db.values())[start:end]
  59. @classmethod
  60. def search(cls, text):
  61. result = []
  62. for c in cls.db.values():
  63. match_first = c.first is not None and text in c.first
  64. match_last = c.last is not None and text in c.last
  65. match_email = c.email is not None and text in c.email
  66. match_phone = c.phone is not None and text in c.phone
  67. if match_first or match_last or match_email or match_phone:
  68. result.append(c)
  69. return result
  70. @classmethod
  71. def load_db(cls):
  72. with open('contacts.json', 'r') as contacts_file:
  73. contacts = json.load(contacts_file)
  74. cls.db.clear()
  75. for c in contacts:
  76. cls.db[c['id']] = Contact(c['id'], c['first'], c['last'], c['phone'], c['email'])
  77. @staticmethod
  78. def save_db():
  79. out_arr = [c.__dict__ for c in Contact.db.values()]
  80. with open("contacts.json", "w") as f:
  81. json.dump(out_arr, f, indent=2)
  82. @classmethod
  83. def find(cls, id_):
  84. id_ = int(id_)
  85. c = cls.db.get(id_)
  86. if c is not None:
  87. c.errors = {}
  88. return c
  89. class Archiver:
  90. archive_status = "Waiting"
  91. archive_progress = 0
  92. thread = None
  93. def status(self):
  94. return Archiver.archive_status
  95. def progress(self):
  96. return Archiver.archive_progress
  97. def run(self):
  98. if Archiver.archive_status == "Waiting":
  99. Archiver.archive_status = "Running"
  100. Archiver.archive_progress = 0
  101. Archiver.thread = Thread(target=self.run_impl)
  102. Archiver.thread.start()
  103. def run_impl(self):
  104. for i in range(10):
  105. time.sleep(1 * random())
  106. if Archiver.archive_status != "Running":
  107. return
  108. Archiver.archive_progress = (i + 1) / 10
  109. print("Here... " + str(Archiver.archive_progress))
  110. time.sleep(1)
  111. if Archiver.archive_status != "Running":
  112. return
  113. Archiver.archive_status = "Complete"
  114. def archive_file(self):
  115. return "contacts.json"
  116. def reset(self):
  117. Archiver.archive_status = "Waiting"
  118. @classmethod
  119. def get(cls):
  120. return Archiver()