sbk.sys_info

src/sbk/sys_info.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# This file is part of the sbk project
# https://github.com/mbarkhau/sbk
#
# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License
# SPDX-License-Identifier: MIT
"""Evaluate memory available on system (for kdf parameters)."""

# Some notes on parameter choices.
# https://tools.ietf.org/html/draft-irtf-cfrg-argon2-04#section-4
#
# parallelism: RFC reccomends 2x the number of cores.
#
# time_cost: As the time constraint is not such an issue for the
# intended use cases of SBK, you should be able to dedicate a few
# minutes of computation time to derive a secure key from relativly
# low amount of secret entropy (the brainkey).
#
# hash_type: Theoretically you should only use SBK on a trusted system
# in a trusted environment, so side channel attacks shouldn't be an
# issue and the benefits of using the argon2id are questionable.
# But the argument is similar to with time_cost, even if the extra time
# spent is pointless, it's not too much of a loss.
#
# memory_cost: The main constraint here is that later reconstruction
# of the secret will require a machine with at least as much memory as
# the one used during the initial derivation. Otherwise it should be
# chosen as large as possible.
import os
import re
import json
import time
import typing as typ
import logging
import pathlib as pl
import subprocess as sp

from . import kdf

logger = logging.getLogger("sbk.sys_info")


Seconds = float

DEFAULT_KDF_THREADS_RATIO = 2
DEFAULT_KDF_MEM_RATIO     = int(os.getenv('SBK_MEM_PERCENT', "90")) / 100

# Fallback value for systems on which total memory cannot be detected
FALLBACK_MEM_TOTAL_MB = int(os.getenv("SBK_FALLBACK_MEM_TOTAL_MB", "1024"))

DEFAULT_XDG_CONFIG_HOME = str(pl.Path("~").expanduser() / ".config")
XDG_CONFIG_HOME         = pl.Path(os.environ.get('XDG_CONFIG_HOME', DEFAULT_XDG_CONFIG_HOME))

SBK_APP_DIR_STR     = os.getenv('SBK_APP_DIR')
SBK_APP_DIR         = pl.Path(SBK_APP_DIR_STR) if SBK_APP_DIR_STR else XDG_CONFIG_HOME / "sbk"
SYSINFO_CACHE_FNAME = "sys_info_measurements.json"
SYSINFO_CACHE_FPATH = SBK_APP_DIR / SYSINFO_CACHE_FNAME


def mem_total() -> kdf.MebiBytes:
    """Get total memory."""

    # Linux
    meminfo_path = pl.Path("/proc/meminfo")
    if meminfo_path.exists():
        try:
            with meminfo_path.open(mode="rb") as fobj:
                data = fobj.read()
            for line in data.splitlines():
                key, num, unit = line.decode("ascii").strip().split()
                if key == "MemTotal:":
                    assert unit == "kB"
                    return int(num) // 1024
        except Exception:
            logger.error("Error while evaluating system memory", exc_info=True)

    return FALLBACK_MEM_TOTAL_MB


class Measurement(typ.NamedTuple):

    p: kdf.NumThreads
    m: kdf.MebiBytes
    t: kdf.Iterations

    duration: Seconds


def _measure(kdf_params: kdf.KDFParams) -> Measurement:
    tzero = time.time()
    kdf.digest(b"saltsaltsaltsaltbrainkey", kdf_params, hash_len=16)
    duration = round(time.time() - tzero, 5)

    logger.debug(f"kdf parameter calibration {kdf_params} -> {round(duration * 1000)}ms")

    p, m, t = kdf_params
    return Measurement(p=p, m=m, t=t, duration=duration)


class SystemInfo(typ.NamedTuple):

    num_cores: int
    total_mb : kdf.MebiBytes
    initial_p: kdf.NumThreads
    initial_m: kdf.MebiBytes


_SYS_INFO: typ.Optional[SystemInfo] = None


