markdown_katex.wrapper

src/markdown_katex/wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# This file is part of the markdown-katex project
# https://github.com/mbarkhau/markdown-katex
#
# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License
# SPDX-License-Identifier: MIT

# NOTE (mb 2019-05-16): This module is substantially shared with the
#   markdown-svgbob package and meaningful changes should be
#   replicated there also.

import os
import re
import time
import signal
import typing as typ
import hashlib
import platform
import tempfile
import subprocess as sp

import pathlib2 as pl

SIG_NAME_BY_NUM = {
    k: v
    for v, k in sorted(signal.__dict__.items(), reverse=True)
    if v.startswith('SIG') and not v.startswith('SIG_')
}

assert SIG_NAME_BY_NUM[15] == 'SIGTERM'


TMP_DIR = pl.Path(tempfile.gettempdir()) / "mdkatex"

LIBDIR: pl.Path = pl.Path(__file__).parent
PKG_BIN_DIR      = LIBDIR / "bin"
FALLBACK_BIN_DIR = pl.Path("/") / "usr" / "local" / "bin"
FALLBACK_BIN_DIR = FALLBACK_BIN_DIR.expanduser()

CMD_NAME = "katex"

# https://pymotw.com/3/platform/
OSNAME  = platform.system()
MACHINE = platform.machine()


# NOTE (mb 2020-06-19): I have no idea if this is true and have not found a good
#   way to test it, especially not in any cross platform way. Maybe KaTeX doesn't
#   care and just uses the same encoding for input as for output.
KATEX_INPUT_ENCODING  = "UTF-8"
KATEX_OUTPUT_ENCODING = "UTF-8"

# local cache so we don't have to validate the command every time
TMP_LOCAL_CMD_CACHE = TMP_DIR / "local_katex_cmd.txt"


def _get_env_paths() -> typ.Iterable[pl.Path]:
    env_path = os.environ.get('PATH')
    if env_path:
        path_strs = env_path.split(os.pathsep)
        for path_str in path_strs:
            yield pl.Path(path_str)

    # search in fallback bin dir regardless of path
    if env_path is None or str(FALLBACK_BIN_DIR) not in env_path:
        yield FALLBACK_BIN_DIR


def _get_local_bin_candidates() -> typ.List[str]:
    if OSNAME == 'Windows':
        # whackamole
        return [
            f"{CMD_NAME}.cmd",
            f"{CMD_NAME}.exe",
            f"npx.cmd --no-install {CMD_NAME}",
            f"npx.exe --no-install {CMD_NAME}",
            f"{CMD_NAME}.ps1",
            f"npx.ps1 --no-install {CMD_NAME}",
        ]
    else:
        return [CMD_NAME, f"npx --no-install {CMD_NAME}"]


def _get_usr_parts() -> typ.Optional[typ.List[str]]:
    if TMP_LOCAL_CMD_CACHE.exists():
        with TMP_LOCAL_CMD_CACHE.open(mode="r", encoding="utf-8") as fobj:
            local_cmd = typ.cast(str, fobj.read())

        local_cmd_parts = local_cmd.split("\n")
        if pl.Path(local_cmd_parts[0]).exists():
            return local_cmd_parts

    for path in _get_env_paths():
        for local_cmd in _get_local_bin_candidates():
            local_cmd_parts = local_cmd.split()
            bin_name        = local_cmd_parts[0]
            local_bin       = path / bin_name
            if not local_bin.is_file():
                continue
            local_cmd_parts[0] = str(local_bin)

            try:
                output_data = sp.check_output(local_cmd_parts + ['--version'], stderr=sp.STDOUT)
                output_text = output_data.decode("utf-8")
                if re.match(r"\d+\.\d+\.\d+", output_text.strip()) is None:
                    continue
            except sp.CalledProcessError:
                continue
            except OSError:
                continue

            TMP_DIR.mkdir(parents=True, exist_ok=True)
            with TMP_LOCAL_CMD_CACHE.open(mode="w", encoding="utf-8") as fobj:
                fobj.write("\n".join(local_cmd_parts))

            return local_cmd_parts

    return None


