#! /usr/bin/env python import sys, ssl, random#, string, import irc.bot#, irc.strings #https://python-irc.readthedocs.io/en/latest/irc.html# #from irc.client import ip_numstr_to_quad#, ip_quad_to_numstr from jaraco.stream import buffer from postgres import Postgres # https://postgres-py.readthedocs.io/en/latest/ import commands.public, commands.admin, commands.games, commands.statistics import events.on_welcome, events.on_join, events.on_kick, events.on_mode, events.on_pubmsg, events.on_action, events.on_whoreply, events.on_nick from common import log, font, queries from common.networkservices import NickServ from events.common import Inform #, MessageStatistics #Lastact, class PyRot(irc.bot.SingleServerIRCBot): def __init__(self, network, db, webgui): # Globals, so it's not needed to pass network and db with every call. self.network = network self.db = db self.webgui = webgui # Pick a random host from the network. hosts = db.all("SELECT * FROM rotbot_host WHERE network_id=%(network)s", network=network.id) host = random.choice(hosts) connect_string = (host.address, host.port) irc.client.ServerConnection.buffer_class = buffer.LenientDecodingLineBuffer # Set buffer class. # self.protectees = {} # self.channelkeys = {} # Log and record to database. log.info('Connecting to %s:%s/%s' % (connect_string[0], connect_string[1], network.home_channel)) db.run("UPDATE rotbot_host SET connection_attempts = connection_attempts + 1 WHERE id=%s", [network.id]) # Create connect factory with correct scheme. if host.ssl: factory = irc.connection.Factory(wrapper=ssl.wrap_socket) else: factory = irc.connection.Factory() try: irc.bot.SingleServerIRCBot.__init__(self, [connect_string], network.nickname, network.username, connect_factory=factory) # Connect. except irc.client.ServerConnectionError: # Connection failure. sys.exit(sys.exc_info()[1]) # Exit. ## Events. def on_nicknameinuse(self, connection, event): log.info('Nickname %s in use, attempting to recover: %s' % (self.network.nickname, connection.nickname)) connection.nick(connection.nickname + ''.join(random.choice(string.digits) for _ in range(3))) # Take temporary nick. In this state recovering via NickServ won't work without changing nickname. NickServ.recover_nick(connection, self.network.password) def on_welcome(self, connection, event): events.on_welcome.process_event(self, connection, event) # def on_error(self, connection, event): log.notice(str(event)) connection.privmsg(self.homechannel, 'ERROR: %s' %s (event)) Inform.notice_owners(self, connection, 'Received error from server: %s' % (event)) # # def on_nick(self, connection, event): # events.on_nick.process_event(self, connection, event) # def on_join(self, connection, event): events.on_join.process_event(self, connection, event) def on_mode(self, connection, event): events.on_mode.process_event(self, connection, event) def on_kick(self, connection, event): events.on_kick.process_event(self, connection, event) # # def on_part(self, connection, event): # log.info(event) # # # Update protectees. # if event.target == self.homechannel and event.source.nick in self.protectees: # Protectee parted home channel. # del self.protectees[event.source.nick] # Delete from protectees. # # # def on_quit(self, connection, event): # log.info(event) # # # Update protectees. # if event.source.nick in self.protectees: # Protectee parted home channel. # del self.protectees[event.source.nick] # Delete from protectees. # def on_invite(self, connection, event): log.notice(event) if event.target == connection.get_nickname(): # Bot invited. Inform.operators(self, connection, 'Received invitation to %s %s %s form %s %s %s.' % font.red, event.arguments[0], font.reset, font.red, event.source.nick, font.reset) # # def on_topic(self, connection, event): # log.info(event) # # def on_pubmsg(self, connection, event): # Channel message events.on_pubmsg.process_event(self, connection, event) def on_action(self, connection, event): # Is this both channel and private actions? events.on_action.process_event(self, connection, event) def on_privmsg(self, connection, event): # Private message log.info(event) user = queries.create_or_get_and_update_last_event(self, 'user', 'pm', user_name=event.source.nick, event_subject_name=event.target) channel = None if not event.source.nick == connection.get_nickname(): # Message is not from myself. Inform.owners(self, connection, 'PM from %s%s%s: %s%s' % (font.red, event.source.nick, font.grey, font.green, event.arguments[0])) # Forward message to users with owner rights of home channel. commands.public.do_command(self, connection, event, user, channel) commands.admin.do_command(self, connection, event, user, channel) commands.statistics.do_command(self, connection, event, user, channel) commands.games.do_command(self, connection, event, user, channel) def on_pubnotice(self, connection, event): # Channel notice log.info(event) user = queries.create_or_get_and_update_last_event(self, 'user', 'cn', channel_name=event.target, user_name=event.source.nick, event_content=event.arguments[0]) channel = queries.create_or_get_and_update_last_event(self, 'channel', 'cn', channel_name=event.target, user_name=event.source.nick, event_content=event.arguments[0]) def on_privnotice(self, connection, event): # Private notice. log.info(event) user = queries.create_or_get_and_update_last_event(self, 'user', 'pn', user_name=event.source.nick, event_subject_name=event.target) channel = None commands.public.do_command(self, connection, event, user, channel) commands.admin.do_command(self, connection, event, user, channel) commands.statistics.do_command(self, connection, event, user, channel) commands.games.do_command(self, connection, event, user, channel) if event.source.nick == connection.get_nickname(): # Message came from myself. return elif event.source.nick == "NickServ": if event.arguments[0].startswith("This nickname is registered"): connection.privmsg('NickServ', 'identify %s %s' % (self.network.nickname, self.network.password)) # Identify with NickServ. if event.arguments[0].startswith("You are already identified."): return # Nick \x02RotBot\x02 isn't registered. if event.arguments[0].endswith(' is not a registered nickname.') or event.arguments[0].startswith('Nick ') and event.arguments[0].endswith(' isn\'t registered.') or event.arguments[0] == 'Your nick isn\'t registered.': # Username from database is not registered. connection.privmsg('NickServ', 'register %s spamtBK@xs4all.nl' % (self.network.password)) # Register with NickServ. connection.privmsg(self.network.home_channel, 'Regisring %s%s%s with %sNickServ%s.' % (font.red, self.network.nickname, font.reset, font.red, font.reset)) # Recover control of nickname via NickServ, old style. log.info('Registerring with NickServ.') if event.arguments[0].startswith('Nickname ') and event.arguments[0].endswith(' registered.'): Inform.home_channel(seld, connection, 'Registerred nickname %s%s%s with NickServ.' % font.red, self.network.nickname, font.reset) # elif event.source.nick == "ChanServ": # if event.arguments[0].startswith("Key for channel ") and len(event.arguments[0]) > 5: # Received channel key. # self.channelkeys[event.arguments[0].split(' ')[3]] = event.arguments[0].split(' ')[5][:-1] # connection.join(event.arguments[0].split(' ')[3], event.arguments[0].split(' ')[5][:-1]) # Inform.owners(self, connection, "Received " + red + event.arguments[0].split(" ")[3] + reset + " key: " + event.arguments.split(" ")[5][:-1]) # if event.arguments[0] == "Password authentication required for that command.": # Not authenticated with NisckServ. # Inform.notice_owners(self, connection, "Not authenticated with NickServ. See " + blue + self.helpchar + "recovernick " + reset + "and " + blue + self.helpchar + "registernick" + reset + ".") # return # if event.arguments[0].startswith("You have been unbanned from ") or event.arguments[0].endswith(" autokick list is empty.") or event.arguments[0].startswith("You are already in ") or event.arguments[0] == "Syntax: UNBAN channel [nick]" or event.arguments[0] == "/msg ChanServ HELP UNBAN for more information": # return # if event.arguments[0].startswith("Channel ") and event.arguments[0].endswith(" has no key."): # return if event.source.nick != "Global": Inform.notice_owners(self, connection, 'Notice from %s %s %s %s: %s %s' % (font.red, font.red, event.source.nick, font.grey, font.reset, event.arguments[0])) # # def on_whoreply(self, connection, event): # events.on_whoreply.process_event(self, connection, event) # def on_keyset(self, connection, event): log.info(event) Inform.owners(self, connection, "keyset " + font.green + event.arguments[0] + reset.reset + " from " + font.red + event.source) def on_yourhost(self, connection, event): log.info(event) def on_yourebannedcreep(self, connection, event): Inform.operators(self, connection, "I am banned " + font.red + event.arguments[0] + reset.reset + " from " + font.red + event.source) log.warning(event) def on_youwillbebanned(self, connection, event): log.warning(event) Inform.operators(self, connection, "I will be banned " + font.red + event.arguments[0] + reset.reset + " from " + font.red + event.source) # DCC stuff from originalexample file. def on_dccmsg(self, c, e): log.info(e) # non-chat DCC messages are raw bytes; decode as text #text = e.arguments[0].decode('utf-8') #c.privmsg("You said: " + text) def on_dccchat(self, c, e): log.info(e) # if len(e.arguments) != 2: # return # args = e.arguments[1].split() # if len(args) == 4: # try: # address = ip_numstr_to_quad(args[2]) # port = int(args[3]) # except ValueError: # return # self.dcc_connect(address, port) def main(): log.info('Starting RotBot: pyRot version.') # Validate system arguments and warn if needed.. if len(sys.argv) != 2: # Not 2 arguments. sys.exit('To run type "python bot.py database_network_id", not: %s. Terminating program.' % (sys.argv)) # Terminate program. instance = sys.argv[1] # Instance is the database network id. ### SETTINGS webgui = { 'base_url': 'https://h0v1n8.nl/rotbot/', # The django rotbot app url. 'register_url': '', } # Database credentials username = 'pyrot' password = 'oGPnbiqh55QKLhmnKQgS92h74j0e9d6LE58cSsD1' db_connect_string = 'postgres://%s:%s@localhost/website' % (username, password) try: db = Postgres(db_connect_string) # Connect to database. except: sys.exit('Database connection %s failed: %s' % (db_connect_string, sys.exc_info())) # Exit. try: network = db.one("SELECT * FROM rotbot_network WHERE id=%(instance)s", instance=instance) # Get network from database. except: sys.exit('Could not retrieve network %s from database: %s' % (instance, sys.exc_info())) # Exit. if network == None: # No data returned. sys.exit('Network %s not found, terminating program.' % (instance)) # Exit. if network.enabled == False: # Network disabled in database. sys.exit('Network %s marked as disabled in database, use webgui to enable. Terminating program.' % (network.name)) # Exit bot = PyRot(network, db, webgui) bot.start() if __name__ == "__main__": try: main() # Run the program. except KeyboardInterrupt: # User presses CTRL+C sys.exit('Interrupted by keyboard.') # Exit.