Coverage for src/markdown_katex/wrapper.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

199 statements  

1# This file is part of the markdown-katex project 

2# https://github.com/mbarkhau/markdown-katex 

3# 

4# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License 

5# SPDX-License-Identifier: MIT 

6 

7# NOTE (mb 2019-05-16): This module is substantially shared with the 

8# markdown-svgbob package and meaningful changes should be 

9# replicated there also. 

10 

11import os 

12import re 

13import time 

14import signal 

15import typing as typ 

16import hashlib 

17import platform 

18import tempfile 

19import subprocess as sp 

20 

21import pathlib2 as pl 

22 

23SIG_NAME_BY_NUM = { 

24 k: v 

25 for v, k in sorted(signal.__dict__.items(), reverse=True) 

26 if v.startswith('SIG') and not v.startswith('SIG_') 

27} 

28 

29assert SIG_NAME_BY_NUM[15] == 'SIGTERM' 

30 

31 

32TMP_DIR = pl.Path(tempfile.gettempdir()) / "mdkatex" 

33 

34LIBDIR: pl.Path = pl.Path(__file__).parent 

35PKG_BIN_DIR = LIBDIR / "bin" 

36FALLBACK_BIN_DIR = pl.Path("/") / "usr" / "local" / "bin" 

37FALLBACK_BIN_DIR = FALLBACK_BIN_DIR.expanduser() 

38 

39CMD_NAME = "katex" 

40 

41# https://pymotw.com/3/platform/ 

42OSNAME = platform.system() 

43MACHINE = platform.machine() 

44 

45 

46# NOTE (mb 2020-06-19): I have no idea if this is true and have not found a good 

47# way to test it, especially not in any cross platform way. Maybe KaTeX doesn't 

48# care and just uses the same encoding for input as for output. 

49KATEX_INPUT_ENCODING = "UTF-8" 

50KATEX_OUTPUT_ENCODING = "UTF-8" 

51 

52# local cache so we don't have to validate the command every time 

53TMP_LOCAL_CMD_CACHE = TMP_DIR / "local_katex_cmd.txt" 

54 

55 

56def _get_env_paths() -> typ.Iterable[pl.Path]: 

57 env_path = os.environ.get('PATH') 

58 if env_path: 

59 path_strs = env_path.split(os.pathsep) 

60 for path_str in path_strs: 

61 yield pl.Path(path_str) 

62 

63 # search in fallback bin dir regardless of path 

64 if env_path is None or str(FALLBACK_BIN_DIR) not in env_path: 

65 yield FALLBACK_BIN_DIR 

66 

67 

68def _get_local_bin_candidates() -> typ.List[str]: 

69 if OSNAME == 'Windows': 

70 # whackamole 

71 return [ 

72 f"{CMD_NAME}.cmd", 

73 f"{CMD_NAME}.exe", 

74 f"npx.cmd --no-install {CMD_NAME}", 

75 f"npx.exe --no-install {CMD_NAME}", 

76 f"{CMD_NAME}.ps1", 

77 f"npx.ps1 --no-install {CMD_NAME}", 

78 ] 

79 else: 

80 return [CMD_NAME, f"npx --no-install {CMD_NAME}"] 

81 

82 

83def _get_usr_parts() -> typ.Optional[typ.List[str]]: 

84 if TMP_LOCAL_CMD_CACHE.exists(): 

85 with TMP_LOCAL_CMD_CACHE.open(mode="r", encoding="utf-8") as fobj: 

86 local_cmd = typ.cast(str, fobj.read()) 

87 

88 local_cmd_parts = local_cmd.split("\n") 

89 if pl.Path(local_cmd_parts[0]).exists(): 

90 return local_cmd_parts 

91 

92 for path in _get_env_paths(): 

93 for local_cmd in _get_local_bin_candidates(): 

94 local_cmd_parts = local_cmd.split() 

95 bin_name = local_cmd_parts[0] 

96 local_bin = path / bin_name 

97 if not local_bin.is_file(): 

98 continue 

99 local_cmd_parts[0] = str(local_bin) 

100 

101 try: 

102 output_data = sp.check_output(local_cmd_parts + ['--version'], stderr=sp.STDOUT) 

103 output_text = output_data.decode("utf-8") 

104 if re.match(r"\d+\.\d+\.\d+", output_text.strip()) is None: 

105 continue 

106 except sp.CalledProcessError: 

107 continue 

108 except OSError: 

109 continue 

110 

111 TMP_DIR.mkdir(parents=True, exist_ok=True) 

112 with TMP_LOCAL_CMD_CACHE.open(mode="w", encoding="utf-8") as fobj: 