def _get_pkg_bin_path(osname: str = OSNAME, machine: str = MACHINE) -> pl.Path:
    if machine == 'AMD64':
        machine = 'x86_64'
    glob_expr = f"*_{machine}-{osname}*"
    bin_files = list(PKG_BIN_DIR.glob(glob_expr))
    if bin_files:
        return max(bin_files)

    err_msg = (
        "Platform not supported, "
        "katex binary not found. "
        "Install manually using "
        "'npm install katex'."
    )

    raise NotImplementedError(err_msg)


def get_bin_cmd() -> typ.List[str]:
    usr_bin_cmd = _get_usr_parts()
    if usr_bin_cmd is None:
        # use packaged binary
        return [str(_get_pkg_bin_path())]
    else:
        return usr_bin_cmd


def _iter_output_lines(buf: typ.IO[bytes]) -> typ.Iterable[bytes]:
    while True:
        output = buf.readline()
        if output:
            yield output
        else:
            return


def read_output(buf: typ.Optional[typ.IO[bytes]]) -> str:
    assert buf is not None
    return b"".join(_iter_output_lines(buf)).decode("utf-8")


ArgValue = typ.Union[str, int, float, bool]
Options  = typ.Dict[str, ArgValue]


class KatexError(Exception):
    pass


def _iter_cmd_parts(options: Options = None) -> typ.Iterable[str]:
    for cmd_part in get_bin_cmd():
        yield cmd_part

    if options:
        for option_name, option_value in options.items():
            if option_name.startswith("--"):
                arg_name = option_name
            else:
                arg_name = "--" + option_name

            if option_value is True:
                yield arg_name
            elif option_value is False:
                continue
            else:
                arg_value = str(option_value)
                yield arg_name
                yield arg_value


def _cmd_digest(tex: str, cmd_parts: typ.List[str]) -> str:
    hasher = hashlib.sha256(tex.encode("utf-8"))
    for cmd_part in cmd_parts:
        hasher.update(cmd_part.encode("utf-8"))
    return hasher.hexdigest()


def _write_tex2html(cmd_parts: typ.List[str], tex: str, tmp_output_file: pl.Path) -> None:
    # pylint: disable=consider-using-with ; not supported on py27
    tmp_input_file = TMP_DIR / tmp_output_file.name.replace(".html", ".tex")
    input_data     = tex.encode(KATEX_INPUT_ENCODING)

    TMP_DIR.mkdir(parents=True, exist_ok=True)
    with tmp_input_file.open(mode="wb") as fobj:
        fobj.write(input_data)

    cmd_parts.extend(["--input", str(tmp_input_file), "--output", str(tmp_output_file)])
    proc = None
    try:
        proc     = sp.Popen(cmd_parts, stdout=sp.PIPE, stderr=sp.PIPE)
        ret_code = proc.wait()
        if ret_code < 0:
            signame = SIG_NAME_BY_NUM[abs(ret_code)]
            err_msg = (
                f"Error processing '{tex}': "
                + "katex_cli process ended with "
                + f"code {ret_code} ({signame})"
            )
            raise KatexError(err_msg)
        elif ret_code > 0:
            stdout  = read_output(proc.stdout)
            errout  = read_output(proc.stderr)
            output  = (stdout + "\n" + errout).strip()
            err_msg = f"Error processing '{tex}': {output}"
            raise KatexError(err_msg)
    finally:
        if proc is not None:
            # It might be reasonable that Popen itself raises an
            # exception. In such a case, proc would still be None
            # and there is nothing to close.
            if proc.stdout is not None:
                proc.stdout.close()
            if proc.stderr is not None:
                proc.stderr.close()
    tmp_input_file.unlink()


