chen/main.py
yakumo.izuru 46fd6ab38e Lady Yukari proceeds to smack Chen with her umbrella
Signed-off-by: Izuru Yakumo <yakumo.izuru@chaotic.ninja>

git-svn-id: file:///srv/svn/repo/chen/trunk@36 32723744-9b23-0b4a-b1da-9b2e968f9461
2024-04-01 17:52:39 +00:00

327 lines
9.9 KiB
Python

import requests
import bs4
import random
import configparser
import re
import io
import os
import mimetypes
import asyncio
from collections import defaultdict
from slixmpp import ClientXMPP
from urllib.parse import urlparse, parse_qs, urlunparse
from pantomime import normalize_mimetype
import cgi
parser = "html.parser"
user_agent = "Mozilla/5.0 (X11; Linux x86_64; rv:10.0)"
" Gecko/20100101 Firefox/10.0"
accept_lang = "en-US"
data_limit = 786400000
headers = {
"user-agent": user_agent,
"Accept-Language": accept_lang,
"Cache-Control": "no-cache",
}
block_list = (
"localhost",
"127.0.0.1",
"0.0.0.0",
"youtu.be",
"www.youtube.com",
"youtube.com",
"m.youtube.com",
"music.youtube.com",
)
req_list = (
"http://",
"https://",
)
html_files = (
"text/html",
"application/xhtml+xml",
)
class Lifo(list):
"""
Limited size LIFO array to store messages and urls
"""
def __init__(self, size):
super().__init__()
self.size = size
def add(self, item):
self.insert(0, item)
if len(self) > self.size:
self.pop()
# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
class ChenBot(ClientXMPP):
commands = {}
muc_commands = {}
messages = defaultdict(
lambda: {
"messages": Lifo(100),
"links": Lifo(10),
"previews": Lifo(10),
}
)
def get_urls(self, msg):
str_list = msg["body"].strip().split()
urls = [u for u in str_list if any(r in u for r in req_list)]
return urls
async def parse_uri(self, uri, sender, mtype):
"""Parse a URI and send the result to the sender."""
netloc = uri.netloc
if netloc.split(":")[0] in block_list:
return
else:
await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype):
"""Process a link and send the result to the sender."""
url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=5)
if not r.ok:
return
ftype = normalize_mimetype(r.headers.get("content-type"))
if ftype in html_files:
data = ""
for i in r.iter_content(chunk_size=1024, decode_unicode=False):
data += i.decode("utf-8", errors="ignore")
if len(data) > data_limit or "</head>" in data.lower():
break
soup = bs4.BeautifulSoup(data, parser)
if title := soup.find("title"):
output = title.text.strip()
if output:
output = f"*{output}*" if ("\n" not in output) else output
if output in self.messages[sender]["previews"]:
return
self.messages[sender]["previews"].add(output)
if r.history:
self.send_message(mto=sender, mbody=r.url, mtype=mtype)
self.send_message(mto=sender, mbody=output, mtype=mtype)
else:
try:
length = 0
outfile = io.BytesIO()
for chunk in r.iter_content(
chunk_size=512,
decode_unicode=False,
):
length += 512
if length >= data_limit:
return
outfile.write(chunk)
content_disposition = r.headers.get("content-disposition")
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
else:
filename = os.path.basename(uri.path)
ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception as e:
print(e)
async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
"""Embed a file and send the result to the sender."""
furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile
)
message = self.make_message(sender)
message["body"] = furl
message["type"] = mtype
message["oob"]["url"] = furl
message.send()
async def parse_urls(self, msg, urls, sender, mtype):
body = msg["body"].lower()
if "nsfl" in body:
return
if "nsfw" in body:
return
if "#nospoil" in body:
return
for u in urls:
if u in self.messages[sender]["links"]:
continue
else:
self.messages[sender]["links"].add(u)
uri = urlparse(u)
await self.parse_uri(uri, sender, mtype)
def muc_word(self, func):
name = func.__name__
self.muc_commands[name] = func
return func
def muc_command(self, func):
name = self.prefix + func.__name__
self.muc_commands[name] = func
return func
def word(self, func):
name = func.__name__
self.commands[name] = func
return func
def command(self, func):
name = self.prefix + func.__name__
self.commands[name] = func
return func
def __init__(self, jid, password, nick, prefix, autojoin=None):
ClientXMPP.__init__(self, jid, password)
self.jid = jid
self.prefix = prefix or []
self.nick = nick or []
self.autojoin = autojoin or []
self.register_plugin("xep_0030")
self.register_plugin("xep_0060")
self.register_plugin("xep_0054")
self.register_plugin("xep_0045")
self.register_plugin("xep_0066")
self.register_plugin("xep_0084")
self.register_plugin("xep_0153")
self.register_plugin("xep_0363")
self.add_event_handler("session_start", self.session_start)
self.add_event_handler("message", self.message)
self.add_event_handler("groupchat_message", self.muc_message)
self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event):
"""Start the bot."""
self.send_presence()
await self.get_roster()
await self.update_info()
for channel in self.autojoin:
try:
self.plugin["xep_0045"].join_muc(channel, self.nick)
except Exception as e:
print(e)
async def update_info(self):
"""Update the bot info."""
with open("avatar.png", "rb") as avatar_file:
avatar = avatar_file.read()
avatar_type = "image/png"
avatar_id = self.plugin["xep_0084"].generate_id(avatar)
avatar_bytes = len(avatar)
asyncio.gather(self.plugin["xep_0084"].publish_avatar(avatar))
asyncio.gather(
self.plugin["xep_0153"].set_avatar(
avatar=avatar,
mtype=avatar_type,
)
)
info = {
"id": avatar_id,
"type": avatar_type,
"bytes": avatar_bytes,
}
asyncio.gather(self.plugin["xep_0084"].publish_avatar_metadata([info]))
vcard = self.plugin["xep_0054"].make_vcard()
vcard["URL"] = "git://git.chaotic.ninja/yakumo_izuru/chen"
vcard["DESC"] = "Shikigami of the Shikigami of the Gap Youkai"
vcard["NICKNAME"] = "Chen"
vcard["FN"] = "Chen"
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
async def message(self, msg):
"""Process a message."""
if msg["type"] in ("chat", "normal"):
mtype = "chat"
sender = msg["from"].bare
message = msg["body"]
ctx = message.strip().split()
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception as e:
print(e)
cm = ctx.pop(0)
if cm in self.muc_commands:
self.muc_commands[cm](msg, ctx, sender)
async def muc_message(self, msg):
"""Process a groupchat message."""
if msg["type"] in ("groupchat", "normal"):
mtype = "groupchat"
sender = msg["from"].bare
if msg["mucnick"] == self.nick:
return
ctx = msg["body"].strip().split()
try:
if not msg["oob"]["url"]:
if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype)
except Exception:
pass
cm = ctx.pop(0)
if cm in self.muc_commands:
self.muc_commands[cm](msg, ctx, sender)
@self.muc_word
def repo(msg, ctx, sender):
if ctx:
return
bot.send_message(
mto=sender,
mbody=f"{msg['mucnick']}: https://git.chaotic.ninja/usr/yakumo_izuru/chen",
mtype="groupchat",
)
@self.word
def repo(msg, ctx, sender):
if ctx:
return
bot.send_message(
mto=sender,
mbody=f"{msg.get_from().bare}: https://git.chaotic.ninja/usr/yakumo_izuru/chen",
mtype="chat",
)
2
if __name__ == "__main__":
config = configparser.ConfigParser()
config.read("config.ini")
jid = config["chen"]["jid"]
password = config["chen"]["password"]
prefix = config["chen"]["prefix"]
nick = config["chen"]["nick"]
autojoin = config["chen"]["autojoin"].split()
bot = ChenBot(jid, password, nick, prefix, autojoin=autojoin)
bot.connect()
bot.process(forever=True)