113 fobj.write("\n".join(local_cmd_parts)) 

114 

115 return local_cmd_parts 

116 

117 return None 

118 

119 

120def _get_pkg_bin_path(osname: str = OSNAME, machine: str = MACHINE) -> pl.Path: 

121 if machine == 'AMD64': 

122 machine = 'x86_64' 

123 glob_expr = f"*_{machine}-{osname}*" 

124 bin_files = list(PKG_BIN_DIR.glob(glob_expr)) 

125 if bin_files: 

126 return max(bin_files) 

127 

128 err_msg = ( 

129 "Platform not supported, " 

130 "katex binary not found. " 

131 "Install manually using " 

132 "'npm install katex'." 

133 ) 

134 

135 raise NotImplementedError(err_msg) 

136 

137 

138def get_bin_cmd() -> typ.List[str]: 

139 usr_bin_cmd = _get_usr_parts() 

140 if usr_bin_cmd is None: 

141 # use packaged binary 

142 return [str(_get_pkg_bin_path())] 

143 else: 

144 return usr_bin_cmd 

145 

146 

147def _iter_output_lines(buf: typ.IO[bytes]) -> typ.Iterable[bytes]: 

148 while True: 

149 output = buf.readline() 

150 if output: 

151 yield output 

152 else: 

153 return 

154 

155 

156def read_output(buf: typ.Optional[typ.IO[bytes]]) -> str: 

157 assert buf is not None 

158 return b"".join(_iter_output_lines(buf)).decode("utf-8") 

159 

160 

161ArgValue = typ.Union[str, int, float, bool] 

162Options = typ.Dict[str, ArgValue] 

163 

164 

165class KatexError(Exception): 

166 pass 

167 

168 

169def _iter_cmd_parts(options: Options = None) -> typ.Iterable[str]: 

170 for cmd_part in get_bin_cmd(): 

171 yield cmd_part 

172 

173 if options: 

174 for option_name, option_value in options.items(): 

175 if option_name.startswith("--"): 

176 arg_name = option_name 

177 else: 

178 arg_name = "--" + option_name 

179 

180 if option_value is True: 

181 yield arg_name 

182 elif option_value is False: 

183 continue 

184 else: 

185 arg_value = str(option_value) 

186 yield arg_name 

187 yield arg_value 

188 

189 

190def _cmd_digest(tex: str, cmd_parts: typ.List[str]) -> str: 

191 hasher = hashlib.sha256(tex.encode("utf-8")) 

192 for cmd_part in cmd_parts: 

193 hasher.update(cmd_part.encode("utf-8")) 

194 return hasher.hexdigest() 

195 

196 

197def _write_tex2html(cmd_parts: typ.List[str], tex: str, tmp_output_file: pl.Path) -> None: 

198 # pylint: disable=consider-using-with ; not supported on py27 

199 tmp_input_file = TMP_DIR / tmp_output_file.name.replace(".html", ".tex") 

200 input_data = tex.encode(KATEX_INPUT_ENCODING) 

201 

202 TMP_DIR.mkdir(parents=True, exist_ok=True) 

203 with tmp_input_file.open(mode="wb") as fobj: 

204 fobj.write(input_data) 

205 

206 cmd_parts.extend(["--input", str(tmp_input_file), "--output", str(tmp_output_file)]) 

207 proc = None 

208 try: 

209 proc = sp.Popen(cmd_parts, stdout=sp.PIPE, stderr=sp.PIPE) 

210 ret_code = proc.wait() 

211 if ret_code < 0: 

212 signame = SIG_NAME_BY_NUM[abs(ret_code)] 

213 err_msg = ( 

214 f"Error processing '{tex}': " 

215 + "katex_cli process ended with " 

216 + f"code {ret_code} ({signame})" 

217 ) 

218 raise KatexError(err_msg) 

219 elif ret_code > 0: 

220 stdout = read_output(proc.stdout) 

221 errout = read_output(proc.stderr) 

222 output = (stdout + "\n" + errout).strip() 

223 err_msg = f"Error processing '{tex}': {output}" 

224 raise KatexError(err_msg) 

225 finally: 

226 if proc is not None: 

227 # It might be reasonable that Popen itself raises an 

228 # exception. In such a case, proc would still be None 

229 # and there is nothing to close. 

230 if proc.stdout is not None: 

231 proc.stdout.close() 

232 if proc.stderr is not None: 

233 proc.stderr.close() 

234 tmp_input_file.unlink() 

235 

236 

237def tex2html(tex: str, options: Options = None) -> str: 

