model.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. from dataclasses import dataclass
  2. import datetime
  3. from passlib.apps import custom_app_context as pwd
  4. import peewee
  5. import playhouse.shortcuts
  6. from typing import List, Optional, Tuple
  7. import lc.config as c
  8. import lc.error as e
  9. import lc.request as r
  10. import lc.view as v
  11. class Model(peewee.Model):
  12. class Meta:
  13. database = c.db
  14. def to_dict(self) -> dict:
  15. return playhouse.shortcuts.model_to_dict(self)
  16. @dataclass
  17. class Pagination:
  18. current: int
  19. last: int
  20. def previous(self) -> Optional[dict]:
  21. if self.current > 1:
  22. return {"page": self.current - 1}
  23. return None
  24. def next(self) -> Optional[dict]:
  25. if self.current < self.last:
  26. return {"page": self.current + 1}
  27. return None
  28. @classmethod
  29. def from_total(cls, current, total) -> "Pagination":
  30. return cls(current=current, last=((total - 1) // c.per_page) + 1,)
  31. class User(Model):
  32. """
  33. A user! you know tf this is about
  34. """
  35. name = peewee.TextField(unique=True)
  36. passhash = peewee.TextField()
  37. is_admin = peewee.BooleanField(default=False)
  38. @staticmethod
  39. def from_request(user: r.User) -> "User":
  40. passhash = pwd.hash(user.password)
  41. try:
  42. return User.create(name=user.name, passhash=passhash,)
  43. except peewee.IntegrityError:
  44. raise e.UserExists(name=user.name)
  45. @staticmethod
  46. def from_invite(user: r.User, token: str) -> "User":
  47. invite = UserInvite.by_code(token)
  48. if invite.claimed_by is not None or invite.claimed_at is not None:
  49. raise e.AlreadyUsedInvite(invite=token)
  50. u = User.from_request(user)
  51. invite.claimed_at = datetime.datetime.now()
  52. invite.claimed_by = u
  53. invite.save()
  54. return u
  55. def authenticate(self, password: str) -> bool:
  56. return pwd.verify(password, self.passhash)
  57. def set_as_admin(self):
  58. self.is_admin = True
  59. self.save()
  60. @staticmethod
  61. def login(user: r.User) -> Tuple["User", str]:
  62. u = User.by_slug(user.name)
  63. if not u.authenticate(user.password):
  64. raise e.BadPassword(name=user.name)
  65. return u, user.to_token()
  66. @staticmethod
  67. def by_slug(slug: str) -> "User":
  68. u = User.get_or_none(name=slug)
  69. if u is None:
  70. raise e.NoSuchUser(name=slug)
  71. return u
  72. def base_url(self) -> str:
  73. return f"/u/{self.name}"
  74. def get_links(self, as_user: r.User, page: int) -> Tuple[List["Link"], Pagination]:
  75. links = (
  76. Link.select()
  77. .where((Link.user == self) & ((self == as_user) | (Link.private == False)))
  78. .order_by(-Link.created)
  79. .paginate(page, c.per_page)
  80. )
  81. pagination = Pagination.from_total(page, Link.select().count())
  82. return links, pagination
  83. def get_link(self, link_id: int) -> "Link":
  84. return Link.get((Link.user == self) & (Link.id == link_id))
  85. def get_tag(self, tag_name: str) -> "Tag":
  86. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  87. def to_dict(self) -> dict:
  88. return {"id": self.id, "name": self.name}
  89. def get_config(self) -> v.Config:
  90. admin_pane = None
  91. if self.is_admin:
  92. user_invites = [
  93. v.UserInvite(
  94. claimed=ui.claimed_by is not None,
  95. claimant=ui.claimed_by and ui.claimed_by.name,
  96. token=ui.token,
  97. )
  98. for ui in UserInvite.select().where(UserInvite.created_by == self)
  99. ]
  100. admin_pane = v.AdminPane(invites=user_invites)
  101. return v.Config(
  102. username=self.name,
  103. admin_pane=admin_pane,
  104. )
  105. class Link(Model):
  106. """
  107. A link as stored in the database
  108. """
  109. url = peewee.TextField()
  110. name = peewee.TextField()
  111. description = peewee.TextField()
  112. # TODO: do we need to track modified time?
  113. created = peewee.DateTimeField()
  114. # is the field entirely private?
  115. private = peewee.BooleanField()
  116. # owned by
  117. user = peewee.ForeignKeyField(User, backref="links")
  118. def link_url(self) -> str:
  119. return f"/u/{self.user.name}/l/{self.id}"
  120. @staticmethod
  121. def by_id(id: int) -> Optional["Link"]:
  122. return Link.get_or_none(id=id)
  123. @staticmethod
  124. def from_request(user: User, link: r.Link) -> "Link":
  125. l = Link.create(
  126. url=link.url,
  127. name=link.name,
  128. description=link.description,
  129. private=link.private,
  130. created=link.created or datetime.datetime.now(),
  131. user=user,
  132. )
  133. for tag_name in link.tags:
  134. t = Tag.get_or_create_tag(user, tag_name)
  135. HasTag.create(
  136. link=l, tag=t,
  137. )
  138. return l
  139. def update_from_request(self, user: User, link: r.Link):
  140. req_tags = set(link.tags)
  141. for hastag in self.tags:
  142. name = hastag.tag.name
  143. if name not in req_tags:
  144. hastag.delete_instance()
  145. else:
  146. req_tags.remove(name)
  147. for tag_name in req_tags:
  148. t = Tag.get_or_create_tag(user, tag_name)
  149. HasTag.get_or_create(link=self, tag=t)
  150. self.url = link.url
  151. self.name = link.name
  152. self.description = link.description
  153. self.private = link.private
  154. self.save()
  155. class Tag(Model):
  156. """
  157. A tag. This just indicates that a user has used this tag at some point.
  158. """
  159. name = peewee.TextField()
  160. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  161. user = peewee.ForeignKeyField(User, backref="tags")
  162. def url(self) -> str:
  163. return f"/u/{self.user.name}/t/{self.name}"
  164. def get_links(self, as_user: r.User, page: int) -> Tuple[List[Link], Pagination]:
  165. links = [
  166. ht.link
  167. for ht in HasTag.select()
  168. .join(Link)
  169. .where(
  170. (HasTag.tag == self)
  171. & ((HasTag.link.user == as_user) | (HasTag.link.private == False))
  172. )
  173. .order_by(-Link.created)
  174. .paginate(page, c.per_page)
  175. ]
  176. pagination = Pagination.from_total(
  177. page, HasTag.select().where((HasTag.tag == self)).count(),
  178. )
  179. return links, pagination
  180. @staticmethod
  181. def get_or_create_tag(user: User, tag_name: str):
  182. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  183. return t
  184. parent = None
  185. if "/" in tag_name:
  186. parent_name = tag_name[: tag_name.rindex("/")]
  187. parent = Tag.get_or_create_tag(user, parent_name)
  188. return Tag.create(name=tag_name, parent=parent, user=user)
  189. class HasTag(Model):
  190. """
  191. Establishes that a link is tagged with a given tag.
  192. """
  193. link = peewee.ForeignKeyField(Link, backref="tags")
  194. tag = peewee.ForeignKeyField(Tag, backref="models")
  195. @staticmethod
  196. def get_or_create(link: Link, tag: Tag):
  197. res = HasTag.get_or_none(link=link, tag=tag)
  198. if res is None:
  199. res = HasTag.create(link=link, tag=tag)
  200. return res
  201. class UserInvite(Model):
  202. token = peewee.TextField(unique=True)
  203. created_by = peewee.ForeignKeyField(User, backref="invites")
  204. created_at = peewee.DateTimeField()
  205. claimed_by = peewee.ForeignKeyField(User, null=True)
  206. claimed_at = peewee.DateTimeField(null=True)
  207. @staticmethod
  208. def by_code(token: str) -> "UserInvite":
  209. if (u := UserInvite.get_or_none(token=token)) :
  210. return u
  211. raise e.NoSuchInvite(invite=token)
  212. @staticmethod
  213. def manufacture(creator: User) -> "UserInvite":
  214. now = datetime.datetime.now()
  215. token = c.serializer.dumps(
  216. {"created_at": now.timestamp(), "created_by": creator.name,}
  217. )
  218. return UserInvite.create(
  219. token=token,
  220. created_by=creator,
  221. created_at=now,
  222. claimed_by=None,
  223. claimed_at=None,
  224. )
  225. MODELS = [
  226. User,
  227. Link,
  228. Tag,
  229. HasTag,
  230. UserInvite,
  231. ]
  232. def create_tables():
  233. c.db.create_tables(MODELS, safe=True)