This commit is contained in:
2025-05-26 22:23:08 +02:00
parent 3013e13163
commit b5db2cd99e
5 changed files with 351 additions and 7 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
run.sh
.env/
__pycache__/
__pycache__/
active_users.json

View File

@ -1,11 +1,130 @@
# This cog manages user activity - Keeps "active" users in a json and gives them an "active" role
# User activity is defined by (in the last 30 days):
# - Sending messages
# - Joining voice channels
import os
from discord.ext import commands
import logging
import json
import datetime
from discord.ext import tasks
role_id = os.getenv("ACTIVE_ROLE_ID")
if not role_id:
raise ValueError("ACTIVE_ROLE_ID mnot set")
class ActivityMonitorCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.active_users = {}
self.activity_file = "active_users.json"
self.load_active_users()
logging.info("ActivityMonitorCog initialized")
self.check_activity.start()
@tasks.loop(hours=24)
async def check_activity(self):
"""Check activity daily and update roles"""
await self.purge_inactive_users()
logging.info("Completed daily activity check")
@check_activity.before_loop
async def before_check_activity(self):
await self.bot.wait_until_ready()
def load_active_users(self):
"""Load active users from JSON file"""
try:
with open(self.activity_file, 'r') as f:
data = json.load(f)
self.active_users = data.get('users', {})
except FileNotFoundError:
self.active_users = {}
logging.info("No existing active users file found, starting fresh")
def save_active_users(self):
"""Save active users to JSON file"""
data = {'users': self.active_users}
with open(self.activity_file, 'w') as f:
json.dump(data, f)
async def update(self, user_id, guild_id):
"""Add user to active set and update their role"""
logging.info(f"Updating activity for user {user_id} in guild {guild_id}")
current_time = datetime.datetime.now().isoformat()
self.active_users[str(user_id)] = current_time
self.save_active_users()
# Get the server and user
server = self.bot.get_guild(guild_id)
user = server.get_member(user_id)
if user:
role = server.get_role(int(role_id))
if role and role not in user.roles:
await user.add_roles(role)
logging.info(f"Added active role to {user.display_name}")
async def purge_inactive_users(self):
"""Remove active role from users inactive for 30+ days"""
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=30)
inactive_users = []
for user_id, last_active in self.active_users.items():
last_active_date = datetime.datetime.fromisoformat(last_active)
if last_active_date < cutoff_date:
inactive_users.append(user_id)
for user_id in inactive_users:
del self.active_users[user_id]
# Remove role from all guilds
for s in self.bot.guilds:
u = s.get_member(int(user_id))
if u:
role = s.get_role(int(role_id))
if role and role in u.roles:
await u.remove_roles(role)
logging.info(f"Removed active role from {u.display_name} due to inactivity")
if inactive_users:
self.save_active_users()
# all members in all guilds
for guild in self.bot.guilds:
role = guild.get_role(int(role_id))
if role:
for member in guild.members:
if not member.bot and str(member.id) not in self.active_users:
if role in member.roles:
await member.remove_roles(role)
logging.info(f"Removed active role from {member.display_name} - not in active users list")
@commands.Cog.listener()
async def on_message(self, message):
"""Track message activity"""
if not message.author.bot:
await self.update(message.author.id, message.guild.id)
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
"""Track voice channel activity"""
if after.channel and not member.bot:
await self.update(member.id, member.guild.id)
@commands.Cog.listener()
async def on_reaction_add(self, reaction, user):
"""Track reaction activity"""
if not user.bot:
await self.update(user.id, reaction.message.guild.id)
@commands.command()
@commands.has_permissions(administrator=True)
async def purge_inactive(self, ctx):
"""Manually trigger purge of inactive users"""
await self.purge_inactive_users()
await ctx.send("Purged inactive users.")
async def setup(bot):
await bot.add_cog(ActivityMonitorCog(bot))

View File

@ -1,11 +1,24 @@
# This cog gives every user a general role when they join the server
import os
from discord.ext import commands
import logging
role_id = os.getenv("FREN_ROLE_ID")
if not role_id:
raise ValueError("ACTIVE_ROLE_ID not set")
class FrenGiverCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
@commands.Cog.listener()
async def on_member_join(self, member):
try:
role = member.guild.get_role(int(role_id))
await member.add_roles(role)
logging.info(f"Added role to {member.display_name}")
except Exception as e:
logging.error(f"Failed to add role to {member.display_name}: {e}")
async def setup(bot):
await bot.add_cog(FrenGiverCog(bot))

View File

