#! /usr/bin/env python import sys, random, string, ssl import irc.bot#, irc.strings from irc.client import ip_numstr_to_quad#, ip_quad_to_numstr from jaraco.stream import buffer from postgres import Postgres import commands.public, commands.admin, commands.games, commands.statistics import events.on_welcome, events.on_join, events.on_kick, events.on_mode, events.on_pubmsg, events.on_action, events.on_whoreply, events.on_nick #from common import log #from common.networkservices import NickServ #from events.common import Lastact, MessageStatistics, Inform bold = "\x02" italic = "\x1D" underline = "\x1F" reverse = "\x16" # swap background and foreground colors ("reverse video") reset = "\x0F" blue = "\x0302" green = "\x0303" red = "\x0304" grey = "\x0314" # class PyRot(irc.bot.SingleServerIRCBot): # def __init__(self, network, db, homechannel, nickname, username, password, host, port=6667, usessl=False, cmdchar="!", helpchar="@"): # self.network = network # self.db = db # self.homechannel = homechannel # self.password = password # self.cmdchar = cmdchar # self.helpchar = helpchar # self.protectees = {} # self.channelkeys = {} # # log.info("Starting pyRot, by tBKwtWS.") # log.info("Connecting to " + host + ":" + str(port) + "/" + self.homechannel) # # if usessl: # factory = irc.connection.Factory(wrapper=ssl.wrap_socket) # else: # factory = irc.connection.Factory() # # irc.client.ServerConnection.buffer_class = buffer.LenientDecodingLineBuffer # try: # irc.bot.SingleServerIRCBot.__init__(self, [(host, port)], nickname, username, connect_factory=factory) # except irc.client.ServerConnectionError: # sys.stderr.write(sys.exc_info()[1]) # # # Events. # def on_nicknameinuse(self, connection, event): # log.info("Nickname in use, attempting to recover: " + connection.nickname) # connection.nick(connection.nickname + ''.join(random.choice(string.digits) for _ in range(3))) # Take temporary nick. Without this recovering via NickServ won't work. # NickServ.recover_nick(connection, self.password) # # def on_welcome(self, connection, event): # events.on_welcome.process_event(self, connection, event) # # def on_error(self, connection, event): # log.notice(str(event)) # connection.privmsg(self.homechannel, "ERROR: " + str(event)) # Inform.owners(self, connection, "Received error from server: " + str(event)) # # def on_nick(self, connection, event): # events.on_nick.process_event(self, connection, event) # # def on_join(self, connection, event): # events.on_join.process_event(self, connection, event) # # def on_kick(self, connection, event): # events.on_kick.process_event(self, connection, event) # # def on_mode(self, connection, event): # events.on_mode.process_event(self, connection, event) # # def on_part(self, connection, event): # log.info(event) # # # Update protectees. # if event.target == self.homechannel and event.source.nick in self.protectees: # Protectee parted home channel. # del self.protectees[event.source.nick] # Delete from protectees. # # # Update last act. # if event.arguments: # Lastact.update(self, event.source.nick, "part", channel=event.target, lastact=event.arguments[0]) # else: # Lastact.update(self, event.source.nick, "part", channel=event.target) # # def on_quit(self, connection, event): # log.info(event) # # # Update protectees. # if event.source.nick in self.protectees: # Protectee parted home channel. # del self.protectees[event.source.nick] # Delete from protectees. # # # Update last act. # if event.arguments: # Lastact.update(self, event.source.nick, "quit", lastact=event.arguments[0]) # else: # Lastact.update(self, event.source.nick, "quit") # # def on_invite(self, connection, event): # log.notice(event) # if event.target == connection.get_nickname(): # Bot invited. # Inform.operators(self, connection, "Received invitation to " + red + event.arguments[0] + reset + " from " + red + event.source.nick + reset + ".") # # def on_topic(self, connection, event): # log.info(event) # # # Update last act. # Lastact.update(self, event.source.nick, "topic", channel=event.target, lastact=event.arguments[0]) # # def on_pubmsg(self, connection, event): # events.on_pubmsg.process_event(self, connection, event) # commands.public.do_command(self, connection, event) # commands.admin.do_command(self, connection, event) # commands.statistics.do_command(self, connection, event) # commands.games.do_command(self, connection, event) # # def on_privmsg(self, connection, event): # log.info(event) # commands.public.do_command(self, connection, event) # commands.admin.do_command(self, connection, event) # commands.statistics.do_command(self, connection, event) # commands.games.do_command(self, connection, event) # if not event.source.nick == connection.get_nickname(): # Inform.owners(self, connection, "PM from " + red + red + event.source.nick + grey + ": " + reset + event.arguments[0]) # # def on_pubnotice(self, connection, event): # log.info(event) # # # Update last act. # Lastact.update(self, event.source.nick, "notice", channel=event.target, lastact=event.arguments[0]) # # # Save statistic to database. # MessageStatistics.update(self, event, "notice") # # def on_privnotice(self, connection, event): # log.info(event) # commands.public.do_command(self, connection, event) # commands.admin.do_command(self, connection, event) # commands.statistics.do_command(self, connection, event) # commands.games.do_command(self, connection, event) # try: # if event.source.nick == connection.get_nickname(): # return # elif event.source.nick == "NickServ": # if event.arguments[0].startswith("This nickname is registered"): # connection.privmsg("NickServ", "identify " + connection.nickname + " " + self.password) # Identify with NickServ. # if event.arguments[0] == "You are already identified.": # return # elif event.source.nick == "ChanServ": # if event.arguments[0].startswith("Key for channel ") and len(event.arguments[0]) > 5: # Received channel key. # self.channelkeys[event.arguments[0].split(' ')[3]] = event.arguments[0].split(' ')[5][:-1] # connection.join(event.arguments[0].split(' ')[3], event.arguments[0].split(' ')[5][:-1]) # Inform.owners(self, connection, "Received " + red + event.arguments[0].split(" ")[3] + reset + " key: " + event.arguments.split(" ")[5][:-1]) # if event.arguments[0] == "Password authentication required for that command.": # Not authenticated with NisckServ. # Inform.notice_owners(self, connection, "Not authenticated with NickServ. See " + blue + self.helpchar + "recovernick " + reset + "and " + blue + self.helpchar + "registernick" + reset + ".") # return # if event.arguments[0].startswith("You have been unbanned from ") or event.arguments[0].endswith(" autokick list is empty.") or event.arguments[0].startswith("You are already in ") or event.arguments[0] == "Syntax: UNBAN channel [nick]" or event.arguments[0] == "/msg ChanServ HELP UNBAN for more information": # return # if event.arguments[0].startswith("Channel ") and event.arguments[0].endswith(" has no key."): # return # elif event.source.nick == "Global": # return # Inform.notice_owners(self, connection, "Notice from " + red + red + event.source.nick + grey + ": " + reset + event.arguments[0]) # except: # pass # # def on_action(self, connection, event): # events.on_action.process_event(self, connection, event) # # # def on_whoreply(self, connection, event): # events.on_whoreply.process_event(self, connection, event) # # # def on_keyset(self, connection, event): # log.info(event) # Inform.owners(self, connection, "keyset " + green + event.arguments[0] + reset + " from " + red + event.source) # # def on_yourhost(self, connection, event): # log.info(event) # # def on_yourebannedcreep(self, connection, event): # log.warning(event) # # def on_youwillbebanned(self, connection, event): # log.warning(event) # Inform.operators(self, connection, "I will be banned " + red + event.arguments[0] + reset + " from " + red + event.source) # # # # DCC stuff from originalexample file. # def on_dccmsg(self, c, e): # log.info(e) # # non-chat DCC messages are raw bytes; decode as text # text = e.arguments[0].decode('utf-8') # c.privmsg("You said: " + text) # # def on_dccchat(self, c, e): # log.info(e) # if len(e.arguments) != 2: # return # args = e.arguments[1].split() # if len(args) == 4: # try: # address = ip_numstr_to_quad(args[2]) # port = int(args[3]) # except ValueError: # return # self.dcc_connect(address, port) # # def main(): # # # Check system arguments. # if len(sys.argv) != 2: # print(sys.argv) # print("Usage: rotbot ") # sys.exit(1) # instance = sys.argv[1] # Instance is the database network id. # # # Database. # db = Postgres("postgres://pyRot:4h8q(.@localhost/pyRot") # # # Get network from database. # try: # network = db.one("SELECT * FROM networks WHERE id=" + str(instance)) # except: # print("Invalid network ID.") # sys.exit(1) # if network == None: # print("Invalid network ID.") # sys.exit(1) # # bot = PyRot(network.name, db, network.home_channel, network.nickname, network.username, network.password, network.host, network.port, network.use_ssl, network.command_character, network.help_character) # bot.start() # # if __name__ == "__main__": # try: # main() # except KeyboardInterrupt: # log.info('Interrupted by keyboard.') # sys.exit(0)