123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- from dataclasses import dataclass
- from dataclasses_json import dataclass_json
- import datetime
- from passlib.apps import custom_app_context as pwd
- import sqlite3
- import typing
- import lc.config as c
- import lc.error as e
- import lc.request as r
- class Model:
- @staticmethod
- def cursor() -> sqlite3.Cursor:
- return c.db.cursor()
- def to_dict(self) -> dict:
- pass
- @dataclass
- class Pagination:
- current: int
- last: int
- def previous(self) -> typing.Optional[dict]:
- if self.current > 1:
- return {"page": self.current - 1}
- return None
- def next(self) -> typing.Optional[dict]:
- if self.current < self.last:
- return {"page": self.current + 1}
- return None
- @classmethod
- def from_total(cls, current, total) -> "Pagination":
- return cls(current=current, last=((total - 1) // c.per_page) + 1,)
- @dataclass_json
- @dataclass
- class User(Model):
- """
- A user! you know tf this is about
- """
- id: int
- name: str
- passhash: str
- is_admin: bool
- @staticmethod
- def create(name: str, passhash: str) -> "User":
- cur = Model.cursor()
- cur.execute('INSERT INTO user (name, passhash, is_admin) VALUES (?, ?, ?)',
- (name, passhash, False))
- return User(
- id=cur.lastrowid,
- name=name,
- passhash=passhash,
- is_admin=False,
- )
- @staticmethod
- def from_request(user: r.User) -> "User":
- passhash = pwd.hash(user.password)
- # try:
- return User.create(name=user.name, passhash=passhash,)
- # except peewee.IntegrityError:
- # raise e.UserExists(name=user.name)
- def authenticate(self, password: str) -> bool:
- return pwd.verify(password, self.passhash)
- def set_as_admin(self):
- self.is_admin = True
- self.save()
- @staticmethod
- def login(user: r.User) -> typing.Tuple["User", str]:
- u = User.by_slug(user.name)
- if not u.authenticate(user.password):
- raise e.BadPassword(name=user.name)
- return u, user.to_token()
- @staticmethod
- def by_slug(slug: str) -> "User":
- cur = Model.cursor()
- cur.execute("SELECT * FROM user WHERE name = ?", (slug,))
- if (row := cur.fetchone()):
- return User(*row)
- raise e.NoSuchUser(name=slug)
- def base_url(self) -> str:
- return f"/u/{self.name}"
- def get_links(self, page: int) -> typing.Tuple[typing.List["Link"], Pagination]:
- cur = Model.cursor()
- cur.execute("""SELECT (l.id, l.url, l.name, l.description, l.created, l.private, l.user
- FROM links l WHERE l.user == ?
- ORDER BY created DESC LIMIT ? OFFSET ?""",
- (self.id, c.per_page, page))
- links = [Link(*rows) for rows in cur]
- cur.execute("SELECT count(*) FROM links l WHERE l.user == ?", (self.id,))
- pagination = Pagination.from_total(page, cur.fetchone()[0])
- return links, pagination
- def get_link(self, link_id: int) -> "Link":
- cur = Model.cursor()
- return Link.get((Link.user == self) & (Link.id == link_id))
- def get_tag(self, tag_name: str) -> "Tag":
- return Tag.get((Tag.user == self) & (Tag.name == tag_name))
- def to_dict(self) -> dict:
- return {"id": self.id, "name": self.name}
- @dataclass_json
- @dataclass
- class Link(Model):
- """
- A link as stored in the database
- """
- url: str
- name: str
- description: str
- # TODO: do we need to track modified time?
- created: datetime.datetime
- # is the field entirely private?
- private: bool
- # owned by
- user: User
- def link_url(self) -> str:
- return f"/u/{self.user.name}/l/{self.id}"
- @staticmethod
- def from_request(user: User, link: r.Link) -> "Link":
- l = Link.create(
- url=link.url,
- name=link.name,
- description=link.description,
- private=link.private,
- created=link.created or datetime.datetime.now(),
- user=user,
- )
- for tag_name in link.tags:
- t = Tag.get_or_create_tag(user, tag_name)
- HasTag.create(
- link=l, tag=t,
- )
- return l
- @dataclass_json
- @dataclass
- class Tag(Model):
- """
- A tag. This just indicates that a user has used this tag at some point.
- """
- name: str
- parent: "Tag"
- user: User
- def url(self) -> str:
- return f"/u/{self.user.name}/t/{self.name}"
- def get_links(self, page: int) -> typing.Tuple[typing.List[Link], Pagination]:
- links = [
- ht.link
- for ht in HasTag.select()
- .join(Link)
- .where((HasTag.tag == self))
- .order_by(-Link.created)
- .paginate(page, c.per_page)
- ]
- pagination = Pagination.from_total(
- page, HasTag.select().where((HasTag.tag == self)).count(),
- )
- return links, pagination
- @staticmethod
- def get_or_create_tag(user: User, tag_name: str):
- if (t := Tag.get_or_none(name=tag_name, user=user)) :
- return t
- parent = None
- if "/" in tag_name:
- parent_name = tag_name[: tag_name.rindex("/")]
- parent = Tag.get_or_create_tag(user, parent_name)
- return Tag.create(name=tag_name, parent=parent, user=user)
- class UserInvite(Model):
- token: str
- MODELS = [
- User,
- Link,
- Tag,
- HasTag,
- ]
- # def create_tables():
- # c.DB.create_tables(MODELS, safe=True)
|