238 cmd_parts = list(_iter_cmd_parts(options)) 

239 digest = _cmd_digest(tex, cmd_parts) 

240 tmp_filename = digest + ".html" 

241 tmp_output_file = TMP_DIR / tmp_filename 

242 

243 try: 

244 if tmp_output_file.exists(): 

245 # give cached file a life extension (update mtime) 

246 tmp_output_file.touch() 

247 else: 

248 _write_tex2html(cmd_parts, tex, tmp_output_file) 

249 

250 with tmp_output_file.open(mode="r", encoding=KATEX_OUTPUT_ENCODING) as fobj: 

251 result = typ.cast(str, fobj.read()) 

252 return result.strip() 

253 finally: 

254 _cleanup_tmp_dir() 

255 

256 

257def _cleanup_tmp_dir() -> None: 

258 min_mtime = time.time() - 24 * 60 * 60 

259 for fpath in TMP_DIR.iterdir(): 

260 if fpath.is_file(): 

261 mtime = fpath.stat().st_mtime 

262 if mtime < min_mtime: 

263 fpath.unlink() 

264 

265 

266# NOTE: in order to not have to update the code 

267# of the extension any time an option is added, 

268# we parse the help text of the katex command. 

269 

270 

271DEFAULT_HELP_TEXT = r""" 

272Options: 

273 -V, --version output the version number 

274 -d, --display-mode Render math in display... 

275 --leqno Render display math in... 

276 --fleqn Render display math fl... 

277 -t, --no-throw-on-error Render errors (in the ... 

278 -c, --error-color <color> A color string given i... 

279 -b, --color-is-text-color Makes \color behave li... 

280 -S, --strict Turn on strict / LaTeX... 

281 -s, --max-size <n> If non-zero, all user-... 

282 -e, --max-expand <n> Limit the number of ma... 

283 -m, --macro <def> Define custom macro of... 

284 -f, --macro-file <path> Read macro definitions... 

285 -i, --input <path> Read LaTeX input from ... 

286 -o, --output <path> Write html output to t... 

287 -h, --help output usage information 

288""" 

289 

290DEFAULT_HELP_TEXT = DEFAULT_HELP_TEXT.replace("\n", " ").replace("NL", "\n") 

291 

292 

293def _get_cmd_help_text() -> str: 

294 # pylint: disable=consider-using-with ; not supported on py27 

295 bin_parts = get_bin_cmd() 

296 cmd_parts = bin_parts + ['--help'] 

297 proc = None 

298 try: 

299 proc = sp.Popen(cmd_parts, stdout=sp.PIPE) 

300 help_text = read_output(proc.stdout) 

301 finally: 

302 if proc is not None and proc.stdout is not None: 

303 proc.stdout.close() 

304 return help_text 

305 

306 

307OptionsHelp = typ.Dict[str, str] 

308 

309# https://regex101.com/r/287NYS/4 

310OPTION_PATTERN = r""" 

311 -- 

312 (?P<name>[a-z\-]+) 

313 \s+(?:<[a-z\-]+>)? 

314 \s+ 

315 (?P<text>[^\n]*[ \s\w(){},:;.'\\/\[\] ]*) 

316""" 

317OPTION_REGEX = re.compile(OPTION_PATTERN, flags=re.VERBOSE | re.DOTALL | re.MULTILINE) 

318 

319 

320def _parse_options_help_text(help_text: str) -> OptionsHelp: 

321 options: OptionsHelp = {} 

322 

323 options_text = help_text.split("Options:", 1)[-1] 

324 

325 for match in OPTION_REGEX.finditer(options_text): 

326 name = match.group("name") 

327 text = match.group("text") 

328 text = " ".join(line.strip() for line in text.splitlines()) 

329 options[name] = text.strip() 

330 

331 options.pop("version" , None) 

332 options.pop("help" , None) 

333 options.pop("input" , None) 

334 options.pop("output" , None) 

335 options.pop("display-mode", None) 

336 

337 return options 

338 

339 

340_PARSED_OPTIONS: OptionsHelp = {} 

341 

342 

343def parse_options() -> OptionsHelp: 

344 if _PARSED_OPTIONS: 

345 return _PARSED_OPTIONS 

346 

347 options = _parse_options_help_text(DEFAULT_HELP_TEXT) 

348 try: 

349 help_text = _get_cmd_help_text() 

350 cmd_options = _parse_options_help_text(help_text) 

351 options.update(cmd_options) 

352 except NotImplementedError: 

353 # NOTE: no need to fail just for the options 

354 pass 

355 

356 _PARSED_OPTIONS.update(options) 

357 return options