def tex2html(tex: str, options: Options = None) -> str:
    cmd_parts       = list(_iter_cmd_parts(options))
    digest          = _cmd_digest(tex, cmd_parts)
    tmp_filename    = digest + ".html"
    tmp_output_file = TMP_DIR / tmp_filename

    try:
        if tmp_output_file.exists():
            # give cached file a life extension (update mtime)
            tmp_output_file.touch()
        else:
            _write_tex2html(cmd_parts, tex, tmp_output_file)

        with tmp_output_file.open(mode="r", encoding=KATEX_OUTPUT_ENCODING) as fobj:
            result = typ.cast(str, fobj.read())
            return result.strip()
    finally:
        _cleanup_tmp_dir()


def _cleanup_tmp_dir() -> None:
    min_mtime = time.time() - 24 * 60 * 60
    for fpath in TMP_DIR.iterdir():
        if fpath.is_file():
            mtime = fpath.stat().st_mtime
            if mtime < min_mtime:
                fpath.unlink()


# NOTE: in order to not have to update the code
#   of the extension any time an option is added,
#   we parse the help text of the katex command.


DEFAULT_HELP_TEXT = r"""
Options:
  -V, --version              output the version number
  -d, --display-mode         Render math in display...
  --leqno                    Render display math in...
  --fleqn                    Render display math fl...
  -t, --no-throw-on-error    Render errors (in the ...
  -c, --error-color <color>  A color string given i...
  -b, --color-is-text-color  Makes \color behave li...
  -S, --strict               Turn on strict / LaTeX...
  -s, --max-size <n>         If non-zero, all user-...
  -e, --max-expand <n>       Limit the number of ma...
  -m, --macro <def>          Define custom macro of...
  -f, --macro-file <path>    Read macro definitions...
  -i, --input <path>         Read LaTeX input from ...
  -o, --output <path>        Write html output to t...
  -h, --help                 output usage information
"""

DEFAULT_HELP_TEXT = DEFAULT_HELP_TEXT.replace("\n", " ").replace("NL", "\n")


def _get_cmd_help_text() -> str:
    # pylint: disable=consider-using-with ; not supported on py27
    bin_parts = get_bin_cmd()
    cmd_parts = bin_parts + ['--help']
    proc      = None
    try:
        proc      = sp.Popen(cmd_parts, stdout=sp.PIPE)
        help_text = read_output(proc.stdout)
    finally:
        if proc is not None and proc.stdout is not None:
            proc.stdout.close()
    return help_text


OptionsHelp = typ.Dict[str, str]

# https://regex101.com/r/287NYS/4
OPTION_PATTERN = r"""
    --
    (?P<name>[a-z\-]+)
    \s+(?:<[a-z\-]+>)?
    \s+
    (?P<text>[^\n]*[ \s\w(){},:;.'\\/\[\] ]*)
"""
OPTION_REGEX = re.compile(OPTION_PATTERN, flags=re.VERBOSE | re.DOTALL | re.MULTILINE)


def _parse_options_help_text(help_text: str) -> OptionsHelp:
    options: OptionsHelp = {}

    options_text = help_text.split("Options:", 1)[-1]

    for match in OPTION_REGEX.finditer(options_text):
        name = match.group("name")
        text = match.group("text")
        text = " ".join(line.strip() for line in text.splitlines())
        options[name] = text.strip()

    options.pop("version"     , None)
    options.pop("help"        , None)
    options.pop("input"       , None)
    options.pop("output"      , None)
    options.pop("display-mode", None)

    return options


_PARSED_OPTIONS: OptionsHelp = {}


def parse_options() -> OptionsHelp:
    if _PARSED_OPTIONS:
        return _PARSED_OPTIONS

    options = _parse_options_help_text(DEFAULT_HELP_TEXT)
    try:
        help_text   = _get_cmd_help_text()
        cmd_options = _parse_options_help_text(help_text)
        options.update(cmd_options)
    except NotImplementedError:
        # NOTE: no need to fail just for the options
        pass

    _PARSED_OPTIONS.update(options)
    return options