import base64 import inspect import ipaddress import re import textwrap import time import weakref from .. import chunks, messages, roost, util, zcode ## Config # I'm not sure why I can't just use the standard form of # _enabled = util.Configurable(...) here, but this works. SETTINGS = { 'enabled': True, 'min_width': 100, 'time': 'receiveTime', 'show_seconds': False, 'show_addr': False, 'show_signatures': False, } def _gen_setter(key): def setter(_context, value): SETTINGS[key] = value return setter util.Configurable( 'user.vt_decor.enabled', True, action=_gen_setter('enabled'), oneof=('True', 'False'), validate=lambda val: isinstance(val, bool), coerce=util.coerce_bool, ) util.Configurable( 'user.vt_decor.time', 'receiveTime', action=_gen_setter('time'), oneof=('time', 'receiveTime'), ) util.Configurable( 'user.vt_decor.show_seconds', False, action=_gen_setter('show_seconds'), oneof=('True', 'False'), validate=lambda val: isinstance(val, bool), coerce=util.coerce_bool, ) util.Configurable( 'user.vt_decor.show_addr', False, action=_gen_setter('show_addr'), oneof=('True', 'False'), validate=lambda val: isinstance(val, bool), coerce=util.coerce_bool, ) util.Configurable( 'user.vt_decor.show_signatures', False, action=_gen_setter('show_signatures'), oneof=('True', 'False'), validate=lambda val: isinstance(val, bool), coerce=util.coerce_bool, ) ## Utils UI = lambda: None def _get_width(): global UI try: return UI().maxx except AttributeError: pass frame = inspect.currentframe() try: while True: frame = frame.f_back try: ui = frame.f_locals['self'].ui UI = weakref.ref(ui) return ui.maxx except (KeyError, AttributeError): pass finally: del frame ## The fun part MSG_NO_WRAP_PATTERNS = [re.compile(x, re.MULTILINE) for x in ( r'^(?: |>)', r'\t', r'[^.] ', )] MSG_LIST_WRAP_PATTERN = re.compile(r'(^\s*[-*]\s+)', re.MULTILINE) REALM_MAP = { 'ANDREW.CMU.EDU': 'AN', 'CS.CMU.EDU': 'CS', 'IASTATE.EDU': 'IA', } class RoostVTDecor(roost.RoostMessage.Decor): @classmethod def decorate(cls, msg, decoration): try: if SETTINGS['enabled'] and _get_width() >= SETTINGS['min_width']: return cls.vt_decorate(msg, decoration) except Exception: pass return super().decorate(msg, decoration) @staticmethod def rewrap_p(msg): force_wrap_tuples = { ('moira', 'incremental'), ('scripts', 'nagios.multivalue-key.mysql-s'), ('scripts', 'nagios.unique-key.mysql-s'), } msg_tuples = { (msg.data['classKey'], '*'), (msg.data['classKey'], msg.data['instanceKey']), } return (bool(force_wrap_tuples & msg_tuples) or not any(x.search(msg.body) for x in MSG_NO_WRAP_PATTERNS)) @staticmethod def list_rewrap_p(msg): return len(MSG_LIST_WRAP_PATTERN.findall(msg.body)) >= 2 @classmethod def partial_rewrap(cls, body, width, indent): try: ret = '' bindent = '' for m in MSG_LIST_WRAP_PATTERN.split(body): if not m: continue if MSG_LIST_WRAP_PATTERN.match(m): # Bullet if ret: ret += indent ret += m bindent = ' ' * util.glyphwidth(m) else: # Content # collapse whitespace and wrap. ret += textwrap.fill(re.sub(r'\s+', ' ', m), width, initial_indent=indent+bindent, subsequent_indent=indent+bindent, break_long_words=False, break_on_hyphens=False).lstrip() ret += '\n' return ret.rstrip() except Exception as e: return str(e) @classmethod def vt_decorate(cls, msg, decoration): width = _get_width() tags = cls.decotags(decoration) realm = msg.backend.realm sender = msg.data['sender'] recipient = msg.data['recipient'] if msg.data['isPersonal'] and msg.data['isOutgoing']: sender = f'➤{recipient}' if sender.endswith(realm): sender = sender[:sender.index('@')] klass = msg.data['class'] if msg.data['classKey'] != 'message' else '' inst = msg.data['instance'] or "''" if klass and inst.lower() == 'personal': dest = klass else: dest = f'{klass}[{inst}]' auth = '+' if msg.data['auth'] else '-' if not msg.data['auth'] and msg.data['opcode'] == 'mattermost': auth = '¤' t = time.strftime( '%H:%M:%S' if SETTINGS['show_seconds'] else '%H:%M', time.localtime(msg.data[SETTINGS['time']] / 1000)) if recipient.startswith('@'): mrealm = REALM_MAP.get(recipient[1:], '??') dest = f'{mrealm} {dest}' dest_width = 18 - (util.glyphwidth(dest) - len(dest)) prefix = f'{sender:10.10} {t} {dest:{dest_width}.{dest_width}} {auth} ' indent = ' ' * util.glyphwidth(prefix) body = msg.body.rstrip() if cls.list_rewrap_p(msg): body = cls.partial_rewrap(body, width, indent) elif cls.rewrap_p(msg): body = textwrap.fill(body, width, initial_indent=indent, subsequent_indent=indent, break_long_words=False, break_on_hyphens=False).lstrip() else: body = textwrap.indent(body, indent).lstrip() # body = f'{list(decoration.keys())}\n' if msg.backend.format_body == 'format': cbody = zcode.tag(body, frozenset(tags)) elif msg.backend.format_body == 'clear': cbody = chunks.Chunk([(tags, '')]) else: if msg.backend.format_body == 'strip': body = zcode.strip(body) cbody = chunks.Chunk([(tags, body)]) chunk = chunks.Chunk([(tags, f'{prefix}')]) + cbody if SETTINGS['show_signatures'] and msg.backend.format_zsig != 'clear': zsig = '\n' + textwrap.fill( msg.data['signature'], width, initial_indent=' -- ', subsequent_indent=' ', break_long_words=False, break_on_hyphens=False) if msg.backend.format_zsig == 'format': chunk += zcode.tag(zsig, frozenset(tags)) elif msg.backend.format_zsig == 'strip': chunk.append((tags, zcode.strip(zsig))) else: chunk.append((tags, zsig)) chunk.append( (tags | {'right'}, f'## {ipaddress.ip_address(base64.b64decode(msg.data["uid"])[:4])}' if SETTINGS['show_addr'] else '' )) return chunk