Напиши мессенджер на питоне как тг
Created on: December 14, 2025
Answered using GPT-5.2 Thinking by Chat01
Created on: December 14, 2025
Answered using GPT-5.2 Thinking by Chat01
Напиши мессенджер на питоне как тг
pythonserver_code = r''' from __future__ import annotations import os import json from datetime import datetime, timedelta, timezone from typing import Dict, Set, Optional, List import aiosqlite from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from passlib.context import CryptContext from pydantic import BaseModel, Field # ========================= # Config # ========================= DB_PATH = os.getenv("TG_LITE_DB", "tg_lite.db") JWT_SECRET = os.getenv("TG_LITE_JWT_SECRET", "CHANGE_ME_TO_A_LONG_RANDOM_SECRET
Ниже — минимальный “TG-like” мессенджер на Python (MVP): регистрация/логин, чаты, участники, история, реалтайм через WebSocket. Это не “полный Telegram”, но рабочая база, которую легко расширять.
bashpython -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate pip install -r requirements.txt
server.pypythonfrom __future__ import annotations import os import json from datetime import datetime, timedelta, timezone from typing import Dict, Set, Optional, List import aiosqlite from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from passlib.context import CryptContext from pydantic import BaseModel, Field # ========================= # Config # ========================= DB_PATH = os.getenv("TG_LITE_DB", "tg_lite.db") JWT_SECRET = os.getenv("TG_LITE_JWT_SECRET", "CHANGE_ME_TO_A_LONG_RANDOM_SECRET") JWT_ALG = "HS256" JWT_TTL_MIN = 60 * 24 * 7 # 7 days pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") app = FastAPI(title="TG-Lite (Educational MVP)") # ========================= # DB init # ========================= CREATE_SQL = """ PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, pwd_hash TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS chats( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, created_by INTEGER NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(created_by) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS chat_members( chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, PRIMARY KEY(chat_id, user_id), FOREIGN KEY(chat_id) REFERENCES chats(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS messages( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, text TEXT NOT NULL, ts TEXT NOT NULL, FOREIGN KEY(chat_id) REFERENCES chats(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); """ @app.on_event("startup") async def startup() -> None: async with aiosqlite.connect(DB_PATH) as db: await db.executescript(CREATE_SQL) await db.commit() # ========================= # Helpers # ========================= def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, pwd_hash: str) -> bool: return pwd_context.verify(password, pwd_hash) def create_token(user_id: int, username: str) -> str: exp = datetime.now(timezone.utc) + timedelta(minutes=JWT_TTL_MIN) payload = {"sub": str(user_id), "username": username, "exp": exp} return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG) async def get_user_by_username(db: aiosqlite.Connection, username: str) -> Optional[dict]: cur = await db.execute("SELECT id, username, pwd_hash FROM users WHERE username = ?", (username,)) row = await cur.fetchone() await cur.close() if not row: return None return {"id": row[0], "username": row[1], "pwd_hash": row[2]} async def get_user_by_id(db: aiosqlite.Connection, user_id: int) -> Optional[dict]: cur = await db.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)) row = await cur.fetchone() await cur.close() if not row: return None return {"id": row[0], "username": row[1]} async def current_user(token: str = Depends(oauth2_scheme)) -> dict: try: data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) user_id = int(data["sub"]) username = data["username"] except (JWTError, KeyError, ValueError): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_id(db, user_id) if not user or user["username"] != username: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user async def ws_auth_user(token: str) -> dict: try: data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) user_id = int(data["sub"]) except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_id(db, user_id) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user async def ensure_chat_member(db: aiosqlite.Connection, chat_id: int, user_id: int) -> bool: cur = await db.execute( "SELECT 1 FROM chat_members WHERE chat_id = ? AND user_id = ?", (chat_id, user_id), ) row = await cur.fetchone() await cur.close() return row is not None # ========================= # Schemas # ========================= class RegisterIn(BaseModel): username: str = Field(min_length=3, max_length=32, pattern=r"^[a-zA-Z0-9_]+$") password: str = Field(min_length=6, max_length=128) class TokenOut(BaseModel): access_token: str token_type: str = "bearer" class ChatCreateIn(BaseModel): title: str = Field(min_length=1, max_length=80) members: List[str] = Field(default_factory=list, description="Usernames to add (besides you)") class ChatOut(BaseModel): id: int title: str class MessageOut(BaseModel): id: int chat_id: int from_user: str text: str ts: str # ========================= # REST API # ========================= @app.post("/register", response_model=TokenOut) async def register(payload: RegisterIn) -> TokenOut: async with aiosqlite.connect(DB_PATH) as db: existing = await get_user_by_username(db, payload.username) if existing: raise HTTPException(status_code=400, detail="Username already taken") cur = await db.execute( "INSERT INTO users(username, pwd_hash, created_at) VALUES(?, ?, ?)", (payload.username, hash_password(payload.password), now_iso()), ) await db.commit() user_id = cur.lastrowid await cur.close() return TokenOut(access_token=create_token(user_id, payload.username)) @app.post("/login", response_model=TokenOut) async def login(form: OAuth2PasswordRequestForm = Depends()) -> TokenOut: async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_username(db, form.username) if not user or not verify_password(form.password, user["pwd_hash"]): raise HTTPException(status_code=401, detail="Bad username or password") return TokenOut(access_token=create_token(user["id"], user["username"])) @app.post("/chats", response_model=ChatOut) async def create_chat(payload: ChatCreateIn, user: dict = Depends(current_user)) -> ChatOut: async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "INSERT INTO chats(title, created_by, created_at) VALUES(?, ?, ?)", (payload.title, user["id"], now_iso()), ) chat_id = cur.lastrowid await cur.close() member_ids = {user["id"]} for uname in payload.members: u = await get_user_by_username(db, uname) if u: member_ids.add(u["id"]) for uid in member_ids: await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id) VALUES(?, ?)", (chat_id, uid), ) await db.commit() return ChatOut(id=chat_id, title=payload.title) @app.get("/chats", response_model=List[ChatOut]) async def list_chats(user: dict = Depends(current_user)) -> List[ChatOut]: async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( """ SELECT c.id, c.title FROM chats c JOIN chat_members m ON m.chat_id = c.id WHERE m.user_id = ? ORDER BY c.id DESC """, (user["id"],), ) rows = await cur.fetchall() await cur.close() return [ChatOut(id=r[0], title=r[1]) for r in rows] @app.get("/chats/{chat_id}/history", response_model=List[MessageOut]) async def chat_history(chat_id: int, limit: int = 50, user: dict = Depends(current_user)) -> List[MessageOut]: limit = max(1, min(200, int(limit))) async with aiosqlite.connect(DB_PATH) as db: if not await ensure_chat_member(db, chat_id, user["id"]): raise HTTPException(status_code=403, detail="Not a member of this chat") cur = await db.execute( """ SELECT msg.id, msg.chat_id, u.username, msg.text, msg.ts FROM messages msg JOIN users u ON u.id = msg.user_id WHERE msg.chat_id = ? ORDER BY msg.id DESC LIMIT ? """, (chat_id, limit), ) rows = await cur.fetchall() await cur.close() rows.reverse() return [MessageOut(id=r[0], chat_id=r[1], from_user=r[2], text=r[3], ts=r[4]) for r in rows] # ========================= # WebSocket connections # ========================= class ConnectionManager: def __init__(self) -> None: self.chat_sockets: Dict[int, Set[WebSocket]] = {} async def connect(self, chat_id: int, websocket: WebSocket) -> None: await websocket.accept() self.chat_sockets.setdefault(chat_id, set()).add(websocket) def disconnect(self, chat_id: int, websocket: WebSocket) -> None: if chat_id in self.chat_sockets: self.chat_sockets[chat_id].discard(websocket) if not self.chat_sockets[chat_id]: del self.chat_sockets[chat_id] async def broadcast(self, chat_id: int, payload: dict) -> None: sockets = list(self.chat_sockets.get(chat_id, set())) data = json.dumps(payload, ensure_ascii=False) for ws in sockets: try: await ws.send_text(data) except Exception: self.disconnect(chat_id, ws) manager = ConnectionManager() @app.websocket("/ws/{chat_id}") async def ws_chat(websocket: WebSocket, chat_id: int, token: str) -> None: """ Connect: ws://localhost:8000/ws/1?token=... Send: {"type":"message","text":"hi"} """ try: user = await ws_auth_user(token) except HTTPException: await websocket.close(code=4401) return async with aiosqlite.connect(DB_PATH) as db: if not await ensure_chat_member(db, chat_id, user["id"]): await websocket.close(code=4403) return await manager.connect(chat_id, websocket) await manager.broadcast(chat_id, {"type": "system", "text": f"{user['username']} joined", "ts": now_iso()}) try: while True: raw = await websocket.receive_text() try: msg = json.loads(raw) except json.JSONDecodeError: await websocket.send_text(json.dumps({"type": "error", "error": "Bad JSON"})) continue if msg.get("type") != "message": await websocket.send_text(json.dumps({"type": "error", "error": "Unknown message type"})) continue text = (msg.get("text") or "").strip() if not text: continue if len(text) > 4000: await websocket.send_text(json.dumps({"type": "error", "error": "Message too long"})) continue ts = now_iso() async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "INSERT INTO messages(chat_id, user_id, text, ts) VALUES(?, ?, ?, ?)", (chat_id, user["id"], text, ts), ) await db.commit() msg_id = cur.lastrowid await cur.close() payload = { "type": "message", "id": msg_id, "chat_id": chat_id, "from": user["username"], "text": text, "ts": ts, } await manager.broadcast(chat_id, payload) except WebSocketDisconnect: manager.disconnect(chat_id, websocket) await manager.broadcast(chat_id, {"type": "system", "text": f"{user['username']} left", "ts": now_iso()}) except Exception: manager.disconnect(chat_id, websocket) try: await websocket.close(code=1011) except Exception: pass
client_cli.pypythonimport asyncio import json import sys from getpass import getpass import requests import websockets BASE = "http://127.0.0.1:8000" def post_json(path: str, payload: dict) -> dict: r = requests.post(BASE + path, json=payload, timeout=10) r.raise_for_status() return r.json() def login(username: str, password: str) -> str: r = requests.post( BASE + "/login", data={"username": username, "password": password}, timeout=10, ) r.raise_for_status() return r.json()["access_token"] def auth_get(path: str, token: str): r = requests.get(BASE + path, headers={"Authorization": f"Bearer {token}"}, timeout=10) r.raise_for_status() return r.json() def auth_post(path: str, token: str, payload: dict): r = requests.post(BASE + path, json=payload, headers={"Authorization": f"Bearer {token}"}, timeout=10) r.raise_for_status() return r.json() async def chat_loop(chat_id: int, token: str) -> None: ws_url = BASE.replace("http", "ws") + f"/ws/{chat_id}?token={token}" print(f"Connecting to {ws_url}") async with websockets.connect(ws_url) as ws: # история try: hist = auth_get(f"/chats/{chat_id}/history?limit=30", token) for m in hist: print(f"[{m['ts']}] {m['from_user']}: {m['text']}") except Exception as e: print("Could not load history:", e) async def recv_task(): async for raw in ws: try: msg = json.loads(raw) except Exception: print(raw) continue if msg.get("type") == "message": print(f"[{msg['ts']}] {msg['from']}: {msg['text']}") elif msg.get("type") == "system": print(f"* {msg.get('text')}") else: print(msg) async def send_task(): loop = asyncio.get_event_loop() while True: text = await loop.run_in_executor(None, sys.stdin.readline) if not text: return text = text.rstrip("\n") if text in ("/quit", "/exit"): return await ws.send(json.dumps({"type": "message", "text": text}, ensure_ascii=False)) await asyncio.gather(recv_task(), send_task()) def main(): print("TG-Lite CLI client") print("1) register 2) login") mode = input("> ").strip() username = input("username: ").strip() password = getpass("password: ") if mode == "1": token = post_json("/register", {"username": username, "password": password})["access_token"] else: token = login(username, password) while True: chats = auth_get("/chats", token) print("\nYour chats:") for c in chats: print(f" {c['id']}: {c['title']}") print("\nCommands: /new /open <id> /quit") cmd = input("> ").strip() if cmd == "/quit": return if cmd == "/new": title = input("title: ") members = input("members (comma usernames): ").strip() arr = [m.strip() for m in members.split(",") if m.strip()] c = auth_post("/chats", token, {"title": title, "members": arr}) print("Created chat", c) continue if cmd.startswith("/open"): try: chat_id = int(cmd.split()[1]) except Exception: print("Usage: /open <id>") continue print("Type messages. /quit to exit chat.") asyncio.run(chat_loop(chat_id, token)) continue if __name__ == "__main__": main()
requirements.txttxtfastapi==0.115.6 uvicorn[standard]==0.32.1 aiosqlite==0.20.0 python-jose==3.3.0 passlib[bcrypt]==1.7.4 requests==2.32.3 websockets==13.1
Сервер:
bashexport TG_LITE_JWT_SECRET="вставь_длинный_случайный_секрет" uvicorn server:app --reload
Клиент (в другом терминале):
bashpython client_cli.py
Если скажешь, что именно “как тг” тебе нужно первым (лички, группы, каналы, медиа, E2E, GUI/веб/мобилка), я сразу накину следующую версию под твой сценарий.
Добавь
личные диалоги (1 на 1) + поиск пользователей
статусы «печатает…», «прочитано»
вложения (файлы/фото) через S3/локальное хранилище
push-уведомления (мобильные устройства) или веб-push (браузер)
Отлично — ниже расширение TG-Lite до “почти как Telegram”. Я покажу архитектуру + конкретные куски кода, которые можно прямо добавить к твоему проекту, не переписывая всё с нуля.
is_private = truesqlALTER TABLE chats ADD COLUMN is_private INTEGER DEFAULT 0;
python@app.get("/users/search") async def search_users(q: str, user=Depends(current_user)): async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "SELECT id, username FROM users WHERE username LIKE ? LIMIT 20", (f"%{q}%",) ) rows = await cur.fetchall() return [{"id": r[0], "username": r[1]} for r in rows]
python@app.post("/dialogs/{username}") async def create_private_dialog(username: str, user=Depends(current_user)): async with aiosqlite.connect(DB_PATH) as db: other = await get_user_by_username(db, username) if not other: raise HTTPException(404, "User not found") # Проверяем существующий диалог cur = await db.execute(""" SELECT c.id FROM chats c JOIN chat_members m1 ON m1.chat_id = c.id JOIN chat_members m2 ON m2.chat_id = c.id WHERE c.is_private = 1 AND m1.user_id = ? AND m2.user_id = ? """, (user["id"], other["id"])) row = await cur.fetchone() if row: return {"chat_id": row[0]} # Создаём cur = await db.execute( "INSERT INTO chats(title, created_by, created_at, is_private) VALUES(?,?,?,1)", ("dialog", user["id"], now_iso()) ) chat_id = cur.lastrowid await db.execute("INSERT INTO chat_members VALUES(?,?)", (chat_id, user["id"])) await db.execute("INSERT INTO chat_members VALUES(?,?)", (chat_id, other["id"])) await db.commit() return {"chat_id": chat_id}
Через WebSocket не сохраняется в БД, просто broadcast.
json{"type":"typing","value":true}
ws_chat)pythonif msg["type"] == "typing": await manager.broadcast(chat_id, { "type": "typing", "from": user["username"] }) continue
sqlALTER TABLE messages ADD COLUMN read_at TEXT;
json{"type":"read","message_id":123}
pythonif msg["type"] == "read": mid = int(msg["message_id"]) async with aiosqlite.connect(DB_PATH) as db: await db.execute( "UPDATE messages SET read_at=? WHERE id=?", (now_iso(), mid) ) await db.commit() await manager.broadcast(chat_id, { "type": "read", "message_id": mid, "by": user["username"] })
bashpip install python-multipart mkdir uploads
pythonfrom fastapi import UploadFile, File @app.post("/upload") async def upload(file: UploadFile = File(...), user=Depends(current_user)): ext = file.filename.split(".")[-1] name = f"{uuid.uuid4()}.{ext}" path = f"uploads/{name}" with open(path, "wb") as f: f.write(await file.read()) return {"url": f"/files/{name}", "type": file.content_type}
pythonfrom fastapi.responses import FileResponse @app.get("/files/{name}") def get_file(name: str): return FileResponse(f"uploads/{name}")
json{ "type": "message", "text": "", "file": { "url": "/files/abc.jpg", "mime": "image/jpeg" } }
И сохраняешь JSON в text или отдельное поле payload.
bashpip install pywebpush
python@app.post("/push/subscribe") async def push_subscribe(data: dict, user=Depends(current_user)): async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO push_subs(user_id, data) VALUES (?,?)", (user["id"], json.dumps(data)) ) await db.commit()
pythonfrom pywebpush import webpush def send_push(sub, title, body): webpush( subscription_info=sub, data=json.dumps({"title": title, "body": body}), vapid_private_key=VAPID_PRIVATE, vapid_claims={"sub": "mailto:[email protected]"}, )
Вызывай при новом сообщении если пользователь оффлайн.
device_token(могу дать готовый FCM код отдельно)
✔ Личные диалоги
✔ Поиск пользователей
✔ typing / read receipts
✔ Фото и файлы
✔ Push-уведомления
Могу следующим шагом:
Скажи, что делаем следующим и под какую платформу.
Каналы как в Telegram и обьедини код и дополнения в единое приложение
Ниже — единое приложение (один server.py), в котором есть:
txtfastapi==0.115.6 uvicorn[standard]==0.32.1 aiosqlite==0.20.0 python-jose==3.3.0 passlib[bcrypt]==1.7.4 requests==2.32.3 websockets==13.1 python-multipart==0.0.9 boto3==1.35.90 pywebpush==1.14.0
bashpython -m venv .venv source .venv/bin/activate pip install -r requirements.txt export TG_LITE_JWT_SECRET="очень-длинный-случайный-секрет" uvicorn server:app --reload
pythonfrom __future__ import annotations import os import json import uuid from datetime import datetime, timedelta, timezone from typing import Dict, Set, Optional, List, Any import aiosqlite from fastapi import ( FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status, UploadFile, File ) from fastapi.responses import FileResponse from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import jwt, JWTError from passlib.context import CryptContext # Optional deps (S3 / WebPush) import boto3 from botocore.exceptions import BotoCoreError, ClientError from pywebpush import webpush, WebPushException # ========================= # Config # ========================= DB_PATH = os.getenv("TG_LITE_DB", "tg_lite.db") JWT_SECRET = os.getenv("TG_LITE_JWT_SECRET", "CHANGE_ME_TO_A_LONG_RANDOM_SECRET") JWT_ALG = "HS256" JWT_TTL_MIN = int(os.getenv("TG_LITE_JWT_TTL_MIN", str(60 * 24 * 7))) # 7 days UPLOAD_DIR = os.getenv("TG_LITE_UPLOAD_DIR", "uploads") PUBLIC_BASE_URL = os.getenv("TG_LITE_PUBLIC_BASE_URL", "") # e.g. https://your-domain.com # S3 (optional) USE_S3 = os.getenv("TG_LITE_USE_S3", "0") == "1" S3_BUCKET = os.getenv("TG_LITE_S3_BUCKET", "") S3_REGION = os.getenv("TG_LITE_S3_REGION", "") S3_PREFIX = os.getenv("TG_LITE_S3_PREFIX", "tg-lite/") # AWS creds via env/instance profile # WebPush (optional) ENABLE_WEBPUSH = os.getenv("TG_LITE_ENABLE_WEBPUSH", "0") == "1" VAPID_PRIVATE_KEY = os.getenv("TG_LITE_VAPID_PRIVATE_KEY", "") VAPID_SUBJECT = os.getenv("TG_LITE_VAPID_SUBJECT", "mailto:[email protected]") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login") app = FastAPI(title="TG-Lite Unified (Dialogs/Groups/Channels + Files + WebPush)") # ========================= # DB init # ========================= CREATE_SQL = """ PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS users( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, pwd_hash TEXT NOT NULL, created_at TEXT NOT NULL ); -- chats.type: group | dialog | channel CREATE TABLE IF NOT EXISTS chats( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, type TEXT NOT NULL, created_by INTEGER NOT NULL, created_at TEXT NOT NULL, channel_slug TEXT UNIQUE, FOREIGN KEY(created_by) REFERENCES users(id) ON DELETE CASCADE ); -- membership for group/dialog; for channels we'll also use subscriptions table, -- but admins are stored here too (so one place for privileges). CREATE TABLE IF NOT EXISTS chat_members( chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, role TEXT NOT NULL DEFAULT 'member', -- member | admin PRIMARY KEY(chat_id, user_id), FOREIGN KEY(chat_id) REFERENCES chats(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); -- channel subscriptions CREATE TABLE IF NOT EXISTS channel_subs( chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, PRIMARY KEY(chat_id, user_id), FOREIGN KEY(chat_id) REFERENCES chats(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); -- messages store JSON payload (text/file/etc) CREATE TABLE IF NOT EXISTS messages( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, payload TEXT NOT NULL, -- JSON string: {"text": "...", "file": {...}} ts TEXT NOT NULL, FOREIGN KEY(chat_id) REFERENCES chats(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); -- read receipts per user per message CREATE TABLE IF NOT EXISTS message_reads( message_id INTEGER NOT NULL, user_id INTEGER NOT NULL, read_at TEXT NOT NULL, PRIMARY KEY(message_id, user_id), FOREIGN KEY(message_id) REFERENCES messages(id) ON DELETE CASCADE, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); -- WebPush subscriptions (per user) CREATE TABLE IF NOT EXISTS push_subs( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, data TEXT NOT NULL, -- subscription_info JSON created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE ); """ @app.on_event("startup") async def startup() -> None: os.makedirs(UPLOAD_DIR, exist_ok=True) async with aiosqlite.connect(DB_PATH) as db: await db.executescript(CREATE_SQL) await db.commit() # ========================= # Helpers # ========================= def now_iso() -> str: return datetime.now(timezone.utc).isoformat() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, pwd_hash: str) -> bool: return pwd_context.verify(password, pwd_hash) def create_token(user_id: int, username: str) -> str: exp = datetime.now(timezone.utc) + timedelta(minutes=JWT_TTL_MIN) payload = {"sub": str(user_id), "username": username, "exp": exp} return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG) def absolutize_url(path: str) -> str: # If PUBLIC_BASE_URL set, return absolute for push and clients. if PUBLIC_BASE_URL: return PUBLIC_BASE_URL.rstrip("/") + path return path async def get_user_by_username(db: aiosqlite.Connection, username: str) -> Optional[dict]: cur = await db.execute("SELECT id, username, pwd_hash FROM users WHERE username = ?", (username,)) row = await cur.fetchone() await cur.close() if not row: return None return {"id": row[0], "username": row[1], "pwd_hash": row[2]} async def get_user_by_id(db: aiosqlite.Connection, user_id: int) -> Optional[dict]: cur = await db.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)) row = await cur.fetchone() await cur.close() if not row: return None return {"id": row[0], "username": row[1]} async def get_chat(db: aiosqlite.Connection, chat_id: int) -> Optional[dict]: cur = await db.execute( "SELECT id, title, type, created_by, channel_slug FROM chats WHERE id=?", (chat_id,) ) row = await cur.fetchone() await cur.close() if not row: return None return {"id": row[0], "title": row[1], "type": row[2], "created_by": row[3], "channel_slug": row[4]} async def current_user(token: str = Depends(oauth2_scheme)) -> dict: try: data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) user_id = int(data["sub"]) username = data["username"] except (JWTError, KeyError, ValueError): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_id(db, user_id) if not user or user["username"] != username: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user async def ws_auth_user(token: str) -> dict: try: data = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG]) user_id = int(data["sub"]) except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_id(db, user_id) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user async def ensure_can_read(db: aiosqlite.Connection, chat: dict, user_id: int) -> bool: if chat["type"] in ("group", "dialog"): cur = await db.execute("SELECT 1 FROM chat_members WHERE chat_id=? AND user_id=?", (chat["id"], user_id)) row = await cur.fetchone() await cur.close() return row is not None # channel: subscribed OR admin member can read cur = await db.execute( "SELECT 1 FROM channel_subs WHERE chat_id=? AND user_id=?", (chat["id"], user_id) ) row = await cur.fetchone() await cur.close() if row: return True cur = await db.execute( "SELECT 1 FROM chat_members WHERE chat_id=? AND user_id=?", (chat["id"], user_id) ) row2 = await cur.fetchone() await cur.close() return row2 is not None async def ensure_can_write(db: aiosqlite.Connection, chat: dict, user_id: int) -> bool: if chat["type"] in ("group", "dialog"): return await ensure_can_read(db, chat, user_id) # channel: only admins cur = await db.execute( "SELECT role FROM chat_members WHERE chat_id=? AND user_id=?", (chat["id"], user_id) ) row = await cur.fetchone() await cur.close() return bool(row and row[0] == "admin") async def get_chat_role(db: aiosqlite.Connection, chat_id: int, user_id: int) -> Optional[str]: cur = await db.execute("SELECT role FROM chat_members WHERE chat_id=? AND user_id=?", (chat_id, user_id)) row = await cur.fetchone() await cur.close() return row[0] if row else None def safe_json_loads(s: str) -> Any: try: return json.loads(s) except Exception: return {"text": s} # ========================= # WebPush # ========================= async def push_notify_user_ids(user_ids: List[int], title: str, body: str) -> None: if not (ENABLE_WEBPUSH and VAPID_PRIVATE_KEY): return payload = json.dumps({"title": title, "body": body}, ensure_ascii=False) async with aiosqlite.connect(DB_PATH) as db: for uid in set(user_ids): cur = await db.execute("SELECT data FROM push_subs WHERE user_id=? ORDER BY id DESC", (uid,)) subs = await cur.fetchall() await cur.close() for (data_str,) in subs: try: sub = json.loads(data_str) webpush( subscription_info=sub, data=payload, vapid_private_key=VAPID_PRIVATE_KEY, vapid_claims={"sub": VAPID_SUBJECT}, ) except (WebPushException, ValueError, TypeError): # ignore bad/expired subs; production: delete them continue # ========================= # REST API # ========================= @app.post("/register") async def register(payload: dict) -> dict: username = (payload.get("username") or "").strip() password = payload.get("password") or "" if not (3 <= len(username) <= 32) or not username.replace("_", "").isalnum(): raise HTTPException(400, "Bad username") if len(password) < 6: raise HTTPException(400, "Password too short") async with aiosqlite.connect(DB_PATH) as db: existing = await get_user_by_username(db, username) if existing: raise HTTPException(400, "Username already taken") cur = await db.execute( "INSERT INTO users(username, pwd_hash, created_at) VALUES(?,?,?)", (username, hash_password(password), now_iso()) ) await db.commit() user_id = cur.lastrowid await cur.close() return {"access_token": create_token(user_id, username), "token_type": "bearer"} @app.post("/login") async def login(form: OAuth2PasswordRequestForm = Depends()) -> dict: async with aiosqlite.connect(DB_PATH) as db: user = await get_user_by_username(db, form.username) if not user or not verify_password(form.password, user["pwd_hash"]): raise HTTPException(401, "Bad username or password") return {"access_token": create_token(user["id"], user["username"]), "token_type": "bearer"} # ---- Users search ---- @app.get("/users/search") async def search_users(q: str, user=Depends(current_user)) -> List[dict]: q = (q or "").strip() if not q: return [] async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "SELECT id, username FROM users WHERE username LIKE ? ORDER BY username LIMIT 20", (f"%{q}%",) ) rows = await cur.fetchall() await cur.close() return [{"id": r[0], "username": r[1]} for r in rows] # ---- Create group chat ---- @app.post("/chats/group") async def create_group(payload: dict, user=Depends(current_user)) -> dict: title = (payload.get("title") or "").strip() members = payload.get("members") or [] if not title: raise HTTPException(400, "Title required") async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "INSERT INTO chats(title, type, created_by, created_at) VALUES(?,?,?,?)", (title, "group", user["id"], now_iso()) ) chat_id = cur.lastrowid await cur.close() # creator as admin await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, user["id"], "admin") ) for uname in members: u = await get_user_by_username(db, (uname or "").strip()) if u: await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, u["id"], "member") ) await db.commit() return {"id": chat_id, "title": title, "type": "group"} # ---- Create dialog (1:1) ---- @app.post("/dialogs/{username}") async def create_dialog(username: str, user=Depends(current_user)) -> dict: username = (username or "").strip() if not username: raise HTTPException(400, "Username required") if username == user["username"]: raise HTTPException(400, "Cannot dialog with yourself") async with aiosqlite.connect(DB_PATH) as db: other = await get_user_by_username(db, username) if not other: raise HTTPException(404, "User not found") # find existing dialog between two users cur = await db.execute(""" SELECT c.id FROM chats c JOIN chat_members a ON a.chat_id=c.id JOIN chat_members b ON b.chat_id=c.id WHERE c.type='dialog' AND a.user_id=? AND b.user_id=? LIMIT 1 """, (user["id"], other["id"])) row = await cur.fetchone() await cur.close() if row: return {"chat_id": row[0]} cur = await db.execute( "INSERT INTO chats(title, type, created_by, created_at) VALUES(?,?,?,?)", ("dialog", "dialog", user["id"], now_iso()) ) chat_id = cur.lastrowid await cur.close() # both are members; optionally make both admin (not critical) await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, user["id"], "member") ) await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, other["id"], "member") ) await db.commit() return {"chat_id": chat_id} # ---- Channels (Telegram-like) ---- @app.post("/channels") async def create_channel(payload: dict, user=Depends(current_user)) -> dict: title = (payload.get("title") or "").strip() slug = (payload.get("slug") or "").strip().lower() if not title: raise HTTPException(400, "Title required") if not slug or any(c for c in slug if not (c.isalnum() or c in ("_", "-"))): raise HTTPException(400, "Bad slug (use a-z0-9 _ -)") async with aiosqlite.connect(DB_PATH) as db: # create try: cur = await db.execute( "INSERT INTO chats(title, type, created_by, created_at, channel_slug) VALUES(?,?,?,?,?)", (title, "channel", user["id"], now_iso(), slug) ) except aiosqlite.IntegrityError: raise HTTPException(400, "Slug already taken") chat_id = cur.lastrowid await cur.close() # creator admin await db.execute( "INSERT OR IGNORE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, user["id"], "admin") ) await db.commit() return {"id": chat_id, "title": title, "type": "channel", "slug": slug} @app.get("/channels/by-slug/{slug}") async def get_channel_by_slug(slug: str, user=Depends(current_user)) -> dict: slug = (slug or "").strip().lower() async with aiosqlite.connect(DB_PATH) as db: cur = await db.execute( "SELECT id, title, channel_slug FROM chats WHERE type='channel' AND channel_slug=?", (slug,) ) row = await cur.fetchone() await cur.close() if not row: raise HTTPException(404, "Channel not found") # show if subscribed cur = await db.execute( "SELECT 1 FROM channel_subs WHERE chat_id=? AND user_id=?", (row[0], user["id"]) ) is_sub = (await cur.fetchone()) is not None await cur.close() return {"id": row[0], "title": row[1], "slug": row[2], "subscribed": is_sub} @app.post("/channels/{chat_id}/subscribe") async def channel_subscribe(chat_id: int, user=Depends(current_user)) -> dict: async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) if not chat or chat["type"] != "channel": raise HTTPException(404, "Channel not found") await db.execute( "INSERT OR IGNORE INTO channel_subs(chat_id, user_id) VALUES(?,?)", (chat_id, user["id"]) ) await db.commit() return {"ok": True} @app.post("/channels/{chat_id}/unsubscribe") async def channel_unsubscribe(chat_id: int, user=Depends(current_user)) -> dict: async with aiosqlite.connect(DB_PATH) as db: await db.execute( "DELETE FROM channel_subs WHERE chat_id=? AND user_id=?", (chat_id, user["id"]) ) await db.commit() return {"ok": True} @app.post("/channels/{chat_id}/admins/add/{username}") async def channel_add_admin(chat_id: int, username: str, user=Depends(current_user)) -> dict: async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) if not chat or chat["type"] != "channel": raise HTTPException(404, "Channel not found") role = await get_chat_role(db, chat_id, user["id"]) if role != "admin": raise HTTPException(403, "Only admin can add admins") other = await get_user_by_username(db, (username or "").strip()) if not other: raise HTTPException(404, "User not found") await db.execute( "INSERT OR REPLACE INTO chat_members(chat_id, user_id, role) VALUES(?,?,?)", (chat_id, other["id"], "admin") ) await db.commit() return {"ok": True} # ---- List chats: groups/dialogs you are member + channels you subscribed/admin ---- @app.get("/chats") async def list_chats(user=Depends(current_user)) -> List[dict]: async with aiosqlite.connect(DB_PATH) as db: # groups+dialogs membership cur = await db.execute(""" SELECT c.id, c.title, c.type, c.channel_slug FROM chats c JOIN chat_members m ON m.chat_id=c.id WHERE m.user_id=? ORDER BY c.id DESC """, (user["id"],)) rows1 = await cur.fetchall() await cur.close() # channel subscriptions (avoid duplicates) cur = await db.execute(""" SELECT c.id, c.title, c.type, c.channel_slug FROM chats c JOIN channel_subs s ON s.chat_id=c.id WHERE s.user_id=? AND c.type='channel' ORDER BY c.id DESC """, (user["id"],)) rows2 = await cur.fetchall() await cur.close() seen = set() out = [] for r in rows1 + rows2: if r[0] in seen: continue seen.add(r[0]) out.append({"id": r[0], "title": r[1], "type": r[2], "slug": r[3]}) return out # ---- History ---- @app.get("/chats/{chat_id}/history") async def chat_history(chat_id: int, limit: int = 50, user=Depends(current_user)) -> List[dict]: limit = max(1, min(200, int(limit))) async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) if not chat: raise HTTPException(404, "Chat not found") if not await ensure_can_read(db, chat, user["id"]): raise HTTPException(403, "No access") cur = await db.execute(""" SELECT msg.id, msg.chat_id, u.username, msg.payload, msg.ts FROM messages msg JOIN users u ON u.id=msg.user_id WHERE msg.chat_id=? ORDER BY msg.id DESC LIMIT ? """, (chat_id, limit)) rows = await cur.fetchall() await cur.close() # read marks for current user ids = [r[0] for r in rows] read_map = {} if ids: qmarks = ",".join(["?"] * len(ids)) cur = await db.execute( f"SELECT message_id, read_at FROM message_reads WHERE user_id=? AND message_id IN ({qmarks})", (user["id"], *ids) ) rr = await cur.fetchall() await cur.close() read_map = {mid: ts for (mid, ts) in rr} rows.reverse() out = [] for r in rows: payload = safe_json_loads(r[3]) out.append({ "id": r[0], "chat_id": r[1], "from_user": r[2], "payload": payload, "ts": r[4], "read_at": read_map.get(r[0]), }) return out # ---- Files: Local by default; optional S3 ---- def s3_client(): return boto3.client("s3", region_name=S3_REGION or None) async def store_file_and_get_url(file: UploadFile) -> dict: content = await file.read() mime = file.content_type or "application/octet-stream" ext = (file.filename.split(".")[-1] if file.filename and "." in file.filename else "bin").lower() key = f"{uuid.uuid4().hex}.{ext}" if USE_S3: if not S3_BUCKET: raise HTTPException(500, "S3 bucket not configured") s3_key = f"{S3_PREFIX}{key}" try: s3_client().put_object( Bucket=S3_BUCKET, Key=s3_key, Body=content, ContentType=mime, ) except (BotoCoreError, ClientError) as e: raise HTTPException(500, f"S3 upload failed: {e}") # public URL: depends on your bucket policy/CDN; simplest: url = f"https://{S3_BUCKET}.s3.amazonaws.com/{s3_key}" return {"url": url, "mime": mime, "name": file.filename, "storage": "s3"} # local path = os.path.join(UPLOAD_DIR, key) with open(path, "wb") as f: f.write(content) url = absolutize_url(f"/files/{key}") return {"url": url, "mime": mime, "name": file.filename, "storage": "local"} @app.post("/upload") async def upload(file: UploadFile = File(...), user=Depends(current_user)) -> dict: return await store_file_and_get_url(file) @app.get("/files/{name}") def get_file(name: str): path = os.path.join(UPLOAD_DIR, name) if not os.path.exists(path): raise HTTPException(404, "Not found") return FileResponse(path) # ---- WebPush subscribe ---- @app.post("/push/subscribe") async def push_subscribe(payload: dict, user=Depends(current_user)) -> dict: # payload is subscription_info from browser Push API async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO push_subs(user_id, data, created_at) VALUES(?,?,?)", (user["id"], json.dumps(payload), now_iso()) ) await db.commit() return {"ok": True} # ========================= # WebSocket # ========================= class ConnectionManager: def __init__(self) -> None: self.chat_sockets: Dict[int, Set[WebSocket]] = {} self.chat_online_users: Dict[int, Set[int]] = {} # chat_id -> user_ids async def connect(self, chat_id: int, user_id: int, websocket: WebSocket) -> None: await websocket.accept() self.chat_sockets.setdefault(chat_id, set()).add(websocket) self.chat_online_users.setdefault(chat_id, set()).add(user_id) def disconnect(self, chat_id: int, user_id: int, websocket: WebSocket) -> None: if chat_id in self.chat_sockets: self.chat_sockets[chat_id].discard(websocket) if not self.chat_sockets[chat_id]: del self.chat_sockets[chat_id] if chat_id in self.chat_online_users: self.chat_online_users[chat_id].discard(user_id) if not self.chat_online_users[chat_id]: del self.chat_online_users[chat_id] async def broadcast(self, chat_id: int, payload: dict) -> None: sockets = list(self.chat_sockets.get(chat_id, set())) data = json.dumps(payload, ensure_ascii=False) for ws in sockets: try: await ws.send_text(data) except Exception: # best-effort cleanup happens on disconnect pass def online_user_ids(self, chat_id: int) -> Set[int]: return set(self.chat_online_users.get(chat_id, set())) manager = ConnectionManager() @app.websocket("/ws/{chat_id}") async def ws_chat(websocket: WebSocket, chat_id: int, token: str) -> None: """ Connect: ws://localhost:8000/ws/1?token=... Send types: - {"type":"message","payload":{"text":"hi"}} - {"type":"typing","value":true} - {"type":"read","message_id":123} """ # auth try: user = await ws_auth_user(token) except HTTPException: await websocket.close(code=4401) return async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) if not chat: await websocket.close(code=4404) return if not await ensure_can_read(db, chat, user["id"]): await websocket.close(code=4403) return await manager.connect(chat_id, user["id"], websocket) await manager.broadcast(chat_id, {"type": "system", "text": f"{user['username']} joined", "ts": now_iso()}) try: while True: raw = await websocket.receive_text() try: msg = json.loads(raw) except json.JSONDecodeError: await websocket.send_text(json.dumps({"type": "error", "error": "Bad JSON"})) continue mtype = msg.get("type") # typing (ephemeral) if mtype == "typing": await manager.broadcast(chat_id, {"type": "typing", "from": user["username"], "value": bool(msg.get("value"))}) continue # read receipt if mtype == "read": try: mid = int(msg.get("message_id")) except Exception: await websocket.send_text(json.dumps({"type": "error", "error": "Bad message_id"})) continue async with aiosqlite.connect(DB_PATH) as db: # validate message belongs to chat & user can read cur = await db.execute("SELECT chat_id FROM messages WHERE id=?", (mid,)) row = await cur.fetchone() await cur.close() if not row or row[0] != chat_id: continue await db.execute( "INSERT OR REPLACE INTO message_reads(message_id, user_id, read_at) VALUES(?,?,?)", (mid, user["id"], now_iso()) ) await db.commit() await manager.broadcast(chat_id, {"type": "read", "message_id": mid, "by": user["username"], "ts": now_iso()}) continue # message if mtype != "message": await websocket.send_text(json.dumps({"type": "error", "error": "Unknown type"})) continue payload = msg.get("payload") if not isinstance(payload, dict): await websocket.send_text(json.dumps({"type": "error", "error": "payload must be object"})) continue # normalize text = (payload.get("text") or "").strip() file_obj = payload.get("file") # {"url": "...", "mime": "...", ...} if not text and not file_obj: continue if len(text) > 4000: await websocket.send_text(json.dumps({"type": "error", "error": "Message too long"})) continue # permissions: channels only admins can post async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) if not chat or not await ensure_can_write(db, chat, user["id"]): await websocket.send_text(json.dumps({"type": "error", "error": "No write access"})) continue ts = now_iso() cur = await db.execute( "INSERT INTO messages(chat_id, user_id, payload, ts) VALUES(?,?,?,?)", (chat_id, user["id"], json.dumps(payload, ensure_ascii=False), ts) ) await db.commit() msg_id = cur.lastrowid await cur.close() event = { "type": "message", "id": msg_id, "chat_id": chat_id, "from": user["username"], "payload": payload, "ts": ts, } await manager.broadcast(chat_id, event) # WebPush notify (best-effort): notify users who are NOT online in this chat # For group/dialog: members; for channel: subscribers if ENABLE_WEBPUSH and (text or file_obj): async with aiosqlite.connect(DB_PATH) as db: chat = await get_chat(db, chat_id) online = manager.online_user_ids(chat_id) recipients: List[int] = [] if chat and chat["type"] in ("group", "dialog"): cur = await db.execute("SELECT user_id FROM chat_members WHERE chat_id=?", (chat_id,)) recipients = [r[0] for r in await cur.fetchall()] await cur.close() elif chat and chat["type"] == "channel": cur = await db.execute("SELECT user_id FROM channel_subs WHERE chat_id=?", (chat_id,)) recipients = [r[0] for r in await cur.fetchall()] await cur.close() # exclude sender and online users recipients = [uid for uid in recipients if uid != user["id"] and uid not in online] if recipients: title = f"{chat['title']}" if chat else "New message" body = text if text else ("📎 " + (file_obj.get("name") or "file")) await push_notify_user_ids(recipients, title=title, body=body) except WebSocketDisconnect: manager.disconnect(chat_id, user["id"], websocket) await manager.broadcast(chat_id, {"type": "system", "text": f"{user['username']} left", "ts": now_iso()}) except Exception: manager.disconnect(chat_id, user["id"], websocket) try: await websocket.close(code=1011) except Exception: pass
POST /channels (Bearer token)
json{"title":"My News","slug":"my_news"}
GET /channels/by-slug/my_news
POST /channels/{chat_id}/subscribe
ws://.../ws/{chat_id}?token=... как обычные "type":"message".POST /upload (multipart) → вернёт {url, mime, name, storage}
Дальше отправляешь в WS:
json{"type":"message","payload":{"text":"","file":{"url":"/files/..","mime":"image/jpeg","name":"pic.jpg"}}}
Включить:
bashexport TG_LITE_USE_S3=1 export TG_LITE_S3_BUCKET="your-bucket" export TG_LITE_S3_PREFIX="tg-lite/" # AWS creds стандартно (env/role)
Включить:
bashexport TG_LITE_ENABLE_WEBPUSH=1 export TG_LITE_VAPID_PRIVATE_KEY="...private key..." export TG_LITE_VAPID_SUBJECT="mailto:[email protected]" export TG_LITE_PUBLIC_BASE_URL="https://your-domain.com"
Клиент (браузер) должен отправить subscription object в POST /push/subscribe.
Если хочешь, следующим сообщением дам:
client_cli.py обновлённый (диалоги/каналы/typing/read/upload),готовый client_cli.py обновлённый (диалоги/каналы/набор текста/чтение/загрузка) и веб клиент
client_cli.py (диалоги/каналы/typing/read/upload)Сохрани как client_cli.py рядом с server.py.
pythonimport asyncio import json import os import sys from getpass import getpass from pathlib import Path from typing import Optional, Dict, Any, List import requests import websockets BASE = os.getenv("TG_LITE_BASE", "http://127.0.0.1:8000").rstrip("/") # ---------- HTTP helpers ---------- def _auth_headers(token: str) -> dict: return {"Authorization": f"Bearer {token}"} def post_json(path: str, payload: dict, token: Optional[str] = None) -> dict: headers = _auth_headers(token) if token else {} r = requests.post(f"{BASE}{path}", json=payload, headers=headers, timeout=20) if r.status_code >= 400: raise RuntimeError(f"{r.status_code}: {r.text}") return r.json() def get_json(path: str, token: Optional[str] = None) -> dict: headers = _auth_headers(token) if token else {} r = requests.get(f"{BASE}{path}", headers=headers, timeout=20) if r.status_code >= 400: raise RuntimeError(f"{r.status_code}: {r.text}") return r.json() def post_form(path: str, data: dict) -> dict: r = requests.post(f"{BASE}{path}", data=data, timeout=20) if r.status_code >= 400: raise RuntimeError(f"{r.status_code}: {r.text}") return r.json() def upload_file(path: str, token: str) -> dict: p = Path(path) if not p.exists() or not p.is_file(): raise RuntimeError("File not found") with p.open("rb") as f: files = {"file": (p.name, f)} r = requests.post(f"{BASE}/upload", files=files, headers=_auth_headers(token), timeout=60) if r.status_code >= 400: raise RuntimeError(f"{r.status_code}: {r.text}") return r.json() # ---------- Auth ---------- def register(username: str, password: str) -> str: data = post_json("/register", {"username": username, "password": password}) return data["access_token"] def login(username: str, password: str) -> str: data = post_form("/login", {"username": username, "password": password}) return data["access_token"] # ---------- Chat ops ---------- def list_chats(token: str) -> List[dict]: return get_json("/chats", token) def search_users(token: str, q: str) -> List[dict]: return get_json(f"/users/search?q={requests.utils.quote(q)}", token) def create_group(token: str, title: str, members: List[str]) -> dict: return post_json("/chats/group", {"title": title, "members": members}, token) def create_dialog(token: str, username: str) -> dict: return post_json(f"/dialogs/{requests.utils.quote(username)}", {}, token) def create_channel(token: str, title: str, slug: str) -> dict: return post_json("/channels", {"title": title, "slug": slug}, token) def channel_by_slug(token: str, slug: str) -> dict: return get_json(f"/channels/by-slug/{requests.utils.quote(slug)}", token) def channel_subscribe(token: str, chat_id: int) -> dict: return post_json(f"/channels/{chat_id}/subscribe", {}, token) def channel_unsubscribe(token: str, chat_id: int) -> dict: return post_json(f"/channels/{chat_id}/unsubscribe", {}, token) def history(token: str, chat_id: int, limit: int = 50) -> List[dict]: return get_json(f"/chats/{chat_id}/history?limit={limit}", token) # ---------- WS chat loop ---------- async def chat_loop(chat_id: int, token: str) -> None: ws_url = BASE.replace("http", "ws") + f"/ws/{chat_id}?token={token}" print(f"\n[WS] {ws_url}") print("Commands inside chat:") print(" /file <path> upload and send file") print(" /typing send typing pulse") print(" /exit leave chat\n") typing_state = False last_typing_sent = 0.0 async with websockets.connect(ws_url, ping_interval=20) as ws: # show history try: hist = history(token, chat_id, 30) if hist: print("---- history ----") for m in hist: frm = m["from_user"] ts = m["ts"] payload = m.get("payload") or {} text = payload.get("text") or "" file_obj = payload.get("file") extra = "" if file_obj: extra = f" [file: {file_obj.get('name') or file_obj.get('url')}]" ra = f" (read_at={m['read_at']})" if m.get("read_at") else "" if text or file_obj: print(f"[{ts}] {frm}: {text}{extra}{ra}") print("---- live ----") except Exception as e: print("Could not load history:", e) async def recv_task(): nonlocal typing_state async for raw in ws: try: msg = json.loads(raw) except Exception: print(raw) continue t = msg.get("type") if t == "message": mid = msg.get("id") frm = msg.get("from") ts = msg.get("ts") payload = msg.get("payload") or {} text = payload.get("text") or "" file_obj = payload.get("file") extra = "" if file_obj: extra = f" [file: {file_obj.get('name') or file_obj.get('url')}]" print(f"[{ts}] {frm}: {text}{extra}") # auto "read" for messages not from you try: await ws.send(json.dumps({"type": "read", "message_id": mid})) except Exception: pass elif t == "typing": frm = msg.get("from") val = msg.get("value") if frm: print(f"* {frm} is typing..." if val else f"* {frm} stopped typing") elif t == "read": print(f"* read: message {msg.get('message_id')} by {msg.get('by')}") elif t == "system": print(f"* {msg.get('text')}") elif t == "error": print(f"! error: {msg.get('error')}") else: print(msg) # reset our typing if server noise happens typing_state = False async def send_task(): nonlocal typing_state, last_typing_sent loop = asyncio.get_event_loop() while True: line = await loop.run_in_executor(None, sys.stdin.readline) if not line: return line = line.rstrip("\n") if line.strip() == "/exit": return if line.startswith("/file "): path = line[len("/file "):].strip().strip('"') try: info = upload_file(path, token) payload = {"text": "", "file": info} await ws.send(json.dumps({"type": "message", "payload": payload}, ensure_ascii=False)) except Exception as e: print("Upload failed:", e) continue if line.strip() == "/typing": await ws.send(json.dumps({"type": "typing", "value": True})) await asyncio.sleep(0.5) await ws.send(json.dumps({"type": "typing", "value": False})) continue # send typing pulse just before send (CLI can't detect per-keystroke well) now = asyncio.get_event_loop().time() if (now - last_typing_sent) > 1.5: try: await ws.send(json.dumps({"type": "typing", "value": True})) last_typing_sent = now typing_state = True except Exception: pass payload = {"text": line} await ws.send(json.dumps({"type": "message", "payload": payload}, ensure_ascii=False)) if typing_state: try: await ws.send(json.dumps({"type": "typing", "value": False})) except Exception: pass typing_state = False await asyncio.gather(recv_task(), send_task()) def main(): print("TG-Lite CLI") print(f"Server: {BASE}") print("1) register 2) login") mode = input("> ").strip() username = input("username: ").strip() password = getpass("password: ") if mode == "1": token = register(username, password) else: token = login(username, password) while True: try: chats = list_chats(token) except Exception as e: print("Failed to list chats:", e) return print("\nYour chats:") if not chats: print(" (none)") for c in chats: slug = f" @{c['slug']}" if c.get("slug") else "" print(f" {c['id']}: [{c['type']}] {c['title']}{slug}") print("\nCommands:") print(" /search <q> search users") print(" /dialog <username> create/open 1:1") print(" /group create group") print(" /channel create channel") print(" /findchan <slug> find channel by slug") print(" /sub <chat_id> subscribe to channel") print(" /unsub <chat_id> unsubscribe") print(" /open <chat_id> open chat") print(" /quit") cmd = input("> ").strip() if cmd == "/quit": return try: if cmd.startswith("/search "): q = cmd[len("/search "):].strip() res = search_users(token, q) for u in res: print(f" {u['username']} (id={u['id']})") elif cmd.startswith("/dialog "): u = cmd[len("/dialog "):].strip() r = create_dialog(token, u) cid = int(r["chat_id"]) print("Dialog chat_id:", cid) elif cmd == "/group": title = input("title: ").strip() members = input("members (comma usernames): ").strip() arr = [m.strip() for m in members.split(",") if m.strip()] r = create_group(token, title, arr) print("Created:", r) elif cmd == "/channel": title = input("title: ").strip() slug = input("slug (unique): ").strip() r = create_channel(token, title, slug) print("Created:", r) elif cmd.startswith("/findchan "): slug = cmd[len("/findchan "):].strip() r = channel_by_slug(token, slug) print("Channel:", r) elif cmd.startswith("/sub "): cid = int(cmd.split()[1]) print(channel_subscribe(token, cid)) elif cmd.startswith("/unsub "): cid = int(cmd.split()[1]) print(channel_unsubscribe(token, cid)) elif cmd.startswith("/open "): cid = int(cmd.split()[1]) asyncio.run(chat_loop(cid, token)) else: print("Unknown command") except Exception as e: print("Error:", e) if __name__ == "__main__": main()
Создай папку web/:
web/index.htmlweb/app.jsweb/sw.jsЗапусти любой статический сервер (важно: Service Worker работает по HTTPS или на localhost):
bashcd web python -m http.server 5173 # открой http://127.0.0.1:5173
В
app.jsукажиAPI_BASE(по умолчаниюhttp://127.0.0.1:8000).
web/index.htmlhtml<!doctype html> <html lang="ru"> <head> <meta charset="utf-8" /> <meta name="viewport" content="width=device-width,initial-scale=1" /> <title>TG-Lite Web</title> <style> body { font-family: system-ui, sans-serif; margin: 0; display: grid; grid-template-columns: 320px 1fr; height: 100vh; } #left { border-right: 1px solid #ddd; padding: 12px; overflow: auto; } #right { padding: 12px; display: grid; grid-template-rows: auto 1fr auto; gap: 10px; } input, button { padding: 8px; } .row { display: flex; gap: 8px; align-items: center; flex-wrap: wrap; } #msgs { border: 1px solid #ddd; padding: 8px; overflow: auto; } .msg { margin: 6px 0; } .meta { color: #666; font-size: 12px; } .typing { color: #0a7; font-size: 12px; } .chat { padding: 8px; border: 1px solid #eee; margin: 6px 0; cursor: pointer; } .chat:hover { background: #fafafa; } code { background: #f6f6f6; padding: 2px 4px; } </style> </head> <body> <div id="left"> <h3>Auth</h3> <div class="row"> <input id="u" placeholder="username" /> <input id="p" placeholder="password" type="password" /> </div> <div class="row"> <button id="btnReg">Register</button> <button id="btnLogin">Login</button> </div> <hr /> <h3>Chats</h3> <div class="row"> <button id="btnRefresh">Refresh</button> <button id="btnPush">Enable WebPush</button> </div> <h4>Dialogs</h4> <div class="row"> <input id="searchQ" placeholder="search users..." /> <button id="btnSearch">Search</button> </div> <div id="searchRes"></div> <h4>Create</h4> <div class="row"> <button id="btnNewGroup">New Group</button> <button id="btnNewChannel">New Channel</button> </div> <h4>Find channel</h4> <div class="row"> <input id="chanSlug" placeholder="channel slug" /> <button id="btnFindChan">Find</button> </div> <div id="chanInfo"></div> <h4>List</h4> <div id="chats"></div> </div> <div id="right"> <div class="row"> <div>Open chat: <code id="chatId">none</code></div> <div class="typing" id="typing"></div> </div> <div id="msgs"></div> <div class="row"> <input id="msg" placeholder="message..." style="flex: 1;" /> <input id="file" type="file" /> <button id="btnSend">Send</button> </div> <div class="meta"> Канал: писать могут только админы. Read receipts/typing работают через WebSocket. </div> </div> <script src="./app.js"></script> </body> </html>
web/sw.jsjsself.addEventListener("push", (event) => { let data = {}; try { data = event.data ? event.data.json() : {}; } catch {} const title = data.title || "TG-Lite"; const body = data.body || "New message"; event.waitUntil( self.registration.showNotification(title, { body, icon: undefined, badge: undefined, data }) ); }); self.addEventListener("notificationclick", (event) => { event.notification.close(); event.waitUntil(clients.openWindow("/")); });
web/app.jsjsconst API_BASE = (localStorage.getItem("API_BASE") || "http://127.0.0.1:8000").replace(/\/+$/, ""); let token = localStorage.getItem("token") || ""; let ws = null; let currentChatId = null; let typingTimer = null; let typingActive = false; // WebPush: нужен VAPID public key (base64url). Вставь сюда свой. const VAPID_PUBLIC_KEY = localStorage.getItem("VAPID_PUBLIC_KEY") || ""; // e.g. "BOr...." const $ = (id) => document.getElementById(id); function headers() { return token ? { "Authorization": `Bearer ${token}` } : {}; } async function http(method, path, body) { const r = await fetch(`${API_BASE}${path}`, { method, headers: { ...(body ? { "Content-Type": "application/json" } : {}), ...headers() }, body: body ? JSON.stringify(body) : undefined }); if (!r.ok) throw new Error(`${r.status}: ${await r.text()}`); return r.json(); } async function uploadFile(file) { const fd = new FormData(); fd.append("file", file, file.name); const r = await fetch(`${API_BASE}/upload`, { method: "POST", headers: headers(), body: fd }); if (!r.ok) throw new Error(`${r.status}: ${await r.text()}`); return r.json(); // {url,mime,name,storage} } function renderChats(items) { const box = $("chats"); box.innerHTML = ""; items.forEach((c) => { const div = document.createElement("div"); div.className = "chat"; div.innerHTML = `<b>${c.title}</b> <span class="meta">[${c.type}] ${c.slug ? "@"+c.slug : ""}</span>`; div.onclick = () => openChat(c.id); box.appendChild(div); if (c.type === "channel") { const row = document.createElement("div"); row.className = "row"; const sub = document.createElement("button"); sub.textContent = "Subscribe"; sub.onclick = async (e) => { e.stopPropagation(); await http("POST", `/channels/${c.id}/subscribe`, {}); alert("Subscribed"); }; const unsub = document.createElement("button"); unsub.textContent = "Unsubscribe"; unsub.onclick = async (e) => { e.stopPropagation(); await http("POST", `/channels/${c.id}/unsubscribe`, {}); alert("Unsubscribed"); }; row.appendChild(sub); row.appendChild(unsub); div.appendChild(row); } }); } function addMsg(line) { const box = $("msgs"); const div = document.createElement("div"); div.className = "msg"; div.textContent = line; box.appendChild(div); box.scrollTop = box.scrollHeight; } async function refreshChats() { const items = await http("GET", "/chats"); renderChats(items); } async function loadHistory(chatId) { $("msgs").innerHTML = ""; const items = await http("GET", `/chats/${chatId}/history?limit=50`); items.forEach((m) => { const p = m.payload || {}; const t = p.text || ""; const f = p.file; const extra = f ? ` [file: ${f.name || f.url}]` : ""; addMsg(`[${m.ts}] ${m.from_user}: ${t}${extra}${m.read_at ? " ✓" : ""}`); }); } function wsSend(obj) { if (ws && ws.readyState === WebSocket.OPEN) ws.send(JSON.stringify(obj)); } function setTyping(text) { $("typing").textContent = text || ""; } async function openChat(chatId) { currentChatId = chatId; $("chatId").textContent = String(chatId); setTyping(""); if (ws) { try { ws.close(); } catch {} ws = null; } await loadHistory(chatId); const wsUrl = API_BASE.replace("http", "ws") + `/ws/${chatId}?token=${encodeURIComponent(token)}`; ws = new WebSocket(wsUrl); ws.onopen = () => addMsg("* connected"); ws.onclose = () => addMsg("* disconnected"); ws.onerror = () => addMsg("! ws error"); ws.onmessage = async (ev) => { let msg = {}; try { msg = JSON.parse(ev.data); } catch { addMsg(ev.data); return; } if (msg.type === "message") { const p = msg.payload || {}; const t = p.text || ""; const f = p.file; const extra = f ? ` [file: ${f.name || f.url}]` : ""; addMsg(`[${msg.ts}] ${msg.from}: ${t}${extra}`); // auto read receipt wsSend({ type: "read", message_id: msg.id }); } else if (msg.type === "typing") { if (msg.value) setTyping(`${msg.from} печатает…`); else setTyping(""); } else if (msg.type === "read") { addMsg(`* read: message ${msg.message_id} by ${msg.by}`); } else if (msg.type === "system") { addMsg(`* ${msg.text}`); } else if (msg.type === "error") { addMsg(`! ${msg.error}`); } else { addMsg(JSON.stringify(msg)); } }; } function typingPulse() { if (!ws || ws.readyState !== WebSocket.OPEN) return; if (!typingActive) { typingActive = true; wsSend({ type: "typing", value: true }); } clearTimeout(typingTimer); typingTimer = setTimeout(() => { typingActive = false; wsSend({ type: "typing", value: false }); }, 900); } // ---- WebPush helpers ---- function urlBase64ToUint8Array(base64String) { const padding = "=".repeat((4 - (base64String.length % 4)) % 4); const base64 = (base64String + padding).replace(/-/g, "+").replace(/_/g, "/"); const rawData = atob(base64); const outputArray = new Uint8Array(rawData.length); for (let i = 0; i < rawData.length; ++i) outputArray[i] = rawData.charCodeAt(i); return outputArray; } async function enableWebPush() { if (!VAPID_PUBLIC_KEY) { alert("Нужно задать VAPID public key в localStorage key 'VAPID_PUBLIC_KEY' (base64url)."); return; } if (!("serviceWorker" in navigator)) { alert("Service Worker не поддерживается."); return; } const reg = await navigator.serviceWorker.register("./sw.js"); const perm = await Notification.requestPermission(); if (perm !== "granted") { alert("Notification permission denied"); return; } const sub = await reg.pushManager.subscribe({ userVisibleOnly: true, applicationServerKey: urlBase64ToUint8Array(VAPID_PUBLIC_KEY) }); await http("POST", "/push/subscribe", sub.toJSON()); alert("WebPush enabled!"); } // ---- UI wiring ---- $("btnReg").onclick = async () => { const u = $("u").value.trim(); const p = $("p").value; const r = await http("POST", "/register", { username: u, password: p }); token = r.access_token; localStorage.setItem("token", token); await refreshChats(); }; $("btnLogin").onclick = async () => { // OAuth2PasswordRequestForm -> form-encoded const u = $("u").value.trim(); const p = $("p").value; const form = new URLSearchParams(); form.set("username", u); form.set("password", p); const r = await fetch(`${API_BASE}/login`, { method: "POST", body: form }); if (!r.ok) throw new Error(`${r.status}: ${await r.text()}`); const data = await r.json(); token = data.access_token; localStorage.setItem("token", token); await refreshChats(); }; $("btnRefresh").onclick = refreshChats; $("btnSearch").onclick = async () => { const q = $("searchQ").value.trim(); const res = await http("GET", `/users/search?q=${encodeURIComponent(q)}`); const box = $("searchRes"); box.innerHTML = ""; res.forEach((u) => { const row = document.createElement("div"); row.className = "row"; const b = document.createElement("button"); b.textContent = `Dialog: ${u.username}`; b.onclick = async () => { const r = await http("POST", `/dialogs/${encodeURIComponent(u.username)}`, {}); await refreshChats(); await openChat(r.chat_id); }; row.appendChild(b); box.appendChild(row); }); }; $("btnNewGroup").onclick = async () => { const title = prompt("Group title?"); if (!title) return; const members = (prompt("Members (comma usernames) ?") || "") .split(",").map(s => s.trim()).filter(Boolean); await http("POST", "/chats/group", { title, members }); await refreshChats(); }; $("btnNewChannel").onclick = async () => { const title = prompt("Channel title?"); if (!title) return; const slug = prompt("Channel slug (unique)?"); if (!slug) return; await http("POST", "/channels", { title, slug }); await refreshChats(); }; $("btnFindChan").onclick = async () => { const slug = $("chanSlug").value.trim(); if (!slug) return; const r = await http("GET", `/channels/by-slug/${encodeURIComponent(slug)}`); $("chanInfo").innerHTML = ` <div class="chat"> <b>${r.title}</b> <span class="meta">@${r.slug} id=${r.id}</span> <div class="row"> <button id="subNow">Subscribe</button> <button id="openNow">Open</button> </div> </div> `; $("subNow").onclick = async () => { await http("POST", `/channels/${r.id}/subscribe`, {}); alert("Subscribed"); await refreshChats(); }; $("openNow").onclick = async () => openChat(r.id); }; $("btnSend").onclick = async () => { if (!currentChatId) return alert("Open a chat first"); const text = $("msg").value; const file = $("file").files && $("file").files[0]; let payload = {}; if (file) { const info = await uploadFile(file); payload.file = info; } if (text && text.trim()) payload.text = text.trim(); if (!payload.text && !payload.file) return; wsSend({ type: "message", payload }); $("msg").value = ""; $("file").value = ""; }; $("msg").addEventListener("input", () => typingPulse()); $("btnPush").onclick = enableWebPush; // ---- bootstrap ---- (async () => { if (token) { try { await refreshChats(); } catch {} } })();
bashexport TG_LITE_ENABLE_WEBPUSH=1 export TG_LITE_VAPID_PRIVATE_KEY="...private..." export TG_LITE_VAPID_SUBJECT="mailto:[email protected]"
jslocalStorage.setItem("VAPID_PUBLIC_KEY", "ТУТ_ТВОЙ_PUBLIC_KEY");
Если хочешь, я могу:
GET /push/vapid-public, чтобы веб-клиент сам подхватывал public key без ручного localStorage;