10 from .. import chunks, messages, roost, util, zcode
13 # I'm not sure why I can't just use the standard form of
14 # _enabled = util.Configurable(...) here, but this works.
19 'time': 'receiveTime',
20 'show_seconds': False,
22 'show_signatures': False,
26 def setter(_context, value):
31 'user.vt_decor.enabled',
33 action=_gen_setter('enabled'),
34 oneof=('True', 'False'),
35 validate=lambda val: isinstance(val, bool),
36 coerce=util.coerce_bool,
42 action=_gen_setter('time'),
43 oneof=('time', 'receiveTime'),
47 'user.vt_decor.show_seconds',
49 action=_gen_setter('show_seconds'),
50 oneof=('True', 'False'),
51 validate=lambda val: isinstance(val, bool),
52 coerce=util.coerce_bool,
56 'user.vt_decor.show_addr',
58 action=_gen_setter('show_addr'),
59 oneof=('True', 'False'),
60 validate=lambda val: isinstance(val, bool),
61 coerce=util.coerce_bool,
65 'user.vt_decor.show_signatures',
67 action=_gen_setter('show_signatures'),
68 oneof=('True', 'False'),
69 validate=lambda val: isinstance(val, bool),
70 coerce=util.coerce_bool,
82 except AttributeError:
85 frame = inspect.currentframe()
90 ui = frame.f_locals['self'].ui
93 except (KeyError, AttributeError):
101 MSG_NO_WRAP_PATTERNS = [re.compile(x) for x in (
108 'ANDREW.CMU.EDU': 'AN',
113 class RoostVTDecor(roost.RoostMessage.Decor):
117 def decorate(cls, msg, decoration):
119 if SETTINGS['enabled'] and _get_width() >= SETTINGS['min_width']:
120 return cls.vt_decorate(msg, decoration)
123 return super().decorate(msg, decoration)
127 force_wrap_tuples = {
128 ('moira', 'incremental'),
129 ('scripts', 'nagios.multivalue-key.mysql-s'),
130 ('scripts', 'nagios.unique-key.mysql-s'),
134 (msg.data['classKey'], '*'),
135 (msg.data['classKey'], msg.data['instanceKey']),
138 return (bool(force_wrap_tuples & msg_tuples) or
139 not any(x.search(msg.body) for x in MSG_NO_WRAP_PATTERNS))
143 def vt_decorate(cls, msg, decoration):
146 tags = cls.decotags(decoration)
148 realm = msg.backend.realm
149 sender = msg.data['sender']
150 recipient = msg.data['recipient']
151 if msg.data['isPersonal'] and msg.data['isOutgoing']:
152 sender = f'➤{recipient}'
153 if sender.endswith(realm):
154 sender = sender[:sender.index('@')]
155 klass = msg.data['class'] if msg.data['classKey'] != 'message' else ''
156 inst = msg.data['instance'] or "''"
157 if klass and inst.lower() == 'personal':
160 dest = f'{klass}[{inst}]'
161 auth = '+' if msg.data['auth'] else '-'
162 if not msg.data['auth'] and msg.data['opcode'] == 'mattermost':
165 '%H:%M:%S' if SETTINGS['show_seconds'] else '%H:%M',
166 time.localtime(msg.data[SETTINGS['time']] / 1000))
168 if recipient.startswith('@'):
169 mrealm = REALM_MAP.get(recipient[1:], '??')
170 dest = f'{mrealm} {dest}'
172 dest_width = 18 - (util.glyphwidth(dest) - len(dest))
174 prefix = f'{sender:10.10} {t} {dest:{dest_width}.{dest_width}} {auth} '
175 indent = ' ' * util.glyphwidth(prefix)
177 body = msg.body.rstrip()
178 if cls.rewrap_p(msg):
179 body = textwrap.fill(body, width,
180 initial_indent=indent,
181 subsequent_indent=indent,
182 break_long_words=False,
183 break_on_hyphens=False).lstrip()
185 body = textwrap.indent(body, indent).lstrip()
187 # body = f'{list(decoration.keys())}\n'
188 if msg.backend.format_body == 'format':
189 cbody = zcode.tag(body, frozenset(tags))
190 elif msg.backend.format_body == 'clear':
191 cbody = chunks.Chunk([(tags, '')])
193 if msg.backend.format_body == 'strip':
194 body = zcode.strip(body)
195 cbody = chunks.Chunk([(tags, body)])
198 chunk = chunks.Chunk([(tags, f'{prefix}')]) + cbody
200 if SETTINGS['show_signatures'] and msg.backend.format_zsig != 'clear':
201 zsig = '\n' + textwrap.fill(
202 msg.data['signature'], width,
203 initial_indent=' -- ',
204 subsequent_indent=' ',
205 break_long_words=False,
206 break_on_hyphens=False)
208 if msg.backend.format_zsig == 'format':
209 chunk += zcode.tag(zsig, frozenset(tags))
210 elif msg.backend.format_zsig == 'strip':
211 chunk.append((tags, zcode.strip(zsig)))
213 chunk.append((tags, zsig))
217 f'## {ipaddress.ip_address(base64.b64decode(msg.data["uid"])[:4])}'
218 if SETTINGS['show_addr'] else ''