Files
ProxmoxVE_Scripts/BOT_FAZANA/test.py

117 lines
4.9 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from telebot.async_telebot import AsyncTeleBot
import os
api_key = '7901893687:AAGKRhisrTwKIX8oLIsDsZ5T7HPbyXV9FMY'
# Список разрешенных пользователей (добавьте свои ID)
ALLOWED_USERS = [1199424310, 1798467387, 1905354346] # Замените на реальные ID пользователей
bot = AsyncTeleBot(api_key)
helpa = ('\n/start - запуск бота\n/info - вывод данной справки\n/rollback - сброс стендов до снапшота DEFAULT\n/on - включить учетки m1 для демо\n/off - выключить учетки m1 для демо\n/demo - сброс стендов зоны DEMO до DEFAULT\n/fuf - сброс зоны FUF до DEFAULT\n/polka - сброс зоны POLKA до DEFAULT')
def is_user_allowed(user_id):
"""Проверяет, есть ли пользователь в списке разрешенных"""
return user_id in ALLOWED_USERS
@bot.message_handler(commands=['start'])
async def start_command(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
await bot.reply_to(message, f'Бот работает!\nСписок комманд:{helpa}')
async def run_script(args_str):
"""Асинхронное выполнение bash скрипта"""
process = await asyncio.create_subprocess_shell(
f'bash /root/bot/rollback.sh -t {args_str}',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Ожидаем завершение процесса и получаем результат
stdout, stderr = await process.communicate()
return stdout.decode().strip(), stderr.decode().strip(), process.returncode
@bot.message_handler(commands=['on'])
async def on_user(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
os.system('bash /root/bot/on.sh')
await bot.reply_to('Учетки m1 активированны')
@bot.message_handler(commands=['off'])
async def off_user(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
else:
os.system('bash /root/bot/off.sh')
await bot.reply_to('Учетки m1 отключенны')
@bot.message_handler(commands=['demo'])
async def off_user(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
else:
os.system('bash /root/bot/ROLLBACKS/rollback_demo.sh')
await bot.reply_to('Зона DEMO сброшенна!')
@bot.message_handler(commands=['polka'])
async def off_user(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
else:
os.system('bash /root/bot/ROLLBACKS/rollback_polka.sh')
await bot.reply_to('Зона POLKA сброшенна!')
@bot.message_handler(commands=['fuf'])
async def off_user(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
else:
os.system('bash /root/bot/ROLLBACKS/rollback_fuf.sh')
await bot.reply_to('Зона FUF сброшенна!')
@bot.message_handler(commands=['info'])
async def info_command(message):
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "⛔ Доступ запрещен")
return
await bot.reply_to(message, helpa)
@bot.message_handler(commands=['rollback'])
async def rollback_command(message):
# Проверка доступа
if not is_user_allowed(message.from_user.id):
await bot.reply_to(message, "У вас нет прав на выполнение этой команды")
return
parts = message.text.split()
if len(parts) > 1:
arguments = parts[1:]
args_str = ' '.join(arguments)
try:
# Запускаем скрипт асинхронно
stdout, stderr, returncode = await run_script(args_str)
if returncode == 0:
await bot.reply_to(message, f"✅ Команда выполнена:стенды {args_str} были сброшены!")
else:
await bot.reply_to(message, f"❌ Ошибка выполнения ({returncode}):\n{stderr}")
except Exception as e:
await bot.reply_to(message, f"⚠️ Критическая ошибка: {str(e)}")
else:
await bot.reply_to(message, " Использование: /rollback <vmid1> [<vmid2> ...]")
async def main():
await bot.infinity_polling()
if __name__ == '__main__':
print("Бот запущен...")
asyncio.run(main())