Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# This file is part of the SBK project 

2# https://github.com/mbarkhau/sbk 

3# 

4# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License 

5# SPDX-License-Identifier: MIT 

6 

7"""CLI input/output reading/printing functions.""" 

8 

9import re 

10import typing as typ 

11 

12import click 

13 

14from . import ecc_rs 

15from . import params 

16from . import enc_util 

17from . import mnemonic 

18from . import ui_common 

19 

20 

21def _echo(msg: str = "") -> bool: 

22 """Write message to stdout. 

23 

24 The boolean return value is only to pacify mypy. To supress output when using the -y --yes-all, 

25 the following idiom is often used: `yes_all or _echo(message)` 

26 """ 

27 click.echo(msg) 

28 return True 

29 

30 

31def _clear() -> bool: 

32 click.clear() 

33 return True 

34 

35 

36def _prompt(text: str, default: typ.Optional[str] = None) -> str: 

37 result = click.prompt(text, default=default, show_default=False) 

38 assert isinstance(result, str) 

39 return result 

40 

41 

42InputType = str 

43 

44 

45SECRET_TYPE_SALT = 'salt' 

46SECRET_TYPE_SHARE = 'share' 

47SECRET_TYPE_BRAINKEY = 'brainkey' 

48 

49 

50MESSAGES = { 

51 SECRET_TYPE_SALT : {'header': 'Enter "Salt"'}, 

52 SECRET_TYPE_SHARE : {'header': 'Enter "Share"'}, 

53 SECRET_TYPE_BRAINKEY: {'header': 'Enter "Brainkey"'}, 

54} 

55 

56 

57MaybeCommand = typ.Optional[str] 

58 

59 

60def _parse_command(in_val: str) -> MaybeCommand: 

61 in_val = in_val.strip().lower() 

62 if in_val in ('a', 'accept'): 

63 return 'accept' 

64 elif in_val in ('d', 'delete'): 

65 return 'delete' 

66 elif in_val in ('c', 'cancel', 'e', 'exit'): 

67 return 'cancel' 

68 elif in_val in ('p', 'prev'): 

69 return 'prev' 

70 elif in_val in ('n', 'next'): 

71 return 'next' 

72 else: 

73 return None 

74 

75 

76# Decoded inputs 

77Inputs = typ.List[ui_common.MaybeIntCode] 

78DataLen = int 

79MaybeInputs = typ.Optional[Inputs] 

80 

81# Markers for which inputs were entered/accepted by user 

82Accepted = typ.List[bool] 

83 

84 

85def _data_len(secret_type: str) -> DataLen: 

86 if secret_type == SECRET_TYPE_SALT: 

87 return params.SALT_LEN 

88 elif secret_type == SECRET_TYPE_SHARE: 

89 return params.SHARE_LEN 

90 elif secret_type == SECRET_TYPE_BRAINKEY: 

91 return params.BRAINKEY_LEN 

92 else: 

93 errmsg = f"PromtState.data_len not implemented for secret_type={secret_type}" 

94 raise NotImplementedError(errmsg) 

95 

96 

97def _init_blank_inputs(secret_type: str) -> Inputs: 

98 # round up if there are an uneven number of inputs (e.g. for shares) 

99 data_len = _data_len(secret_type) 

