model.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from dataclasses import dataclass
  2. import datetime
  3. import json
  4. from passlib.apps import custom_app_context as pwd
  5. import peewee
  6. import playhouse.shortcuts
  7. from typing import Iterator, List, Optional, Tuple
  8. import lc.config as c
  9. import lc.error as e
  10. import lc.request as r
  11. import lc.view as v
  12. class Model(peewee.Model):
  13. class Meta:
  14. database = c.db
  15. def to_dict(self) -> dict:
  16. return playhouse.shortcuts.model_to_dict(self)
  17. class User(Model):
  18. """
  19. A user! you know tf this is about
  20. """
  21. name = peewee.TextField(unique=True)
  22. passhash = peewee.TextField()
  23. is_admin = peewee.BooleanField(default=False)
  24. @staticmethod
  25. def from_request(user: r.User) -> "User":
  26. passhash = pwd.hash(user.password)
  27. try:
  28. return User.create(name=user.name, passhash=passhash,)
  29. except peewee.IntegrityError:
  30. raise e.UserExists(name=user.name)
  31. @staticmethod
  32. def from_invite(user: r.User, token: str) -> "User":
  33. invite = UserInvite.by_code(token)
  34. if invite.claimed_by is not None or invite.claimed_at is not None:
  35. raise e.AlreadyUsedInvite(invite=token)
  36. u = User.from_request(user)
  37. invite.claimed_at = datetime.datetime.now()
  38. invite.claimed_by = u
  39. invite.save()
  40. return u
  41. def authenticate(self, password: str) -> bool:
  42. return pwd.verify(password, self.passhash)
  43. def set_as_admin(self):
  44. self.is_admin = True
  45. self.save()
  46. @staticmethod
  47. def login(user: r.User) -> Tuple["User", str]:
  48. u = User.by_slug(user.name)
  49. if not u.authenticate(user.password):
  50. raise e.BadPassword(name=user.name)
  51. return u, user.to_token()
  52. @staticmethod
  53. def by_slug(slug: str) -> "User":
  54. u = User.get_or_none(name=slug)
  55. if u is None:
  56. raise e.NoSuchUser(name=slug)
  57. return u
  58. def base_url(self) -> str:
  59. return f"/u/{self.name}"
  60. def get_links(
  61. self, as_user: Optional["User"], page: int
  62. ) -> Tuple[List[v.Link], v.Pagination]:
  63. links = (
  64. Link.select()
  65. .where((Link.user == self) & ((self == as_user) | (Link.private == False)))
  66. .order_by(-Link.created)
  67. .paginate(page, c.per_page)
  68. )
  69. link_views = [l.to_view(as_user) for l in links]
  70. pagination = v.Pagination.from_total(page, Link.select().count())
  71. return link_views, pagination
  72. def get_link(self, link_id: int) -> "Link":
  73. try:
  74. return Link.get((Link.user == self) & (Link.id == link_id))
  75. except Link.DoesNotExist:
  76. raise e.NoSuchLink(link_id)
  77. def get_tag(self, tag_name: str) -> "Tag":
  78. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  79. def to_dict(self) -> dict:
  80. return {"id": self.id, "name": self.name}
  81. def get_config(self) -> v.Config:
  82. admin_pane = None
  83. if self.is_admin:
  84. user_invites = [
  85. v.UserInvite(
  86. claimed=ui.claimed_by is not None,
  87. claimant=ui.claimed_by and ui.claimed_by.name,
  88. token=ui.token,
  89. )
  90. for ui in UserInvite.select().where(UserInvite.created_by == self)
  91. ]
  92. admin_pane = v.AdminPane(invites=user_invites)
  93. return v.Config(username=self.name, admin_pane=admin_pane,)
  94. def import_pinboard_data(self, stream):
  95. try:
  96. links = json.load(stream)
  97. except json.decoder.JSONDecodeError as exn:
  98. raise e.BadFileUpload("could not parse file as JSON")
  99. if not isinstance(links, list):
  100. raise e.BadFileUpload(f"expected a list")
  101. # create and (for this session) cache the tags
  102. tags = {}
  103. for l in links:
  104. if "tags" not in l:
  105. raise e.BadFileUpload("missing key {exn.args[0]}")
  106. for t in l["tags"].split():
  107. if t in tags:
  108. continue
  109. tags[t] = Tag.get_or_create_tag(self, t)
  110. with c.db.atomic():
  111. for l in links:
  112. try:
  113. time = datetime.datetime.strptime(l["time"], "%Y-%m-%dT%H:%M:%SZ")
  114. ln = Link.create(
  115. url=l["href"],
  116. name=l["description"],
  117. description=l["extended"],
  118. private=l["shared"] == "no",
  119. created=time,
  120. user=self,
  121. )
  122. except KeyError as exn:
  123. raise e.BadFileUpload(f"missing key {exn.args[0]}")
  124. for t in l["tags"].split():
  125. HasTag.get_or_create(link=ln, tag=tags[t])
  126. def get_tags(self) -> List[v.Tag]:
  127. return sorted(
  128. (t.to_view() for t in self.tags), # type: ignore
  129. key=lambda t: t.name,
  130. )
  131. class Link(Model):
  132. """
  133. A link as stored in the database
  134. """
  135. url = peewee.TextField()
  136. name = peewee.TextField()
  137. description = peewee.TextField()
  138. # TODO: do we need to track modified time?
  139. created = peewee.DateTimeField()
  140. # is the field entirely private?
  141. private = peewee.BooleanField()
  142. # owned by
  143. user = peewee.ForeignKeyField(User, backref="links")
  144. def link_url(self) -> str:
  145. return f"/u/{self.user.name}/l/{self.id}"
  146. @staticmethod
  147. def by_id(id: int) -> Optional["Link"]:
  148. return Link.get_or_none(id=id)
  149. @staticmethod
  150. def from_request(user: User, link: r.Link) -> "Link":
  151. l = Link.create(
  152. url=link.url,
  153. name=link.name,
  154. description=link.description,
  155. private=link.private,
  156. created=link.created or datetime.datetime.now(),
  157. user=user,
  158. )
  159. for tag_name in link.tags:
  160. tag = Tag.get_or_create_tag(user, tag_name)
  161. HasTag.get_or_create(link=l, tag=tag)
  162. return l
  163. def update_from_request(self, user: User, link: r.Link):
  164. with c.db.atomic():
  165. req_tags = set(link.tags)
  166. for hastag in self.tags: # type: ignore
  167. name = hastag.tag.name
  168. if name not in req_tags:
  169. hastag.delete_instance()
  170. else:
  171. req_tags.remove(name)
  172. for tag_name in req_tags:
  173. t = Tag.get_or_create_tag(user, tag_name)
  174. HasTag.get_or_create(link=self, tag=t)
  175. self.url = link.url
  176. self.name = link.name
  177. self.description = link.description
  178. self.private = link.private
  179. self.save()
  180. def to_view(self, as_user: Optional[User]) -> v.Link:
  181. return v.Link(
  182. id=self.id,
  183. url=self.url,
  184. name=self.name,
  185. description=self.description,
  186. private=self.private,
  187. tags=self.get_tags_view(),
  188. created=self.created,
  189. is_mine=self.user.id == as_user.id if as_user else False,
  190. link_url=self.link_url(),
  191. )
  192. def get_tags_view(self) -> List[v.Tag]:
  193. return [
  194. v.Tag(url=f"/u/{self.user.name}/t/{ht.tag.name}", name=ht.tag.name)
  195. for ht in HasTag().select(Tag.name).join(Tag).where(HasTag.link == self)
  196. ]
  197. class Tag(Model):
  198. """
  199. A tag. This just indicates that a user has used this tag at some point.
  200. """
  201. name = peewee.TextField()
  202. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  203. user = peewee.ForeignKeyField(User, backref="tags")
  204. def url(self) -> str:
  205. return f"/u/{self.user.name}/t/{self.name}"
  206. def get_links(
  207. self, as_user: Optional[User], page: int
  208. ) -> Tuple[List[Link], v.Pagination]:
  209. links = [
  210. ht.link.to_view(as_user)
  211. for ht in HasTag.select()
  212. .join(Link)
  213. .where(
  214. (HasTag.tag == self)
  215. & ((HasTag.link.user == as_user) | (HasTag.link.private == False))
  216. )
  217. .order_by(-Link.created)
  218. .paginate(page, c.per_page)
  219. ]
  220. pagination = v.Pagination.from_total(
  221. page, HasTag.select().where((HasTag.tag == self)).count(),
  222. )
  223. return links, pagination
  224. def get_family(self) -> Iterator["Tag"]:
  225. yield self
  226. p = self
  227. while (p := p.parent) :
  228. yield p
  229. BAD_TAG_CHARS = set("{}[]\\()#?")
  230. @staticmethod
  231. def is_valid_tag_name(tag_name: str) -> bool:
  232. return all((c not in Tag.BAD_TAG_CHARS for c in tag_name))
  233. @staticmethod
  234. def get_or_create_tag(user: User, tag_name: str) -> "Tag":
  235. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  236. return t
  237. if not Tag.is_valid_tag_name(tag_name):
  238. raise e.BadTagName(tag_name)
  239. parent = None
  240. if "/" in tag_name:
  241. parent_name = tag_name[: tag_name.rindex("/")]
  242. parent = Tag.get_or_create_tag(user, parent_name)
  243. return Tag.create(name=tag_name, parent=parent, user=user)
  244. def to_view(self) -> v.Tag:
  245. return v.Tag(url=self.url(), name=self.name)
  246. class HasTag(Model):
  247. """
  248. Establishes that a link is tagged with a given tag.
  249. """
  250. link = peewee.ForeignKeyField(Link, backref="tags")
  251. tag = peewee.ForeignKeyField(Tag, backref="models")
  252. @staticmethod
  253. def get_or_create(link: Link, tag: Tag):
  254. res = HasTag.get_or_none(link=link, tag=tag)
  255. if res is None:
  256. res = HasTag.create(link=link, tag=tag)
  257. if tag.parent:
  258. HasTag.get_or_create(link, tag.parent)
  259. return res
  260. class UserInvite(Model):
  261. token = peewee.TextField(unique=True)
  262. created_by = peewee.ForeignKeyField(User, backref="invites")
  263. created_at = peewee.DateTimeField()
  264. claimed_by = peewee.ForeignKeyField(User, null=True)
  265. claimed_at = peewee.DateTimeField(null=True)
  266. @staticmethod
  267. def by_code(token: str) -> "UserInvite":
  268. if (u := UserInvite.get_or_none(token=token)) :
  269. return u
  270. raise e.NoSuchInvite(invite=token)
  271. @staticmethod
  272. def manufacture(creator: User) -> "UserInvite":
  273. now = datetime.datetime.now()
  274. token = c.serializer.dumps(
  275. {"created_at": now.timestamp(), "created_by": creator.name,}
  276. )
  277. return UserInvite.create(
  278. token=token,
  279. created_by=creator,
  280. created_at=now,
  281. claimed_by=None,
  282. claimed_at=None,
  283. )
  284. MODELS = [
  285. User,
  286. Link,
  287. Tag,
  288. HasTag,
  289. UserInvite,
  290. ]
  291. def create_tables():
  292. c.db.create_tables(MODELS, safe=True)