Reintroduce commands from angel, and take some fixes too

Signed-off-by: Izuru Yakumo <yakumo.izuru@chaotic.ninja>

git-svn-id: file:///srv/svn/repo/chen/trunk@34 32723744-9b23-0b4a-b1da-9b2e968f9461
This commit is contained in:
yakumo.izuru 2024-04-01 01:23:38 +00:00
parent eacd51b957
commit 56be85d57c
4 changed files with 92 additions and 13 deletions

10
Makefile Normal file
View File

@ -0,0 +1,10 @@
# $TheSupernovaDuo$
all:
@echo "Commands available"
@echo "=================="
@echo "deps -- fetch and install dependencies"
@echo "format -- format code using python-black"
deps:
pip install --user -r requirements.txt
format:
black main.py

5
README
View File

@ -4,6 +4,11 @@ chen
XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai XMPP bot to preview links and file contents. Shikigami of the Shikigami of the Gap Youkai
Based on Angel[1], without the sed(1) and YT redirect features. Based on Angel[1], without the sed(1) and YT redirect features.
Requirements
------------
* Python >= 3.7
Run Run
--- ---

View File

@ -2,4 +2,5 @@
jid = chen@example.com jid = chen@example.com
password = b0TPA55W0rD password = b0TPA55W0rD
nick = Chen nick = Chen
autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com autojoin = room1@muc.example.com room2@muc.example.com room3@muc.example.com
prefix = !

87
main.py
View File

