from dataclasses import dataclass from dataclasses_json import dataclass_json import datetime from passlib.apps import custom_app_context as pwd import sqlite3 import typing import lc.config as c import lc.error as e import lc.request as r class Model: @staticmethod def cursor() -> sqlite3.Cursor: return c.db.cursor() def to_dict(self) -> dict: pass @dataclass class Pagination: current: int last: int def previous(self) -> typing.Optional[dict]: if self.current > 1: return {"page": self.current - 1} return None def next(self) -> typing.Optional[dict]: if self.current < self.last: return {"page": self.current + 1} return None @classmethod def from_total(cls, current, total) -> "Pagination": return cls(current=current, last=((total - 1) // c.per_page) + 1,) @dataclass_json @dataclass class User(Model): """ A user! you know tf this is about """ id: int name: str passhash: str is_admin: bool @staticmethod def create(name: str, passhash: str) -> "User": cur = Model.cursor() cur.execute('INSERT INTO user (name, passhash, is_admin) VALUES (?, ?, ?)', (name, passhash, False)) return User( id=cur.lastrowid, name=name, passhash=passhash, is_admin=False, ) @staticmethod def from_request(user: r.User) -> "User": passhash = pwd.hash(user.password) # try: return User.create(name=user.name, passhash=passhash,) # except peewee.IntegrityError: # raise e.UserExists(name=user.name) def authenticate(self, password: str) -> bool: return pwd.verify(password, self.passhash) def set_as_admin(self): self.is_admin = True self.save() @staticmethod def login(user: r.User) -> typing.Tuple["User", str]: u = User.by_slug(user.name) if not u.authenticate(user.password): raise e.BadPassword(name=user.name) return u, user.to_token() @staticmethod def by_slug(slug: str) -> "User": cur = Model.cursor() cur.execute("SELECT * FROM user WHERE name = ?", (slug,)) if (row := cur.fetchone()): return User(*row) raise e.NoSuchUser(name=slug) def base_url(self) -> str: return f"/u/{self.name}" def get_links(self, page: int) -> typing.Tuple[typing.List["Link"], Pagination]: cur = Model.cursor() cur.execute("""SELECT (l.id, l.url, l.name, l.description, l.created, l.private, l.user FROM links l WHERE l.user == ? ORDER BY created DESC LIMIT ? OFFSET ?""", (self.id, c.per_page, page)) links = [Link(*rows) for rows in cur] cur.execute("SELECT count(*) FROM links l WHERE l.user == ?", (self.id,)) pagination = Pagination.from_total(page, cur.fetchone()[0]) return links, pagination def get_link(self, link_id: int) -> "Link": cur = Model.cursor() return Link.get((Link.user == self) & (Link.id == link_id)) def get_tag(self, tag_name: str) -> "Tag": return Tag.get((Tag.user == self) & (Tag.name == tag_name)) def to_dict(self) -> dict: return {"id": self.id, "name": self.name} @dataclass_json @dataclass class Link(Model): """ A link as stored in the database """ url: str name: str description: str # TODO: do we need to track modified time? created: datetime.datetime # is the field entirely private? private: bool # owned by user: User def link_url(self) -> str: return f"/u/{self.user.name}/l/{self.id}" @staticmethod def from_request(user: User, link: r.Link) -> "Link": l = Link.create( url=link.url, name=link.name, description=link.description, private=link.private, created=link.created or datetime.datetime.now(), user=user, ) for tag_name in link.tags: t = Tag.get_or_create_tag(user, tag_name) HasTag.create( link=l, tag=t, ) return l @dataclass_json @dataclass class Tag(Model): """ A tag. This just indicates that a user has used this tag at some point. """ name: str parent: "Tag" user: User def url(self) -> str: return f"/u/{self.user.name}/t/{self.name}" def get_links(self, page: int) -> typing.Tuple[typing.List[Link], Pagination]: links = [ ht.link for ht in HasTag.select() .join(Link) .where((HasTag.tag == self)) .order_by(-Link.created) .paginate(page, c.per_page) ] pagination = Pagination.from_total( page, HasTag.select().where((HasTag.tag == self)).count(), ) return links, pagination @staticmethod def get_or_create_tag(user: User, tag_name: str): if (t := Tag.get_or_none(name=tag_name, user=user)) : return t parent = None if "/" in tag_name: parent_name = tag_name[: tag_name.rindex("/")] parent = Tag.get_or_create_tag(user, parent_name) return Tag.create(name=tag_name, parent=parent, user=user) class UserInvite(Model): token: str MODELS = [ User, Link, Tag, HasTag, ] # def create_tables(): # c.DB.create_tables(MODELS, safe=True)