model.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. from contextlib import contextmanager
  2. import datetime
  3. import json
  4. from passlib.apps import custom_app_context as pwd
  5. import peewee
  6. import playhouse.shortcuts
  7. from typing import Iterator, List, Optional, Tuple
  8. import lc.config as c
  9. import lc.error as e
  10. import lc.request as r
  11. import lc.view as v
  12. class Model(peewee.Model):
  13. class Meta:
  14. database = c.app.db
  15. def to_dict(self) -> dict:
  16. return playhouse.shortcuts.model_to_dict(self)
  17. @contextmanager
  18. def atomic(self):
  19. with c.app.db.atomic():
  20. yield
  21. class Meta(Model):
  22. version = peewee.IntegerField(default=0)
  23. @staticmethod
  24. def fetch():
  25. try:
  26. return Meta.get(id=0)
  27. except Exception:
  28. meta = Meta.create(id=0)
  29. return meta
  30. class User(Model):
  31. """
  32. A user! you know tf this is about
  33. """
  34. name = peewee.TextField(unique=True)
  35. passhash = peewee.TextField()
  36. is_admin = peewee.BooleanField(default=False)
  37. @staticmethod
  38. def from_request(user: r.User) -> "User":
  39. passhash = pwd.hash(user.password)
  40. try:
  41. return User.create(
  42. name=user.name,
  43. passhash=passhash,
  44. )
  45. except peewee.IntegrityError:
  46. raise e.UserExists(name=user.name)
  47. def change_password(self, req: r.PasswordChange):
  48. if not pwd.verify(req.old, self.passhash):
  49. raise e.BadPassword(name=self.name)
  50. self.passhash = pwd.hash(req.n1)
  51. self.save()
  52. @staticmethod
  53. def from_invite(user: r.User, token: str) -> "User":
  54. invite = UserInvite.by_code(token)
  55. if invite.claimed_by is not None or invite.claimed_at is not None:
  56. raise e.AlreadyUsedInvite(invite=token)
  57. u = User.from_request(user)
  58. invite.claimed_at = datetime.datetime.now()
  59. invite.claimed_by = u
  60. invite.save()
  61. return u
  62. def authenticate(self, password: str) -> bool:
  63. return pwd.verify(password, self.passhash)
  64. def set_as_admin(self):
  65. self.is_admin = True
  66. self.save()
  67. @staticmethod
  68. def login(user: r.User) -> Tuple["User", str]:
  69. u = User.by_slug(user.name)
  70. if not u.authenticate(user.password):
  71. raise e.BadPassword(name=user.name)
  72. return u, user.to_token()
  73. @staticmethod
  74. def by_slug(slug: str) -> "User":
  75. u = User.get_or_none(name=slug)
  76. if u is None:
  77. raise e.NoSuchUser(name=slug)
  78. return u
  79. def base_url(self) -> str:
  80. return f"/u/{self.name}"
  81. def config_url(self) -> str:
  82. return f"/u/{self.name}/config"
  83. def get_links(
  84. self, as_user: Optional["User"], page: int
  85. ) -> Tuple[List[v.Link], v.Pagination]:
  86. query = Link.select().where(
  87. (Link.user == self)
  88. & ((self == as_user) | (Link.private == False)) # noqa: E712
  89. )
  90. links = query.order_by(-Link.created).paginate(page, c.app.per_page)
  91. link_views = [link.to_view(as_user) for link in links]
  92. pagination = v.Pagination.from_total(page, query.count())
  93. return link_views, pagination
  94. def get_link(self, link_id: int) -> "Link":
  95. try:
  96. return Link.get((Link.user == self) & (Link.id == link_id))
  97. except Link.DoesNotExist:
  98. raise e.NoSuchLink(link_id)
  99. def get_tag(self, tag_name: str) -> "Tag":
  100. return Tag.get((Tag.user == self) & (Tag.name == tag_name))
  101. def to_dict(self) -> dict:
  102. return {"id": self.id, "name": self.name}
  103. def get_config(self, status_msg: Optional[int]) -> v.Config:
  104. admin_pane = None
  105. if self.is_admin:
  106. user_invites = [
  107. v.UserInvite(
  108. claimed=ui.claimed_by is not None,
  109. claimant=ui.claimed_by and ui.claimed_by.name,
  110. token=ui.token,
  111. )
  112. for ui in UserInvite.select().where(UserInvite.created_by == self)
  113. ]
  114. admin_pane = v.AdminPane(invites=user_invites)
  115. return v.Config(username=self.name, admin_pane=admin_pane, msg=status_msg)
  116. def import_pinboard_data(self, stream):
  117. try:
  118. links = json.load(stream)
  119. except json.decoder.JSONDecodeError:
  120. raise e.BadFileUpload("could not parse file as JSON")
  121. if not isinstance(links, list):
  122. raise e.BadFileUpload("expected a list")
  123. # create and (for this session) cache the tags
  124. tags = {}
  125. for link in links:
  126. if "tags" not in link:
  127. raise e.BadFileUpload("missing key {exn.args[0]}")
  128. for t in link["tags"].split():
  129. if t in tags:
  130. continue
  131. tags[t] = Tag.get_or_create_tag(self, t)
  132. with self.atomic():
  133. for link in links:
  134. try:
  135. time = datetime.datetime.strptime(
  136. link["time"], "%Y-%m-%dT%H:%M:%SZ"
  137. )
  138. ln = Link.create(
  139. url=link["href"],
  140. name=link["description"],
  141. description=link["extended"],
  142. private=link["shared"] == "no",
  143. created=time,
  144. user=self,
  145. )
  146. except KeyError as exn:
  147. raise e.BadFileUpload(f"missing key {exn.args[0]}")
  148. for t in link["tags"].split():
  149. HasTag.get_or_create(link=ln, tag=tags[t])
  150. def get_tags(self) -> List[v.Tag]:
  151. return sorted(
  152. (t.to_view() for t in self.tags), # type: ignore
  153. key=lambda t: t.name.upper(),
  154. )
  155. def get_related_tags(self, tag: "Tag") -> List[v.Tag]:
  156. # SELECT * from has_tag t1, has_tag t2, link l
  157. # WHERE t1.link_id == l.id AND t2.link_id == l.id
  158. # AND t1.id != t2.id AND t1 = self
  159. SelfTag = HasTag.alias()
  160. query = (
  161. HasTag.select(HasTag.tag)
  162. .join(Link, on=(HasTag.link == Link.id))
  163. .join(SelfTag, on=(SelfTag.link == Link.id))
  164. .where((SelfTag.tag == tag) & (SelfTag.id != HasTag.id))
  165. .group_by(HasTag.tag)
  166. )
  167. return sorted(
  168. (t.tag.to_view() for t in query),
  169. key=lambda t: t.name,
  170. )
  171. def get_string_search(
  172. self, needle: str, as_user: Optional["User"], page: int
  173. ) -> Tuple[List[v.Link], v.Pagination]:
  174. """
  175. Find all links that contain the string `needle` somewhere in their URL or title
  176. """
  177. query = Link.select().where(
  178. (Link.user == self)
  179. & ((self == as_user) | (Link.private == False)) # noqa: E712
  180. & (Link.name.contains(needle) | Link.description.contains(needle))
  181. )
  182. links = query.order_by(-Link.created).paginate(page, c.app.per_page)
  183. link_views = [link.to_view(as_user) for link in links]
  184. pagination = v.Pagination.from_total(page, query.count())
  185. return link_views, pagination
  186. class Link(Model):
  187. """
  188. A link as stored in the database
  189. """
  190. url = peewee.TextField()
  191. name = peewee.TextField()
  192. description = peewee.TextField()
  193. # TODO: do we need to track modified time?
  194. created = peewee.DateTimeField()
  195. # is the field entirely private?
  196. private = peewee.BooleanField()
  197. # owned by
  198. user = peewee.ForeignKeyField(User, backref="links")
  199. def link_url(self) -> str:
  200. return f"/u/{self.user.name}/l/{self.id}"
  201. @staticmethod
  202. def by_id(id: int) -> Optional["Link"]:
  203. return Link.get_or_none(id=id)
  204. @staticmethod
  205. def get_all(
  206. as_user: Optional[User], page: int
  207. ) -> Tuple[List["Link"], v.Pagination]:
  208. links = (
  209. Link.select()
  210. .where((Link.user == as_user) | (Link.private == False)) # noqa: E712
  211. .order_by(-Link.created)
  212. .paginate(page, c.app.per_page)
  213. )
  214. link_views = [link.to_view(as_user) for link in links]
  215. pagination = v.Pagination.from_total(page, Link.select().count())
  216. return link_views, pagination
  217. @staticmethod
  218. def from_request(user: User, link: r.Link) -> "Link":
  219. new_link = Link.create(
  220. url=link.url,
  221. name=link.name,
  222. description=link.description,
  223. private=link.private,
  224. created=link.created or datetime.datetime.now(),
  225. user=user,
  226. )
  227. for tag_name in link.tags:
  228. tag = Tag.get_or_create_tag(user, tag_name)
  229. HasTag.get_or_create(link=new_link, tag=tag)
  230. return new_link
  231. def update_from_request(self, user: User, link: r.Link):
  232. with self.atomic():
  233. req_tags = set(link.tags)
  234. for hastag in self.tags: # type: ignore
  235. name = hastag.tag.name
  236. if name not in req_tags:
  237. hastag.delete_instance()
  238. else:
  239. req_tags.remove(name)
  240. for tag_name in req_tags:
  241. t = Tag.get_or_create_tag(user, tag_name)
  242. HasTag.get_or_create(link=self, tag=t)
  243. Tag.clean()
  244. self.url = link.url
  245. self.name = link.name
  246. self.description = link.description
  247. self.private = link.private
  248. self.save()
  249. def to_view(self, as_user: Optional[User]) -> v.Link:
  250. return v.Link(
  251. id=self.id,
  252. url=self.url,
  253. name=self.name,
  254. description=self.description,
  255. private=self.private,
  256. tags=self.get_tags_view(),
  257. created=self.created,
  258. is_mine=self.user.id == as_user.id if as_user else False,
  259. link_url=self.link_url(),
  260. user=self.user.name,
  261. )
  262. def get_tags_view(self) -> List[v.Tag]:
  263. return [
  264. v.Tag(url=f"/u/{self.user.name}/t/{ht.tag.name}", name=ht.tag.name)
  265. for ht in HasTag().select(Tag.name).join(Tag).where(HasTag.link == self)
  266. ]
  267. def full_delete(self):
  268. self.delete_instance(recursive=True)
  269. Tag.clean()
  270. class Tag(Model):
  271. """
  272. A tag. This just indicates that a user has used this tag at some point.
  273. """
  274. name = peewee.TextField()
  275. parent = peewee.ForeignKeyField("self", null=True, backref="children")
  276. user = peewee.ForeignKeyField(User, backref="tags")
  277. def url(self) -> str:
  278. return f"/u/{self.user.name}/t/{self.name}"
  279. def get_links(
  280. self, as_user: Optional[User], page: int
  281. ) -> Tuple[List[Link], v.Pagination]:
  282. query = (
  283. HasTag.select()
  284. .join(Link)
  285. .where(
  286. (HasTag.tag == self)
  287. & (
  288. (HasTag.link.user == as_user)
  289. | (HasTag.link.private == False) # noqa: E712
  290. )
  291. )
  292. )
  293. links = [
  294. ht.link.to_view(as_user)
  295. for ht in query.order_by(-Link.created).paginate(page, c.app.per_page)
  296. ]
  297. pagination = v.Pagination.from_total(page, query.count())
  298. return links, pagination
  299. def get_family(self) -> Iterator["Tag"]:
  300. yield self
  301. p = self
  302. while p := p.parent:
  303. yield p
  304. BAD_TAG_CHARS = set("{}\\#")
  305. @staticmethod
  306. def is_valid_tag_name(tag_name: str) -> bool:
  307. return all((c not in Tag.BAD_TAG_CHARS for c in tag_name))
  308. @staticmethod
  309. def get_or_create_tag(user: User, tag_name: str) -> "Tag":
  310. if t := Tag.get_or_none(name=tag_name, user=user):
  311. return t
  312. if not Tag.is_valid_tag_name(tag_name):
  313. raise e.BadTagName(tag_name)
  314. parent = None
  315. if "/" in tag_name:
  316. parent_name = tag_name[: tag_name.rindex("/")]
  317. parent = Tag.get_or_create_tag(user, parent_name)
  318. return Tag.create(name=tag_name, parent=parent, user=user)
  319. def to_view(self) -> v.Tag:
  320. return v.Tag(url=self.url(), name=self.name)
  321. @staticmethod
  322. def clean():
  323. unused = (
  324. Tag.select(Tag.id) # type: ignore
  325. .join(HasTag, peewee.JOIN.LEFT_OUTER)
  326. .group_by(Tag.name)
  327. .having(peewee.fn.COUNT(HasTag.id) == 0)
  328. )
  329. Tag.delete().where(Tag.id.in_(unused)).execute() # type: ignore
  330. class HasTag(Model):
  331. """
  332. Establishes that a link is tagged with a given tag.
  333. """
  334. link = peewee.ForeignKeyField(Link, backref="tags")
  335. tag = peewee.ForeignKeyField(Tag, backref="models")
  336. @staticmethod
  337. def get_or_create(link: Link, tag: Tag):
  338. res = HasTag.get_or_none(link=link, tag=tag)
  339. if res is None:
  340. res = HasTag.create(link=link, tag=tag)
  341. if tag.parent:
  342. HasTag.get_or_create(link, tag.parent)
  343. return res
  344. class UserInvite(Model):
  345. token = peewee.TextField(unique=True)
  346. created_by = peewee.ForeignKeyField(User, backref="invites")
  347. created_at = peewee.DateTimeField()
  348. claimed_by = peewee.ForeignKeyField(User, null=True)
  349. claimed_at = peewee.DateTimeField(null=True)
  350. @staticmethod
  351. def by_code(token: str) -> "UserInvite":
  352. if u := UserInvite.get_or_none(token=token):
  353. return u
  354. raise e.NoSuchInvite(invite=token)
  355. @staticmethod
  356. def manufacture(creator: User) -> "UserInvite":
  357. now = datetime.datetime.now()
  358. token = c.app.serialize_token(
  359. {
  360. "created_at": now.timestamp(),
  361. "created_by": creator.name,
  362. }
  363. )
  364. return UserInvite.create(
  365. token=token,
  366. created_by=creator,
  367. created_at=now,
  368. claimed_by=None,
  369. claimed_at=None,
  370. )
  371. MODELS = [
  372. Meta,
  373. User,
  374. Link,
  375. Tag,
  376. HasTag,
  377. UserInvite,
  378. ]
  379. def create_tables():
  380. c.app.db.create_tables(MODELS, safe=True)