| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import asyncio |
| | import os |
| | import random |
| | import shutil |
| | import time |
| | from random import randint |
| |
|
| | from ..configs import Var |
| |
|
| | try: |
| | from pytz import timezone |
| | except ImportError: |
| | timezone = None |
| |
|
| | from telethon.errors import ( |
| | ChannelsTooMuchError, |
| | ChatAdminRequiredError, |
| | MessageIdInvalidError, |
| | MessageNotModifiedError, |
| | UserNotParticipantError, |
| | ) |
| | from telethon.tl.custom import Button |
| | from telethon.tl.functions.channels import ( |
| | CreateChannelRequest, |
| | EditAdminRequest, |
| | EditPhotoRequest, |
| | InviteToChannelRequest, |
| | ) |
| | from telethon.tl.functions.contacts import UnblockRequest |
| | from telethon.tl.types import ( |
| | ChatAdminRights, |
| | ChatPhotoEmpty, |
| | InputChatUploadedPhoto, |
| | InputMessagesFilterDocument, |
| | ) |
| | from telethon.utils import get_peer_id |
| | from decouple import config, RepositoryEnv |
| | from .. import LOGS, ULTConfig |
| | from ..fns.helper import download_file, inline_mention, updater |
| |
|
| | db_url = 0 |
| |
|
| |
|
| | async def autoupdate_local_database(): |
| | from .. import Var, asst, udB, ultroid_bot |
| |
|
| | global db_url |
| | db_url = ( |
| | udB.get_key("TGDB_URL") or Var.TGDB_URL or ultroid_bot._cache.get("TGDB_URL") |
| | ) |
| | if db_url: |
| | _split = db_url.split("/") |
| | _channel = _split[-2] |
| | _id = _split[-1] |
| | try: |
| | await asst.edit_message( |
| | int(_channel) if _channel.isdigit() else _channel, |
| | message=_id, |
| | file="database.json", |
| | text="**Do not delete this file.**", |
| | ) |
| | except MessageNotModifiedError: |
| | return |
| | except MessageIdInvalidError: |
| | pass |
| | try: |
| | LOG_CHANNEL = ( |
| | udB.get_key("LOG_CHANNEL") |
| | or Var.LOG_CHANNEL |
| | or asst._cache.get("LOG_CHANNEL") |
| | or "me" |
| | ) |
| | msg = await asst.send_message( |
| | LOG_CHANNEL, "**Do not delete this file.**", file="database.json" |
| | ) |
| | asst._cache["TGDB_URL"] = msg.message_link |
| | udB.set_key("TGDB_URL", msg.message_link) |
| | except Exception as ex: |
| | LOGS.error(f"Error on autoupdate_local_database: {ex}") |
| |
|
| |
|
| | def update_envs(): |
| | """Update Var. attributes to udB""" |
| | from .. import udB |
| | _envs = [*list(os.environ)] |
| | if ".env" in os.listdir("."): |
| | [_envs.append(_) for _ in list(RepositoryEnv(config._find_file(".")).data)] |
| | for envs in _envs: |
| | if ( |
| | envs in ["LOG_CHANNEL", "BOT_TOKEN", "BOTMODE", "DUAL_MODE", "language"] |
| | or envs in udB.keys() |
| | ): |
| | if _value := os.environ.get(envs): |
| | udB.set_key(envs, _value) |
| | else: |
| | udB.set_key(envs, config.config.get(envs)) |
| |
|
| |
|
| | async def startup_stuff(): |
| | from .. import udB |
| |
|
| | x = ["resources/auth", "resources/downloads"] |
| | for x in x: |
| | if not os.path.isdir(x): |
| | os.mkdir(x) |
| |
|
| | CT = udB.get_key("CUSTOM_THUMBNAIL") |
| | if CT: |
| | path = "resources/extras/thumbnail.jpg" |
| | ULTConfig.thumb = path |
| | try: |
| | await download_file(CT, path) |
| | except Exception as er: |
| | LOGS.exception(er) |
| | elif CT is False: |
| | ULTConfig.thumb = None |
| | GT = udB.get_key("GDRIVE_AUTH_TOKEN") |
| | if GT: |
| | with open("resources/auth/gdrive_creds.json", "w") as t_file: |
| | t_file.write(GT) |
| |
|
| | if udB.get_key("AUTH_TOKEN"): |
| | udB.del_key("AUTH_TOKEN") |
| |
|
| | MM = udB.get_key("MEGA_MAIL") |
| | MP = udB.get_key("MEGA_PASS") |
| | if MM and MP: |
| | with open(".megarc", "w") as mega: |
| | mega.write(f"[Login]\nUsername = {MM}\nPassword = {MP}") |
| |
|
| | TZ = udB.get_key("TIMEZONE") |
| | if TZ and timezone: |
| | try: |
| | timezone(TZ) |
| | os.environ["TZ"] = TZ |
| | time.tzset() |
| | except AttributeError as er: |
| | LOGS.debug(er) |
| | except BaseException: |
| | LOGS.critical( |
| | "Incorrect Timezone ,\nCheck Available Timezone From Here https://graph.org/Ultroid-06-18-2\nSo Time is Default UTC" |
| | ) |
| | os.environ["TZ"] = "UTC" |
| | time.tzset() |
| |
|
| |
|
| | async def autobot(): |
| | from .. import udB, ultroid_bot |
| |
|
| | if udB.get_key("BOT_TOKEN"): |
| | return |
| | await ultroid_bot.start() |
| | LOGS.info("MAKING A TELEGRAM BOT FOR YOU AT @BotFather, Kindly Wait") |
| | who = ultroid_bot.me |
| | name = who.first_name + "'s Bot" |
| | if who.username: |
| | username = who.username + "_bot" |
| | else: |
| | username = "ultroid_" + (str(who.id))[5:] + "_bot" |
| | bf = "@BotFather" |
| | await ultroid_bot(UnblockRequest(bf)) |
| | await ultroid_bot.send_message(bf, "/cancel") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message(bf, "/newbot") |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
| | if isdone.startswith("That I cannot do.") or "20 bots" in isdone: |
| | LOGS.critical( |
| | "Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." |
| | ) |
| | import sys |
| |
|
| | sys.exit(1) |
| | await ultroid_bot.send_message(bf, name) |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
| | if not isdone.startswith("Good."): |
| | await ultroid_bot.send_message(bf, "My Assistant Bot") |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
| | if not isdone.startswith("Good."): |
| | LOGS.critical( |
| | "Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." |
| | ) |
| | import sys |
| |
|
| | sys.exit(1) |
| | await ultroid_bot.send_message(bf, username) |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
| | await ultroid_bot.send_read_acknowledge("botfather") |
| | if isdone.startswith("Sorry,"): |
| | ran = randint(1, 100) |
| | username = "ultroid_" + (str(who.id))[6:] + str(ran) + "_bot" |
| | await ultroid_bot.send_message(bf, username) |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text |
| | if isdone.startswith("Done!"): |
| | token = isdone.split("`")[1] |
| | udB.set_key("BOT_TOKEN", token) |
| | await enable_inline(ultroid_bot, username) |
| | LOGS.info( |
| | f"Done. Successfully created @{username} to be used as your assistant bot!" |
| | ) |
| | else: |
| | LOGS.info( |
| | "Please Delete Some Of your Telegram bots at @Botfather or Set Var BOT_TOKEN with token of a bot" |
| | ) |
| |
|
| | import sys |
| |
|
| | sys.exit(1) |
| |
|
| |
|
| | async def autopilot(): |
| | from .. import asst, udB, ultroid_bot |
| |
|
| | channel = udB.get_key("LOG_CHANNEL") |
| | new_channel = None |
| | if channel: |
| | try: |
| | chat = await ultroid_bot.get_entity(channel) |
| | except BaseException as err: |
| | LOGS.exception(err) |
| | udB.del_key("LOG_CHANNEL") |
| | channel = None |
| | if not channel: |
| |
|
| | async def _save(exc): |
| | udB._cache["LOG_CHANNEL"] = ultroid_bot.me.id |
| | await asst.send_message( |
| | ultroid_bot.me.id, f"Failed to Create Log Channel due to {exc}.." |
| | ) |
| |
|
| | if ultroid_bot._bot: |
| | msg_ = "'LOG_CHANNEL' not found! Add it in order to use 'BOTMODE'" |
| | LOGS.error(msg_) |
| | return await _save(msg_) |
| | LOGS.info("Creating a Log Channel for You!") |
| | try: |
| | r = await ultroid_bot( |
| | CreateChannelRequest( |
| | title="My Ultroid Logs", |
| | about="My Ultroid Log Group\n\n Join @TeamUltroid", |
| | megagroup=True, |
| | ), |
| | ) |
| | except ChannelsTooMuchError as er: |
| | LOGS.critical( |
| | "You Are in Too Many Channels & Groups , Leave some And Restart The Bot" |
| | ) |
| | return await _save(str(er)) |
| | except BaseException as er: |
| | LOGS.exception(er) |
| | LOGS.info( |
| | "Something Went Wrong , Create A Group and set its id on config var LOG_CHANNEL." |
| | ) |
| |
|
| | return await _save(str(er)) |
| | new_channel = True |
| | chat = r.chats[0] |
| | channel = get_peer_id(chat) |
| | udB.set_key("LOG_CHANNEL", channel) |
| | assistant = True |
| | try: |
| | await ultroid_bot.get_permissions(int(channel), asst.me.username) |
| | except UserNotParticipantError: |
| | try: |
| | await ultroid_bot(InviteToChannelRequest(int(channel), [asst.me.username])) |
| | except BaseException as er: |
| | LOGS.info("Error while Adding Assistant to Log Channel") |
| | LOGS.exception(er) |
| | assistant = False |
| | except BaseException as er: |
| | assistant = False |
| | LOGS.exception(er) |
| | if assistant and new_channel: |
| | try: |
| | achat = await asst.get_entity(int(channel)) |
| | except BaseException as er: |
| | achat = None |
| | LOGS.info("Error while getting Log channel from Assistant") |
| | LOGS.exception(er) |
| | if achat and not achat.admin_rights: |
| | rights = ChatAdminRights( |
| | add_admins=True, |
| | invite_users=True, |
| | change_info=True, |
| | ban_users=True, |
| | delete_messages=True, |
| | pin_messages=True, |
| | anonymous=False, |
| | manage_call=True, |
| | ) |
| | try: |
| | await ultroid_bot( |
| | EditAdminRequest( |
| | int(channel), asst.me.username, rights, "Assistant" |
| | ) |
| | ) |
| | except ChatAdminRequiredError: |
| | LOGS.info( |
| | "Failed to promote 'Assistant Bot' in 'Log Channel' due to 'Admin Privileges'" |
| | ) |
| | except BaseException as er: |
| | LOGS.info("Error while promoting assistant in Log Channel..") |
| | LOGS.exception(er) |
| | if isinstance(chat.photo, ChatPhotoEmpty): |
| | photo, _ = await download_file( |
| | "https://graph.org/file/27c6812becf6f376cbb10.jpg", "channelphoto.jpg" |
| | ) |
| | ll = await ultroid_bot.upload_file(photo) |
| | try: |
| | await ultroid_bot( |
| | EditPhotoRequest(int(channel), InputChatUploadedPhoto(ll)) |
| | ) |
| | except BaseException as er: |
| | LOGS.exception(er) |
| | os.remove(photo) |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def customize(): |
| | from .. import asst, udB, ultroid_bot |
| |
|
| | rem = None |
| | try: |
| | chat_id = udB.get_key("LOG_CHANNEL") |
| | if asst.me.photo: |
| | return |
| | LOGS.info("Customising Ur Assistant Bot in @BOTFATHER") |
| | UL = f"@{asst.me.username}" |
| | if not ultroid_bot.me.username: |
| | sir = ultroid_bot.me.first_name |
| | else: |
| | sir = f"@{ultroid_bot.me.username}" |
| | file = random.choice( |
| | [ |
| | "https://graph.org/file/92cd6dbd34b0d1d73a0da.jpg", |
| | "https://graph.org/file/a97973ee0425b523cdc28.jpg", |
| | "resources/extras/ultroid_assistant.jpg", |
| | ] |
| | ) |
| | if not os.path.exists(file): |
| | file, _ = await download_file(file, "profile.jpg") |
| | rem = True |
| | msg = await asst.send_message( |
| | chat_id, "**Auto Customisation** Started on @Botfather" |
| | ) |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message("botfather", "/cancel") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message("botfather", "/setuserpic") |
| | await asyncio.sleep(1) |
| | isdone = (await ultroid_bot.get_messages("botfather", limit=1))[0].text |
| | if isdone.startswith("Invalid bot"): |
| | LOGS.info("Error while trying to customise assistant, skipping...") |
| | return |
| | await ultroid_bot.send_message("botfather", UL) |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_file("botfather", file) |
| | await asyncio.sleep(2) |
| | await ultroid_bot.send_message("botfather", "/setabouttext") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message("botfather", UL) |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message( |
| | "botfather", f"β¨ Hello β¨!! I'm Assistant Bot of {sir}" |
| | ) |
| | await asyncio.sleep(2) |
| | await ultroid_bot.send_message("botfather", "/setdescription") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message("botfather", UL) |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message( |
| | "botfather", |
| | f"β¨ Powerful Ultroid Assistant Bot β¨\nβ¨ Master ~ {sir} β¨\n\nβ¨ Powered By ~ @TeamUltroid β¨", |
| | ) |
| | await asyncio.sleep(2) |
| | await msg.edit("Completed **Auto Customisation** at @BotFather.") |
| | if rem: |
| | os.remove(file) |
| | LOGS.info("Customisation Done") |
| | except Exception as e: |
| | LOGS.exception(e) |
| |
|
| |
|
| | async def plug(plugin_channels): |
| | from .. import ultroid_bot |
| | from .utils import load_addons |
| |
|
| | if ultroid_bot._bot: |
| | LOGS.info("Plugin Channels can't be used in 'BOTMODE'") |
| | return |
| | if os.path.exists("addons") and not os.path.exists("addons/.git"): |
| | shutil.rmtree("addons") |
| | if not os.path.exists("addons"): |
| | os.mkdir("addons") |
| | if not os.path.exists("addons/__init__.py"): |
| | with open("addons/__init__.py", "w") as f: |
| | f.write("from plugins import *\n\nbot = ultroid_bot") |
| | LOGS.info("β’ Loading Plugins from Plugin Channel(s) β’") |
| | for chat in plugin_channels: |
| | LOGS.info(f"{'β’'*4} {chat}") |
| | try: |
| | async for x in ultroid_bot.iter_messages( |
| | chat, search=".py", filter=InputMessagesFilterDocument, wait_time=10 |
| | ): |
| | plugin = "addons/" + x.file.name.replace("_", "-").replace("|", "-") |
| | if not os.path.exists(plugin): |
| | await asyncio.sleep(0.6) |
| | if x.text == "#IGNORE": |
| | continue |
| | plugin = await x.download_media(plugin) |
| | try: |
| | load_addons(plugin) |
| | except Exception as e: |
| | LOGS.info(f"Ultroid - PLUGIN_CHANNEL - ERROR - {plugin}") |
| | LOGS.exception(e) |
| | os.remove(plugin) |
| | except Exception as er: |
| | LOGS.exception(er) |
| |
|
| |
|
| | |
| |
|
| |
|
| | async def fetch_ann(): |
| | from .. import asst, udB |
| | from ..fns.tools import async_searcher |
| |
|
| | get_ = udB.get_key("OLDANN") or [] |
| | chat_id = udB.get_key("LOG_CHANNEL") |
| | try: |
| | updts = await async_searcher( |
| | "https://ultroid-api.vercel.app/announcements", post=True, re_json=True |
| | ) |
| | for upt in updts: |
| | key = list(upt.keys())[0] |
| | if key not in get_: |
| | cont = upt[key] |
| | if isinstance(cont, dict) and cont.get("lang"): |
| | if cont["lang"] != (udB.get_key("language") or "en"): |
| | continue |
| | cont = cont["msg"] |
| | if isinstance(cont, str): |
| | await asst.send_message(chat_id, cont) |
| | elif isinstance(cont, dict) and cont.get("chat"): |
| | await asst.forward_messages(chat_id, cont["msg_id"], cont["chat"]) |
| | else: |
| | LOGS.info(cont) |
| | LOGS.info( |
| | "Invalid Type of Announcement Detected!\nMake sure you are on latest version.." |
| | ) |
| | get_.append(key) |
| | udB.set_key("OLDANN", get_) |
| | except Exception as er: |
| | LOGS.exception(er) |
| |
|
| |
|
| | async def ready(): |
| | from .. import asst, udB, ultroid_bot |
| |
|
| | chat_id = udB.get_key("LOG_CHANNEL") |
| | spam_sent = None |
| | if not udB.get_key("INIT_DEPLOY"): |
| | MSG = """π **Thanks for Deploying Ultroid Userbot!** |
| | β’ Here, are the Some Basic stuff from, where you can Know, about its Usage.""" |
| | PHOTO = "https://graph.org/file/54a917cc9dbb94733ea5f.jpg" |
| | BTTS = Button.inline("β’ Click to Start β’", "initft_2") |
| | udB.set_key("INIT_DEPLOY", "Done") |
| | else: |
| | MSG = f"**Ultroid has been deployed!**\nββββββββββ\n**UserMode**: {inline_mention(ultroid_bot.me)}\n**Assistant**: @{asst.me.username}\nββββββββββ\n**Support**: @TeamUltroid\nββββββββββ" |
| | BTTS, PHOTO = None, None |
| | prev_spam = udB.get_key("LAST_UPDATE_LOG_SPAM") |
| | if prev_spam: |
| | try: |
| | await ultroid_bot.delete_messages(chat_id, int(prev_spam)) |
| | except Exception as E: |
| | LOGS.info("Error while Deleting Previous Update Message :" + str(E)) |
| | if await updater(): |
| | BTTS = Button.inline("Update Available", "updtavail") |
| |
|
| | try: |
| | spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) |
| | except ValueError as e: |
| | try: |
| | await (await ultroid_bot.send_message(chat_id, str(e))).delete() |
| | spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) |
| | except Exception as g: |
| | LOGS.info(g) |
| | except Exception as el: |
| | LOGS.info(el) |
| | try: |
| | spam_sent = await ultroid_bot.send_message(chat_id, MSG) |
| | except Exception as ef: |
| | LOGS.exception(ef) |
| | if spam_sent and not spam_sent.media: |
| | udB.set_key("LAST_UPDATE_LOG_SPAM", spam_sent.id) |
| | |
| |
|
| |
|
| | async def WasItRestart(udb): |
| | key = udb.get_key("_RESTART") |
| | if not key: |
| | return |
| | from .. import asst, ultroid_bot |
| |
|
| | try: |
| | data = key.split("_") |
| | who = asst if data[0] == "bot" else ultroid_bot |
| | await who.edit_message( |
| | int(data[1]), int(data[2]), "__Restarted Successfully.__" |
| | ) |
| | except Exception as er: |
| | LOGS.exception(er) |
| | udb.del_key("_RESTART") |
| |
|
| |
|
| | def _version_changes(udb): |
| | for _ in [ |
| | "BOT_USERS", |
| | "BOT_BLS", |
| | "VC_SUDOS", |
| | "SUDOS", |
| | "CLEANCHAT", |
| | "LOGUSERS", |
| | "PLUGIN_CHANNEL", |
| | "CH_SOURCE", |
| | "CH_DESTINATION", |
| | "BROADCAST", |
| | ]: |
| | key = udb.get_key(_) |
| | if key and str(key)[0] != "[": |
| | key = udb.get(_) |
| | new_ = [ |
| | int(z) if z.isdigit() or (z.startswith("-") and z[1:].isdigit()) else z |
| | for z in key.split() |
| | ] |
| | udb.set_key(_, new_) |
| |
|
| |
|
| | async def enable_inline(ultroid_bot, username): |
| | bf = "BotFather" |
| | await ultroid_bot.send_message(bf, "/setinline") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message(bf, f"@{username}") |
| | await asyncio.sleep(1) |
| | await ultroid_bot.send_message(bf, "Search") |
| | await ultroid_bot.send_read_acknowledge(bf) |
| |
|