@ -1,11 +1,213 @@
# This cog takes in a /poll <discord_username> and creates a poll in the channel asking whether the user should be invited to the server.
# It takes the active users list and determines how many votes should be reached (50% of active users vote for an option)
# The cog will send an invite link to the user if the poll passes, and if it fails (due to user not having direct messages on) it will send the invite link in the voting channel.
# Invite links are 1 time use only, and will expire after 24 hours.
import os
import discord
from discord.ext import commands
import logging
from datetime import datetime, timedelta
HOURS = 24
voting_channel_id = int(os.getenv("VOTING_CHANNEL_ID"))
if not voting_channel_id:
raise ValueError("VOTING_CHANNEL_ID not set")
class PollCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.active_polls = {}
@discord.app_commands.command(
name="poll", description="Create a poll to invite a user to the server"
)
async def poll(self, interaction: discord.Interaction, username: str):
"""Create a poll for inviting a user to the server"""
# Check if we're in the voting channel
if interaction.channel_id != voting_channel_id:
await interaction.response.send_message(
f"This command can only be used in <#{voting_channel_id}>",
ephemeral=True,
)
return
active_members = [
member
for member in interaction.guild.members
if not member.bot and member.status != discord.Status.offline
]
required_votes = max(1, len(active_members) // 2)
question = f"Should we invite {username} to the server?"
duration = timedelta(hours=HOURS)
poll = discord.Poll(question=question, duration=duration)
poll.add_answer(text="Yes", emoji="👍")
poll.add_answer(text="No", emoji="👎")
embed = discord.Embed(
title=f"Voting for the invitation of {username}",
description=f"Required votes: **{required_votes}** (50% of {len(active_members)} active members)\n"
f"Poll will expire in {HOURS} hours.",
color=discord.Color.blue(),
)
await interaction.response.send_message(embed=embed)
# message = await interaction.original_response()
# await message.edit(content="", embed=embed, poll=poll)
# Send separate message with poll
channel = self.bot.get_channel(voting_channel_id)
poll_message = await channel.send(
poll=poll,
)
self.active_polls[poll_message.id] = {
"username": username,
"required_votes": required_votes,
"channel_id": interaction.channel_id,
"creator": interaction.user.id,
"poll": poll,
"message_id": poll_message.id,
}
def __get_msg_id(self, poll: discord.Poll):
"""Get the message ID for a given poll"""
for msg_id, data in self.active_polls.items():
if data.get("poll").question == poll.question and data.get("poll").duration == poll.duration:
return msg_id
return None
@commands.Cog.listener()
async def on_poll_vote_add(self, user: discord.User, answer):
"""Handle poll vote additions"""
await self._handle_poll_vote_change(answer.poll)
@commands.Cog.listener()
async def on_poll_vote_remove(self, user: discord.User, answer):
"""Handle poll vote removals"""
await self._handle_poll_vote_change(answer.poll)
async def _handle_poll_vote_change(self, poll: discord.Poll):
"""Handle poll voting updates"""
message_id = self.__get_msg_id(poll)
logging.info(f"Handling poll vote change for message ID: {message_id}")
if not message_id:
return
poll_data = self.active_polls[message_id]
required_votes = poll_data["required_votes"]
# these are inverted for some reason
yes_answer = poll.get_answer(1)
no_answer = poll.get_answer(0)
yes_count = yes_answer.vote_count if yes_answer else 0
no_count = no_answer.vote_count if no_answer else 0
if yes_count >= required_votes:
await self._end_poll(message_id, poll_data, True)
elif no_count >= required_votes:
await self._end_poll(message_id, poll_data, False)
@commands.Cog.listener()
async def on_poll_end(self, poll: discord.Poll):
"""When a poll times out"""
message_id = self.__get_msg_id(poll)
if not message_id:
return
poll_data = self.active_polls[message_id]
# Determine if poll passed
yes_answer = poll.get_answer(0)
no_answer = poll.get_answer(1)
yes_count = yes_answer.vote_count if yes_answer else 0
no_count = no_answer.vote_count if no_answer else 0
passed = yes_count > no_count
# End the poll
await self._end_poll(message_id, poll_data, passed)
def __result_embed(self, username, passed, invite_url=None):
"""Create result embed for poll completion"""
title = f"Poll Results for {username}"
if passed:
description = ""
color = discord.Color.green()
if invite_url:
description += f" Their DMs are off, so send them this!\n\nInvite link: {invite_url}"
else:
description += " Manual invite required"
else:
description = "❌ Invitation denied"
color = discord.Color.red()
embed = discord.Embed(
title=title,
description=description,
color=color,
timestamp=datetime.utcnow(),
)
embed.set_footer(text="Poll completed")
return embed
async def _end_poll(self, message_id, poll_data, passed):
"""End the poll and handle the result"""
username = poll_data["username"]
channel = self.bot.get_channel(poll_data["channel_id"])
if not channel:
return
message = await channel.fetch_message(message_id)
if not message:
return
if message.poll and not message.poll.is_finalised():
try:
await message.poll.end()
except Exception as e:
logging.error(f"Failed to end poll: {e}")
del self.active_polls[message_id]
invite_url = None
if passed:
# Create invite link to os.env("INVITE_CHANNEL")
try:
# invite = await channel.create_invite(
# max_uses=1,
# max_age=timedelta(hours=HOURS),
# reason=f"Poll passed for {username}",
# )
inv_channel = self.bot.get_channel(int(os.getenv("INVITE_CHANNEL")))
if not inv_channel:
logging.error("INVITE_CHANNEL not found")
return
invite = await inv_channel.create_invite(
max_uses=1,
max_age=int(timedelta(hours=HOURS).total_seconds()),
reason=f"Poll passed for {username}",
)
invite_url = invite.url
except discord.Forbidden:
logging.error("Bot does not have permission to create invites.")
except Exception as e:
logging.error(f"Failed to create invite: {e}")
invite_url = None
result_embed = self.__result_embed(username, passed, invite_url)
await message.edit(embed=result_embed)
async def setup(bot):
await bot.add_cog(PollCog(bot))

View File

@ -10,8 +10,13 @@ logging.basicConfig(
datefmt="%Y-%m-%d %H:%M:%S",
)
intents = discord.Intents.default()
intents = discord.Intents.all()
intents.message_content = True
intents.members = True
intents.guilds = True
intents.reactions = True
intents.voice_states = True
bot = commands.Bot(command_prefix="!", intents=intents)
async def load_cogs():
@ -28,6 +33,8 @@ async def load_cogs():
@bot.event
async def on_ready():
logging.info(f"Logged in as {bot.user} (ID: {bot.user.id})")
synced = await bot.tree.sync()
logging.info(f"Synced {len(synced)} command(s)")
logging.info("Bot is ready.")
async def main():
@ -37,5 +44,7 @@ async def main():
# Run the bot
if __name__ == "__main__":
asyncio.run(main())
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("...")