100 num_inputs: int = ((data_len + 1) // 2) * 2 

101 assert num_inputs > 0 

102 assert num_inputs % 2 == 0 

103 

104 return [None] * num_inputs 

105 

106 

107def _newline_mod(num_lines: int) -> int: 

108 if num_lines in (12, 16): 

109 newline_mod = 4 

110 elif num_lines in (6, 8, 10, 14): 

111 newline_mod = (num_lines + 1) // 2 

112 else: 

113 newline_mod = 99 

114 return newline_mod 

115 

116 

117def _line_marker(idx: int) -> str: 

118 return f"{idx + 1:02}" 

119 

120 

121class PromptState: 

122 

123 secret_type: InputType 

124 inputs : Inputs 

125 accepted : Accepted 

126 cursor : int 

127 

128 def __init__( 

129 self, 

130 secret_type: InputType, 

131 inputs : Inputs, 

132 cursor : int = 0, 

133 accepted : typ.Optional[Accepted] = None, 

134 ) -> None: 

135 assert len(inputs) % 2 == 0 

136 

137 self.secret_type = secret_type 

138 

139 if accepted is None: 

140 _accepted = [False] * len(inputs) 

141 else: 

142 _accepted = accepted 

143 

144 self.inputs = inputs 

145 self.cursor = max(0, min(len(self.inputs) - 1, cursor)) 

146 assert len(_accepted) == len(self.inputs) 

147 self.accepted = _accepted 

148 

149 @property 

150 def is_cursor_at_ecc(self) -> bool: 

151 return self.cursor >= len(self.inputs) // 2 

152 

153 @property 

154 def is_completable(self) -> bool: 

155 return all(self.inputs) 

156 

157 def is_complete(self) -> bool: 

158 return all(self.inputs) and all(self.accepted) 

159 

160 def result(self) -> bytes: 

161 if self.is_complete(): 

162 msg_len = _data_len(self.secret_type) 

163 return ui_common.maybe_intcodes2bytes(self.inputs, msg_len=msg_len) 

164 else: 

165 raise RuntimeError("Invalid State") 

166 

167 def message(self, key: str) -> str: 

168 if key == 'prompt': 

169 cursor_marker = _line_marker(idx=self.cursor) 

170 if self.is_completable: 

171 if self.is_cursor_at_ecc: 

172 return f"Enter code at {cursor_marker} (or Enter to Accept)" 

173 else: 

174 return f"Enter code/words at {cursor_marker}" 

175 else: 

176 if self.is_cursor_at_ecc: 

177 return f"Enter code at {cursor_marker}" 

178 else: 

179 return f"Enter code/words at {cursor_marker}" 

180 

181 return MESSAGES[self.secret_type][key] 

182 

183 def _formatted_lines(self) -> typ.List[str]: 

184 num_lines = len(self.inputs) // 2 

185 lines = [""] * num_lines 

186 

187 # data intcodes 

188 for line_index, maybe_intcode in enumerate(self.inputs[:num_lines]): 

189 if maybe_intcode is None: 

190 intcode = "___-___" 

191 else: 

192 intcode = maybe_intcode 

193 

194 marker = _line_marker(line_index) 

195 lines[line_index] += marker + ": " + intcode 

196 

197 for line_index, maybe_intcode in enumerate(self.inputs[:num_lines]): 

198 if maybe_intcode is None: 

199 dummy_word = "_" * 9 

200 words = dummy_word + " " + dummy_word 

201 else: 

202 parts = ui_common.intcodes2parts([maybe_intcode], idx_offset=line_index) 

203 words = mnemonic.bytes2phrase(b"".join(parts)) 

204 

205 lines[line_index] += " " + words + " " 

206 

207 # ecc intcodes 

208 for line_index, maybe_intcode in enumerate(self.inputs[num_lines:]): 

209 idx_offset = num_lines + line_index 

210 if maybe_intcode is None: 

211 intcode = "___-___" 

212 else: 

213 intcode = maybe_intcode 

214 

215 marker = _line_marker(idx_offset) 

216 lines[line_index] += marker + ": " + intcode + " " 

217 

218 return lines 

219 

220 def _iter_out_lines(self, show_cursor: bool) -> typ.Iterator[str]: 

221 lines = self._formatted_lines() 

222 newline_mod = _newline_mod(len(lines)) 

223 

224 for line_index, line in enumerate(lines): 

225 if line_index > 0 and line_index % newline_mod == 0: 

226 yield "" 

227 

228 prefix = " " 

229 suffix = "" 

230 

231 if show_cursor: 

232 if line_index == self.cursor: 

233 prefix = "=> " 

234 elif line_index == (self.cursor % len(lines)): 

235 suffix = "<=" 

236 

237 yield prefix + line + suffix 

238 

239 def formatted_input_lines(self, show_cursor: bool = True) -> typ.List[str]: 

240 header = f" {'Data':^7} {'Mnemonic':^18} {'ECC':^7}" 

241 return [header] + list(self._iter_out_lines(show_cursor)) 

242 

243 def _copy(self, **overrides) -> 'PromptState': 

244 return PromptState( 

245 secret_type=overrides.get('secret_type', self.secret_type), 

246 cursor=overrides.get('cursor', self.cursor), 

247 inputs=overrides.get('inputs', self.inputs), 

248 accepted=overrides.get('accepted', self.accepted), 

249 ) 

250 

251 def _eval_cmd(self, cmd: str) -> 'PromptState': 

252 if cmd == 'accept': 

253 return self._copy(accepted=[True] * len(self.inputs)) 

254 elif cmd == 'delete': 

255 new_inputs = list(self.inputs) 

256 new_accepted = list(self.accepted) 

257 new_inputs[self.cursor] = None 

258 new_accepted[self.cursor] = False 

259 return self._copy(cursor=self.cursor + 1, inputs=new_inputs, accepted=new_accepted) 

260 elif cmd == 'next': 

261 return self._copy(cursor=self.cursor + 1) 

262 elif cmd == 'prev': 

263 return self._copy(cursor=self.cursor - 1) 

264 elif cmd == 'cancel': 

265 raise click.Abort() 

266 else: 

267 raise Exception(f"Invalid command {cmd}") 

268 

269 def parse_input(self, in_val: str) -> typ.Optional['PromptState']: 

270 in_val, _ = re.subn(r"[^\w\s]", "", in_val.lower().strip()) 

271 cmd: MaybeCommand = None 

272 

273 try: 

274 if re.match(r"^[\d\s]+$", in_val): 

275 parts = list(re.findall(r"\d{6}", in_val)) 

276 in_data = b"".join(ui_common.intcodes2parts(parts, idx_offset=self.cursor)) 

277 else: 

278 if len(in_val.strip()) == 0 and self.is_completable and self.is_cursor_at_ecc: 

279 cmd = 'accept' 

280 else: 

281 cmd = _parse_command(in_val) 

282 

283 if cmd is None: 

284 in_data = mnemonic.phrase2bytes(in_val) 

285 else: 

286 return self._eval_cmd(cmd) 

287 except ValueError as err: 

288 _echo() 

289 errmsg = getattr(err, 'args', [str(err)])[0] 

290 _echo(f" Error - {errmsg}") 

291 return None 

292 

293 if len(in_data) < 2: 

294 _echo("Invalid data length") 

295 return None 

296 

297 if len(in_data) % 2 != 0: 

298 in_data = in_data[:-1] 

299 

300 new_inputs, new_accepted = self._updated_input_data(in_data) 

301 new_cursor = self.cursor + (len(in_data) // 2) 

302 assert isinstance(new_inputs, list) 

303 assert all(elem is None or isinstance(elem, str) for elem in new_inputs) 

304 

305 return self._copy(cursor=new_cursor, inputs=new_inputs, accepted=new_accepted) 

306 

307 def _updated_input_data(self, in_data: bytes) -> typ.Tuple[Inputs, Accepted]: 

308 new_accepted = list(self.accepted) 

309 new_inputs = [ 

310 (input_value if accepted else None) for input_value, accepted in zip(self.inputs, self.accepted) 

311 ] 

312 pairs = [in_data[i : i + 2] for i in range(0, len(in_data), 2)] 

313 for i, pair in enumerate(pairs): 

314 if self.cursor + i >= len(self.inputs): 

315 _echo("Warning, too many inputs.") 

316 break 

317 

318 in_intcode = ui_common.bytes2incode_part(pair, self.cursor + i) 

319 new_inputs[self.cursor + i] = in_intcode 

320 new_accepted[self.cursor + i] = True 

321 

322 input_data_len = sum(2 for maybe_intcode in new_inputs if maybe_intcode) 

323 msg_len = _data_len(self.secret_type) 

324 is_recoverable = input_data_len >= msg_len 

325 

326 if is_recoverable: 

327 try: 

328 recovered_data = ui_common.maybe_intcodes2bytes(new_inputs, msg_len=msg_len) 

329 recovered_intcodes = ui_common.bytes2intcodes(recovered_data) 

330 

331 new_inputs = [ 

332 (new_input if accepted else recovered) 

333 for accepted, new_input, recovered in zip(new_accepted, new_inputs, recovered_intcodes) 

334 ] 

335 except ecc_rs.ECCDecodeError as err: 

336 _echo(f"Recovery failed, possibly invalid inputs. {err}") 

337 

338 return (new_inputs, new_accepted) 

339 

340 

341def format_secret_lines(secret_type: str, data: bytes) -> typ.Sequence[str]: 

342 intcodes = list(ui_common.bytes2intcodes(data)) 

343 inputs = typ.cast(Inputs, intcodes) 

344 prompt_state = PromptState(secret_type, inputs) 

345 return prompt_state.formatted_input_lines(show_cursor=False) 

346 

347 

348def format_secret(secret_type: str, data: bytes) -> str: 

349 return "\n".join(format_secret_lines(secret_type, data)) 

350 

351 

352def prompt(secret_type: str, header_text: typ.Optional[str] = None) -> bytes: 

353 blank_inputs = _init_blank_inputs(secret_type) 

354 current_ps = PromptState(secret_type, blank_inputs) 

355 

356 if header_text is None: 

357 _header_text = current_ps.message('header') 

358 else: 

359 _header_text = header_text 

360 

361 while True: 

362 _clear() 

363 _echo(f"{_header_text:^50}") 

364 _echo() 

365 _echo("\n".join(current_ps.formatted_input_lines())) 

366 _echo() 

367 _echo("Available commands:") 

368 _echo() 

369 _echo(" C/Cancel: Cancel recovery") 

370 _echo(" P/Prev : Move to previous code/words") 

371 _echo(" N/Next : Move to next code/words") 

372 

373 if current_ps.inputs[current_ps.cursor]: 

374 _echo(" D/Delete: Delete current input") 

375 if current_ps.is_completable: 

376 _echo() 

377 _echo(" A/Accept: Accept input and continue") 

378 

379 new_ps: typ.Optional[PromptState] = None 

380 while new_ps is None: 

381 _echo() 

382 in_val = _prompt(current_ps.message('prompt'), default="") 

383 new_ps = current_ps.parse_input(in_val) 

384 

385 if new_ps.is_complete(): 

386 return new_ps.result() 

387 

388 current_ps = new_ps 

389 

390 

391def _debug_test() -> None: 

392 data = prompt(SECRET_TYPE_SHARE) 

393 print("<<<<", enc_util.bytes_repr(data)) 

394 

395 data = prompt(SECRET_TYPE_SALT) 

396 print("<<<<", enc_util.bytes_repr(data)) 

397 _prompt("...", default="") 

398 

399 data = prompt(SECRET_TYPE_BRAINKEY) 

400 print("<<<<", enc_util.bytes_repr(data)) 

401 _prompt("...", default="") 

402 

403 

404if __name__ == '__main__': 

405 _debug_test()