| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- #! /usr/bin/env python
- import sys, ssl, random#, string,
- import irc.bot#, irc.strings #https://python-irc.readthedocs.io/en/latest/irc.html#
- #from irc.client import ip_numstr_to_quad#, ip_quad_to_numstr
- from jaraco.stream import buffer
- from postgres import Postgres # https://postgres-py.readthedocs.io/en/latest/
- import commands.public, commands.admin, commands.games, commands.statistics
- import events.on_welcome, events.on_join, events.on_kick, events.on_mode, events.on_pubmsg, events.on_action, events.on_whoreply, events.on_nick
- from common import log, font, queries
- from common.networkservices import NickServ
- from events.common import Inform #, MessageStatistics #Lastact,
- class PyRot(irc.bot.SingleServerIRCBot):
- def __init__(self, network, db, webgui):
- # Globals, so it's not needed to pass network and db with every call.
- self.network = network
- self.db = db
- self.webgui = webgui
- # Pick a random host from the network.
- hosts = db.all("SELECT * FROM rotbot_host WHERE network_id=%(network)s", network=network.id)
- host = random.choice(hosts)
- connect_string = (host.address, host.port)
- irc.client.ServerConnection.buffer_class = buffer.LenientDecodingLineBuffer # Set buffer class.
- # self.protectees = {}
- # self.channelkeys = {}
- # Log and record to database.
- log.info('Connecting to %s:%s/%s' % (connect_string[0], connect_string[1], network.home_channel))
- db.run("UPDATE rotbot_host SET connection_attempts = connection_attempts + 1 WHERE id=%s", [network.id])
- # Create connect factory with correct scheme.
- if host.ssl:
- factory = irc.connection.Factory(wrapper=ssl.wrap_socket)
- else:
- factory = irc.connection.Factory()
- try:
- irc.bot.SingleServerIRCBot.__init__(self, [connect_string], network.nickname, network.username, connect_factory=factory) # Connect.
- except irc.client.ServerConnectionError: # Connection failure.
- sys.exit(sys.exc_info()[1]) # Exit.
- ## Events.
- def on_nicknameinuse(self, connection, event):
- log.info('Nickname %s in use, attempting to recover: %s' % (self.network.nickname, connection.nickname))
- connection.nick(connection.nickname + ''.join(random.choice(string.digits) for _ in range(3))) # Take temporary nick. In this state recovering via NickServ won't work without changing nickname.
- NickServ.recover_nick(connection, self.network.password)
- def on_welcome(self, connection, event):
- events.on_welcome.process_event(self, connection, event)
- #
- def on_error(self, connection, event):
- log.notice(str(event))
- connection.privmsg(self.homechannel, 'ERROR: %s' %s (event))
- Inform.notice_owners(self, connection, 'Received error from server: %s' % (event))
- #
- # def on_nick(self, connection, event):
- # events.on_nick.process_event(self, connection, event)
- #
- def on_join(self, connection, event):
- events.on_join.process_event(self, connection, event)
- def on_mode(self, connection, event):
- events.on_mode.process_event(self, connection, event)
- def on_kick(self, connection, event):
- events.on_kick.process_event(self, connection, event)
- #
- # def on_part(self, connection, event):
- # log.info(event)
- #
- # # Update protectees.
- # if event.target == self.homechannel and event.source.nick in self.protectees: # Protectee parted home channel.
- # del self.protectees[event.source.nick] # Delete from protectees.
- #
- #
- # def on_quit(self, connection, event):
- # log.info(event)
- #
- # # Update protectees.
- # if event.source.nick in self.protectees: # Protectee parted home channel.
- # del self.protectees[event.source.nick] # Delete from protectees.
- #
- def on_invite(self, connection, event):
- log.notice(event)
- if event.target == connection.get_nickname(): # Bot invited.
- Inform.operators(self, connection, 'Received invitation to %s %s %s form %s %s %s.' % font.red, event.arguments[0], font.reset, font.red, event.source.nick, font.reset)
- #
- # def on_topic(self, connection, event):
- # log.info(event)
- #
- #
- def on_pubmsg(self, connection, event): # Channel message
- events.on_pubmsg.process_event(self, connection, event)
- def on_action(self, connection, event): # Is this both channel and private actions?
- events.on_action.process_event(self, connection, event)
- def on_privmsg(self, connection, event): # Private message
- log.info(event)
- user = queries.create_or_get_and_update_last_event(self, 'user', 'pm', user_name=event.source.nick, event_subject_name=event.target)
- channel = None
- if not event.source.nick == connection.get_nickname(): # Message is not from myself.
- Inform.owners(self, connection, 'PM from %s%s%s: %s%s' % (font.red, event.source.nick, font.grey, font.green, event.arguments[0])) # Forward message to users with owner rights of home channel.
- commands.public.do_command(self, connection, event, user, channel)
- commands.admin.do_command(self, connection, event, user, channel)
- commands.statistics.do_command(self, connection, event, user, channel)
- commands.games.do_command(self, connection, event, user, channel)
- def on_pubnotice(self, connection, event): # Channel notice
- log.info(event)
- user = queries.create_or_get_and_update_last_event(self, 'user', 'cn', channel_name=event.target, user_name=event.source.nick, event_content=event.arguments[0])
- channel = queries.create_or_get_and_update_last_event(self, 'channel', 'cn', channel_name=event.target, user_name=event.source.nick, event_content=event.arguments[0])
- def on_privnotice(self, connection, event): # Private notice.
- log.info(event)
- user = queries.create_or_get_and_update_last_event(self, 'user', 'pn', user_name=event.source.nick, event_subject_name=event.target)
- channel = None
- commands.public.do_command(self, connection, event, user, channel)
- commands.admin.do_command(self, connection, event, user, channel)
- commands.statistics.do_command(self, connection, event, user, channel)
- commands.games.do_command(self, connection, event, user, channel)
- if event.source.nick == connection.get_nickname(): # Message came from myself.
- return
- elif event.source.nick == "NickServ":
- if event.arguments[0].startswith("This nickname is registered"):
- connection.privmsg('NickServ', 'identify %s %s' % (self.network.nickname, self.network.password)) # Identify with NickServ.
- if event.arguments[0].startswith("You are already identified."):
- return
- # Nick \x02RotBot\x02 isn't registered.
- if event.arguments[0].endswith(' is not a registered nickname.') or event.arguments[0].startswith('Nick ') and event.arguments[0].endswith(' isn\'t registered.') or event.arguments[0] == 'Your nick isn\'t registered.': # Username from database is not registered.
- connection.privmsg('NickServ', 'register %s spamtBK@xs4all.nl' % (self.network.password)) # Register with NickServ.
- connection.privmsg(self.network.home_channel, 'Regisring %s%s%s with %sNickServ%s.' % (font.red, self.network.nickname, font.reset, font.red, font.reset)) # Recover control of nickname via NickServ, old style.
- log.info('Registerring with NickServ.')
- if event.arguments[0].startswith('Nickname ') and event.arguments[0].endswith(' registered.'):
- Inform.home_channel(seld, connection, 'Registerred nickname %s%s%s with NickServ.' % font.red, self.network.nickname, font.reset)
- # elif event.source.nick == "ChanServ":
- # if event.arguments[0].startswith("Key for channel ") and len(event.arguments[0]) > 5: # Received channel key.
- # self.channelkeys[event.arguments[0].split(' ')[3]] = event.arguments[0].split(' ')[5][:-1]
- # connection.join(event.arguments[0].split(' ')[3], event.arguments[0].split(' ')[5][:-1])
- # Inform.owners(self, connection, "Received " + red + event.arguments[0].split(" ")[3] + reset + " key: " + event.arguments.split(" ")[5][:-1])
- # if event.arguments[0] == "Password authentication required for that command.": # Not authenticated with NisckServ.
- # Inform.notice_owners(self, connection, "Not authenticated with NickServ. See " + blue + self.helpchar + "recovernick " + reset + "and " + blue + self.helpchar + "registernick" + reset + ".")
- # return
- # if event.arguments[0].startswith("You have been unbanned from ") or event.arguments[0].endswith(" autokick list is empty.") or event.arguments[0].startswith("You are already in ") or event.arguments[0] == "Syntax: UNBAN channel [nick]" or event.arguments[0] == "/msg ChanServ HELP UNBAN for more information":
- # return
- # if event.arguments[0].startswith("Channel ") and event.arguments[0].endswith(" has no key."):
- # return
- if event.source.nick != "Global":
- Inform.notice_owners(self, connection, 'Notice from %s %s %s %s: %s %s' % (font.red, font.red, event.source.nick, font.grey, font.reset, event.arguments[0]))
- #
- # def on_whoreply(self, connection, event):
- # events.on_whoreply.process_event(self, connection, event)
- #
- def on_keyset(self, connection, event):
- log.info(event)
- Inform.owners(self, connection, "keyset " + font.green + event.arguments[0] + reset.reset + " from " + font.red + event.source)
- def on_yourhost(self, connection, event):
- log.info(event)
- def on_yourebannedcreep(self, connection, event):
- Inform.operators(self, connection, "I am banned " + font.red + event.arguments[0] + reset.reset + " from " + font.red + event.source)
- log.warning(event)
- def on_youwillbebanned(self, connection, event):
- log.warning(event)
- Inform.operators(self, connection, "I will be banned " + font.red + event.arguments[0] + reset.reset + " from " + font.red + event.source)
- # DCC stuff from originalexample file.
- def on_dccmsg(self, c, e):
- log.info(e)
- # non-chat DCC messages are raw bytes; decode as text
- #text = e.arguments[0].decode('utf-8')
- #c.privmsg("You said: " + text)
- def on_dccchat(self, c, e):
- log.info(e)
- # if len(e.arguments) != 2:
- # return
- # args = e.arguments[1].split()
- # if len(args) == 4:
- # try:
- # address = ip_numstr_to_quad(args[2])
- # port = int(args[3])
- # except ValueError:
- # return
- # self.dcc_connect(address, port)
- def main():
- log.info('Starting RotBot: pyRot version.')
- # Validate system arguments and warn if needed..
- if len(sys.argv) != 2: # Not 2 arguments.
- sys.exit('To run type "python bot.py database_network_id", not: %s. Terminating program.' % (sys.argv)) # Terminate program.
- instance = sys.argv[1] # Instance is the database network id.
- ### SETTINGS
- webgui = {
- 'base_url': 'https://h0v1n8.nl/rotbot/', # The django rotbot app url.
- 'register_url': '',
- }
- # Database credentials
- username = 'pyrot'
- password = 'oGPnbiqh55QKLhmnKQgS92h74j0e9d6LE58cSsD1'
- db_connect_string = 'postgres://%s:%s@localhost/website' % (username, password)
- try:
- db = Postgres(db_connect_string) # Connect to database.
- except:
- sys.exit('Database connection %s failed: %s' % (db_connect_string, sys.exc_info())) # Exit.
- try:
- network = db.one("SELECT * FROM rotbot_network WHERE id=%(instance)s", instance=instance) # Get network from database.
- except:
- sys.exit('Could not retrieve network %s from database: %s' % (instance, sys.exc_info())) # Exit.
- if network == None: # No data returned.
- sys.exit('Network %s not found, terminating program.' % (instance)) # Exit.
- if network.enabled == False: # Network disabled in database.
- sys.exit('Network %s marked as disabled in database, use webgui to enable. Terminating program.' % (network.name)) # Exit
- bot = PyRot(network, db, webgui)
- bot.start()
- if __name__ == "__main__":
- try:
- main() # Run the program.
- except KeyboardInterrupt: # User presses CTRL+C
- sys.exit('Interrupted by keyboard.') # Exit.
|