# -*- coding: utf-8 -*- # Copyright © 2018, 2019 Damir Jelić # Copyright © 2018, 2019 Denis Kasak # # Permission to use, copy, modify, and/or distribute this software for # any purpose with or without fee is hereby granted, provided that the # above copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY # SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER # RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from __future__ import unicode_literals import argparse import os import re from builtins import str from future.moves.itertools import zip_longest from collections import defaultdict from functools import partial from nio import EncryptionError, LocalProtocolError from . import globals as G from .colors import Formatted from .globals import SERVERS, W, UPLOADS, SCRIPT_NAME from .server import MatrixServer from .utf import utf8_decode from .utils import key_from_value, parse_redact_args from .uploads import UploadsBuffer, Upload try: from urllib.parse import urlparse except ImportError: from urlparse import urlparse # type: ignore class ParseError(Exception): pass class WeechatArgParse(argparse.ArgumentParser): def print_usage(self, file=None): pass def error(self, message): message = ( "{prefix}Error: {message} for command {command} " "(see /help {command})" ).format(prefix=W.prefix("error"), message=message, command=self.prog) W.prnt("", message) raise ParseError class WeechatCommandParser(object): @staticmethod def _run_parser(parser, args): try: parsed_args = parser.parse_args(args.split()) return parsed_args except ParseError: return None @staticmethod def topic(args): parser = WeechatArgParse(prog="topic") parser.add_argument("-delete", action="store_true") parser.add_argument("topic", nargs="*") return WeechatCommandParser._run_parser(parser, args) @staticmethod def kick(args): parser = WeechatArgParse(prog="kick") parser.add_argument("user_id") parser.add_argument("reason", nargs="*") return WeechatCommandParser._run_parser(parser, args) @staticmethod def invite(args): parser = WeechatArgParse(prog="invite") parser.add_argument("user_id") return WeechatCommandParser._run_parser(parser, args) @staticmethod def join(args): parser = WeechatArgParse(prog="join") parser.add_argument("room_id") return WeechatCommandParser._run_parser(parser, args) @staticmethod def part(args): parser = WeechatArgParse(prog="part") parser.add_argument("room_id", nargs="?") return WeechatCommandParser._run_parser(parser, args) @staticmethod def devices(args): parser = WeechatArgParse(prog="devices") subparsers = parser.add_subparsers(dest="subcommand") subparsers.add_parser("list") delete_parser = subparsers.add_parser("delete") delete_parser.add_argument("device_id") name_parser = subparsers.add_parser("set-name") name_parser.add_argument("device_id") name_parser.add_argument("device_name", nargs="*") return WeechatCommandParser._run_parser(parser, args) @staticmethod def olm(args): parser = WeechatArgParse(prog="olm") subparsers = parser.add_subparsers(dest="subcommand") info_parser = subparsers.add_parser("info") info_parser.add_argument( "category", nargs="?", default="private", choices=[ "all", "blacklisted", "private", "unverified", "verified", "ignored" ]) info_parser.add_argument("filter", nargs="?") verify_parser = subparsers.add_parser("verify") verify_parser.add_argument("user_filter") verify_parser.add_argument("device_filter", nargs="?") unverify_parser = subparsers.add_parser("unverify") unverify_parser.add_argument("user_filter") unverify_parser.add_argument("device_filter", nargs="?") blacklist_parser = subparsers.add_parser("blacklist") blacklist_parser.add_argument("user_filter") blacklist_parser.add_argument("device_filter", nargs="?") unblacklist_parser = subparsers.add_parser("unblacklist") unblacklist_parser.add_argument("user_filter") unblacklist_parser.add_argument("device_filter", nargs="?") ignore_parser = subparsers.add_parser("ignore") ignore_parser.add_argument("user_filter") ignore_parser.add_argument("device_filter", nargs="?") unignore_parser = subparsers.add_parser("unignore") unignore_parser.add_argument("user_filter") unignore_parser.add_argument("device_filter", nargs="?") export_parser = subparsers.add_parser("export") export_parser.add_argument("file") export_parser.add_argument("passphrase") import_parser = subparsers.add_parser("import") import_parser.add_argument("file") import_parser.add_argument("passphrase") sas_parser = subparsers.add_parser("verification") sas_parser.add_argument( "action", choices=[ "start", "accept", "confirm", "cancel", ]) sas_parser.add_argument("user_id") sas_parser.add_argument("device_id") return WeechatCommandParser._run_parser(parser, args) @staticmethod def room(args): parser = WeechatArgParse(prog="room") subparsers = parser.add_subparsers(dest="subcommand") typing_notification = subparsers.add_parser("typing-notifications") typing_notification.add_argument( "state", choices=["enable", "disable", "toggle"] ) read_markers = subparsers.add_parser("read-markers") read_markers.add_argument( "state", choices=["enable", "disable", "toggle"] ) return WeechatCommandParser._run_parser(parser, args) @staticmethod def uploads(args): parser = WeechatArgParse(prog="uploads") subparsers = parser.add_subparsers(dest="subcommand") subparsers.add_parser("list") subparsers.add_parser("listfull") subparsers.add_parser("up") subparsers.add_parser("down") return WeechatCommandParser._run_parser(parser, args) @staticmethod def upload(args): parser = WeechatArgParse(prog="upload") parser.add_argument("file") return WeechatCommandParser._run_parser(parser, args) def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue) def partition_key(key): groups = grouper(key, 4, " ") return ' '.join(''.join(g) for g in groups) def hook_commands(): W.hook_command( # Command name and short description "matrix", "Matrix chat protocol command", # Synopsis ( "server add [:] ||" "server delete|list|listfull ||" "connect ||" "disconnect ||" "reconnect ||" "help " ), # Description ( " server: list, add, or remove Matrix servers\n" " connect: connect to Matrix servers\n" "disconnect: disconnect from one or all Matrix servers\n" " reconnect: reconnect to server(s)\n" " help: show detailed command help\n\n" "Use /matrix help [command] to find out more.\n" ), # Completions ( "server %(matrix_server_commands)|%* ||" "connect %(matrix_servers) ||" "disconnect %(matrix_servers) ||" "reconnect %(matrix_servers) ||" "help %(matrix_commands)" ), # Function name "matrix_command_cb", "", ) W.hook_command( # Command name and short description "redact", "redact messages", # Synopsis ('[:""] []'), # Description ( " event-id: event id of the message that will be redacted\n" "message-part: an initial part of the message (ignored, only " "used\n" " as visual feedback when using completion)\n" " reason: the redaction reason\n" ), # Completions ("%(matrix_messages)"), # Function name "matrix_redact_command_cb", "", ) W.hook_command( # Command name and short description "reply-matrix", "reply to a message", # Synopsis ('[:""] []'), # Description ( " event-id: event id of the message that will be replied to\n" "message-part: an initial part of the message (ignored, only " "used\n" " as visual feedback when using completion)\n" " reply: the reply\n" ), # Completions ("%(matrix_messages)"), # Function name "matrix_reply_command_cb", "", ) W.hook_command( # Command name and short description "topic", "get/set the room topic", # Synopsis ("[|-delete]"), # Description (" topic: topic to set\n" "-delete: delete room topic"), # Completions "", # Callback "matrix_topic_command_cb", "", ) W.hook_command( # Command name and short description "me", "send an emote message to the current room", # Synopsis (""), # Description ("message: message to send"), # Completions "", # Callback "matrix_me_command_cb", "", ) W.hook_command( # Command name and short description "kick", "kick a user from the current room", # Synopsis (" []"), # Description ( "user-id: user-id to kick\n" " reason: reason why the user was kicked" ), # Completions ("%(matrix_users)"), # Callback "matrix_kick_command_cb", "", ) W.hook_command( # Command name and short description "invite", "invite a user to the current room", # Synopsis (""), # Description ("user-id: user-id to invite"), # Completions ("%(matrix_users)"), # Callback "matrix_invite_command_cb", "", ) W.hook_command( # Command name and short description "join", "join a room", # Synopsis ("|"), # Description ( " room-id: room-id of the room to join\n" "room-alias: room alias of the room to join" ), # Completions "", # Callback "matrix_join_command_cb", "", ) W.hook_command( # Command name and short description "part", "leave a room", # Synopsis ("[]"), # Description (" room-name: room name of the room to leave"), # Completions "", # Callback "matrix_part_command_cb", "", ) W.hook_command( # Command name and short description "devices", "list, delete or rename matrix devices", # Synopsis ("list ||" "delete ||" "set-name " ), # Description ("device-id: device id of the device to delete\n" " name: new device name to set\n"), # Completions ("list ||" "delete %(matrix_own_devices) ||" "set-name %(matrix_own_devices)"), # Callback "matrix_devices_command_cb", "", ) W.hook_command( # Command name and short description "olm", "Matrix olm encryption configuration command", # Synopsis ("info all|blacklisted|ignored|private|unverified|verified ||" "blacklist ||" "unverify ||" "verify ||" "verification start|accept|cancel|confirm ||" "ignore ||" "unignore ||" "export ||" "import " ), # Description (" info: show info about known devices and their keys\n" " blacklist: blacklist a device\n" "unblacklist: unblacklist a device\n" " unverify: unverify a device\n" " verify: verify a device\n" " ignore: ignore an unverifiable but non-blacklist-worthy device\n" " unignore: unignore a device\n" "verification: manage interactive device verification\n" " export: export encryption keys\n" " import: import encryption keys\n\n" "Examples:" "\n /olm verify @example:example.com *" "\n /olm info all example*" ), # Completions ('info all|blacklisted|ignored|private|unverified|verified ||' 'blacklist %(olm_user_ids) %(olm_devices) ||' 'unblacklist %(olm_user_ids) %(olm_devices) ||' 'unverify %(olm_user_ids) %(olm_devices) ||' 'verify %(olm_user_ids) %(olm_devices) ||' 'verification start|accept|cancel|confirm %(olm_user_ids) %(olm_devices) ||' 'ignore %(olm_user_ids) %(olm_devices) ||' 'unignore %(olm_user_ids) %(olm_devices) ||' 'export %(filename) ||' 'import %(filename)' ), # Function name 'matrix_olm_command_cb', '') W.hook_command( # Command name and short description "room", "change room state", # Synopsis ("typing-notifications ||" "read-markers " ), # Description ("state: one of enable, disable or toggle\n"), # Completions ("typing-notifications enable|disable|toggle||" "read-markers enable|disable|toggle" ), # Callback "matrix_room_command_cb", "", ) # W.hook_command( # # Command name and short description # "uploads", # "Open the uploads buffer or list uploads in the core buffer", # # Synopsis # ("list||" # "listfull" # ), # # Description # (""), # # Completions # ("list ||" # "listfull"), # # Callback # "matrix_uploads_command_cb", # "", # ) W.hook_command( # Command name and short description "upload", "Upload a file to a room", # Synopsis (""), # Description (""), # Completions ("%(filename)"), # Callback "matrix_upload_command_cb", "", ) W.hook_command( # Command name and short description "send-anyways", "Send the last message in a room ignorin unverified devices.", # Synopsis "", # Description "Send the last message in a room despite there being unverified " "devices. The unverified devices will be marked as ignored after " "running this command.", # Completions "", # Callback "matrix_send_anyways_cb", "", ) W.hook_command_run("/buffer clear", "matrix_command_buf_clear_cb", "") if G.CONFIG.network.fetch_backlog_on_pgup: hook_page_up() def hook_key_bindings(): W.hook_hsignal("matrix_cursor_reply", "matrix_cursor_reply_signal_cb", "") binding = "@chat(python.{}*):r".format(G.BUFFER_NAME_PREFIX) W.key_bind("cursor", { binding: "hsignal:matrix_cursor_reply", }) def format_device(device_id, fp_key, display_name): fp_key = partition_key(fp_key) message = (" - Device ID: {device_color}{device_id}{ncolor}\n" " - Display name: {device_color}{display_name}{ncolor}\n" " - Device key: {key_color}{fp_key}{ncolor}").format( device_color=W.color("chat_channel"), device_id=device_id, ncolor=W.color("reset"), display_name=display_name, key_color=W.color("chat_server"), fp_key=fp_key) return message def olm_info_command(server, args): def print_devices( device_store, filter_regex, device_category="All", predicate=None, ): user_strings = [] try: filter_regex = re.compile(args.filter) if args.filter else None except re.error as e: server.error("Invalid regular expression: {}.".format(e.args[0])) return for user_id in sorted(device_store.users): device_strings = [] for device in device_store.active_user_devices(user_id): if filter_regex: if (not filter_regex.search(user_id) and not filter_regex.search(device.id)): continue if predicate: if not predicate(device): continue device_strings.append(format_device( device.id, device.ed25519, device.display_name )) if not device_strings: continue d_string = "\n".join(device_strings) message = (" - User: {user_color}{user}{ncolor}\n").format( user_color=W.color("chat_nick"), user=user_id, ncolor=W.color("reset")) message += d_string user_strings.append(message) if not user_strings: message = ("{prefix}matrix: No matching devices " "found.").format(prefix=W.prefix("error")) W.prnt(server.server_buffer, message) return server.info("{} devices:\n".format(device_category)) W.prnt(server.server_buffer, "\n".join(user_strings)) olm = server.client.olm if not hasattr(args, 'category') or args.category == "private": fp_key = partition_key(olm.account.identity_keys["ed25519"]) message = ("Identity keys:\n" " - User: {user_color}{user}{ncolor}\n" " - Device ID: {device_color}{device_id}{ncolor}\n" " - Device key: {key_color}{fp_key}{ncolor}\n" "").format( user_color=W.color("chat_self"), ncolor=W.color("reset"), user=olm.user_id, device_color=W.color("chat_channel"), device_id=olm.device_id, key_color=W.color("chat_server"), fp_key=fp_key) server.info(message) elif args.category == "all": print_devices(olm.device_store, args.filter) elif args.category == "verified": print_devices( olm.device_store, args.filter, "Verified", olm.is_device_verified ) elif args.category == "unverified": def predicate(device): return not olm.is_device_verified(device) print_devices( olm.device_store, args.filter, "Unverified", predicate ) elif args.category == "blacklisted": print_devices( olm.device_store, args.filter, "Blacklisted", olm.is_device_blacklisted ) elif args.category == "ignored": print_devices( olm.device_store, args.filter, "Ignored", olm.is_device_ignored ) def olm_action_command(server, args, category, error_category, prefix, action): device_store = server.client.olm.device_store users = [] if args.user_filter == "*": users = device_store.users else: users = [x for x in device_store.users if args.user_filter in x] user_devices = { user: device_store.active_user_devices(user) for user in users } if args.device_filter and args.device_filter != "*": filtered_user_devices = {} for user, device_list in user_devices.items(): filtered_devices = filter( lambda x: args.device_filter in x.id, device_list ) filtered_user_devices[user] = list(filtered_devices) user_devices = filtered_user_devices changed_devices = defaultdict(list) for user, device_list in user_devices.items(): for device in device_list: if action(device): changed_devices[user].append(device) if not changed_devices: message = ("{prefix}matrix: No matching {error_category} devices " "found.").format( prefix=W.prefix("error"), error_category=error_category ) W.prnt(server.server_buffer, message) return user_strings = [] for user_id, device_list in changed_devices.items(): device_strings = [] message = (" - User: {user_color}{user}{ncolor}\n").format( user_color=W.color("chat_nick"), user=user_id, ncolor=W.color("reset")) for device in device_list: device_strings.append(format_device( device.id, device.ed25519, device.display_name )) if not device_strings: continue d_string = "\n".join(device_strings) message += d_string user_strings.append(message) W.prnt(server.server_buffer, "{}matrix: {} key(s):\n".format(W.prefix("prefix"), category)) W.prnt(server.server_buffer, "\n".join(user_strings)) pass def olm_verify_command(server, args): olm_action_command( server, args, "Verified", "unverified", "join", server.client.verify_device ) def olm_unverify_command(server, args): olm_action_command( server, args, "Unverified", "verified", "quit", server.client.unverify_device ) def olm_blacklist_command(server, args): olm_action_command( server, args, "Blacklisted", "unblacklisted", "join", server.client.blacklist_device ) def olm_unblacklist_command(server, args): olm_action_command( server, args, "Unblacklisted", "blacklisted", "join", server.client.unblacklist_device ) def olm_ignore_command(server, args): olm_action_command( server, args, "Ignored", "ignored", "join", server.client.ignore_device ) def olm_unignore_command(server, args): olm_action_command( server, args, "Unignored", "unignored", "join", server.client.unignore_device ) def olm_export_command(server, args): file_path = os.path.expanduser(args.file) try: server.client.export_keys(file_path, args.passphrase) except (OSError, IOError) as e: server.error("Error exporting keys: {}".format(str(e))) server.info("Successfully exported keys") def olm_import_command(server, args): file_path = os.path.expanduser(args.file) try: server.client.import_keys(file_path, args.passphrase) except (OSError, IOError, EncryptionError) as e: server.error("Error importing keys: {}".format(str(e))) server.info("Successfully imported keys") def olm_sas_command(server, args): try: device_store = server.client.device_store except LocalProtocolError: server.error("The device store is not loaded") return W.WEECHAT_RC_OK try: device = device_store[args.user_id][args.device_id] except KeyError: server.error("Device {} of user {} not found".format( args.device_id, args.user_id )) return W.WEECHAT_RC_OK if device.deleted: server.error("Device {} of user {} is deleted.".format( args.device_id, args.user_id )) return W.WEECHAT_RC_OK if args.action == "start": server.start_verification(device) elif args.action in ["accept", "confirm", "cancel"]: sas = server.client.get_active_sas(args.user_id, args.device_id) if not sas: server.error("No active key verification found for " "device {} of user {}.".format( args.device_id, args.user_id )) return W.WEECHAT_RC_OK try: if args.action == "accept": server.accept_sas(sas) elif args.action == "confirm": server.confirm_sas(sas) elif args.action == "cancel": server.cancel_sas(sas) except LocalProtocolError as e: server.error(str(e)) @utf8_decode def matrix_olm_command_cb(data, buffer, args): def command(server, data, buffer, args): parsed_args = WeechatCommandParser.olm(args) if not parsed_args: return W.WEECHAT_RC_OK if not server.client.olm: W.prnt(server.server_buffer, "{}matrix: Olm account isn't " "loaded.".format(W.prefix("error"))) return W.WEECHAT_RC_OK if not parsed_args.subcommand or parsed_args.subcommand == "info": olm_info_command(server, parsed_args) elif parsed_args.subcommand == "export": olm_export_command(server, parsed_args) elif parsed_args.subcommand == "import": olm_import_command(server, parsed_args) elif parsed_args.subcommand == "verify": olm_verify_command(server, parsed_args) elif parsed_args.subcommand == "unverify": olm_unverify_command(server, parsed_args) elif parsed_args.subcommand == "blacklist": olm_blacklist_command(server, parsed_args) elif parsed_args.subcommand == "unblacklist": olm_unblacklist_command(server, parsed_args) elif parsed_args.subcommand == "verification": olm_sas_command(server, parsed_args) elif parsed_args.subcommand == "ignore": olm_ignore_command(server, parsed_args) elif parsed_args.subcommand == "unignore": olm_unignore_command(server, parsed_args) else: message = ("{prefix}matrix: Command not implemented.".format( prefix=W.prefix("error"))) W.prnt(server.server_buffer, message) W.bar_item_update("buffer_modes") W.bar_item_update("matrix_modes") return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer in server.buffers.values(): return command(server, data, buffer, args) elif buffer == server.server_buffer: return command(server, data, buffer, args) W.prnt("", "{prefix}matrix: command \"olm\" must be executed on a " "matrix buffer (server or channel)".format( prefix=W.prefix("error") )) return W.WEECHAT_RC_OK @utf8_decode def matrix_devices_command_cb(data, buffer, args): for server in SERVERS.values(): if buffer in server.buffers.values() or buffer == server.server_buffer: parsed_args = WeechatCommandParser.devices(args) if not parsed_args: return W.WEECHAT_RC_OK if not parsed_args.subcommand or parsed_args.subcommand == "list": server.devices() elif parsed_args.subcommand == "delete": server.delete_device(parsed_args.device_id) elif parsed_args.subcommand == "set-name": new_name = " ".join(parsed_args.device_name).strip("\"") server.rename_device(parsed_args.device_id, new_name) return W.WEECHAT_RC_OK W.prnt("", "{prefix}matrix: command \"devices\" must be executed on a " "matrix buffer (server or channel)".format( prefix=W.prefix("error") )) return W.WEECHAT_RC_OK @utf8_decode def matrix_me_command_cb(data, buffer, args): for server in SERVERS.values(): if buffer in server.buffers.values(): if not server.connected: message = ( "{prefix}matrix: you are not connected to " "the server" ).format(prefix=W.prefix("error")) W.prnt(server.server_buffer, message) return W.WEECHAT_RC_ERROR room_buffer = server.find_room_from_ptr(buffer) if not server.client.logged_in: room_buffer.error("You are not logged in.") return W.WEECHAT_RC_ERROR if not args: return W.WEECHAT_RC_OK formatted_data = Formatted.from_input_line(args) server.room_send_message(room_buffer, formatted_data, "m.emote") return W.WEECHAT_RC_OK if buffer == server.server_buffer: message = ( '{prefix}matrix: command "me" must be ' "executed on a Matrix channel buffer" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK @utf8_decode def matrix_topic_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.topic(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer == server.server_buffer: server.error( 'command "topic" must be ' "executed on a Matrix room buffer" ) return W.WEECHAT_RC_OK room = server.find_room_from_ptr(buffer) if not room: continue if not parsed_args.topic and not parsed_args.delete: # TODO print the current topic return W.WEECHAT_RC_OK if parsed_args.delete and parsed_args.topic: # TODO error message return W.WEECHAT_RC_OK topic = "" if parsed_args.delete else " ".join(parsed_args.topic) content = {"topic": topic} server.room_send_state(room, content, "m.room.topic") return W.WEECHAT_RC_OK def matrix_fetch_old_messages(server, room_id): room_buffer = server.find_room_from_id(room_id) room = room_buffer.room if room_buffer.backlog_pending: return prev_batch = room.prev_batch if not prev_batch: return raise NotImplementedError def check_server_existence(server_name, servers): if server_name not in servers: message = "{prefix}matrix: No such server: {server}".format( prefix=W.prefix("error"), server=server_name ) W.prnt("", message) return False return True def hook_page_up(): G.CONFIG.page_up_hook = W.hook_command_run( "/window page_up", "matrix_command_pgup_cb", "" ) @utf8_decode def matrix_command_buf_clear_cb(data, buffer, command): for server in SERVERS.values(): if buffer in server.buffers.values(): room_buffer = server.find_room_from_ptr(buffer) room_buffer.room.prev_batch = server.next_batch return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK @utf8_decode def matrix_command_pgup_cb(data, buffer, command): # TODO the highlight status of a line isn't allowed to be updated/changed # via hdata, therefore the highlight status of a messages can't be # reoredered this would need to be fixed in weechat # TODO we shouldn't fetch and print out more messages than # max_buffer_lines_number or older messages than max_buffer_lines_minutes for server in SERVERS.values(): if buffer in server.buffers.values(): window = W.window_search_with_buffer(buffer) first_line_displayed = bool( W.window_get_integer(window, "first_line_displayed") ) room_buffer = server.find_room_from_ptr(buffer) if first_line_displayed or room_buffer.weechat_buffer.num_lines == 0: room_id = key_from_value(server.buffers, buffer) server.room_get_messages(room_id) return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK @utf8_decode def matrix_join_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.join(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer in server.buffers.values() or buffer == server.server_buffer: server.room_join(parsed_args.room_id) break return W.WEECHAT_RC_OK @utf8_decode def matrix_part_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.part(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer in server.buffers.values() or buffer == server.server_buffer: room_id = parsed_args.room_id if not room_id: if buffer == server.server_buffer: server.error( 'command "part" must be ' "executed on a Matrix room buffer or a room " "name needs to be given" ) return W.WEECHAT_RC_OK room_buffer = server.find_room_from_ptr(buffer) room_id = room_buffer.room.room_id server.room_leave(room_id) break return W.WEECHAT_RC_OK @utf8_decode def matrix_invite_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.invite(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer == server.server_buffer: server.error( 'command "invite" must be ' "executed on a Matrix room buffer" ) return W.WEECHAT_RC_OK room = server.find_room_from_ptr(buffer) if not room: continue user_id = parsed_args.user_id user_id = user_id if user_id.startswith("@") else "@" + user_id server.room_invite(room, user_id) break return W.WEECHAT_RC_OK @utf8_decode def matrix_room_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.room(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer == server.server_buffer: server.error( 'command "room" must be ' "executed on a Matrix room buffer" ) return W.WEECHAT_RC_OK room = server.find_room_from_ptr(buffer) if not room: continue if not parsed_args.subcommand or parsed_args.subcommand == "list": server.error("command no subcommand found") return W.WEECHAT_RC_OK if parsed_args.subcommand == "typing-notifications": if parsed_args.state == "enable": room.typing_enabled = True elif parsed_args.state == "disable": room.typing_enabled = False elif parsed_args.state == "toggle": room.typing_enabled = not room.typing_enabled break elif parsed_args.subcommand == "read-markers": if parsed_args.state == "enable": room.read_markers_enabled = True elif parsed_args.state == "disable": room.read_markers_enabled = False elif parsed_args.state == "toggle": room.read_markers_enabled = not room.read_markers_enabled break return W.WEECHAT_RC_OK @utf8_decode def matrix_uploads_command_cb(data, buffer, args): if not args: if not G.CONFIG.upload_buffer: G.CONFIG.upload_buffer = UploadsBuffer() G.CONFIG.upload_buffer.display() return W.WEECHAT_RC_OK parsed_args = WeechatCommandParser.uploads(args) if not parsed_args: return W.WEECHAT_RC_OK if parsed_args.subcommand == "list": pass elif parsed_args.subcommand == "listfull": pass elif parsed_args.subcommand == "up": if G.CONFIG.upload_buffer: G.CONFIG.upload_buffer.move_line_up() elif parsed_args.subcommand == "down": if G.CONFIG.upload_buffer: G.CONFIG.upload_buffer.move_line_down() return W.WEECHAT_RC_OK @utf8_decode def matrix_upload_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.upload(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer == server.server_buffer: server.error( 'command "upload" must be ' "executed on a Matrix room buffer" ) return W.WEECHAT_RC_OK room_buffer = server.find_room_from_ptr(buffer) if not room_buffer: continue upload = Upload( server.name, server.config.address, server.client.access_token, room_buffer.room.room_id, parsed_args.file, room_buffer.room.encrypted ) UPLOADS[upload.uuid] = upload if G.CONFIG.upload_buffer: G.CONFIG.upload_buffer.render() break return W.WEECHAT_RC_OK @utf8_decode def matrix_kick_command_cb(data, buffer, args): parsed_args = WeechatCommandParser.kick(args) if not parsed_args: return W.WEECHAT_RC_OK for server in SERVERS.values(): if buffer == server.server_buffer: server.error( 'command "kick" must be ' "executed on a Matrix room buffer" ) return W.WEECHAT_RC_OK room = server.find_room_from_ptr(buffer) if not room: continue user_id = parsed_args.user_id user_id = user_id if user_id.startswith("@") else "@" + user_id reason = " ".join(parsed_args.reason) if parsed_args.reason else None server.room_kick(room, user_id, reason) break return W.WEECHAT_RC_OK @utf8_decode def matrix_redact_command_cb(data, buffer, args): def already_redacted(line): if SCRIPT_NAME + "_redacted" in line.tags: return True return False def predicate(event_id, line): event_tag = SCRIPT_NAME + "_id_{}".format(event_id) tags = line.tags if event_tag in tags: return True return False for server in SERVERS.values(): if buffer in server.buffers.values(): room_buffer = server.find_room_from_ptr(buffer) event_id, reason = parse_redact_args(args) if not event_id: message = ( "{prefix}matrix: Invalid command " "arguments (see /help redact)" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_ERROR lines = room_buffer.weechat_buffer.find_lines( partial(predicate, event_id), max_lines=1 ) if not lines: room_buffer.error( "No such message with event id " "{event_id} found.".format(event_id=event_id)) return W.WEECHAT_RC_OK if already_redacted(lines[0]): room_buffer.error("Event already redacted.") return W.WEECHAT_RC_OK server.room_send_redaction(room_buffer, event_id, reason) return W.WEECHAT_RC_OK if buffer == server.server_buffer: message = ( '{prefix}matrix: command "redact" must be ' "executed on a Matrix channel buffer" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK @utf8_decode def matrix_reply_command_cb(data, buffer, args): def predicate(event_id, line): event_tag = SCRIPT_NAME + "_id_{}".format(event_id) tags = line.tags if event_tag in tags: return True return False for server in SERVERS.values(): if buffer in server.buffers.values(): room_buffer = server.find_room_from_ptr(buffer) # Intentional use of `parse_redact_args` which serves the # necessary purpose event_id, reply = parse_redact_args(args) if not event_id or not reply: message = ( "{prefix}matrix: Invalid command " "arguments (see /help reply)" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_ERROR lines = room_buffer.weechat_buffer.find_lines( partial(predicate, event_id), max_lines=1 ) if not lines: room_buffer.error( "No such message with event id " "{event_id} found.".format(event_id=event_id)) return W.WEECHAT_RC_OK formatted_data = Formatted.from_input_line(reply) server.room_send_message( room_buffer, formatted_data, "m.text", in_reply_to_event_id=event_id, ) room_buffer.last_message = None return W.WEECHAT_RC_OK if buffer == server.server_buffer: message = ( '{prefix}matrix: command "reply" must be ' "executed on a Matrix channel buffer" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK def matrix_command_help(args): if not args: message = ( "{prefix}matrix: Too few arguments for command " '"/matrix help" (see /matrix help help)' ).format(prefix=W.prefix("error")) W.prnt("", message) return for command in args: message = "" if command == "connect": message = ( "{delimiter_color}[{ncolor}matrix{delimiter_color}] " "{ncolor}{cmd_color}/connect{ncolor} " " [...]" "\n\n" "connect to Matrix server(s)" "\n\n" "server-name: server to connect to" "(internal name)" ).format( delimiter_color=W.color("chat_delimiters"), cmd_color=W.color("chat_buffer"), ncolor=W.color("reset"), ) elif command == "disconnect": message = ( "{delimiter_color}[{ncolor}matrix{delimiter_color}] " "{ncolor}{cmd_color}/disconnect{ncolor} " " [...]" "\n\n" "disconnect from Matrix server(s)" "\n\n" "server-name: server to disconnect" "(internal name)" ).format( delimiter_color=W.color("chat_delimiters"), cmd_color=W.color("chat_buffer"), ncolor=W.color("reset"), ) elif command == "reconnect": message = ( "{delimiter_color}[{ncolor}matrix{delimiter_color}] " "{ncolor}{cmd_color}/reconnect{ncolor} " " [...]" "\n\n" "reconnect to Matrix server(s)" "\n\n" "server-name: server to reconnect" "(internal name)" ).format( delimiter_color=W.color("chat_delimiters"), cmd_color=W.color("chat_buffer"), ncolor=W.color("reset"), ) elif command == "server": message = ( "{delimiter_color}[{ncolor}matrix{delimiter_color}] " "{ncolor}{cmd_color}/server{ncolor} " "add [:]" "\n " "delete|list|listfull " "\n\n" "list, add, or remove Matrix servers" "\n\n" " list: list servers (without argument, this " "list is displayed)\n" " listfull: list servers with detailed info for each " "server\n" " add: add a new server\n" " delete: delete a server\n" "server-name: server to reconnect (internal name)\n" " hostname: name or IP address of server\n" " port: port of server (default: 443)\n" "\n" "Examples:" "\n /matrix server listfull" "\n /matrix server add matrix matrix.org:80" "\n /matrix server delete matrix" ).format( delimiter_color=W.color("chat_delimiters"), cmd_color=W.color("chat_buffer"), ncolor=W.color("reset"), ) elif command == "help": message = ( "{delimiter_color}[{ncolor}matrix{delimiter_color}] " "{ncolor}{cmd_color}/help{ncolor} " " [...]" "\n\n" "display help about Matrix commands" "\n\n" "matrix-command: a Matrix command name" "(internal name)" ).format( delimiter_color=W.color("chat_delimiters"), cmd_color=W.color("chat_buffer"), ncolor=W.color("reset"), ) else: message = ( '{prefix}matrix: No help available, "{command}" ' "is not a matrix command" ).format(prefix=W.prefix("error"), command=command) W.prnt("", "") W.prnt("", message) return def matrix_server_command_listfull(args): def get_value_string(value, default_value): if value == default_value: if not value: value = "''" value_string = " ({value})".format(value=value) else: value_string = "{color}{value}{ncolor}".format( color=W.color("chat_value"), value=value, ncolor=W.color("reset"), ) return value_string for server_name in args: if server_name not in SERVERS: continue server = SERVERS[server_name] connected = "" W.prnt("", "") if server.connected: connected = "connected" else: connected = "not connected" message = ( "Server: {server_color}{server}{delimiter_color}" " [{ncolor}{connected}{delimiter_color}]" "{ncolor}" ).format( server_color=W.color("chat_server"), server=server.name, delimiter_color=W.color("chat_delimiters"), connected=connected, ncolor=W.color("reset"), ) W.prnt("", message) option = server.config._option_ptrs["autoconnect"] default_value = W.config_string_default(option) value = W.config_string(option) value_string = get_value_string(value, default_value) message = " autoconnect. : {value}".format(value=value_string) W.prnt("", message) option = server.config._option_ptrs["address"] default_value = W.config_string_default(option) value = W.config_string(option) value_string = get_value_string(value, default_value) message = " address. . . : {value}".format(value=value_string) W.prnt("", message) option = server.config._option_ptrs["port"] default_value = str(W.config_integer_default(option)) value = str(W.config_integer(option)) value_string = get_value_string(value, default_value) message = " port . . . . : {value}".format(value=value_string) W.prnt("", message) option = server.config._option_ptrs["username"] default_value = W.config_string_default(option) value = W.config_string(option) value_string = get_value_string(value, default_value) message = " username . . : {value}".format(value=value_string) W.prnt("", message) option = server.config._option_ptrs["password"] value = W.config_string(option) if value: value = "(hidden)" value_string = get_value_string(value, "") message = " password . . : {value}".format(value=value_string) W.prnt("", message) def matrix_server_command_delete(args): for server_name in args: if check_server_existence(server_name, SERVERS): server = SERVERS[server_name] if server.connected: message = ( "{prefix}matrix: you can not delete server " "{color}{server}{ncolor} because you are " 'connected to it. Try "/matrix disconnect ' '{color}{server}{ncolor}" before.' ).format( prefix=W.prefix("error"), color=W.color("chat_server"), ncolor=W.color("reset"), server=server.name, ) W.prnt("", message) return for buf in list(server.buffers.values()): W.buffer_close(buf) if server.server_buffer: W.buffer_close(server.server_buffer) for option in server.config._option_ptrs.values(): W.config_option_free(option) if server.timer_hook: W.unhook(server.timer_hook) server.timer_hook = None message = ( "matrix: server {color}{server}{ncolor} has been " "deleted" ).format( server=server.name, color=W.color("chat_server"), ncolor=W.color("reset"), ) del SERVERS[server.name] server = None W.prnt("", message) def matrix_server_command_add(args): if len(args) < 2: message = ( "{prefix}matrix: Too few arguments for command " '"/matrix server add" (see /matrix help server)' ).format(prefix=W.prefix("error")) W.prnt("", message) return if len(args) > 4: message = ( "{prefix}matrix: Too many arguments for command " '"/matrix server add" (see /matrix help server)' ).format(prefix=W.prefix("error")) W.prnt("", message) return def remove_server(server): for option in server.config._option_ptrs.values(): W.config_option_free(option) del SERVERS[server.name] server_name = args[0] if server_name in SERVERS: message = ( "{prefix}matrix: server {color}{server}{ncolor} " "already exists, can't add it" ).format( prefix=W.prefix("error"), color=W.color("chat_server"), server=server_name, ncolor=W.color("reset"), ) W.prnt("", message) return server = MatrixServer(server_name, G.CONFIG._ptr) SERVERS[server.name] = server if len(args) >= 2: if args[1].startswith("http"): homeserver= urlparse(args[1]) host = homeserver.hostname port = str(homeserver.port) if homeserver.port else None else: try: host, port = args[1].split(":", 1) except ValueError: host, port = args[1], None return_code = W.config_option_set( server.config._option_ptrs["address"], host, 1 ) if return_code == W.WEECHAT_CONFIG_OPTION_SET_ERROR: remove_server(server) message = ( "{prefix}Failed to set address for server " "{color}{server}{ncolor}, failed to add " "server." ).format( prefix=W.prefix("error"), color=W.color("chat_server"), server=server.name, ncolor=W.color("reset"), ) W.prnt("", message) server = None return if port: return_code = W.config_option_set( server.config._option_ptrs["port"], port, 1 ) if return_code == W.WEECHAT_CONFIG_OPTION_SET_ERROR: remove_server(server) message = ( "{prefix}Failed to set port for server " "{color}{server}{ncolor}, failed to add " "server." ).format( prefix=W.prefix("error"), color=W.color("chat_server"), server=server.name, ncolor=W.color("reset"), ) W.prnt("", message) server = None return if len(args) >= 3: user = args[2] return_code = W.config_option_set( server.config._option_ptrs["username"], user, 1 ) if return_code == W.WEECHAT_CONFIG_OPTION_SET_ERROR: remove_server(server) message = ( "{prefix}Failed to set user for server " "{color}{server}{ncolor}, failed to add " "server." ).format( prefix=W.prefix("error"), color=W.color("chat_server"), server=server.name, ncolor=W.color("reset"), ) W.prnt("", message) server = None return if len(args) == 4: password = args[3] return_code = W.config_option_set( server.config._option_ptrs["password"], password, 1 ) if return_code == W.WEECHAT_CONFIG_OPTION_SET_ERROR: remove_server(server) message = ( "{prefix}Failed to set password for server " "{color}{server}{ncolor}, failed to add " "server." ).format( prefix=W.prefix("error"), color=W.color("chat_server"), server=server.name, ncolor=W.color("reset"), ) W.prnt("", message) server = None return message = ( "matrix: server {color}{server}{ncolor} " "has been added" ).format( server=server.name, color=W.color("chat_server"), ncolor=W.color("reset"), ) W.prnt("", message) def matrix_server_command(command, args): def list_servers(_): if SERVERS: W.prnt("", "\nAll matrix servers:") for server in SERVERS: W.prnt( "", " {color}{server}".format( color=W.color("chat_server"), server=server ), ) # TODO the argument for list and listfull is used as a match word to # find/filter servers, we're currently match exactly to the whole name if command == "list": list_servers(args) elif command == "listfull": matrix_server_command_listfull(args) elif command == "add": matrix_server_command_add(args) elif command == "delete": matrix_server_command_delete(args) else: message = ( "{prefix}matrix: Error: unknown matrix server command, " '"{command}" (type /matrix help server for help)' ).format(prefix=W.prefix("error"), command=command) W.prnt("", message) @utf8_decode def matrix_command_cb(data, buffer, args): def connect_server(args): for server_name in args: if check_server_existence(server_name, SERVERS): server = SERVERS[server_name] server.connect() def disconnect_server(args): for server_name in args: if check_server_existence(server_name, SERVERS): server = SERVERS[server_name] if server.connected or server.reconnect_time: # W.unhook(server.timer_hook) # server.timer_hook = None server.access_token = "" server.disconnect(reconnect=False) split_args = list(filter(bool, args.split(" "))) if len(split_args) < 1: message = ( "{prefix}matrix: Too few arguments for command " '"/matrix" ' "(see /help matrix)" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_ERROR command, args = split_args[0], split_args[1:] if command == "connect": connect_server(args) elif command == "disconnect": disconnect_server(args) elif command == "reconnect": disconnect_server(args) connect_server(args) elif command == "server": if len(args) >= 1: subcommand, args = args[0], args[1:] matrix_server_command(subcommand, args) else: matrix_server_command("list", "") elif command == "help": matrix_command_help(args) else: message = ( "{prefix}matrix: Error: unknown matrix command, " '"{command}" (type /help matrix for help)' ).format(prefix=W.prefix("error"), command=command) W.prnt("", message) return W.WEECHAT_RC_OK @utf8_decode def matrix_send_anyways_cb(data, buffer, args): for server in SERVERS.values(): if buffer in server.buffers.values(): room_buffer = server.find_room_from_ptr(buffer) if not server.connected: room_buffer.error("Server is disconnected") break if not server.client.logged_in: room_buffer.error("You are not logged in.") return W.WEECHAT_RC_ERROR if not room_buffer.last_message: room_buffer.error("No previously sent message found.") break server.room_send_message( room_buffer, room_buffer.last_message, "m.text", ignore_unverified_devices=True ) room_buffer.last_message = None break else: message = ( "{prefix}matrix: The 'send-anyways' command needs to be " "run on a matrix room buffer" ).format(prefix=W.prefix("error")) W.prnt("", message) return W.WEECHAT_RC_ERROR @utf8_decode def matrix_cursor_reply_signal_cb(data, signal, ht): tags = ht["_chat_line_tags"].split(",") W.command("", "/cursor stop") if "matrix_message" in tags: for tag in tags: if tag.startswith("matrix_id_"): matrix_id = tag[10:] break else: return W.WEECHAT_RC_OK buffer_name = ht["_buffer_full_name"] bufptr = W.buffer_search("==", buffer_name) current_input = W.buffer_get_string(bufptr, "input") input_pos = W.buffer_get_integer(bufptr, "input_pos") new_prefix = "/reply-matrix {} ".format(matrix_id) W.buffer_set(bufptr, "input", new_prefix + current_input) W.buffer_set(bufptr, "input_pos", str(len(new_prefix) + input_pos)) return W.WEECHAT_RC_OK