|
@@ -1,8 +1,8 @@
|
|
|
from dataclasses import dataclass
|
|
|
+from dataclasses_json import dataclass_json
|
|
|
import datetime
|
|
|
from passlib.apps import custom_app_context as pwd
|
|
|
-import peewee
|
|
|
-import playhouse.shortcuts
|
|
|
+import sqlite3
|
|
|
import typing
|
|
|
|
|
|
import lc.config as c
|
|
@@ -10,12 +10,13 @@ import lc.error as e
|
|
|
import lc.request as r
|
|
|
|
|
|
|
|
|
-class Model(peewee.Model):
|
|
|
- class Meta:
|
|
|
- database = c.db
|
|
|
+class Model:
|
|
|
+ @staticmethod
|
|
|
+ def cursor() -> sqlite3.Cursor:
|
|
|
+ return c.db.cursor()
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
- return playhouse.shortcuts.model_to_dict(self)
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
@dataclass
|
|
@@ -38,23 +39,36 @@ class Pagination:
|
|
|
return cls(current=current, last=((total - 1) // c.per_page) + 1,)
|
|
|
|
|
|
|
|
|
-# TODO: figure out authorization for users (oauth? passwd?)
|
|
|
+@dataclass_json
|
|
|
+@dataclass
|
|
|
class User(Model):
|
|
|
"""
|
|
|
A user! you know tf this is about
|
|
|
"""
|
|
|
+ id: int
|
|
|
+ name: str
|
|
|
+ passhash: str
|
|
|
+ is_admin: bool
|
|
|
|
|
|
- name = peewee.TextField(unique=True)
|
|
|
- passhash = peewee.TextField()
|
|
|
- is_admin = peewee.BooleanField(default=False)
|
|
|
+ @staticmethod
|
|
|
+ def create(name: str, passhash: str) -> "User":
|
|
|
+ cur = Model.cursor()
|
|
|
+ cur.execute('INSERT INTO user (name, passhash, is_admin) VALUES (?, ?, ?)',
|
|
|
+ (name, passhash, False))
|
|
|
+ return User(
|
|
|
+ id=cur.lastrowid,
|
|
|
+ name=name,
|
|
|
+ passhash=passhash,
|
|
|
+ is_admin=False,
|
|
|
+ )
|
|
|
|
|
|
@staticmethod
|
|
|
def from_request(user: r.User) -> "User":
|
|
|
passhash = pwd.hash(user.password)
|
|
|
- try:
|
|
|
- return User.create(name=user.name, passhash=passhash,)
|
|
|
- except peewee.IntegrityError:
|
|
|
- raise e.UserExists(name=user.name)
|
|
|
+ # try:
|
|
|
+ return User.create(name=user.name, passhash=passhash,)
|
|
|
+ # except peewee.IntegrityError:
|
|
|
+ # raise e.UserExists(name=user.name)
|
|
|
|
|
|
def authenticate(self, password: str) -> bool:
|
|
|
return pwd.verify(password, self.passhash)
|
|
@@ -72,25 +86,28 @@ class User(Model):
|
|
|
|
|
|
@staticmethod
|
|
|
def by_slug(slug: str) -> "User":
|
|
|
- u = User.get_or_none(name=slug)
|
|
|
- if u is None:
|
|
|
- raise e.NoSuchUser(name=slug)
|
|
|
- return u
|
|
|
+ cur = Model.cursor()
|
|
|
+ cur.execute("SELECT * FROM user WHERE name = ?", (slug,))
|
|
|
+ if (row := cur.fetchone()):
|
|
|
+ return User(*row)
|
|
|
+ raise e.NoSuchUser(name=slug)
|
|
|
|
|
|
def base_url(self) -> str:
|
|
|
return f"/u/{self.name}"
|
|
|
|
|
|
def get_links(self, page: int) -> typing.Tuple[typing.List["Link"], Pagination]:
|
|
|
- links = (
|
|
|
- Link.select()
|
|
|
- .where(Link.user == self)
|
|
|
- .order_by(-Link.created)
|
|
|
- .paginate(page, c.per_page)
|
|
|
- )
|
|
|
- pagination = Pagination.from_total(page, Link.select().count())
|
|
|
+ cur = Model.cursor()
|
|
|
+ cur.execute("""SELECT (l.id, l.url, l.name, l.description, l.created, l.private, l.user
|
|
|
+ FROM links l WHERE l.user == ?
|
|
|
+ ORDER BY created DESC LIMIT ? OFFSET ?""",
|
|
|
+ (self.id, c.per_page, page))
|
|
|
+ links = [Link(*rows) for rows in cur]
|
|
|
+ cur.execute("SELECT count(*) FROM links l WHERE l.user == ?", (self.id,))
|
|
|
+ pagination = Pagination.from_total(page, cur.fetchone()[0])
|
|
|
return links, pagination
|
|
|
|
|
|
def get_link(self, link_id: int) -> "Link":
|
|
|
+ cur = Model.cursor()
|
|
|
return Link.get((Link.user == self) & (Link.id == link_id))
|
|
|
|
|
|
def get_tag(self, tag_name: str) -> "Tag":
|
|
@@ -100,20 +117,22 @@ class User(Model):
|
|
|
return {"id": self.id, "name": self.name}
|
|
|
|
|
|
|
|
|
+@dataclass_json
|
|
|
+@dataclass
|
|
|
class Link(Model):
|
|
|
"""
|
|
|
A link as stored in the database
|
|
|
"""
|
|
|
|
|
|
- url = peewee.TextField()
|
|
|
- name = peewee.TextField()
|
|
|
- description = peewee.TextField()
|
|
|
+ url: str
|
|
|
+ name: str
|
|
|
+ description: str
|
|
|
# TODO: do we need to track modified time?
|
|
|
- created = peewee.DateTimeField()
|
|
|
+ created: datetime.datetime
|
|
|
# is the field entirely private?
|
|
|
- private = peewee.BooleanField()
|
|
|
+ private: bool
|
|
|
# owned by
|
|
|
- user = peewee.ForeignKeyField(User, backref="links")
|
|
|
+ user: User
|
|
|
|
|
|
def link_url(self) -> str:
|
|
|
return f"/u/{self.user.name}/l/{self.id}"
|
|
@@ -136,14 +155,16 @@ class Link(Model):
|
|
|
return l
|
|
|
|
|
|
|
|
|
+@dataclass_json
|
|
|
+@dataclass
|
|
|
class Tag(Model):
|
|
|
"""
|
|
|
A tag. This just indicates that a user has used this tag at some point.
|
|
|
"""
|
|
|
|
|
|
- name = peewee.TextField()
|
|
|
- parent = peewee.ForeignKeyField("self", null=True, backref="children")
|
|
|
- user = peewee.ForeignKeyField(User, backref="tags")
|
|
|
+ name: str
|
|
|
+ parent: "Tag"
|
|
|
+ user: User
|
|
|
|
|
|
def url(self) -> str:
|
|
|
return f"/u/{self.user.name}/t/{self.name}"
|
|
@@ -175,15 +196,6 @@ class Tag(Model):
|
|
|
return Tag.create(name=tag_name, parent=parent, user=user)
|
|
|
|
|
|
|
|
|
-class HasTag(Model):
|
|
|
- """
|
|
|
- Establishes that a link is tagged with a given tag.
|
|
|
- """
|
|
|
-
|
|
|
- link = peewee.ForeignKeyField(Link, backref="tags")
|
|
|
- tag = peewee.ForeignKeyField(Tag, backref="models")
|
|
|
-
|
|
|
-
|
|
|
class UserInvite(Model):
|
|
|
token: str
|
|
|
|
|
@@ -196,5 +208,5 @@ MODELS = [
|
|
|
]
|
|
|
|
|
|
|
|
|
-def create_tables():
|
|
|
- c.DB.create_tables(MODELS, safe=True)
|
|
|
+# def create_tables():
|
|
|
+# c.DB.create_tables(MODELS, safe=True)
|