]> asedeno.scripts.mit.edu Git - vt_decor.git/commitdiff
initial commit of module
authorAlejandro R. Sedeño <asedeno@mit.edu>
Sun, 13 Feb 2022 20:57:50 +0000 (15:57 -0500)
committerAlejandro R. Sedeño <asedeno@mit.edu>
Sun, 13 Feb 2022 20:57:50 +0000 (15:57 -0500)
vt_decor.py [new file with mode: 0644]

diff --git a/vt_decor.py b/vt_decor.py
new file mode 100644 (file)
index 0000000..abb3d22
--- /dev/null
@@ -0,0 +1,221 @@
+import base64
+import inspect
+import ipaddress
+import re
+import textwrap
+import time
+import sys
+import weakref
+
+from .. import chunks, messages, roost, util, zcode
+
+## Config
+# I'm not sure why I can't just use the standard form of
+# _enabled = util.Configurable(...) here, but this works.
+
+SETTINGS = {
+    'enabled': True,
+    'min_width': 100,
+    'time': 'receiveTime',
+    'show_seconds': False,
+    'show_addr': False,
+    'show_signatures': False,
+}
+
+def _gen_setter(key):
+    def setter(_context, value):
+        SETTINGS[key] = value
+    return setter
+
+util.Configurable(
+    'user.vt_decor.enabled',
+    True,
+    action=_gen_setter('enabled'),
+    oneof=('True', 'False'),
+    validate=lambda val: isinstance(val, bool),
+    coerce=util.coerce_bool,
+    )
+
+util.Configurable(
+    'user.vt_decor.time',
+    'receiveTime',
+    action=_gen_setter('time'),
+    oneof=('time', 'receiveTime'),
+    )
+
+util.Configurable(
+    'user.vt_decor.show_seconds',
+    False,
+    action=_gen_setter('show_seconds'),
+    oneof=('True', 'False'),
+    validate=lambda val: isinstance(val, bool),
+    coerce=util.coerce_bool,
+    )
+
+util.Configurable(
+    'user.vt_decor.show_addr',
+    False,
+    action=_gen_setter('show_addr'),
+    oneof=('True', 'False'),
+    validate=lambda val: isinstance(val, bool),
+    coerce=util.coerce_bool,
+    )
+
+util.Configurable(
+    'user.vt_decor.show_signatures',
+    False,
+    action=_gen_setter('show_signatures'),
+    oneof=('True', 'False'),
+    validate=lambda val: isinstance(val, bool),
+    coerce=util.coerce_bool,
+    )
+
+
+## Utils
+
+UI = lambda: None
+
+def _get_width():
+    global UI
+    try:
+        return UI().maxx
+    except AttributeError:
+        pass
+
+    frame = inspect.currentframe()
+    try:
+        while True:
+            frame = frame.f_back
+            try:
+                ui = frame.f_locals['self'].ui
+                UI = weakref.ref(ui)
+                return ui.maxx
+            except (KeyError, AttributeError):
+                pass
+    finally:
+        del frame
+
+
+## The fun part
+
+MSG_NO_WRAP_PATTERNS = [re.compile(x) for x in (
+    r'\n(?: |>)',
+    r'\t',
+    r'[^.]  ',
+)]
+
+REALM_MAP = {
+    'ANDREW.CMU.EDU': 'AN',
+    'CS.CMU.EDU': 'CS',
+    'IASTATE.EDU': 'IA',
+}
+
+class RoostVTDecor(roost.RoostMessage.Decor):
+
+
+    @classmethod
+    def decorate(cls, msg, decoration):
+        try:
+            if SETTINGS['enabled'] and _get_width() >= SETTINGS['min_width']:
+                return cls.vt_decorate(msg, decoration)
+        except Exception:
+            pass
+        return super().decorate(msg, decoration)
+
+    @staticmethod
+    def rewrap_p(msg):
+        force_wrap_tuples = {
+            ('moira', 'incremental'),
+            ('scripts', 'nagios.multivalue-key.mysql-s'),
+            ('scripts', 'nagios.unique-key.mysql-s'),
+        }
+
+        msg_tuples = {
+            (msg.data['classKey'], '*'),
+            (msg.data['classKey'], msg.data['instanceKey']),
+        }
+
+        return (bool(force_wrap_tuples & msg_tuples) or
+                not any(x.search(msg.body) for x in MSG_NO_WRAP_PATTERNS))
+
+
+    @classmethod
+    def vt_decorate(cls, msg, decoration):
+        width = _get_width()
+
+        tags = cls.decotags(decoration)
+
+        realm = msg.backend.realm
+        sender = msg.data['sender']
+        recipient = msg.data['recipient']
+        if msg.data['isPersonal'] and msg.data['isOutgoing']:
+            sender = f'➤{recipient}'
+        if sender.endswith(realm):
+            sender = sender[:sender.index('@')]
+        klass = msg.data['class'] if msg.data['classKey'] != 'message' else ''
+        inst = msg.data['instance'] or "''"
+        if klass and inst.lower() == 'personal':
+            dest = klass
+        else:
+            dest = f'{klass}[{inst}]'
+        auth = '+' if msg.data['auth'] else '-'
+        if not msg.data['auth'] and  msg.data['opcode'] == 'mattermost':
+            auth = '¤'
+        t = time.strftime(
+            '%H:%M:%S' if SETTINGS['show_seconds'] else '%H:%M',
+            time.localtime(msg.data[SETTINGS['time']] / 1000))
+
+        if recipient.startswith('@'):
+            mrealm = REALM_MAP.get(recipient[1:], '??')
+            dest = f'{mrealm} {dest}'
+
+        dest_width = 18 - (util.glyphwidth(dest) - len(dest))
+
+        prefix = f'{sender:10.10} {t} {dest:{dest_width}.{dest_width}} {auth} '
+        indent = ' ' * util.glyphwidth(prefix)
+
+        body = msg.body.rstrip()
+        if cls.rewrap_p(msg):
+            body = textwrap.fill(body, width,
+                                 initial_indent=indent,
+                                 subsequent_indent=indent,
+                                 break_long_words=False,
+                                 break_on_hyphens=False).lstrip()
+        else:
+            body = textwrap.indent(body, indent).lstrip()
+
+        # body = f'{list(decoration.keys())}\n'
+        if msg.backend.format_body == 'format':
+            cbody = zcode.tag(body, frozenset(tags))
+        elif msg.backend.format_body == 'clear':
+            cbody = chunks.Chunk([(tags, '')])
+        else:
+            if msg.backend.format_body == 'strip':
+                body = zcode.strip(body)
+            cbody = chunks.Chunk([(tags, body)])
+
+
+        chunk = chunks.Chunk([(tags, f'{prefix}')]) + cbody
+
+        if SETTINGS['show_signatures'] and msg.backend.format_zsig != 'clear':
+            zsig = '\n' + textwrap.fill(
+                msg.data['signature'], width,
+                initial_indent='                  -- ',
+                subsequent_indent='                     ',
+                break_long_words=False,
+                break_on_hyphens=False)
+
+            if msg.backend.format_zsig == 'format':
+                chunk += zcode.tag(zsig, frozenset(tags))
+            elif msg.backend.format_zsig == 'strip':
+                chunk.append((tags, zcode.strip(zsig)))
+            else:
+                chunk.append((tags, zsig))
+
+        chunk.append(
+            (tags | {'right'},
+             f'## {ipaddress.ip_address(base64.b64decode(msg.data["uid"])[:4])}'
+             if SETTINGS['show_addr'] else ''
+            ))
+
+        return chunk