model.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. from dataclasses import dataclass
  2. import datetime
  3. from passlib.apps import custom_app_context as pwd
  4. import peewee
  5. import playhouse.shortcuts
  6. from typing import List, Optional, Tuple
  7. import lc.config as c
  8. import lc.error as e
  9. import lc.request as r
  10. class Model(peewee.Model):
  11. class Meta:
  12. database = c.db
  13. def to_dict(self) -> dict:
  14. return playhouse.shortcuts.model_to_dict(self)
  15. @dataclass
  16. class Pagination:
  17. current: int
  18. last: int
  19. def previous(self) -> Optional[dict]:
  20. if self.current > 1:
  21. return {"page": self.current - 1}
  22. return None
  23. def next(self) -> Optional[dict]:
  24. if self.current < self.last:
  25. return {"page": self.current + 1}
  26. return None
  27. @classmethod
  28. def from_total(cls, current, total) -> "Pagination":
  29. return cls(current=current, last=((total - 1) // c.per_page) + 1,)
  30. class User(Model):
  31. """
  32. A user! you know tf this is about
  33. """
  34. name = peewee.TextField(unique=True)
  35. passhash = peewee.TextField()
  36. is_admin = peewee.BooleanField(default=False)
  37. @staticmethod
  38. def from_request(user: r.User) -> "User":
  39. passhash = pwd.hash(user.password)
  40. try:
  41. return User.create(name=user.name, passhash=passhash,)
  42. except peewee.IntegrityError:
  43. raise e.UserExists(name=user.name)
  44. @staticmethod
  45. def from_invite(user: r.User, token: str) -> "User":
  46. invite = UserInvite.by_code(token)
  47. if invite.claimed_by is not None or invite.claimed_at is not None:
  48. raise e.AlreadyUsedInvite(invite=token)
  49. u = User.from_request(user)
  50. invite.claimed_at = datetime.datetime.now()
  51. invite.claimed_by = u
  52. invite.save()
  53. return u
  54. def authenticate(self, password: str) -> bool:
  55. return pwd.verify(password, self.passhash)
  56. def set_as_admin(self):
  57. self.is_admin = True
  58. self.save()
  59. @staticmethod
  60. def login(user: r.User) -> Tuple["User", str]:
  61. u = User.by_slug(user.name)
  62. if not u.authenticate(user.password):
  63. raise e.BadPassword(name=user.name)
  64. return u, user.to_token()
  65. @staticmethod
  66. def by_slug(slug: str) -> "User":
  67. u = User.get_or_none(name=slug)
  68. if u is None:
  69. raise e.NoSuchUser(name=slug)
  70. return u
  71. def base_url(self) -> str:
  72. return f"/u/{self.name}"
  73. def get_links(self, as_user: r.User, page: int) -> Tuple[List["Link"], Pagination]:
  74. links = (
  75. Link.select()
  76. .where((Link.user == self) & ((self == as_user) | (Link.private == False)))
  77. .order_by(-Link.created)
  78. .paginate(page, c.per_page)
  79. )
  80. pagination = Pagination.from_total(page, Link.select().count())
  81. return links, pagination
  82. def get_link(self, link_id: int) -> "Link":
  83. return Link.get((Link.user == self) & (Link.id == link_id))
  84. def get_tag(self, tag_name: str) -> "Tag":
  85. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  86. def to_dict(self) -> dict:
  87. return {"id": self.id, "name": self.name}
  88. def get_config(self) -> dict:
  89. admin_pane = None
  90. if self.is_admin:
  91. user_invites = [
  92. {
  93. "claimed": ui.claimed_by is not None,
  94. "claimant": ui.claimed_by and ui.claimed_by.name,
  95. "token": ui.token,
  96. }
  97. for ui in UserInvite.select().where(UserInvite.created_by == self)
  98. ]
  99. admin_pane = {"invites": user_invites}
  100. return {
  101. "username": self.name,
  102. "admin_pane": admin_pane,
  103. }
  104. class Link(Model):
  105. """
  106. A link as stored in the database
  107. """
  108. url = peewee.TextField()
  109. name = peewee.TextField()
  110. description = peewee.TextField()
  111. # TODO: do we need to track modified time?
  112. created = peewee.DateTimeField()
  113. # is the field entirely private?
  114. private = peewee.BooleanField()
  115. # owned by
  116. user = peewee.ForeignKeyField(User, backref="links")
  117. def link_url(self) -> str:
  118. return f"/u/{self.user.name}/l/{self.id}"
  119. @staticmethod
  120. def by_id(id: int) -> Optional["Link"]:
  121. return Link.get_or_none(id=id)
  122. @staticmethod
  123. def from_request(user: User, link: r.Link) -> "Link":
  124. l = Link.create(
  125. url=link.url,
  126. name=link.name,
  127. description=link.description,
  128. private=link.private,
  129. created=link.created or datetime.datetime.now(),
  130. user=user,
  131. )
  132. for tag_name in link.tags:
  133. t = Tag.get_or_create_tag(user, tag_name)
  134. HasTag.create(
  135. link=l, tag=t,
  136. )
  137. return l
  138. def update_from_request(self, link: r.Link):
  139. self.url = link.url
  140. self.name = link.name
  141. self.description = link.description
  142. self.private = link.private
  143. self.tags = link.tags
  144. self.save()
  145. class Tag(Model):
  146. """
  147. A tag. This just indicates that a user has used this tag at some point.
  148. """
  149. name = peewee.TextField()
  150. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  151. user = peewee.ForeignKeyField(User, backref="tags")
  152. def url(self) -> str:
  153. return f"/u/{self.user.name}/t/{self.name}"
  154. def get_links(self, as_user: r.User, page: int) -> Tuple[List[Link], Pagination]:
  155. links = [
  156. ht.link
  157. for ht in HasTag.select()
  158. .join(Link)
  159. .where(
  160. (HasTag.tag == self)
  161. & ((HasTag.link.user == as_user) | (HasTag.link.private == False))
  162. )
  163. .order_by(-Link.created)
  164. .paginate(page, c.per_page)
  165. ]
  166. pagination = Pagination.from_total(
  167. page, HasTag.select().where((HasTag.tag == self)).count(),
  168. )
  169. return links, pagination
  170. @staticmethod
  171. def get_or_create_tag(user: User, tag_name: str):
  172. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  173. return t
  174. parent = None
  175. if "/" in tag_name:
  176. parent_name = tag_name[: tag_name.rindex("/")]
  177. parent = Tag.get_or_create_tag(user, parent_name)
  178. return Tag.create(name=tag_name, parent=parent, user=user)
  179. class HasTag(Model):
  180. """
  181. Establishes that a link is tagged with a given tag.
  182. """
  183. link = peewee.ForeignKeyField(Link, backref="tags")
  184. tag = peewee.ForeignKeyField(Tag, backref="models")
  185. class UserInvite(Model):
  186. token = peewee.TextField(unique=True)
  187. created_by = peewee.ForeignKeyField(User, backref="invites")
  188. created_at = peewee.DateTimeField()
  189. claimed_by = peewee.ForeignKeyField(User, null=True)
  190. claimed_at = peewee.DateTimeField(null=True)
  191. @staticmethod
  192. def by_code(token: str) -> "UserInvite":
  193. if (u := UserInvite.get_or_none(token=token)) :
  194. return u
  195. raise e.NoSuchInvite(invite=token)
  196. @staticmethod
  197. def manufacture(creator: User) -> "UserInvite":
  198. now = datetime.datetime.now()
  199. token = c.serializer.dumps(
  200. {"created_at": now.timestamp(), "created_by": creator.name,}
  201. )
  202. return UserInvite.create(
  203. token=token,
  204. created_by=creator,
  205. created_at=now,
  206. claimed_by=None,
  207. claimed_at=None,
  208. )
  209. MODELS = [
  210. User,
  211. Link,
  212. Tag,
  213. HasTag,
  214. UserInvite,
  215. ]
  216. def create_tables():
  217. c.db.create_tables(MODELS, safe=True)