10 from .. import chunks, messages, roost, util, zcode
13 # I'm not sure why I can't just use the standard form of
14 # _enabled = util.Configurable(...) here, but this works.
19 'time': 'receiveTime',
20 'show_seconds': False,
22 'show_signatures': False,
28 def setter(_context, value):
33 'user.vt_decor.enabled',
35 action=_gen_setter('enabled'),
36 oneof=('True', 'False'),
37 validate=lambda val: isinstance(val, bool),
38 coerce=util.coerce_bool,
44 action=_gen_setter('time'),
45 oneof=('time', 'receiveTime'),
49 'user.vt_decor.show_seconds',
51 action=_gen_setter('show_seconds'),
52 oneof=('True', 'False'),
53 validate=lambda val: isinstance(val, bool),
54 coerce=util.coerce_bool,
58 'user.vt_decor.show_addr',
60 action=_gen_setter('show_addr'),
61 oneof=('True', 'False'),
62 validate=lambda val: isinstance(val, bool),
63 coerce=util.coerce_bool,
67 'user.vt_decor.show_signatures',
69 action=_gen_setter('show_signatures'),
70 oneof=('True', 'False'),
71 validate=lambda val: isinstance(val, bool),
72 coerce=util.coerce_bool,
84 except AttributeError:
87 frame = inspect.currentframe()
92 ui = frame.f_locals['self'].ui
95 except (KeyError, AttributeError):
103 MSG_NO_WRAP_PATTERNS = [re.compile(x, re.MULTILINE) for x in (
109 MSG_LIST_WRAP_PATTERN = re.compile(r'(^\s*[-*]\s+)', re.MULTILINE)
112 'ANDREW.CMU.EDU': 'AN',
117 class RoostVTDecor(roost.RoostMessage.Decor):
121 def decorate(cls, msg, decoration):
123 if SETTINGS['enabled'] and _get_width() >= SETTINGS['min_width']:
124 return cls.vt_decorate(msg, decoration)
127 return super().decorate(msg, decoration)
131 force_wrap_tuples = {
132 ('moira', 'incremental'),
133 ('scripts', 'nagios.multivalue-key.mysql-s'),
134 ('scripts', 'nagios.unique-key.mysql-s'),
138 (msg.data['classKey'], '*'),
139 (msg.data['classKey'], msg.data['instanceKey']),
142 return (bool(force_wrap_tuples & msg_tuples) or
143 not any(x.search(msg.body) for x in MSG_NO_WRAP_PATTERNS))
146 def list_rewrap_p(msg):
147 return len(MSG_LIST_WRAP_PATTERN.findall(msg.body)) >= 2
150 def partial_rewrap(cls, body, width, indent):
154 for m in MSG_LIST_WRAP_PATTERN.split(body):
157 if MSG_LIST_WRAP_PATTERN.match(m):
161 ret += m.lstrip('\n')
162 bindent = ' ' * util.glyphwidth(m)
165 # collapse whitespace and wrap.
166 ret += textwrap.fill(re.sub(r'\s+', ' ', m), width,
167 initial_indent=indent+bindent,
168 subsequent_indent=indent+bindent,
169 break_long_words=False,
170 break_on_hyphens=False).lstrip()
174 except Exception as e:
179 def vt_decorate(cls, msg, decoration):
182 tags = cls.decotags(decoration)
184 realm = msg.backend.realm
185 sender = msg.data['sender']
186 recipient = msg.data['recipient']
187 if msg.data['isPersonal'] and msg.data['isOutgoing']:
188 sender = f'➤{recipient}'
189 if sender.endswith(realm):
190 sender = sender[:sender.index('@')]
191 klass = msg.data['class'] if msg.data['classKey'] != 'message' else ''
192 inst = msg.data['instance'] or "''"
193 if klass and inst.lower() == 'personal':
196 dest = f'{klass}[{inst}]'
197 auth = '+' if msg.data['auth'] else '-'
198 if not msg.data['auth'] and msg.data['opcode'] == 'mattermost':
201 '%H:%M:%S' if SETTINGS['show_seconds'] else '%H:%M',
202 time.localtime(msg.data[SETTINGS['time']] / 1000))
204 if recipient.startswith('@'):
205 mrealm = REALM_MAP.get(recipient[1:], '??')
206 dest = f'{mrealm} {dest}'
208 dest_width = 18 - (util.glyphwidth(dest) - len(dest))
210 prefix = f'{sender:10.10} {t} {dest:{dest_width}.{dest_width}} {auth} '
211 indent = ' ' * util.glyphwidth(prefix)
213 body = msg.body.rstrip()
214 if cls.list_rewrap_p(msg):
215 body = cls.partial_rewrap(body, width, indent)
216 elif cls.rewrap_p(msg):
217 body = textwrap.fill(body, width,
218 initial_indent=indent,
219 subsequent_indent=indent,
220 break_long_words=False,
221 break_on_hyphens=False).lstrip()
223 body = textwrap.indent(body, indent).lstrip()
225 # body = f'{list(decoration.keys())}\n'
226 if msg.backend.format_body == 'format':
227 cbody = zcode.tag(body, frozenset(tags))
228 elif msg.backend.format_body == 'clear':
229 cbody = chunks.Chunk([(tags, '')])
231 if msg.backend.format_body == 'strip':
232 body = zcode.strip(body)
233 cbody = chunks.Chunk([(tags, body)])
236 chunk = chunks.Chunk([(tags, f'{prefix}')]) + cbody
238 if SETTINGS['show_signatures'] and msg.backend.format_zsig != 'clear':
239 zsig = '\n' + textwrap.fill(
240 msg.data['signature'], width,
241 initial_indent=' -- ',
242 subsequent_indent=' ',
243 break_long_words=False,
244 break_on_hyphens=False)
246 if msg.backend.format_zsig == 'format':
247 chunk += zcode.tag(zsig, frozenset(tags))
248 elif msg.backend.format_zsig == 'strip':
249 chunk.append((tags, zcode.strip(zsig)))
251 chunk.append((tags, zsig))
253 if SETTINGS['show_addr']:
254 addr = str(ipaddress.ip_address(base64.b64decode(msg.data["uid"])[:4]))
255 # Hangs snipe while resolving; need to make async somehow.
256 #if addr not in HOSTNAMES:
258 # HOSTNAMES[addr] = socket.gethostbyaddr(addr)[0].upper()
260 # HOSTNAMES[addr] = addr
261 #addr = HOSTNAMES[addr]
262 chunk.append((tags | {'right'}, f'## {addr}'))
264 chunk.append((tags | {'right'}, ''))