@ -34,7 +34,6 @@ block_list = (
"m.youtube.com", "m.youtube.com",
"music.youtube.com", "music.youtube.com",
) )
# FIXME: Gopher support when?
req_list = ( req_list = (
"http://", "http://",
"https://", "https://",
@ -62,6 +61,9 @@ class Lifo(list):
# Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen # Cheeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeen
class ChenBot(ClientXMPP): class ChenBot(ClientXMPP):
commands = {}
muc_commands = {}
messages = defaultdict( messages = defaultdict(
lambda: { lambda: {
"messages": Lifo(100), "messages": Lifo(100),
@ -76,6 +78,7 @@ class ChenBot(ClientXMPP):
return urls return urls
async def parse_uri(self, uri, sender, mtype): async def parse_uri(self, uri, sender, mtype):
"""Parse a URI and send the result to the sender."""
netloc = uri.netloc netloc = uri.netloc
if netloc.split(":")[0] in block_list: if netloc.split(":")[0] in block_list:
return return
@ -83,6 +86,7 @@ class ChenBot(ClientXMPP):
await self.process_link(uri, sender, mtype) await self.process_link(uri, sender, mtype)
async def process_link(self, uri, sender, mtype): async def process_link(self, uri, sender, mtype):
"""Process a link and send the result to the sender."""
url = urlunparse(uri) url = urlunparse(uri)
r = requests.get(url, stream=True, headers=headers, timeout=5) r = requests.get(url, stream=True, headers=headers, timeout=5)
if not r.ok: if not r.ok:
@ -121,17 +125,21 @@ class ChenBot(ClientXMPP):
outfile.write(chunk) outfile.write(chunk)
content_disposition = r.headers.get("content-disposition") content_disposition = r.headers.get("content-disposition")
filename = None
if content_disposition:
_, params = cgi.parse_header(content_disposition)
filename = params.get("filename")
else:
filename = os.path.basename(uri.path)
_, params = cgi.parse_header(content_disposition) ext = os.path.splitext(filename)[1] if filename else ".txt"
fname = filename if filename else f"file{ext}"
filename = params.get("filename")
ext = os.path.splitext(filename)[1] if filename else None or ".bin"
fname = filename or uri.path.strip("/").split("/")[-1] or f"file{ext}"
await self.embed_file(url, sender, mtype, ftype, fname, outfile) await self.embed_file(url, sender, mtype, ftype, fname, outfile)
except Exception: except Exception as e:
... print(e)
async def embed_file(self, url, sender, mtype, ftype, fname, outfile): async def embed_file(self, url, sender, mtype, ftype, fname, outfile):
"""Embed a file and send the result to the sender."""
furl = await self.plugin["xep_0363"].upload_file( furl = await self.plugin["xep_0363"].upload_file(
fname, content_type=ftype, input_file=outfile fname, content_type=ftype, input_file=outfile
) )
@ -158,9 +166,30 @@ class ChenBot(ClientXMPP):
uri = urlparse(u) uri = urlparse(u)
await self.parse_uri(uri, sender, mtype) await self.parse_uri(uri, sender, mtype)
def muc_word(self, func):
name = func.__name__
self.muc_commands[name] = func
return func
def muc_command(self, func):
name = self.prefix + func.__name__
self.muc_commands[name] = func
return func
def word(self, func):
name = func.__name__
self.commands[name] = func
return func
def command(self, func):
name = self.prefix + func.__name__
self.commands[name] = func
return func
def __init__(self, jid, password, nick, autojoin=None): def __init__(self, jid, password, nick, autojoin=None):
ClientXMPP.__init__(self, jid, password) ClientXMPP.__init__(self, jid, password)
self.jid = jid self.jid = jid
self.prefix = prefix or []
self.nick = nick or [] self.nick = nick or []
self.autojoin = autojoin or [] self.autojoin = autojoin or []
self.register_plugin("xep_0030") self.register_plugin("xep_0030")
@ -178,16 +207,18 @@ class ChenBot(ClientXMPP):
self.add_event_handler("disconnected", lambda _: self.connect()) self.add_event_handler("disconnected", lambda _: self.connect())
async def session_start(self, event): async def session_start(self, event):
"""Start the bot."""
self.send_presence() self.send_presence()
await self.get_roster() await self.get_roster()
await self.update_info() await self.update_info()
for channel in self.autojoin: for channel in self.autojoin:
try: try:
self.plugin["xep_0045"].join_muc(channel, self.nick) self.plugin["xep_0045"].join_muc(channel, self.nick)
except: except Exception as e:
... print(e)
async def update_info(self): async def update_info(self):
"""Update the bot info."""
with open("avatar.png", "rb") as avatar_file: with open("avatar.png", "rb") as avatar_file:
avatar = avatar_file.read() avatar = avatar_file.read()
@ -219,24 +250,35 @@ class ChenBot(ClientXMPP):
asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard)) asyncio.gather(self.plugin["xep_0054"].publish_vcard(vcard))
async def message(self, msg): async def message(self, msg):
"""Process a message."""
if msg["type"] in ("chat", "normal"): if msg["type"] in ("chat", "normal"):
mtype = "chat" mtype = "chat"
sender = msg["from"].bare sender = msg["from"].bare
message = msg["body"]
ctx = message.strip().split()
try: try:
if not msg["oob"]["url"]: if not msg["oob"]["url"]:
if urls := self.get_urls(msg): if urls := self.get_urls(msg):
await self.parse_urls(msg, urls, sender, mtype) await self.parse_urls(msg, urls, sender, mtype)
except Exception: except Exception as e:
... print(e)
cm = ctx.pop(0)
if cm in self.muc_commands:
self.muc_commands[cm](msg, ctx, sender)
async def muc_message(self, msg): async def muc_message(self, msg):
"""Process a groupchat message."""
if msg["type"] in ("groupchat", "normal"): if msg["type"] in ("groupchat", "normal"):
mtype = "groupchat" mtype = "groupchat"
sender = msg["from"].bare sender = msg["from"].bare
if msg["mucnick"] == self.nick: if msg["mucnick"] == self.nick:
return return
ctx = msg["body"].strip().split()
try: try:
if not msg["oob"]["url"]: if not msg["oob"]["url"]:
if urls := self.get_urls(msg): if urls := self.get_urls(msg):
@ -244,11 +286,32 @@ class ChenBot(ClientXMPP):
except Exception: except Exception:
pass pass
cm = ctx.pop(0)
if cm in self.muc_commands:
self.muc_commands[cm](msg, ctx, sender)
@self.muc_word
def repo(msg, ctx, sender):
if ctx:
return
bot.send_message(
mto=sender, mbody=f"{msg['mucnick']}: https://git.chaotic.ninja/usr/yakumo_izuru/chen", mtype="groupchat",
)
@self.word
def repo(msg, ctx, sender):
if ctx:
return
bot.send_message(
mto=sender, mbody=f"{msg.get_from().bare}: https://git.chaotic.ninja/usr/yakumo_izuru/chen", mtype="chat",
)
2
if __name__ == "__main__": if __name__ == "__main__":
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read("config.ini") config.read("config.ini")
jid = config["chen"]["jid"] jid = config["chen"]["jid"]
prefix = config["chen"]["prefix"]
password = config["chen"]["password"] password = config["chen"]["password"]
nick = config["chen"]["nick"] nick = config["chen"]["nick"]
autojoin = config["chen"]["autojoin"].split() autojoin = config["chen"]["autojoin"].split()