#! /usr/bin/env python import sys, random, string, ssl import irc.bot#, irc.strings from irc.client import ip_numstr_to_quad#, ip_quad_to_numstr from jaraco.stream import buffer from postgres import Postgres import commands.public, commands.admin, commands.games, commands.statistics import events.on_welcome, events.on_join, events.on_kick, events.on_mode, events.on_pubmsg, events.on_action, events.on_whoreply, events.on_nick from common import log from common.networkservices import NickServ from events.common import Lastact, MessageStatistics, Inform bold = "\x02" italic = "\x1D" underline = "\x1F" reverse = "\x16" # swap background and foreground colors ("reverse video") reset = "\x0F" blue = "\x0302" green = "\x0303" red = "\x0304" grey = "\x0314" class PyRot(irc.bot.SingleServerIRCBot): def __init__(self, network, db, homechannel, nickname, username, password, host, port=6667, usessl=False, cmdchar="!", helpchar="@"): self.network = network self.db = db self.homechannel = homechannel self.password = password self.cmdchar = cmdchar self.helpchar = helpchar self.protectees = {} self.channelkeys = {} log.info("Starting pyRot, the third RotBot by tBkwtWS.") log.info("Connecting to " + host + ":" + str(port) + "/" + self.homechannel) if usessl: factory = irc.connection.Factory(wrapper=ssl.wrap_socket) else: factory = irc.connection.Factory() irc.client.ServerConnection.buffer_class = buffer.LenientDecodingLineBuffer try: irc.bot.SingleServerIRCBot.__init__(self, [(host, port)], nickname, username, connect_factory=factory) except irc.client.ServerConnectionError: sys.stderr.write(sys.exc_info()[1]) # Events. def on_nicknameinuse(self, connection, event): log.info("Nickname in use, attempting to recover: " + connection.nickname) connection.nick(connection.nickname + ''.join(random.choice(string.digits) for _ in range(3))) # Take temporary nick. Without this recovering via NickServ won't work. NickServ.recover_nick(connection, self.password) def on_welcome(self, connection, event): events.on_welcome.process_event(self, connection, event) def on_error(self, connection, event): log.notice(str(event)) connection.privmsg(self.homechannel, "ERROR: " + str(event)) Inform.owners(self, connection, "Received error from server: " + str(event)) def on_nick(self, connection, event): events.on_nick.process_event(self, connection, event) def on_join(self, connection, event): events.on_join.process_event(self, connection, event) def on_kick(self, connection, event): events.on_kick.process_event(self, connection, event) def on_mode(self, connection, event): events.on_mode.process_event(self, connection, event) def on_part(self, connection, event): log.info(event) # Update protectees. if event.target == self.homechannel and event.source.nick in self.protectees: # Protectee parted home channel. del self.protectees[event.source.nick] # Delete from protectees. # Update last act. if event.arguments: Lastact.update(self, event.source.nick, "part", channel=event.target, lastact=event.arguments[0]) else: Lastact.update(self, event.source.nick, "part", channel=event.target) def on_quit(self, connection, event): log.info(event) # Update protectees. if event.source.nick in self.protectees: # Protectee parted home channel. del self.protectees[event.source.nick] # Delete from protectees. # Update last act. if event.arguments: Lastact.update(self, event.source.nick, "quit", lastact=event.arguments[0]) else: Lastact.update(self, event.source.nick, "quit") def on_invite(self, connection, event): log.notice(event) if event.target == connection.get_nickname(): # Bot invited. Inform.operators(self, connection, "Received invitation to " + red + event.arguments[0] + reset + " from " + red + event.source.nick + reset + ".") def on_topic(self, connection, event): log.info(event) # Update last act. Lastact.update(self, event.source.nick, "topic", channel=event.target, lastact=event.arguments[0]) def on_pubmsg(self, connection, event): events.on_pubmsg.process_event(self, connection, event) commands.public.do_command(self, connection, event) commands.admin.do_command(self, connection, event) commands.statistics.do_command(self, connection, event) commands.games.do_command(self, connection, event) def on_privmsg(self, connection, event): log.info(event) commands.public.do_command(self, connection, event) commands.admin.do_command(self, connection, event) commands.statistics.do_command(self, connection, event) commands.games.do_command(self, connection, event) if not event.source.nick == connection.get_nickname(): Inform.owners(self, connection, "PM from " + red + red + event.source.nick + grey + ": " + reset + event.arguments[0]) def on_pubnotice(self, connection, event): log.info(event) # Update last act. Lastact.update(self, event.source.nick, "notice", channel=event.target, lastact=event.arguments[0]) # Save statistic to database. MessageStatistics.update(self, event, "notice") def on_privnotice(self, connection, event): log.info(event) commands.public.do_command(self, connection, event) commands.admin.do_command(self, connection, event) commands.statistics.do_command(self, connection, event) commands.games.do_command(self, connection, event) try: if event.source.nick == connection.get_nickname(): return elif event.source.nick == "NickServ": if event.arguments[0].startswith("This nickname is registered"): connection.privmsg("NickServ", "identify " + connection.nickname + " " + self.password) # Identify with NickServ. if event.arguments[0] == "You are already identified.": return elif event.source.nick == "ChanServ": if event.arguments[0].startswith("Key for channel ") and len(event.arguments[0]) > 5: # Received channel key. self.channelkeys[event.arguments[0].split(' ')[3]] = event.arguments[0].split(' ')[5][:-1] connection.join(event.arguments[0].split(' ')[3], event.arguments[0].split(' ')[5][:-1]) Inform.owners(self, connection, "Received " + red + event.arguments[0].split(" ")[3] + reset + " key: " + event.arguments.split(" ")[5][:-1]) if event.arguments[0] == "Password authentication required for that command.": # Not authenticated with NisckServ. Inform.notice_owners(self, connection, "Not authenticated with NickServ. See " + blue + self.helpchar + "recovernick " + reset + "and " + blue + self.helpchar + "registernick" + reset + ".") return if event.arguments[0].startswith("You have been unbanned from ") or event.arguments[0].endswith(" autokick list is empty.") or event.arguments[0].startswith("You are already in ") or event.arguments[0] == "Syntax: UNBAN channel [nick]" or event.arguments[0] == "/msg ChanServ HELP UNBAN for more information": return if event.arguments[0].startswith("Channel ") and event.arguments[0].endswith(" has no key."): return elif event.source.nick == "Global": return Inform.notice_owners(self, connection, "Notice from " + red + red + event.source.nick + grey + ": " + reset + event.arguments[0]) except: pass def on_action(self, connection, event): events.on_action.process_event(self, connection, event) def on_whoreply(self, connection, event): events.on_whoreply.process_event(self, connection, event) def on_keyset(self, connection, event): log.info(event) Inform.owners(self, connection, "keyset " + green + event.arguments[0] + reset + " from " + red + event.source) def on_yourhost(self, connection, event): log.info(event) def on_yourebannedcreep(self, connection, event): log.warning(event) def on_youwillbebanned(self, connection, event): log.warning(event) Inform.operators(self, connection, "I will be banned " + red + event.arguments[0] + reset + " from " + red + event.source) # DCC stuff from originalexample file. def on_dccmsg(self, c, e): log.info(e) # non-chat DCC messages are raw bytes; decode as text text = e.arguments[0].decode('utf-8') c.privmsg("You said: " + text) def on_dccchat(self, c, e): log.info(e) if len(e.arguments) != 2: return args = e.arguments[1].split() if len(args) == 4: try: address = ip_numstr_to_quad(args[2]) port = int(args[3]) except ValueError: return self.dcc_connect(address, port) def main(): # Check system arguments. if len(sys.argv) != 2: print(sys.argv) print("Usage: rotbot ") sys.exit(1) instance = sys.argv[1] # Instance is the database network id. # Database. db = Postgres("postgres://pyRot:4h8q(.@localhost/pyRot") # Get network from database. try: network = db.one("SELECT * FROM networks WHERE id=" + str(instance)) except: print("Invalid network ID.") sys.exit(1) if network == None: print("Invalid network ID.") sys.exit(1) bot = PyRot(network.name, db, network.home_channel, network.nickname, network.username, network.password, network.host, network.port, network.use_ssl, network.command_character, network.help_character) bot.start() if __name__ == "__main__": try: main() except KeyboardInterrupt: log.info('Interrupted by keyboard.') sys.exit(0)