model.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. from dataclasses import dataclass
  2. import datetime
  3. from passlib.apps import custom_app_context as pwd
  4. import peewee
  5. import playhouse.shortcuts
  6. from typing import List, Optional, Tuple
  7. import lc.config as c
  8. import lc.error as e
  9. import lc.request as r
  10. import lc.view as v
  11. class Model(peewee.Model):
  12. class Meta:
  13. database = c.db
  14. def to_dict(self) -> dict:
  15. return playhouse.shortcuts.model_to_dict(self)
  16. class User(Model):
  17. """
  18. A user! you know tf this is about
  19. """
  20. name = peewee.TextField(unique=True)
  21. passhash = peewee.TextField()
  22. is_admin = peewee.BooleanField(default=False)
  23. @staticmethod
  24. def from_request(user: r.User) -> "User":
  25. passhash = pwd.hash(user.password)
  26. try:
  27. return User.create(name=user.name, passhash=passhash,)
  28. except peewee.IntegrityError:
  29. raise e.UserExists(name=user.name)
  30. @staticmethod
  31. def from_invite(user: r.User, token: str) -> "User":
  32. invite = UserInvite.by_code(token)
  33. if invite.claimed_by is not None or invite.claimed_at is not None:
  34. raise e.AlreadyUsedInvite(invite=token)
  35. u = User.from_request(user)
  36. invite.claimed_at = datetime.datetime.now()
  37. invite.claimed_by = u
  38. invite.save()
  39. return u
  40. def authenticate(self, password: str) -> bool:
  41. return pwd.verify(password, self.passhash)
  42. def set_as_admin(self):
  43. self.is_admin = True
  44. self.save()
  45. @staticmethod
  46. def login(user: r.User) -> Tuple["User", str]:
  47. u = User.by_slug(user.name)
  48. if not u.authenticate(user.password):
  49. raise e.BadPassword(name=user.name)
  50. return u, user.to_token()
  51. @staticmethod
  52. def by_slug(slug: str) -> "User":
  53. u = User.get_or_none(name=slug)
  54. if u is None:
  55. raise e.NoSuchUser(name=slug)
  56. return u
  57. def base_url(self) -> str:
  58. return f"/u/{self.name}"
  59. def get_links(
  60. self, as_user: r.User, page: int
  61. ) -> Tuple[List[v.Link], v.Pagination]:
  62. links = (
  63. Link.select()
  64. .where((Link.user == self) & ((self == as_user) | (Link.private == False)))
  65. .order_by(-Link.created)
  66. .paginate(page, c.per_page)
  67. )
  68. link_views = [l.to_view(as_user) for l in links]
  69. pagination = v.Pagination.from_total(page, Link.select().count())
  70. return link_views, pagination
  71. def get_link(self, link_id: int) -> "Link":
  72. try:
  73. return Link.get((Link.user == self) & (Link.id == link_id))
  74. except Link.DoesNotExist:
  75. raise e.NoSuchLink(link_id)
  76. def get_tag(self, tag_name: str) -> "Tag":
  77. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  78. def to_dict(self) -> dict:
  79. return {"id": self.id, "name": self.name}
  80. def get_config(self) -> v.Config:
  81. admin_pane = None
  82. if self.is_admin:
  83. user_invites = [
  84. v.UserInvite(
  85. claimed=ui.claimed_by is not None,
  86. claimant=ui.claimed_by and ui.claimed_by.name,
  87. token=ui.token,
  88. )
  89. for ui in UserInvite.select().where(UserInvite.created_by == self)
  90. ]
  91. admin_pane = v.AdminPane(invites=user_invites)
  92. return v.Config(username=self.name, admin_pane=admin_pane,)
  93. class Link(Model):
  94. """
  95. A link as stored in the database
  96. """
  97. url = peewee.TextField()
  98. name = peewee.TextField()
  99. description = peewee.TextField()
  100. # TODO: do we need to track modified time?
  101. created = peewee.DateTimeField()
  102. # is the field entirely private?
  103. private = peewee.BooleanField()
  104. # owned by
  105. user = peewee.ForeignKeyField(User, backref="links")
  106. def link_url(self) -> str:
  107. return f"/u/{self.user.name}/l/{self.id}"
  108. @staticmethod
  109. def by_id(id: int) -> Optional["Link"]:
  110. return Link.get_or_none(id=id)
  111. @staticmethod
  112. def from_request(user: User, link: r.Link) -> "Link":
  113. l = Link.create(
  114. url=link.url,
  115. name=link.name,
  116. description=link.description,
  117. private=link.private,
  118. created=link.created or datetime.datetime.now(),
  119. user=user,
  120. )
  121. for tag_name in link.tags:
  122. t = Tag.get_or_create_tag(user, tag_name)
  123. HasTag.create(
  124. link=l, tag=t,
  125. )
  126. return l
  127. def update_from_request(self, user: User, link: r.Link):
  128. with c.db.atomic():
  129. req_tags = set(link.tags)
  130. for hastag in self.tags: # type: ignore
  131. name = hastag.tag.name
  132. if name not in req_tags:
  133. hastag.delete_instance()
  134. else:
  135. req_tags.remove(name)
  136. for tag_name in req_tags:
  137. t = Tag.get_or_create_tag(user, tag_name)
  138. HasTag.get_or_create(link=self, tag=t)
  139. self.url = link.url
  140. self.name = link.name
  141. self.description = link.description
  142. self.private = link.private
  143. self.save()
  144. def to_view(self, as_user: User) -> v.Link:
  145. return v.Link(
  146. id=self.id,
  147. url=self.url,
  148. name=self.name,
  149. description=self.description,
  150. private=self.private,
  151. tags=self.get_tags_view(),
  152. created=self.created,
  153. is_mine=self.user.id == as_user.id if as_user else False,
  154. link_url=self.link_url(),
  155. )
  156. def get_tags_view(self) -> List[v.Tag]:
  157. return [
  158. v.Tag(url=f"/u/{self.user.name}/t/{ht.tag.name}", name=ht.tag.name)
  159. for ht in HasTag().select(Tag.name).join(Tag).where(HasTag.link == self)
  160. ]
  161. class Tag(Model):
  162. """
  163. A tag. This just indicates that a user has used this tag at some point.
  164. """
  165. name = peewee.TextField()
  166. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  167. user = peewee.ForeignKeyField(User, backref="tags")
  168. def url(self) -> str:
  169. return f"/u/{self.user.name}/t/{self.name}"
  170. def get_links(self, as_user: r.User, page: int) -> Tuple[List[Link], v.Pagination]:
  171. links = [
  172. ht.link.to_view(as_user)
  173. for ht in HasTag.select()
  174. .join(Link)
  175. .where(
  176. (HasTag.tag == self)
  177. & ((HasTag.link.user == as_user) | (HasTag.link.private == False))
  178. )
  179. .order_by(-Link.created)
  180. .paginate(page, c.per_page)
  181. ]
  182. pagination = v.Pagination.from_total(
  183. page, HasTag.select().where((HasTag.tag == self)).count(),
  184. )
  185. return links, pagination
  186. @staticmethod
  187. def get_or_create_tag(user: User, tag_name: str):
  188. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  189. return t
  190. parent = None
  191. if "/" in tag_name:
  192. parent_name = tag_name[: tag_name.rindex("/")]
  193. parent = Tag.get_or_create_tag(user, parent_name)
  194. return Tag.create(name=tag_name, parent=parent, user=user)
  195. def to_view(self) -> v.Tag:
  196. return v.Tag(url=self.url(), name=self.name)
  197. class HasTag(Model):
  198. """
  199. Establishes that a link is tagged with a given tag.
  200. """
  201. link = peewee.ForeignKeyField(Link, backref="tags")
  202. tag = peewee.ForeignKeyField(Tag, backref="models")
  203. @staticmethod
  204. def get_or_create(link: Link, tag: Tag):
  205. res = HasTag.get_or_none(link=link, tag=tag)
  206. if res is None:
  207. res = HasTag.create(link=link, tag=tag)
  208. return res
  209. class UserInvite(Model):
  210. token = peewee.TextField(unique=True)
  211. created_by = peewee.ForeignKeyField(User, backref="invites")
  212. created_at = peewee.DateTimeField()
  213. claimed_by = peewee.ForeignKeyField(User, null=True)
  214. claimed_at = peewee.DateTimeField(null=True)
  215. @staticmethod
  216. def by_code(token: str) -> "UserInvite":
  217. if (u := UserInvite.get_or_none(token=token)) :
  218. return u
  219. raise e.NoSuchInvite(invite=token)
  220. @staticmethod
  221. def manufacture(creator: User) -> "UserInvite":
  222. now = datetime.datetime.now()
  223. token = c.serializer.dumps(
  224. {"created_at": now.timestamp(), "created_by": creator.name,}
  225. )
  226. return UserInvite.create(
  227. token=token,
  228. created_by=creator,
  229. created_at=now,
  230. claimed_by=None,
  231. claimed_at=None,
  232. )
  233. MODELS = [
  234. User,
  235. Link,
  236. Tag,
  237. HasTag,
  238. UserInvite,
  239. ]
  240. def create_tables():
  241. c.db.create_tables(MODELS, safe=True)