model.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. from contextlib import contextmanager
  2. from dataclasses import dataclass
  3. import datetime
  4. import json
  5. from passlib.apps import custom_app_context as pwd
  6. import peewee
  7. import playhouse.shortcuts
  8. from typing import Iterator, List, Optional, Tuple
  9. import lc.config as c
  10. import lc.error as e
  11. import lc.request as r
  12. import lc.view as v
  13. class Model(peewee.Model):
  14. class Meta:
  15. database = c.app.db
  16. def to_dict(self) -> dict:
  17. return playhouse.shortcuts.model_to_dict(self)
  18. @contextmanager
  19. def atomic(self):
  20. with c.app.db.atomic():
  21. yield
  22. class Meta(Model):
  23. version = peewee.IntegerField(default=0)
  24. @staticmethod
  25. def fetch():
  26. try:
  27. return Meta.get(id=0)
  28. except:
  29. meta = Meta.create(id=0)
  30. return meta
  31. class User(Model):
  32. """
  33. A user! you know tf this is about
  34. """
  35. name = peewee.TextField(unique=True)
  36. passhash = peewee.TextField()
  37. is_admin = peewee.BooleanField(default=False)
  38. @staticmethod
  39. def from_request(user: r.User) -> "User":
  40. passhash = pwd.hash(user.password)
  41. try:
  42. return User.create(name=user.name, passhash=passhash,)
  43. except peewee.IntegrityError:
  44. raise e.UserExists(name=user.name)
  45. def change_password(self, req: r.PasswordChange):
  46. if not pwd.verify(req.old, self.passhash):
  47. raise e.BadPassword(name=self.name)
  48. self.passhash = pwd.hash(req.n1)
  49. self.save()
  50. @staticmethod
  51. def from_invite(user: r.User, token: str) -> "User":
  52. invite = UserInvite.by_code(token)
  53. if invite.claimed_by is not None or invite.claimed_at is not None:
  54. raise e.AlreadyUsedInvite(invite=token)
  55. u = User.from_request(user)
  56. invite.claimed_at = datetime.datetime.now()
  57. invite.claimed_by = u
  58. invite.save()
  59. return u
  60. def authenticate(self, password: str) -> bool:
  61. return pwd.verify(password, self.passhash)
  62. def set_as_admin(self):
  63. self.is_admin = True
  64. self.save()
  65. @staticmethod
  66. def login(user: r.User) -> Tuple["User", str]:
  67. u = User.by_slug(user.name)
  68. if not u.authenticate(user.password):
  69. raise e.BadPassword(name=user.name)
  70. return u, user.to_token()
  71. @staticmethod
  72. def by_slug(slug: str) -> "User":
  73. u = User.get_or_none(name=slug)
  74. if u is None:
  75. raise e.NoSuchUser(name=slug)
  76. return u
  77. def base_url(self) -> str:
  78. return f"/u/{self.name}"
  79. def config_url(self) -> str:
  80. return f"/u/{self.name}/config"
  81. def get_links(
  82. self, as_user: Optional["User"], page: int
  83. ) -> Tuple[List[v.Link], v.Pagination]:
  84. query = Link.select().where(
  85. (Link.user == self) & ((self == as_user) | (Link.private == False))
  86. )
  87. links = query.order_by(-Link.created).paginate(page, c.app.per_page)
  88. link_views = [l.to_view(as_user) for l in links]
  89. pagination = v.Pagination.from_total(page, query.count())
  90. return link_views, pagination
  91. def get_link(self, link_id: int) -> "Link":
  92. try:
  93. return Link.get((Link.user == self) & (Link.id == link_id))
  94. except Link.DoesNotExist:
  95. raise e.NoSuchLink(link_id)
  96. def get_tag(self, tag_name: str) -> "Tag":
  97. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  98. def to_dict(self) -> dict:
  99. return {"id": self.id, "name": self.name}
  100. def get_config(self, status_msg: Optional[int]) -> v.Config:
  101. admin_pane = None
  102. if self.is_admin:
  103. user_invites = [
  104. v.UserInvite(
  105. claimed=ui.claimed_by is not None,
  106. claimant=ui.claimed_by and ui.claimed_by.name,
  107. token=ui.token,
  108. )
  109. for ui in UserInvite.select().where(UserInvite.created_by == self)
  110. ]
  111. admin_pane = v.AdminPane(invites=user_invites)
  112. return v.Config(username=self.name, admin_pane=admin_pane, msg=status_msg)
  113. def import_pinboard_data(self, stream):
  114. try:
  115. links = json.load(stream)
  116. except json.decoder.JSONDecodeError as exn:
  117. raise e.BadFileUpload("could not parse file as JSON")
  118. if not isinstance(links, list):
  119. raise e.BadFileUpload(f"expected a list")
  120. # create and (for this session) cache the tags
  121. tags = {}
  122. for l in links:
  123. if "tags" not in l:
  124. raise e.BadFileUpload("missing key {exn.args[0]}")
  125. for t in l["tags"].split():
  126. if t in tags:
  127. continue
  128. tags[t] = Tag.get_or_create_tag(self, t)
  129. with self.atomic():
  130. for l in links:
  131. try:
  132. time = datetime.datetime.strptime(l["time"], "%Y-%m-%dT%H:%M:%SZ")
  133. ln = Link.create(
  134. url=l["href"],
  135. name=l["description"],
  136. description=l["extended"],
  137. private=l["shared"] == "no",
  138. created=time,
  139. user=self,
  140. )
  141. except KeyError as exn:
  142. raise e.BadFileUpload(f"missing key {exn.args[0]}")
  143. for t in l["tags"].split():
  144. HasTag.get_or_create(link=ln, tag=tags[t])
  145. def get_tags(self) -> List[v.Tag]:
  146. return sorted(
  147. (t.to_view() for t in self.tags), # type: ignore
  148. key=lambda t: t.name,
  149. )
  150. def get_related_tags(self, tag: "Tag") -> List[v.Tag]:
  151. # SELECT * from has_tag t1, has_tag t2, link l WHERE t1.link_id == l.id AND t2.link_id == l.id AND t1.id != t2.id AND t1 = self
  152. SelfTag = HasTag.alias()
  153. query = (
  154. HasTag.select(HasTag.tag)
  155. .join(Link, on=(HasTag.link == Link.id))
  156. .join(SelfTag, on=(SelfTag.link == Link.id))
  157. .where((SelfTag.tag == tag) & (SelfTag.id != HasTag.id))
  158. .group_by(HasTag.tag)
  159. )
  160. return sorted((t.tag.to_view() for t in query), key=lambda t: t.name,)
  161. def get_string_search(
  162. self, needle: str, as_user: Optional["User"], page: int
  163. ) -> Tuple[List[v.Link], v.Pagination]:
  164. """
  165. Find all links that contain the string `needle` somewhere in their URL or title
  166. """
  167. query = Link.select().where(
  168. (Link.user == self)
  169. & ((self == as_user) | (Link.private == False))
  170. & (Link.name.contains(needle) | Link.description.contains(needle))
  171. )
  172. links = query.order_by(-Link.created).paginate(page, c.app.per_page)
  173. link_views = [l.to_view(as_user) for l in links]
  174. pagination = v.Pagination.from_total(page, query.count())
  175. return link_views, pagination
  176. class Link(Model):
  177. """
  178. A link as stored in the database
  179. """
  180. url = peewee.TextField()
  181. name = peewee.TextField()
  182. description = peewee.TextField()
  183. # TODO: do we need to track modified time?
  184. created = peewee.DateTimeField()
  185. # is the field entirely private?
  186. private = peewee.BooleanField()
  187. # owned by
  188. user = peewee.ForeignKeyField(User, backref="links")
  189. def link_url(self) -> str:
  190. return f"/u/{self.user.name}/l/{self.id}"
  191. @staticmethod
  192. def by_id(id: int) -> Optional["Link"]:
  193. return Link.get_or_none(id=id)
  194. @staticmethod
  195. def get_all(
  196. as_user: Optional[User], page: int
  197. ) -> Tuple[List["Link"], v.Pagination]:
  198. links = (
  199. Link.select()
  200. .where((Link.user == as_user) | (Link.private == False))
  201. .order_by(-Link.created)
  202. .paginate(page, c.app.per_page)
  203. )
  204. link_views = [l.to_view(as_user) for l in links]
  205. pagination = v.Pagination.from_total(page, Link.select().count())
  206. return link_views, pagination
  207. @staticmethod
  208. def from_request(user: User, link: r.Link) -> "Link":
  209. l = Link.create(
  210. url=link.url,
  211. name=link.name,
  212. description=link.description,
  213. private=link.private,
  214. created=link.created or datetime.datetime.now(),
  215. user=user,
  216. )
  217. for tag_name in link.tags:
  218. tag = Tag.get_or_create_tag(user, tag_name)
  219. HasTag.get_or_create(link=l, tag=tag)
  220. return l
  221. def update_from_request(self, user: User, link: r.Link):
  222. with self.atomic():
  223. req_tags = set(link.tags)
  224. for hastag in self.tags: # type: ignore
  225. name = hastag.tag.name
  226. if name not in req_tags:
  227. hastag.delete_instance()
  228. else:
  229. req_tags.remove(name)
  230. for tag_name in req_tags:
  231. t = Tag.get_or_create_tag(user, tag_name)
  232. HasTag.get_or_create(link=self, tag=t)
  233. Tag.clean()
  234. self.url = link.url
  235. self.name = link.name
  236. self.description = link.description
  237. self.private = link.private
  238. self.save()
  239. def to_view(self, as_user: Optional[User]) -> v.Link:
  240. return v.Link(
  241. id=self.id,
  242. url=self.url,
  243. name=self.name,
  244. description=self.description,
  245. private=self.private,
  246. tags=self.get_tags_view(),
  247. created=self.created,
  248. is_mine=self.user.id == as_user.id if as_user else False,
  249. link_url=self.link_url(),
  250. user=self.user.name,
  251. )
  252. def get_tags_view(self) -> List[v.Tag]:
  253. return [
  254. v.Tag(url=f"/u/{self.user.name}/t/{ht.tag.name}", name=ht.tag.name)
  255. for ht in HasTag().select(Tag.name).join(Tag).where(HasTag.link == self)
  256. ]
  257. class Tag(Model):
  258. """
  259. A tag. This just indicates that a user has used this tag at some point.
  260. """
  261. name = peewee.TextField()
  262. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  263. user = peewee.ForeignKeyField(User, backref="tags")
  264. def url(self) -> str:
  265. return f"/u/{self.user.name}/t/{self.name}"
  266. def get_links(
  267. self, as_user: Optional[User], page: int
  268. ) -> Tuple[List[Link], v.Pagination]:
  269. query = (
  270. HasTag.select()
  271. .join(Link)
  272. .where(
  273. (HasTag.tag == self)
  274. & ((HasTag.link.user == as_user) | (HasTag.link.private == False))
  275. )
  276. )
  277. links = [
  278. ht.link.to_view(as_user)
  279. for ht in query.order_by(-Link.created).paginate(page, c.app.per_page)
  280. ]
  281. pagination = v.Pagination.from_total(page, query.count())
  282. return links, pagination
  283. def get_family(self) -> Iterator["Tag"]:
  284. yield self
  285. p = self
  286. while (p := p.parent) :
  287. yield p
  288. BAD_TAG_CHARS = set("{}[]\\()#?")
  289. @staticmethod
  290. def is_valid_tag_name(tag_name: str) -> bool:
  291. return all((c not in Tag.BAD_TAG_CHARS for c in tag_name))
  292. @staticmethod
  293. def get_or_create_tag(user: User, tag_name: str) -> "Tag":
  294. if (t := Tag.get_or_none(name=tag_name, user=user)) :
  295. return t
  296. if not Tag.is_valid_tag_name(tag_name):
  297. raise e.BadTagName(tag_name)
  298. parent = None
  299. if "/" in tag_name:
  300. parent_name = tag_name[: tag_name.rindex("/")]
  301. parent = Tag.get_or_create_tag(user, parent_name)
  302. return Tag.create(name=tag_name, parent=parent, user=user)
  303. def to_view(self) -> v.Tag:
  304. return v.Tag(url=self.url(), name=self.name)
  305. @staticmethod
  306. def clean():
  307. unused = (
  308. Tag.select(Tag.id) # type: ignore
  309. .join(HasTag, peewee.JOIN.LEFT_OUTER)
  310. .group_by(Tag.name)
  311. .having(peewee.fn.COUNT(HasTag.id) == 0)
  312. )
  313. Tag.delete().where(Tag.id.in_(unused)).execute() # type: ignore
  314. class HasTag(Model):
  315. """
  316. Establishes that a link is tagged with a given tag.
  317. """
  318. link = peewee.ForeignKeyField(Link, backref="tags")
  319. tag = peewee.ForeignKeyField(Tag, backref="models")
  320. @staticmethod
  321. def get_or_create(link: Link, tag: Tag):
  322. res = HasTag.get_or_none(link=link, tag=tag)
  323. if res is None:
  324. res = HasTag.create(link=link, tag=tag)
  325. if tag.parent:
  326. HasTag.get_or_create(link, tag.parent)
  327. return res
  328. class UserInvite(Model):
  329. token = peewee.TextField(unique=True)
  330. created_by = peewee.ForeignKeyField(User, backref="invites")
  331. created_at = peewee.DateTimeField()
  332. claimed_by = peewee.ForeignKeyField(User, null=True)
  333. claimed_at = peewee.DateTimeField(null=True)
  334. @staticmethod
  335. def by_code(token: str) -> "UserInvite":
  336. if (u := UserInvite.get_or_none(token=token)) :
  337. return u
  338. raise e.NoSuchInvite(invite=token)
  339. @staticmethod
  340. def manufacture(creator: User) -> "UserInvite":
  341. now = datetime.datetime.now()
  342. token = c.app.serialize_token(
  343. {"created_at": now.timestamp(), "created_by": creator.name,}
  344. )
  345. return UserInvite.create(
  346. token=token,
  347. created_by=creator,
  348. created_at=now,
  349. claimed_by=None,
  350. claimed_at=None,
  351. )
  352. MODELS = [
  353. Meta,
  354. User,
  355. Link,
  356. Tag,
  357. HasTag,
  358. UserInvite,
  359. ]
  360. def create_tables():
  361. c.app.db.create_tables(MODELS, safe=True)