model.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. from dataclasses import dataclass
  2. from dataclasses_json import dataclass_json
  3. import datetime
  4. from passlib.apps import custom_app_context as pwd
  5. import sqlite3
  6. import typing
  7. import lc.config as c
  8. import lc.error as e
  9. import lc.request as r
  10. class Model:
  11. @staticmethod
  12. def cursor() -> sqlite3.Cursor:
  13. return c.db.cursor()
  14. def to_dict(self) -> dict:
  15. pass
  16. @dataclass
  17. class Pagination:
  18. current: int
  19. last: int
  20. def previous(self) -> typing.Optional[dict]:
  21. if self.current > 1:
  22. return {"page": self.current - 1}
  23. return None
  24. def next(self) -> typing.Optional[dict]:
  25. if self.current < self.last:
  26. return {"page": self.current + 1}
  27. return None
  28. @classmethod
  29. def from_total(cls, current, total) -> "Pagination":
  30. return cls(current=current, last=((total - 1) // c.per_page) + 1,)
  31. @dataclass_json
  32. @dataclass
  33. class User(Model):
  34. """
  35. A user! you know tf this is about
  36. """
  37. id: int
  38. name: str
  39. passhash: str
  40. is_admin: bool
  41. @staticmethod
  42. def create(name: str, passhash: str) -> "User":
  43. cur = Model.cursor()
  44. cur.execute('INSERT INTO user (name, passhash, is_admin) VALUES (?, ?, ?)',
  45. (name, passhash, False))
  46. return User(
  47. id=cur.lastrowid,
  48. name=name,
  49. passhash=passhash,
  50. is_admin=False,
  51. )
  52. @staticmethod
  53. def from_request(user: r.User) -> "User":
  54. passhash = pwd.hash(user.password)
  55. # try:
  56. return User.create(name=user.name, passhash=passhash,)
  57. # except peewee.IntegrityError:
  58. # raise e.UserExists(name=user.name)
  59. def authenticate(self, password: str) -> bool:
  60. return pwd.verify(password, self.passhash)
  61. def set_as_admin(self):
  62. self.is_admin = True
  63. self.save()
  64. @staticmethod
  65. def login(user: r.User) -> typing.Tuple["User", str]:
  66. u = User.by_slug(user.name)
  67. if not u.authenticate(user.password):
  68. raise e.BadPassword(name=user.name)
  69. return u, user.to_token()
  70. @staticmethod
  71. def by_slug(slug: str) -> "User":
  72. cur = Model.cursor()
  73. cur.execute("SELECT * FROM user WHERE name = ?", (slug,))
  74. if (row := cur.fetchone()):
  75. return User(*row)
  76. raise e.NoSuchUser(name=slug)
  77. def base_url(self) -> str:
  78. return f"/u/{self.name}"
  79. def get_links(self, page: int) -> typing.Tuple[typing.List["Link"], Pagination]:
  80. cur = Model.cursor()
  81. cur.execute("""SELECT (l.id, l.url, l.name, l.description, l.created, l.private, l.user
  82. FROM links l WHERE l.user == ?
  83. ORDER BY created DESC LIMIT ? OFFSET ?""",
  84. (self.id, c.per_page, page))
  85. links = [Link(*rows) for rows in cur]
  86. cur.execute("SELECT count(*) FROM links l WHERE l.user == ?", (self.id,))
  87. pagination = Pagination.from_total(page, cur.fetchone()[0])
  88. return links, pagination
  89. def get_link(self, link_id: int) -> "Link":
  90. cur = Model.cursor()
  91. return Link.get((Link.user == self) & (Link.id == link_id))
  92. def get_tag(self, tag_name: str) -> "Tag":
  93. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  94. def to_dict(self) -> dict:
  95. return {"id": self.id, "name": self.name}
  96. @dataclass_json
  97. @dataclass
  98. class Link(Model):
  99. """
  100. A link as stored in the database
  101. """
  102. url: str
  103. name: str
  104. description: str
  105. # TODO: do we need to track modified time?
  106. created: datetime.datetime
  107. # is the field entirely private?
  108. private: bool
  109. # owned by
  110. user: User
  111. def link_url(self) -> str:
  112. return f"/u/{self.user.name}/l/{self.id}"
  113. @staticmethod
  114. def from_request(user: User, link: r.Link) -> "Link":
  115. l = Link.create(
  116. url=link.url,
  117. name=link.name,
  118. description=link.description,
  119. private=link.private,
  120. created=link.created or datetime.datetime.now(),
  121. user=user,
  122. )
  123. for tag_name in link.tags:
  124. t = Tag.get_or_create_tag(user, tag_name)
  125. HasTag.create(
  126. link=l, tag=t,
  127. )
  128. return l
  129. @dataclass_json
  130. @dataclass
  131. class Tag(Model):
  132. """
  133. A tag. This just indicates that a user has used this tag at some point.
  134. """
  135. name: str
  136. parent: "Tag"
  137. user: User
  138. def url(self) -> str:
  139. return f"/u/{self.user.name}/t/{self.name}"
  140. def get_links(self, page: int) -> typing.Tuple[typing.List[Link], Pagination]:
  141. links = [
  142. ht.link
  143. for ht in HasTag.select()
  144. .join(Link)
  145. .where((HasTag.tag == self))
  146. .order_by(-Link.created)
  147. .paginate(page, c.per_page)
  148. ]
  149. pagination = Pagination.from_total(
  150. page, HasTag.select().where((HasTag.tag == self)).count(),
  151. )
  152. return links, pagination
  153. @staticmethod
  154. def get_or_create_tag(user: User, tag_name: str):
  155. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  156. return t
  157. parent = None
  158. if "/" in tag_name:
  159. parent_name = tag_name[: tag_name.rindex("/")]
  160. parent = Tag.get_or_create_tag(user, parent_name)
  161. return Tag.create(name=tag_name, parent=parent, user=user)
  162. class UserInvite(Model):
  163. token: str
  164. MODELS = [
  165. User,
  166. Link,
  167. Tag,
  168. HasTag,
  169. ]
  170. # def create_tables():
  171. # c.DB.create_tables(MODELS, safe=True)