напиши мне инвайтер для группы телеграм
Создано: 2 августа 2025 г.
Создано: 2 августа 2025 г.
напиши мне инвайтер для группы телеграм
Конечно! Ниже — готовые варианты инвайта для группы в Telegram. Выбирайте и подставьте ссылку/название. Могу дописать под вашу тему (о чём группа, кто аудитория, тон).
1) Коротко и по-делу
Присоединяйтесь к [Название группы] — обсуждаем [тема], делимся опытом и полезностями каждый день. Ждём вас 👉 [ссылка]
2) Дружелюбно
Привет! Если вам близка тема [тема], приходите в нашу уютную группу [Название]. Без спама, только практика, ответы на вопросы и живое общение. Вход здесь: [ссылка]
3) С пользой (упор на ценность)
В [Название группы] вы получите:
• чек-листы и гайды по [тема]
• разборы кейсов каждую неделю
• ответы на вопросы от участников
Подключайтесь: [ссылка]
4) Деловой стиль
Приглашаем в профессиональное сообщество [Название] по [тема]. Стандарты, лучшие практики, обмен опытом и нетворкинг. Присоединиться: [ссылка]
5) Динамично/маркетингово
Хватит листать — пора прокачать [тема] вместе с нами! В [Название]: быстрые советы, разборы, встречи и поддержка единомышленников. Жми: [ссылка] 🚀
6) Для закрытой/камерной группы
Открываем доступ в маленький закрытый круг по [тема]. Акцент на практику, фидбек и взаимопомощь. Если вам по пути — заходите: [ссылка] (мест немного)
7) С призывом к первому действию
Сегодня в [Название] разбираем [конкретная тема/вопрос]. Хотите участвовать и получить материалы? Подключайтесь сейчас: [ссылка]
Советы по оформлению:
Если подскажете: тематику, аудиторию, тон (дружелюбный/строгий/игровой), и ключевые преимущества, я соберу один точный инвайт под вашу группу.
инвайтер на python нужен
Ниже два основных варианта «инвайтера» на Python — для личного аккаунта через Telethon и для бота через библиотеку python-telegram-bot. Выберите тот, который подходит под ваши права и задачи.
Требуется: личный аккаунт-админ в целевой группе (+ право Invite Users),
api_id/api_hash
с my.telegram.org и файлusers.csv
со списком юзернеймов.
pythonimport asyncio, csv from telethon import TelegramClient from telethon.tl.functions.channels import InviteToChannelRequest API_ID = 123456 # ваше api_id API_HASH = "YOUR_API_HASH" SESSION = "inviter" # имя .session-файла TARGET = "https://t.me/+XXXXXXXXXXXX" # @username или приватная ссылка группы BATCH_SIZE = 5 # приглашать по N человек за раз PAUSE = 10 # сек между пачками (уменьшает FloodWait) async def main(): client = TelegramClient(SESSION, API_ID, API_HASH) await client.start() channel = await client.get_entity(TARGET) users = [] with open("users.csv") as f: for row in csv.reader(f): try: users.append(await client.get_input_entity(row[0].strip())) except Exception as e: print("Пропуск", row[0], "→", e) for i in range(0, len(users), BATCH_SIZE): batch = users[i:i+BATCH_SIZE] try: await client(InviteToChannelRequest(channel, batch)) print(f"➕ Добавлено {len(batch)}") except Exception as e: print("Flood/ошибка:", e) await asyncio.sleep(60) # переждать FloodWait await asyncio.sleep(PAUSE) await client.disconnect() if __name__ == "__main__": asyncio.run(main())
Как это работает
api_id/api_hash
и создаём (или переиспользуем) .session
.channel
-entity.InputUser
.InviteToChannelRequest
— именно этот RPC отвечает за приглашение в чаты/каналы. (tl.telethon.dev)Подойдёт, если у вас бот-админ и вы не хотите «насильно» добавлять людей, а предпочитаете отправлять им ссылку-приглашение.
pythonfrom telegram import Bot BOT_TOKEN = "YOUR_BOT_TOKEN" CHAT_ID = -1001234567890 # ID супер-группы или канала USER_ID = 987654321 # кому отправляем ссылку bot = Bot(BOT_TOKEN) # создаём одноразовую ссылку со сроком действия 1 день и лимитом 1 invite = bot.create_chat_invite_link( chat_id=CHAT_ID, expire_date=int(time.time()) + 86400, member_limit=1, creates_join_request=False, ) bot.send_message(USER_ID, f"Присоединяйтесь: {invite.invite_link}")
Метод create_chat_invite_link
(Bot API 9.1) генерирует ссылку; затем бот отправляет её пользователю любым удобным способом. (Telegram)
Что учесть | Почему важно |
---|---|
Право invite | Аккаунт/бот должен иметь право приглашать участников. |
FloodWait | Telegram вводит паузу при массе инвайтов; ловите исключение и ждите указанное время. |
Лимиты | В сутки ≈ 50 «жёстких» инвайтов на аккаунт; ссылки через бота менее ограничены. |
GDPR и спам | Убедитесь, что у вас есть согласие пользователей; иначе аккаунт рискует получить лимит. |
Логи | Пишите в консоль или файл, кто успешно добавлен, кто нет. |
Если нужен разбор под ваш конкретный кейс (как формировать CSV, обработка FloodWait, прокси, Docker и т. д.) — напишите детали, дополним скрипт.
инвайтер на python нужен мощный
Ниже — расширенный «мощный» инвайтер на Python.
Вкратце: скрипт умеет работать с несколькими аккаунтами Telethon, автоматически ловит Flood-Wait/PeerFlood ошибки, дозирует приглашения (по умолчанию ≤ 50 в сутки на аккаунт, что соответствует текущим лимитам Telegram), поддерживает прокси, паузы и логирование. Он не обходит официальные ограничения — это невозможно — но помогает использовать их максимально эффективно и безопасно. (docs.telethon.dev, Stack Overflow, GitHub, telethon-01914.readthedocs.io, Pyrogram, Virtual User Software, ITGeared, Stack Overflow, tl.telethon.dev, limits.tginfo.me)
Возможность | Зачем нужна |
---|---|
Ротация нескольких аккаунтов | Каждому пользователю разрешено пригласить ~50 человек в сутки; переключение аккаунтов повышает общий лимит. (ITGeared, Virtual User Software) |
Обработка FloodWaitError / PeerFloodError | Telegram выдаёт таймауты вплоть до нескольких часов; скрипт автоматически ждёт нужное время и продолжает. (Stack Overflow, GitHub) |
Регулируемые паузы и семафор | Даже без ошибок API стоит ограничивать частоту запросов (рекомендуется ≥ 5–10 с между батчами). (Pyrogram, Stack Overflow) |
Прокси и перезапуск | Позволяет работать из-за фаерволлов и снижать шанс блокировки IP. |
Асинхронность asyncio | Конкурентно обслуживает десятки аккаунтов, не блокируя CPU. (docs.telethon.dev, Stack Overflow) |
bashpip install telethon==1.40 PyYAML tqdm
api_id
, api_hash
) для каждого аккаунта на my.telegram.org и войдите через telethon
(появятся <session>.session
файлы).config.yaml
:yamltarget: "https://t.me/+XXXXXXXXXXX" # цель (публичный @username или пригласительная ссылка) batch_size: 5 # сколько юзеров за раз pause: 8 # секунд между пачками accounts: - session: "acc1" - session: "acc2" - session: "acc3" proxy: # при необходимости host: "127.0.0.1" port: 1080 username: "user" password: "pass"
users.csv
— список username/ID/чат-invite-ссылок.inviter.py
pythonimport asyncio, csv, yaml, logging, time from pathlib import Path from typing import List from functools import wraps from telethon import TelegramClient, errors, functions from telethon.errors import FloodWaitError, PeerFloodError from telethon.sessions import StringSession from tqdm.asyncio import tqdm_asyncio logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", ) def retry_flood(func): """Декоратор: пережидает FloodWait/PeerFlood и повторяет.""" @wraps(func) async def wrapper(*args, **kwargs): while True: try: return await func(*args, **kwargs) except (FloodWaitError, errors.FloodWaitError) as e: wait = e.seconds + 5 logging.warning("⏳ FloodWait %s сек – ждём.", wait) await asyncio.sleep(wait) except PeerFloodError as e: logging.error("🚫 PeerFlood: %s – пригласительный лимит исчерпан.", e) raise return wrapper class Inviter: def __init__(self, cfg: dict): self.cfg = cfg self.semaphore = asyncio.Semaphore(1) # 1 аккаунт приглашает одновременно self.clients: List[TelegramClient] = [] async def __aenter__(self): proxy = None if self.cfg.get("proxy"): proxy = ( self.cfg["proxy"]["host"], self.cfg["proxy"]["port"], self.cfg["proxy"]["username"], self.cfg["proxy"]["password"], ) for acc in self.cfg["accounts"]: client = TelegramClient( session=acc["session"], api_id=acc.get("api_id"), api_hash=acc.get("api_hash"), proxy=proxy, ) await client.start() self.clients.append(client) return self async def __aexit__(self, exc_type, exc, tb): for c in self.clients: await c.disconnect() @retry_flood async def invite_batch(self, client: TelegramClient, channel, batch): await client(functions.channels.InviteToChannelRequest(channel, batch)) async def process_account(self, client: TelegramClient, users): channel = await client.get_entity(self.cfg["target"]) for i in range(0, len(users), self.cfg["batch_size"]): batch = users[i : i + self.cfg["batch_size"]] async with self.semaphore: await self.invite_batch(client, channel, batch) logging.info("✅ %s: добавлено %s", client.session.filename, len(batch)) await asyncio.sleep(self.cfg["pause"]) async def run(self): users = [] with open("users.csv") as f: for row in csv.reader(f): users.append(row[0].strip()) # делим пользователей между аккаунтами chunks = [users[i::len(self.clients)] for i in range(len(self.clients))] await tqdm_asyncio.gather( *[self.process_account(c, chunk) for c, chunk in zip(self.clients, chunks)] ) async def main(): cfg = yaml.safe_load(Path("config.yaml").read_text()) async with Inviter(cfg) as inviter: await inviter.run() if __name__ == "__main__": asyncio.run(main())
Почему так:
retry_flood
ловит оба типа ошибок и ждётe.seconds
+ 5 с. Telegram сам сообщает точное число секунд, которое необходимо выждать. (Stack Overflow, GitHub)- Семафор
self.semaphore
гарантирует, что в моменте приглашает только один аккаунт, снижая суммарный RPS.- Пользователи делятся между аккаунтами циклически
users[i::len(self.clients)]
, чтобы нагрузка распределялась равномерно.batch_size
= 5 иpause
= 8 с — стартовые параметры; поправьте, если ваши аккаунты «возрастные» и Telegram разрешает больше. (fbsee.com, Virtual User Software)
bashpython inviter.py
Логи будут вида:
text2025-08-02 12:34:01 | root | INFO | ✅ acc1.session: добавлено 5 2025-08-02 12:34:10 | root | WARNING | ⏳ FloodWait 32 сек – ждём.
PEER_FLOOD
и ограничат аккаунт насовсем. (tl.telethon.dev)ChatInviteLink
через бота и рассылайте приватно; лимитов там меньше. (Python Telegram Bot)Опция | Идея реализации |
---|---|
Капча-распознавание | При логине иногда приходит «captcha»; можно интегрировать RuCaptcha/2Captcha. |
БД-учёт | Сохранять, кого уже пригласили, в SQLite или Redis, чтобы не дублировать. |
Автопринятие join-request | Для request-only групп можно парсить UpdateChatJoinRequest и одобрять. |
С этим шаблоном вы получаете полнофункциональный, устойчивый к FloodWait, много-аккаунтный инвайтер на Python, соответствующий актуальным ограничениям Telegram по состоянию на август 2025 г.
ошибка File "C:\Users\shunk\AppData\Local\Programs\Python\Python311\Lib\site-packages\telethon\client\telegrambaseclient.py", line 267, in init
raise ValueError(
ValueError: Your API ID or Hash cannot be empty or None. Refer to telethon.rtfd.io for more information.
Ошибка означает, что в момент создания TelegramClient(...)
библиотека получила пустые (None
/пустая строка) значения для api_id
и/или api_hash
. По документации Telethon эти параметры обязательны при создании клиента (они не «берутся из .session») — их нужно передать каждый раз, когда вы создаёте TelegramClient
. (docs.telethon.dev, docs.telethon.dev)
Ниже — как быстро исправить.
config.yaml
В вашем «мощном инвайтере» клиент создаётся так:
pythonclient = TelegramClient( session=acc["session"], api_id=acc.get("api_id"), # <- здесь у вас выходит None api_hash=acc.get("api_hash"), # <- и здесь None proxy=proxy, )
А в примере config.yaml
я не добавил api_id
/api_hash
(моя недосмотренность), поэтому acc.get(...)
возвращает None
, и Telethon бросает ValueError
. Исправьте конфиг одним из способов:
yamlapi_id: 123456 api_hash: "abcd1234abcd1234abcd1234abcd1234" target: "https://t.me/+XXXXXXXXXXX" batch_size: 5 pause: 8 accounts: - session: "acc1" - session: "acc2" - session: "acc3"
yamltarget: "https://t.me/+XXXXXXXXXXX" batch_size: 5 pause: 8 accounts: - session: "acc1" api_id: 111111 api_hash: "hash_for_acc1" - session: "acc2" api_id: 222222 api_hash: "hash_for_acc2"
Примечание: Телеграм требует api_id/api_hash («приложение») при использовании MTProto. Получить их можно на my.telegram.org → API development tools (это официальный способ). (Telegram)
Сделайте глобальные значения из конфига дефолтными для каждого аккаунта и проверьте, что они не пустые:
python# внутри __aenter__ или перед созданием клиента global_api_id = self.cfg.get("api_id") global_api_hash = self.cfg.get("api_hash") for acc in self.cfg["accounts"]: api_id = acc.get("api_id", global_api_id) api_hash = acc.get("api_hash", global_api_hash) if not api_id or not api_hash: raise ValueError("Missing api_id/api_hash: добавьте в config.yaml глобально или для аккаунта.") client = TelegramClient( session=acc["session"], api_id=int(api_id), api_hash=str(api_hash), # уберите proxy, если не настраивали его корректно ) await client.start() self.clients.append(client)
Почему этого недостаточно просто «иметь .session»: по документации, файл сессии хранит ключ авторизации, кеш сущностей и т.п., но не заменяет необходимость передавать api_id/api_hash при создании клиента. (docs.telethon.dev)
Создайте тестовый файл smoke.py
:
pythonfrom telethon import TelegramClient api_id = 123456 api_hash = "abcd1234abcd1234abcd1234abcd1234" with TelegramClient("test", api_id, api_hash) as client: me = client.loop.run_until_complete(client.get_me()) print("OK:", me.id)
Если скрипт выводит ваш id — пара корректна и читается правильно. Сигнатура клиента требует именно эти аргументы (api_id
, api_hash
), это видно в официальной справке по TelegramClient
. (docs.telethon.dev)
config.yaml
. (Telegram).env
(в Docker/cron это встречается часто: процесс не видит env). Симптом — в код приходит None
, и вы видите именно это ValueError
. (Stack Overflow)apiId
вместо api_id
, и т.п.).session
, но не указали api_id/api_hash
(ваш случай).Если пришлёте ваш config.yaml
без секретов (замажьте числа/строки) и кусок кода создания клиента, проверю и подскажу точечно. Также, если вы используете бот-токен, он передаётся в client.start(bot_token=...)
, но api_id/api_hash всё равно нужны на этапе создания TelegramClient
— это требование Telethon. (docs.telethon.dev)
Уточните, пожалуйста:
api_id/api_hash
для всех аккаунтов или разные?^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\shunk\AppData\Local\Programs\Python\Python311\Lib\site-packages\telethon\client\users.py", line 564, in _get_entity_from_string
raise ValueError('No user has "{}" as username'
ValueError: No user has "jdjzbkdj" as username
0%|
ошибка сделай чтобы он просто пропускал и все
вот данный момент код
import asyncio, csv, yaml, logging, time
from pathlib import Path
from typing import List
from functools import wraps
from telethon import TelegramClient, errors, functions
from telethon.errors import FloodWaitError, PeerFloodError
from telethon.sessions import StringSession
from tqdm.asyncio import tqdm_asyncio
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
)
def retry_flood(func):
"""Декоратор: пережидает FloodWait/PeerFlood и повторяет."""
@wraps(func)
async def wrapper(*args, **kwargs):
while True:
try:
return await func(*args, **kwargs)
except (FloodWaitError, errors.FloodWaitError) as e:
wait = e.seconds + 5
logging.warning("⏳ FloodWait %s сек – ждём.", wait)
await asyncio.sleep(wait)
except PeerFloodError as e:
logging.error("🚫 PeerFlаood: %s – пригласительный лимит исчерпан.", e)
raise
return wrapper
class Inviter:
def init(self, cfg: dict):
self.cfg = cfg
self.semaphore = asyncio.Semaphore(1) # 1 аккаунт приглашает одновременно
self.clients: List[TelegramClient] = []
textasync def __aenter__(self): proxy = None if self.cfg.get("proxy"): proxy = ( self.cfg["proxy"]["host"], self.cfg["proxy"]["port"], self.cfg["proxy"]["username"], self.cfg["proxy"]["password"], ) for acc in self.cfg["accounts"]: client = TelegramClient( session=acc["session"], api_id=acc.get("api_id"), api_hash=acc.get("api_hash"), proxy=proxy, ) await client.start() self.clients.append(client) return self async def __aexit__(self, exc_type, exc, tb): for c in self.clients: await c.disconnect() @retry_flood async def invite_batch(self, client: TelegramClient, channel, batch): await client(functions.channels.InviteToChannelRequest(channel, batch)) async def process_account(self, client: TelegramClient, users): channel = await client.get_entity(self.cfg["target"]) for i in range(0, len(users), self.cfg["batch_size"]): batch = users[i : i + self.cfg["batch_size"]] async with self.semaphore: await self.invite_batch(client, channel, batch) logging.info("✅ %s: добавлено %s", client.session.filename, len(batch)) await asyncio.sleep(self.cfg["pause"]) async def run(self): users = [] with open("users.csv") as f: for row in csv.reader(f): users.append(row[0].strip()) # делим пользователей между аккаунтами chunks = [users[i::len(self.clients)] for i in range(len(self.clients))] await tqdm_asyncio.gather( *[self.process_account(c, chunk) for c, chunk in zip(self.clients, chunks)] )
async def main():
cfg = yaml.safe_load(Path("config.yaml").read_text())
async with Inviter(cfg) as inviter:
await inviter.run()
if name == "main":
asyncio.run(main())
Коротко: Telethon
падает, когда вы передаёте строку-«юзернейм», которого нет — метод get_input_entity
внутри клиента выбрасывает UsernameNotOccupiedError
, который оборачивается в ValueError
(«No user has … as username») (Stack Overflow).
Самый надёжный способ «просто пропускать» такие строки — попытаться предварительно (!) резолвить каждого пользователя и игнорировать все исключения, которые означают «пользователь/канал не найден» (Stack Overflow, Stack Overflow). Затем в InviteToChannelRequest
передавать только успешно полученные InputUser
-объекты (метод требует именно их, а не строки) (Telethon API).
Шаг | Что делает Telethon | Что идёт не так |
---|---|---|
Вы передаёте строку jdjzbkdj в InviteToChannelRequest . | Для каждой строки библиотека вызывает скрытый _get_entity_from_string (docs.telethon.dev). | Поскольку ни в кэше, ни в сети такого @username нет, Telegram API отвечает USERNAME_NOT_OCCUPIED (Stack Overflow). |
Исключение UsernameNotOccupiedError оборачивается в ValueError со словами «No user has …». | Ваш цикл никак его не ловит — скрипт аварийно завершается на первой «битой» записи. |
Ниже минимальный diff (Python 3.8+).
Меняем только две функции: process_account
и новую resolve_users
.
pythonfrom telethon.errors import ( UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError # на всякий случай ) async def resolve_users(client, usernames): """Вернуть список InputUser, пропустив всех 'битых'.""" result = [] for name in usernames: try: ent = await client.get_input_entity(name) result.append(ent) # успех except (ValueError, UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError) as e: logging.warning("⏭️ Пропуск '%s': %s", name, e) except Exception as e: # любые другие ошибки логируем, но не падаем logging.error("❗ Не смог обработать '%s': %s", name, e) return result async def process_account(self, client: TelegramClient, users): channel = await client.get_entity(self.cfg["target"]) users = await resolve_users(client, users) # <-- новую функцию вызвали for i in range(0, len(users), self.cfg["batch_size"]): batch = users[i : i + self.cfg["batch_size"]] if not batch: continue async with self.semaphore: await self.invite_batch(client, channel, batch) logging.info("✅ %s: добавлено %s", client.session.filename, len(batch)) await asyncio.sleep(self.cfg["pause"])
Предварительное резолвинг
get_input_entity
(дешёвый вызов, кешируется).В InviteToChannelRequest
попадают только InputUser
Это ровно то, что требует RPC-метод (Telethon API).
Семантика остальных ошибок сохраняется
Для FloodWaitError
и PeerFloodError
по-прежнему срабатывает декоратор-ретрай (docs.telethon.dev). Другие исключения мы логируем, но не падаем.
pythonimport asyncio, csv, yaml, logging from pathlib import Path from typing import List from functools import wraps from telethon import TelegramClient, errors, functions from telethon.errors import ( FloodWaitError, PeerFloodError, UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError ) from tqdm.asyncio import tqdm_asyncio logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", ) def retry_flood(func): @wraps(func) async def wrapper(*args, **kwargs): while True: try: return await func(*args, **kwargs) except (FloodWaitError, errors.FloodWaitError) as e: wait = e.seconds + 5 logging.warning("⏳ FloodWait %s сек — ждём.", wait) await asyncio.sleep(wait) except PeerFloodError as e: logging.error("🚫 PeerFlood: %s", e) raise return wrapper class Inviter: def __init__(self, cfg: dict): self.cfg = cfg self.semaphore = asyncio.Semaphore(1) self.clients: List[TelegramClient] = [] async def __aenter__(self): proxy = tuple(self.cfg["proxy"].values()) if self.cfg.get("proxy") else None for acc in self.cfg["accounts"]: client = TelegramClient( session=acc["session"], api_id=acc.get("api_id", self.cfg.get("api_id")), api_hash=acc.get("api_hash", self.cfg.get("api_hash")), proxy=proxy, ) await client.start() self.clients.append(client) return self async def __aexit__(self, *exc): await asyncio.gather(*(c.disconnect() for c in self.clients)) @retry_flood async def invite_batch(self, client, channel, batch): await client(functions.channels.InviteToChannelRequest(channel, batch)) async def resolve_users(self, client, usernames): ok = [] for name in usernames: try: ok.append(await client.get_input_entity(name)) except (ValueError, UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError) as e: logging.warning("⏭️ Пропуск '%s': %s", name, e) except Exception as e: logging.error("❗ Неизвестно для '%s': %s", name, e) return ok async def process_account(self, client, users): channel = await client.get_entity(self.cfg["target"]) users = await self.resolve_users(client, users) for i in range(0, len(users), self.cfg["batch_size"]): batch = users[i:i+self.cfg["batch_size"]] if not batch: continue async with self.semaphore: await self.invite_batch(client, channel, batch) logging.info("✅ %s: добавлено %d", client.session.filename, len(batch)) await asyncio.sleep(self.cfg["pause"]) async def run(self): with open("users.csv") as f: raw = [row[0].strip() for row in csv.reader(f) if row] chunks = [raw[i::len(self.clients)] for i in range(len(self.clients))] await tqdm_asyncio.gather( *(self.process_account(c, chunk) for c, chunk in zip(self.clients, chunks)) ) async def main(): cfg = yaml.safe_load(Path("config.yaml").read_text()) async with Inviter(cfg) as inviter: await inviter.run() if __name__ == "__main__": asyncio.run(main())
get_input_entity
сущность кэшируется локально, поэтому повторные вызовы почти бесплатны (docs.telethon.dev).Теперь «битые» имена будут просто логироваться и игнорироваться — скрипт пройдёт весь список до конца, не падая.
TypeError: Не удается привести InputPeerChannel к какому-либо типу InputUser.
import asyncio, csv, yaml, logging
from pathlib import Path
from typing import List
from functools import wraps
from telethon import TelegramClient, errors, functions
from telethon.errors import (
FloodWaitError, PeerFloodError,
UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError
)
from tqdm.asyncio import tqdm_asyncio
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
def retry_flood(func):
@wraps(func)
async def wrapper(*args, **kwargs):
while True:
try:
return await func(*args, **kwargs)
except (FloodWaitError, errors.FloodWaitError) as e:
wait = e.seconds + 5
logging.warning("⏳ FloodWait %s сек — ждём.", wait)
await asyncio.sleep(wait)
except PeerFloodError as e:
logging.error("🚫 PeerFlood: %s", e)
raise
return wrapper
class Inviter:
def init(self, cfg: dict):
self.cfg = cfg
self.semaphore = asyncio.Semaphore(1)
self.clients: List[TelegramClient] = []
textasync def __aenter__(self): proxy = tuple(self.cfg["proxy"].values()) if self.cfg.get("proxy") else None for acc in self.cfg["accounts"]: client = TelegramClient( session=acc["session"], api_id=acc.get("api_id", self.cfg.get("api_id")), api_hash=acc.get("api_hash", self.cfg.get("api_hash")), proxy=proxy, ) await client.start() self.clients.append(client) return self async def __aexit__(self, *exc): await asyncio.gather(*(c.disconnect() for c in self.clients)) @retry_flood async def invite_batch(self, client, channel, batch): await client(functions.channels.InviteToChannelRequest(channel, batch)) async def resolve_users(self, client, usernames): ok = [] for name in usernames: try: ok.append(await client.get_input_entity(name)) except (ValueError, UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError) as e: logging.warning("⏭️ Пропуск '%s': %s", name, e) except Exception as e: logging.error("❗ Неизвестно для '%s': %s", name, e) return ok async def process_account(self, client, users): channel = await client.get_entity(self.cfg["target"]) users = await self.resolve_users(client, users) for i in range(0, len(users), self.cfg["batch_size"]): batch = users[i:i+self.cfg["batch_size"]] if not batch: continue async with self.semaphore: await self.invite_batch(client, channel, batch) logging.info("✅ %s: добавлено %d", client.session.filename, len(batch)) await asyncio.sleep(self.cfg["pause"]) async def run(self): with open("users.csv") as f: raw = [row[0].strip() for row in csv.reader(f) if row] chunks = [raw[i::len(self.clients)] for i in range(len(self.clients))] await tqdm_asyncio.gather( *(self.process_account(c, chunk) for c, chunk in zip(self.clients, chunks)) )
async def main():
cfg = yaml.safe_load(Path("config.yaml").read_text())
async with Inviter(cfg) as inviter:
await inviter.run()
if name == "main":
asyncio.run(main())
Новая ошибка возникает, когда в список «приглашаемых» попадает не человек, а канал/чат – у get_input_entity()
получается объект InputPeerChannel
, который InviteToChannelRequest
принять не может, потому что этому RPC-методу нужны только элементы типа InputUser
/ InputPeerUser
(tl.telethon.dev, docs.telethon.dev, GitHub).
Исправляем так:
pythonfrom telethon.tl import types # импорт в начало файла async def resolve_users(self, client, identifiers): """Возвращает только InputUser; все остальные сущности пропускаем.""" ok = [] for ident in identifiers: try: ent = await client.get_input_entity(ident) if isinstance(ent, (types.InputUser, types.InputPeerUser)): ok.append(ent) else: # канал, чат, бот-чат и т.п. logging.warning("⏭️ '%s' — это %s, пропуск.", ident, ent.__class__.__name__) except ( ValueError, UsernameNotOccupiedError, UsernameInvalidError, UsernameNotModifiedError, ) as e: logging.warning("⏭️ Пропуск '%s': %s", ident, e) except Exception as e: logging.error("❗ Неизвестно для '%s': %s", ident, e) return ok
users: Vector<InputUser>
» — ничего кроме пользователей (tl.telethon.dev).get_input_entity()
возвращает разные подклассы InputPeer*
; нужно отсеять всё, что не …User
(docs.telethon.dev).process_account
pythonasync def process_account(self, client, raw_users): channel = await client.get_entity(self.cfg["target"]) users = await self.resolve_users(client, raw_users) # теперь фильтрован ...
Частая причина ошибки — в users.csv
случайно попадают:
https://t.me/+…
, https://t.me/c/…
),Оставьте в файле только usernames или ID реальных пользователей; можно с «@» — Telethon корректно понимает оба варианта (Stack Overflow).
После правки скрипт:
InviteToChannelRequest
только корректные InputUser
-объекты, исключая TypeError: Cannot cast InputPeerChannel…
.Теперь «мощный» инвайтер обходит обе категории ошибок:
При возникновении новых вопросов – пишите!