def dump_sys_info(sys_info: SystemInfo) -> None:
    global _SYS_INFO
    _SYS_INFO = sys_info

    cache_path = SYSINFO_CACHE_FPATH
    try:
        cache_path.parent.mkdir(exist_ok=True, parents=True)
    except Exception as ex:
        logger.warning(f"Unable to create cache dir {cache_path.parent}: {ex}")
        return

    sys_info_data = {
        'num_cores': sys_info.num_cores,
        'total_mb' : sys_info.total_mb,
        'initial_p': sys_info.initial_p,
        'initial_m': sys_info.initial_m,
    }

    try:
        with cache_path.open(mode="w", encoding="utf-8") as fobj:
            json.dump(sys_info_data, fobj, indent=4)
    except Exception as ex:
        logger.warning(f"Error writing cache file {cache_path}: {ex}")
        return


def _load_cached_sys_info() -> SystemInfo:
    cache_path = SYSINFO_CACHE_FPATH
    try:
        with cache_path.open(mode="rb") as fobj:
            sys_info_data = json.load(fobj)
        nfo = SystemInfo(**sys_info_data)
    except Exception as ex:
        logger.warning(f"Error reading cache file {cache_path}: {ex}")
        nfo = init_sys_info()

    return nfo


def load_sys_info(use_cache: bool = True) -> SystemInfo:
    global _SYS_INFO
    if _SYS_INFO:
        return _SYS_INFO

    if use_cache and SYSINFO_CACHE_FPATH.exists():
        nfo = _load_cached_sys_info()
    else:
        nfo = init_sys_info()

    _SYS_INFO = nfo
    return nfo


def num_cores() -> int:
    if hasattr(os, 'sched_getaffinity'):
        # pylint: disable=no-member    ; macos doesn't have this
        return len(os.sched_getaffinity(0))
    else:
        return os.cpu_count() or 1


def init_sys_info() -> SystemInfo:
    import argon2

    total_mb = mem_total()

    initial_p = int(num_cores() * DEFAULT_KDF_THREADS_RATIO)
    initial_m = int(total_mb * DEFAULT_KDF_MEM_RATIO) // initial_p

    while True:
        try:
            kdf_params = kdf.init_kdf_params(p=initial_p, m=initial_m, t=1)
            initial_p  = kdf_params.p
            initial_m  = kdf_params.m
            logger.debug(f"testing initial_p={initial_p}, initial_m={initial_m}")
            _measure(kdf_params)
            logger.debug(f"using initial_p={initial_p}, initial_m={initial_m}")
            break  # success
        except argon2.exceptions.HashingError as err:
            if "Memory allocation error" not in str(err):
                raise
            initial_m = (2 * initial_m) // 3

    nfo = SystemInfo(num_cores(), total_mb, initial_p, initial_m)
    dump_sys_info(nfo)
    return nfo


# NOTE (mb 2021-06-06):
# SBK tries to be as non-region specific as possible.
#   no en_US, en_GB, en_AU etc.etc. just en
#
# I'm also not sure we'll ever support non-phonetic systems,
# especiall for the wordlists.

# initially
SUPPORTED_LANGUAGES = {'en', 'de'}

# next (PR welcome)
# SUPPORTED_LANGUAGES |= {'es', 'pt', 'ru', 'fr', de', 'it', 'tr'}

# eventually/maybe (non-phonetic systems may be a design issue for wordlists)
# SUPPORTED_LANGUAGES |= {'ar', 'ko', 'cn', 'jp'}

LAYOUT_TO_LANG = {'us': 'en', 'de': 'de'}

LangCode = str


def detect_lang() -> LangCode:
    try:
        output_data = sp.check_output("localectl")
        output_text = output_data.decode("utf-8")

        # We only parse the first portion on purpose.
        lang_match = re.search(r"LANG=([a-z]+)", output_text)
        if lang_match is None:
            lang = "default"
        else:
            lang = lang_match.group(1)

        if lang != 'default' and lang in SUPPORTED_LANGUAGES:
            return lang

        keyboard_match = re.search(r"X11 Layout: ([a-z]+)", output_text)
        if keyboard_match is None:
            layout = "default"
        else:
            layout = keyboard_match.group(1)

        layout_lang = LAYOUT_TO_LANG.get(layout, layout)

        if layout_lang != 'default' and layout_lang in SUPPORTED_LANGUAGES:
            return layout_lang

    except Exception:
        pass

    return "en"


if __name__ == '__main__':
    print("lang:", detect_lang())