Files
Birthday-Pool-Bot/birthday_pool_bot/telegram_bot/ui/logic.py
2026-01-15 09:47:43 +03:00

684 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from aiogram import types
from aiogram.enums import ParseMode
from aiogram.fsm.context import FSMContext
from aiogram.utils.keyboard import InlineKeyboardBuilder, ReplyKeyboardBuilder
from pydantic_filters import PagePagination
from birthday_pool_bot.dto import User
from birthday_pool_bot.repositories.repositories import UsersRepository
from . import constants
from .callback_data import (
AddSubscriptionCallbackData,
AddSubscriptionConfirmCallbackData,
ConfirmAnswerEnum,
MenuCallbackData,
SubscriptionActionEnum,
SubscriptionCallbackData,
SubscriptionsCallbackData,
)
from .exceptions import FlowInternalError
async def show_menu(message: types.Message) -> None:
text = (
"Привет! Я Birthday Pool Bot. "
"Я помогаю собирать подарки на дни рождения.\n"
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.PROFILE_BUTTON)
keyboard.button(text=constants.SUBSCRIPTIONS_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def show_profile(
message: types.Message,
user: User,
):
birthday_str = user.birthday.strftime("%d.%m.%Y") if user.birthday else 'Не указано'
text = (
f"👤 *Имя*: {user.name or 'Не указано'}\n\n"
f"📱 *Телефон*: {user.phone}\n\n"
f"🎂 *День рождения*: {birthday_str}\n\n"
)
if user.gift_payment_data is not None:
bank_title = constants.BANKS_TITLE_MAP[user.gift_payment_data.bank]
text += (
"💳️ *Получать подарки на*:\n"
f"🏦 _Банк_: {bank_title}\n"
f"📱 _Телефон_: {user.gift_payment_data.phone}\n\n"
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.SET_PROFILE_NAME)
keyboard.button(text=constants.SET_PROFILE_PHONE)
keyboard.button(text=constants.SET_PROFILE_BIRTHDATE_BUTTON)
keyboard.button(text=constants.SET_PROFILE_GIFT_PAYMENT_DATA_BUTTON)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(2)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_profile_name(message: types.Message):
text = "👤 Напишите своё имя"
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.BACK_BUTTON)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_profile_phone(message: types.Message):
text = "📱 Отправь свой контакт для подтверждения телефона"
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.SHARE_CONTACT_BUTTON, request_contact=True)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_profile_gift_payment_phone(message: types.Message):
text = (
"*Введите номер телефона для получения подарков*\n"
"Пример: `+79123456789`\n\n"
"ИЛИ\n\n"
"*Просто поделитесь своим контактом*"
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.SHARE_CONTACT_BUTTON, request_contact=True)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_profile_gift_payment_bank(
message: types.Message,
):
text = "🏦 Выберите банк для приёма платежей"
keyboard = ReplyKeyboardBuilder()
for title, type_ in constants.BANKS_MAP.items():
keyboard.button(text=title)
keyboard.adjust(3)
keyboard.row(types.KeyboardButton(text=constants.BACK_BUTTON))
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_profile_birthdate(message: types.Message):
text = (
"🎂 Отправьте свою дату рождения в формате dd.mm.yyyy\n"
"Пример: `26.05.1995`."
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.BACK_BUTTON)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def show_subscriptions(
user: User,
subscriptions_repository: SubscriptionsRepository,
message: types.Message | None = None,
callback_query: types.CallbackQuery | None = None,
):
page = 1
per_page = 5
if callback_query is not None:
callback_data = SubscriptionsCallbackData.unpack(callback_query.data)
page = callback_data.page
async with subscriptions_repository.transaction():
total = await subscriptions_repository.get_user_subscriptions_count(user_id=user.id)
pages_count = total // per_page + int(bool(total % per_page))
subscriptions = [
subscription
async for subscription in subscriptions_repository.get_user_subscriptions(
user_id=user.id,
pagination=PagePagination(page=page, per_page=per_page),
)
]
text = "Мои подписки:" if subscriptions else "Нет подписок"
keyboard = InlineKeyboardBuilder()
for subscription in subscriptions:
keyboard.button(
text=subscription.name,
callback_data=SubscriptionCallbackData(
to_user_id=subscription.to_user_id,
).pack(),
)
navigation_row = []
if page > 1:
navigation_row.append(types.InlineKeyboardButton(
text="<",
callback_data=SubscriptionsCallbackData(page=page - 1).pack(),
))
if pages_count > 1:
navigation_row.append(types.InlineKeyboardButton(
text=f"{page}/{pages_count}",
callback_data="null",
))
if page < pages_count:
navigation_row.append(types.InlineKeyboardButton(
text=">",
callback_data=SubscriptionsCallbackData(page=page + 1).pack(),
))
keyboard.row(*navigation_row)
keyboard.row(
types.InlineKeyboardButton(
text="Добавить",
callback_data=AddSubscriptionCallbackData().pack(),
),
types.InlineKeyboardButton(
text="Назад",
callback_data=MenuCallbackData().pack(),
),
)
reply_markup = keyboard.as_markup()
if message is not None:
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
elif callback_query is not None:
await callback_query.message.edit_text(text=text)
await callback_query.message.edit_reply_markup(
reply_markup=reply_markup,
)
async def show_subscription(
from_user_id: uuid.UUID,
to_user_id: uuid.UUID,
subscriptions_repository: SubscriptionsRepository,
callback_query: types.CallbackQuery | None = None,
message: types.Message | None = None,
):
async with subscriptions_repository.transaction():
subscription = await subscriptions_repository.get_subscription(
from_user_id=from_user_id,
to_user_id=to_user_id,
with_to_user=True,
with_pool=True,
with_pool_owner=True,
)
if subscription is None:
raise FlowInternalError()
birthday_str = subscription.to_user.birthday.strftime("%d.%m.%Y")
text = (
f"*Имя*: {subscription.name}\n\n"
f"🎂 *День рождения*: {birthday_str}\n\n"
)
if subscription.pool is not None:
if subscription.pool.owner_id == from_user_id:
text += "Вы собираете деньги\n\n"
else:
text += "Вы участвуете в сборе денег\n\n"
keyboard = InlineKeyboardBuilder()
# keyboard.button(
# text="Изменить",
# callback_data=SubscriptionCallbackData(
# to_user_id=subscription.to_user_id,
# action=SubscriptionActionEnum.EDIT,
# ).pack(),
# )
keyboard.button(
text="Удалить",
callback_data=SubscriptionCallbackData(
to_user_id=subscription.to_user_id,
action=SubscriptionActionEnum.DELETE,
).pack(),
)
keyboard.button(
text="Назад",
callback_data=SubscriptionsCallbackData().pack(),
)
keyboard.adjust(2)
reply_markup = keyboard.as_markup()
if message is not None:
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
return
elif callback_query is not None:
await callback_query.message.edit_text(
text=text,
parse_mode=ParseMode.MARKDOWN,
)
await callback_query.message.edit_reply_markup(reply_markup=reply_markup)
return
async def delete_subscription(
from_user_id: uuid.UUID,
to_user_id: uuid.UUID,
subscriptions_repository: SubscriptionsRepository,
callback_query: types.CallbackQuery | None = None,
message: types.Message | None = None,
):
async with subscriptions_repository.transaction():
await subscriptions_repository.delete_subscription(
from_user_id=from_user_id,
to_user_id=to_user_id,
)
if message is not None:
await message.reply(text="Подписка успешно удалена")
elif callback_query is not None:
await callback_query.answer(text="Подписка успешно удалена")
async def ask_add_subscription_user(
callback_query: types.CallbackQuery | None = None,
message: types.Message | None = None,
):
text = (
"*Введите номер телефона будущего именниника*\n"
"Пример: `+79123456789`\n\n"
"ИЛИ\n\n"
"*Перешлите сообщение пользователя*\n\n"
"ИЛИ\n\n"
"*Просто поделитесь его контактом*"
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
if message is not None:
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
elif callback_query is not None:
await callback_query.message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_user_birthday(
message: types.Message,
):
text = (
"Отправьте дату рождения пользователя в формате dd.mm.yyyy\n"
"Пример: `26.05.1995`."
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.BACK_BUTTON)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_name(
message: types.Message,
users_repository: UsersRepository,
subscription_user_id: uuid.UUID,
):
async with users_repository.transaction():
subscription_user = await users_repository.get_user_by_id(user_id=subscription_user_id)
if subscription_user is None or subscription_user.birthday is None:
raise FlowInternalError()
text = (
"Введите имя человека, на которого хотите подписаться\n\n"
)
keyboard = ReplyKeyboardBuilder()
if subscription_user.name is not None:
text += (
"ИЛИ\n\n"
f"Используйте имя из профиля пользователя: `{subscription_user.name}`\n\n"
)
keyboard.button(text=subscription_user.name)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_pool_decision(
message: types.Message,
pools_repository: PoolsRepository,
users_repository: UsersRepository,
subscription_user_id: uuid.UUID,
):
async with users_repository.transaction():
subscription_user = await users_repository.get_user_by_id(user_id=subscription_user_id)
if subscription_user is None or subscription_user.birthday is None:
raise FlowInternalError()
async with pools_repository.transaction():
pools_count = await pools_repository.get_pools_by_birthday_user_id_count(
birthday_user_id=subscription_user_id,
)
text = "Хотите ли вы участвовать в сборе денег для этого пользователя?"
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.CREATE_POOL_BUTTON)
if pools_count > 0:
keyboard.button(text=constants.JOIN_EXISTING_POOL_BUTTON)
keyboard.button(text=constants.DECLINE_POOL_BUTTON)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def show_add_subscription_pools(
user: User,
pools_repository: PoolsRepository,
message: types.Message | None = None,
callback_query: types.CallbackQuery | None = None,
):
page = 1
per_page = 5
if callback_query is not None:
callback_data = SubscriptionsCallbackData.unpack(callback_query.data)
page = callback_data.page
async with subscriptions_repository.transaction():
total = await subscriptions_repository.get_user_subscriptions_count(user_id=user.id)
pages_count = total // per_page + int(bool(total % per_page))
subscriptions = [
subscription
async for subscription in subscriptions_repository.get_user_subscriptions(
user_id=user.id,
pagination=PagePagination(page=page, per_page=per_page),
)
]
text = "Сборы:" if subscriptions else "Нет сборов"
keyboard = InlineKeyboardBuilder()
for subscription in subscriptions:
keyboard.button(
text=subscription.name,
callback_data=SubscriptionCallbackData(
to_user_id=subscription.to_user_id,
).pack(),
)
navigation_row = []
if page > 1:
navigation_row.append(types.InlineKeyboardButton(
text="<",
callback_data=SubscriptionsCallbackData(page=page - 1).pack(),
))
if pages_count > 1:
navigation_row.append(types.InlineKeyboardButton(
text=f"{page}/{pages_count}",
callback_data="null",
))
if page < pages_count:
navigation_row.append(types.InlineKeyboardButton(
text=">",
callback_data=SubscriptionsCallbackData(page=page + 1).pack(),
))
keyboard.row(*navigation_row)
keyboard.row(
types.InlineKeyboardButton(
text="Добавить",
callback_data=AddSubscriptionCallbackData().pack(),
),
types.InlineKeyboardButton(
text="Назад",
callback_data=MenuCallbackData().pack(),
),
)
reply_markup = keyboard.as_markup()
if message is not None:
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
elif callback_query is not None:
await callback_query.message.edit_text(
text=text,
parse_mode=ParseMode.MARKDOWN,
)
await callback_query.message.edit_reply_markup(
reply_markup=reply_markup,
)
async def ask_add_subscription_pool_description(
message: types.Message,
):
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.SKIP_BUTTON)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text="Введи описание для сбора",
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_pool_payment_phone(
message: types.Message,
user: User,
):
text = (
"*Введите номер телефона, на который будете принимать сбор денег*\n\n"
"ИЛИ\n\n"
"*Отправьте свой контакт*\n\n"
)
if user.gift_payment_data is not None:
bank_title = constants.BANKS_TITLE_MAP[user.gift_payment_data.bank]
text += (
"ИЛИ\n\n"
"*Используйте данные из своего профиля*:\n"
f"📱 _Телефон_: {user.gift_payment_data.phone}\n"
f"🏦 _Банк_: {bank_title}\n\n"
)
keyboard = ReplyKeyboardBuilder()
keyboard.button(text=constants.SHARE_CONTACT_BUTTON, request_contact=True)
if user.gift_payment_data is not None:
keyboard.button(text=constants.USE_PROFILE_GIFT_PAYMENT_DATA)
keyboard.button(text=constants.BACK_BUTTON)
keyboard.adjust(1)
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_pool_payment_bank(
message: types.Message,
):
text = "Выберите банк для приёма платежей"
keyboard = ReplyKeyboardBuilder()
for title, type_ in constants.BANKS_MAP.items():
keyboard.button(text=title)
keyboard.adjust(3)
keyboard.row(types.KeyboardButton(text=constants.BACK_BUTTON))
reply_markup = keyboard.as_markup(
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)
async def ask_add_subscription_confirmation(
message: types.Message,
users_repository: UsersRepository,
pools_repository: PoolsRepository,
subscription_name: str,
subscription_user_id: uuid.UUID,
subscription_pool_id: uuid.UUID | None = None,
subscription_pool_description: str | None = None,
subscription_pool_phone: str | None = None,
subscription_pool_bank: BankEnum | None = None,
):
async with users_repository.transaction():
subscription_user = await users_repository.get_user_by_id(
user_id=subscription_user_id,
)
if subscription_user is None:
raise FlowInternalError()
subscription_pool = None
if subscription_pool_id is not None:
async with pools_repository.transaction():
subscription_pool = await pools_repository.get_pool_by_id(
pool_id=subscription_pool_id,
with_owner=True,
)
if subscription_pool is None:
raise FlowInternalError()
text = (
f"Вы хотите подписаться на пользователя {subscription_name}?\n\n"
f"🎂 *День рождения*: {subscription_user.birthday.strftime('%d.%m.%Y')}\n\n"
)
if all((subscription_pool_description, subscription_pool_phone, subscription_pool_bank)):
bank_title = constants.BANKS_TITLE_MAP[subscription_pool_bank]
text += (
"💳️ *Вы собираете деньги на*:\n"
f"📱 _Телефон_: {subscription_pool_phone}\n"
f"🏦 _Банк_: {bank_title}\n\n"
)
if subscription_pool_description:
text += (
"_Описание_:\n"
"```\n"
f"{subscription_pool_description}"
"```\n\n"
)
elif subscription_pool is not None:
text += "💳️ *Вы участвуете в сборе денег*\n\n"
keyboard = InlineKeyboardBuilder()
keyboard.button(
text="Да",
callback_data=AddSubscriptionConfirmCallbackData(
answer=ConfirmAnswerEnum.YES,
).pack(),
)
keyboard.button(
text="Нет",
callback_data=AddSubscriptionConfirmCallbackData(
answer=ConfirmAnswerEnum.NO,
).pack(),
)
keyboard.adjust(2)
reply_markup = keyboard.as_markup()
await message.reply(
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=reply_markup,
)