model.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. from dataclasses import dataclass
  2. import datetime
  3. from passlib.apps import custom_app_context as pwd
  4. import peewee
  5. import playhouse.shortcuts
  6. import typing
  7. import lc.config as c
  8. import lc.error as e
  9. import lc.request as r
  10. class Model(peewee.Model):
  11. class Meta:
  12. database = c.DB
  13. def to_dict(self) -> dict:
  14. return playhouse.shortcuts.model_to_dict(self)
  15. @dataclass
  16. class Pagination:
  17. current: int
  18. last: int
  19. def previous(self) -> typing.Optional[dict]:
  20. if self.current > 1:
  21. return {"page": self.current - 1}
  22. return None
  23. def next(self) -> typing.Optional[dict]:
  24. if self.current < self.last:
  25. return {"page": self.current + 1}
  26. return None
  27. @classmethod
  28. def from_total(cls, current, total) -> "Pagination":
  29. return cls(
  30. current=current,
  31. last=((total-1)//c.PER_PAGE)+1,
  32. )
  33. # TODO: figure out authorization for users (oauth? passwd?)
  34. class User(Model):
  35. """
  36. A user! you know tf this is about
  37. """
  38. name = peewee.TextField(unique=True)
  39. passhash = peewee.TextField()
  40. is_admin = peewee.BooleanField(default=False)
  41. @staticmethod
  42. def from_request(user: r.User) -> "User":
  43. passhash = pwd.hash(user.password)
  44. try:
  45. return User.create(name=user.name, passhash=passhash,)
  46. except peewee.IntegrityError:
  47. raise e.UserExists(name=user.name)
  48. def authenticate(self, password: str) -> bool:
  49. return pwd.verify(password, self.passhash)
  50. def set_as_admin(self):
  51. self.is_admin = True
  52. self.save()
  53. @staticmethod
  54. def login(user: r.User) -> typing.Tuple["User", str]:
  55. u = User.by_slug(user.name)
  56. if not u.authenticate(user.password):
  57. raise e.BadPassword(name=user.name)
  58. return u, c.SERIALIZER.dumps({
  59. "name": user.name,
  60. "password": user.password,
  61. })
  62. @staticmethod
  63. def by_slug(slug: str) -> "User":
  64. u = User.get_or_none(name=slug)
  65. if u is None:
  66. raise e.NoSuchUser(name=slug)
  67. return u
  68. def base_url(self) -> str:
  69. return f"/u/{self.name}"
  70. def get_links(self, page: int) -> typing.Tuple[typing.List["Link"], Pagination]:
  71. links = Link.select().where(Link.user == self).order_by(-Link.created).paginate(page, c.PER_PAGE)
  72. pagination = Pagination.from_total(page, Link.select().count())
  73. return links, pagination
  74. def get_link(self, link_id: int) -> "Link":
  75. return Link.get((Link.user == self) & (Link.id == link_id))
  76. def get_tag(self, tag_name: str) -> "Tag":
  77. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  78. def to_dict(self) -> dict:
  79. return {"id": self.id, "name": self.name}
  80. class Link(Model):
  81. """
  82. A link as stored in the database
  83. """
  84. url = peewee.TextField()
  85. name = peewee.TextField()
  86. description = peewee.TextField()
  87. # TODO: do we need to track modified time?
  88. created = peewee.DateTimeField()
  89. # is the field entirely private?
  90. private = peewee.BooleanField()
  91. # owned by
  92. user = peewee.ForeignKeyField(User, backref="links")
  93. def link_url(self) -> str:
  94. return f"/u/{self.user.name}/l/{self.id}"
  95. @staticmethod
  96. def from_request(user: User, link: r.Link) -> "Link":
  97. l = Link.create(
  98. url=link.url,
  99. name=link.name,
  100. description=link.description,
  101. private=link.private,
  102. created=link.created or datetime.datetime.now(),
  103. user=user,
  104. )
  105. for tag_name in link.tags:
  106. t = Tag.get_or_create_tag(user, tag_name)
  107. HasTag.create(
  108. link=l, tag=t,
  109. )
  110. return l
  111. class Tag(Model):
  112. """
  113. A tag. This just indicates that a user has used this tag at some point.
  114. """
  115. name = peewee.TextField()
  116. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  117. user = peewee.ForeignKeyField(User, backref="tags")
  118. def url(self) -> str:
  119. return f"/u/{self.user.name}/t/{self.name}"
  120. def get_links(self, page: int) -> typing.Tuple[typing.List[Link], Pagination]:
  121. links = [
  122. ht.link
  123. for ht in HasTag.select()
  124. .join(Link)
  125. .where((HasTag.tag == self))
  126. .order_by(-Link.created)
  127. .paginate(page, c.PER_PAGE)
  128. ]
  129. pagination = Pagination.from_total(
  130. page,
  131. HasTag.select().where((HasTag.tag == self)).count(),
  132. )
  133. return links, pagination
  134. @staticmethod
  135. def get_or_create_tag(user: User, tag_name: str):
  136. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  137. return t
  138. parent = None
  139. if "/" in tag_name:
  140. parent_name = tag_name[: tag_name.rindex("/")]
  141. parent = Tag.get_or_create_tag(user, parent_name)
  142. return Tag.create(name=tag_name, parent=parent, user=user)
  143. class HasTag(Model):
  144. """
  145. Establishes that a link is tagged with a given tag.
  146. """
  147. link = peewee.ForeignKeyField(Link, backref="tags")
  148. tag = peewee.ForeignKeyField(Tag, backref="models")
  149. class UserInvite(Model):
  150. token: str
  151. MODELS = [
  152. User,
  153. Link,
  154. Tag,
  155. HasTag,
  156. ]
  157. def create_tables():
  158. c.DB.create_tables(MODELS, safe=True)