Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This file is part of the SBK project
2# https://github.com/mbarkhau/sbk
3#
4# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License
5# SPDX-License-Identifier: MIT
7"""CLI input/output reading/printing functions."""
9import re
10import typing as typ
12import click
14from . import ecc_rs
15from . import params
16from . import enc_util
17from . import mnemonic
18from . import ui_common
21def _echo(msg: str = "") -> bool:
22 """Write message to stdout.
24 The boolean return value is only to pacify mypy. To supress output when using the -y --yes-all,
25 the following idiom is often used: `yes_all or _echo(message)`
26 """
27 click.echo(msg)
28 return True
31def _clear() -> bool:
32 click.clear()
33 return True
36def _prompt(text: str, default: typ.Optional[str] = None) -> str:
37 result = click.prompt(text, default=default, show_default=False)
38 assert isinstance(result, str)
39 return result
42InputType = str
45SECRET_TYPE_SALT = 'salt'
46SECRET_TYPE_SHARE = 'share'
47SECRET_TYPE_BRAINKEY = 'brainkey'
50MESSAGES = {
51 SECRET_TYPE_SALT : {'header': 'Enter "Salt"'},
52 SECRET_TYPE_SHARE : {'header': 'Enter "Share"'},
53 SECRET_TYPE_BRAINKEY: {'header': 'Enter "Brainkey"'},
54}
57MaybeCommand = typ.Optional[str]
60def _parse_command(in_val: str) -> MaybeCommand:
61 in_val = in_val.strip().lower()
62 if in_val in ('a', 'accept'):
63 return 'accept'
64 elif in_val in ('d', 'delete'):
65 return 'delete'
66 elif in_val in ('c', 'cancel', 'e', 'exit'):
67 return 'cancel'
68 elif in_val in ('p', 'prev'):
69 return 'prev'
70 elif in_val in ('n', 'next'):
71 return 'next'
72 else:
73 return None
76# Decoded inputs
77Inputs = typ.List[ui_common.MaybeIntCode]
78DataLen = int
79MaybeInputs = typ.Optional[Inputs]
81# Markers for which inputs were entered/accepted by user
82Accepted = typ.List[bool]
85def _data_len(secret_type: str) -> DataLen:
86 if secret_type == SECRET_TYPE_SALT:
87 return params.SALT_LEN
88 elif secret_type == SECRET_TYPE_SHARE:
89 return params.SHARE_LEN
90 elif secret_type == SECRET_TYPE_BRAINKEY:
91 return params.BRAINKEY_LEN
92 else:
93 errmsg = f"PromtState.data_len not implemented for secret_type={secret_type}"
94 raise NotImplementedError(errmsg)
97def _init_blank_inputs(secret_type: str) -> Inputs:
98 # round up if there are an uneven number of inputs (e.g. for shares)
99 data_len = _data_len(secret_type)
100 num_inputs: int = ((data_len + 1) // 2) * 2
101 assert num_inputs > 0
102 assert num_inputs % 2 == 0
104 return [None] * num_inputs
107def _newline_mod(num_lines: int) -> int:
108 if num_lines in (12, 16):
109 newline_mod = 4
110 elif num_lines in (6, 8, 10, 14):
111 newline_mod = (num_lines + 1) // 2
112 else:
113 newline_mod = 99
114 return newline_mod
117def _line_marker(idx: int) -> str:
118 return f"{idx + 1:02}"
121class PromptState:
123 secret_type: InputType
124 inputs : Inputs
125 accepted : Accepted
126 cursor : int
128 def __init__(
129 self,
130 secret_type: InputType,
131 inputs : Inputs,
132 cursor : int = 0,
133 accepted : typ.Optional[Accepted] = None,
134 ) -> None:
135 assert len(inputs) % 2 == 0
137 self.secret_type = secret_type
139 if accepted is None:
140 _accepted = [False] * len(inputs)
141 else:
142 _accepted = accepted
144 self.inputs = inputs
145 self.cursor = max(0, min(len(self.inputs) - 1, cursor))
146 assert len(_accepted) == len(self.inputs)
147 self.accepted = _accepted
149 @property
150 def is_cursor_at_ecc(self) -> bool:
151 return self.cursor >= len(self.inputs) // 2
153 @property
154 def is_completable(self) -> bool:
155 return all(self.inputs)
157 def is_complete(self) -> bool:
158 return all(self.inputs) and all(self.accepted)
160 def result(self) -> bytes:
161 if self.is_complete():
162 msg_len = _data_len(self.secret_type)
163 return ui_common.maybe_intcodes2bytes(self.inputs, msg_len=msg_len)
164 else:
165 raise RuntimeError("Invalid State")
167 def message(self, key: str) -> str:
168 if key == 'prompt':
169 cursor_marker = _line_marker(idx=self.cursor)
170 if self.is_completable:
171 if self.is_cursor_at_ecc:
172 return f"Enter code at {cursor_marker} (or Enter to Accept)"
173 else:
174 return f"Enter code/words at {cursor_marker}"
175 else:
176 if self.is_cursor_at_ecc:
177 return f"Enter code at {cursor_marker}"
178 else:
179 return f"Enter code/words at {cursor_marker}"
181 return MESSAGES[self.secret_type][key]
183 def _formatted_lines(self) -> typ.List[str]:
184 num_lines = len(self.inputs) // 2
185 lines = [""] * num_lines
187 # data intcodes
188 for line_index, maybe_intcode in enumerate(self.inputs[:num_lines]):
189 if maybe_intcode is None:
190 intcode = "___-___"
191 else:
192 intcode = maybe_intcode
194 marker = _line_marker(line_index)
195 lines[line_index] += marker + ": " + intcode
197 for line_index, maybe_intcode in enumerate(self.inputs[:num_lines]):
198 if maybe_intcode is None:
199 dummy_word = "_" * 9
200 words = dummy_word + " " + dummy_word
201 else:
202 parts = ui_common.intcodes2parts([maybe_intcode], idx_offset=line_index)
203 words = mnemonic.bytes2phrase(b"".join(parts))
205 lines[line_index] += " " + words + " "
207 # ecc intcodes
208 for line_index, maybe_intcode in enumerate(self.inputs[num_lines:]):
209 idx_offset = num_lines + line_index
210 if maybe_intcode is None:
211 intcode = "___-___"
212 else:
213 intcode = maybe_intcode
215 marker = _line_marker(idx_offset)
216 lines[line_index] += marker + ": " + intcode + " "
218 return lines
220 def _iter_out_lines(self, show_cursor: bool) -> typ.Iterator[str]:
221 lines = self._formatted_lines()
222 newline_mod = _newline_mod(len(lines))
224 for line_index, line in enumerate(lines):
225 if line_index > 0 and line_index % newline_mod == 0:
226 yield ""
228 prefix = " "
229 suffix = ""
231 if show_cursor:
232 if line_index == self.cursor:
233 prefix = "=> "
234 elif line_index == (self.cursor % len(lines)):
235 suffix = "<="
237 yield prefix + line + suffix
239 def formatted_input_lines(self, show_cursor: bool = True) -> typ.List[str]:
240 header = f" {'Data':^7} {'Mnemonic':^18} {'ECC':^7}"
241 return [header] + list(self._iter_out_lines(show_cursor))
243 def _copy(self, **overrides) -> 'PromptState':
244 return PromptState(
245 secret_type=overrides.get('secret_type', self.secret_type),
246 cursor=overrides.get('cursor', self.cursor),
247 inputs=overrides.get('inputs', self.inputs),
248 accepted=overrides.get('accepted', self.accepted),
249 )
251 def _eval_cmd(self, cmd: str) -> 'PromptState':
252 if cmd == 'accept':
253 return self._copy(accepted=[True] * len(self.inputs))
254 elif cmd == 'delete':
255 new_inputs = list(self.inputs)
256 new_accepted = list(self.accepted)
257 new_inputs[self.cursor] = None
258 new_accepted[self.cursor] = False
259 return self._copy(cursor=self.cursor + 1, inputs=new_inputs, accepted=new_accepted)
260 elif cmd == 'next':
261 return self._copy(cursor=self.cursor + 1)
262 elif cmd == 'prev':
263 return self._copy(cursor=self.cursor - 1)
264 elif cmd == 'cancel':
265 raise click.Abort()
266 else:
267 raise Exception(f"Invalid command {cmd}")
269 def parse_input(self, in_val: str) -> typ.Optional['PromptState']:
270 in_val, _ = re.subn(r"[^\w\s]", "", in_val.lower().strip())
271 cmd: MaybeCommand = None
273 try:
274 if re.match(r"^[\d\s]+$", in_val):
275 parts = list(re.findall(r"\d{6}", in_val))
276 in_data = b"".join(ui_common.intcodes2parts(parts, idx_offset=self.cursor))
277 else:
278 if len(in_val.strip()) == 0 and self.is_completable and self.is_cursor_at_ecc:
279 cmd = 'accept'
280 else:
281 cmd = _parse_command(in_val)
283 if cmd is None:
284 in_data = mnemonic.phrase2bytes(in_val)
285 else:
286 return self._eval_cmd(cmd)
287 except ValueError as err:
288 _echo()
289 errmsg = getattr(err, 'args', [str(err)])[0]
290 _echo(f" Error - {errmsg}")
291 return None
293 if len(in_data) < 2:
294 _echo("Invalid data length")
295 return None
297 if len(in_data) % 2 != 0:
298 in_data = in_data[:-1]
300 new_inputs, new_accepted = self._updated_input_data(in_data)
301 new_cursor = self.cursor + (len(in_data) // 2)
302 assert isinstance(new_inputs, list)
303 assert all(elem is None or isinstance(elem, str) for elem in new_inputs)
305 return self._copy(cursor=new_cursor, inputs=new_inputs, accepted=new_accepted)
307 def _updated_input_data(self, in_data: bytes) -> typ.Tuple[Inputs, Accepted]:
308 new_accepted = list(self.accepted)
309 new_inputs = [
310 (input_value if accepted else None) for input_value, accepted in zip(self.inputs, self.accepted)
311 ]
312 pairs = [in_data[i : i + 2] for i in range(0, len(in_data), 2)]
313 for i, pair in enumerate(pairs):
314 if self.cursor + i >= len(self.inputs):
315 _echo("Warning, too many inputs.")
316 break
318 in_intcode = ui_common.bytes2incode_part(pair, self.cursor + i)
319 new_inputs[self.cursor + i] = in_intcode
320 new_accepted[self.cursor + i] = True
322 input_data_len = sum(2 for maybe_intcode in new_inputs if maybe_intcode)
323 msg_len = _data_len(self.secret_type)
324 is_recoverable = input_data_len >= msg_len
326 if is_recoverable:
327 try:
328 recovered_data = ui_common.maybe_intcodes2bytes(new_inputs, msg_len=msg_len)
329 recovered_intcodes = ui_common.bytes2intcodes(recovered_data)
331 new_inputs = [
332 (new_input if accepted else recovered)
333 for accepted, new_input, recovered in zip(new_accepted, new_inputs, recovered_intcodes)
334 ]
335 except ecc_rs.ECCDecodeError as err:
336 _echo(f"Recovery failed, possibly invalid inputs. {err}")
338 return (new_inputs, new_accepted)
341def format_secret_lines(secret_type: str, data: bytes) -> typ.Sequence[str]:
342 intcodes = list(ui_common.bytes2intcodes(data))
343 inputs = typ.cast(Inputs, intcodes)
344 prompt_state = PromptState(secret_type, inputs)
345 return prompt_state.formatted_input_lines(show_cursor=False)
348def format_secret(secret_type: str, data: bytes) -> str:
349 return "\n".join(format_secret_lines(secret_type, data))
352def prompt(secret_type: str, header_text: typ.Optional[str] = None) -> bytes:
353 blank_inputs = _init_blank_inputs(secret_type)
354 current_ps = PromptState(secret_type, blank_inputs)
356 if header_text is None:
357 _header_text = current_ps.message('header')
358 else:
359 _header_text = header_text
361 while True:
362 _clear()
363 _echo(f"{_header_text:^50}")
364 _echo()
365 _echo("\n".join(current_ps.formatted_input_lines()))
366 _echo()
367 _echo("Available commands:")
368 _echo()
369 _echo(" C/Cancel: Cancel recovery")
370 _echo(" P/Prev : Move to previous code/words")
371 _echo(" N/Next : Move to next code/words")
373 if current_ps.inputs[current_ps.cursor]:
374 _echo(" D/Delete: Delete current input")
375 if current_ps.is_completable:
376 _echo()
377 _echo(" A/Accept: Accept input and continue")
379 new_ps: typ.Optional[PromptState] = None
380 while new_ps is None:
381 _echo()
382 in_val = _prompt(current_ps.message('prompt'), default="")
383 new_ps = current_ps.parse_input(in_val)
385 if new_ps.is_complete():
386 return new_ps.result()
388 current_ps = new_ps
391def _debug_test() -> None:
392 data = prompt(SECRET_TYPE_SHARE)
393 print("<<<<", enc_util.bytes_repr(data))
395 data = prompt(SECRET_TYPE_SALT)
396 print("<<<<", enc_util.bytes_repr(data))
397 _prompt("...", default="")
399 data = prompt(SECRET_TYPE_BRAINKEY)
400 print("<<<<", enc_util.bytes_repr(data))
401 _prompt("...", default="")
404if __name__ == '__main__':
405 _debug_test()