Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# This file is part of the sbk project 

2# https://github.com/mbarkhau/sbk 

3# 

4# Copyright (c) 2019-2021 Manuel Barkhau (mbarkhau@gmail.com) - MIT License 

5# SPDX-License-Identifier: MIT 

6"""Evaluate memory available on system (for kdf parameters).""" 

7 

8# Some notes on parameter choices. 

9# https://tools.ietf.org/html/draft-irtf-cfrg-argon2-04#section-4 

10# 

11# parallelism: RFC reccomends 2x the number of cores. 

12# 

13# time_cost: As the time constraint is not such an issue for the 

14# intended use cases of SBK, you should be able to dedicate a few 

15# minutes of computation time to derive a secure key from relativly 

16# low amount of secret entropy (the brainkey). 

17# 

18# hash_type: Theoretically you should only use SBK on a trusted system 

19# in a trusted environment, so side channel attacks shouldn't be an 

20# issue and the benefits of using the argon2id are questionable. 

21# But the argument is similar to with time_cost, even if the extra time 

22# spent is pointless, it's not too much of a loss. 

23# 

24# memory_cost: The main constraint here is that later reconstruction 

25# of the secret will require a machine with at least as much memory as 

26# the one used during the initial derivation. Otherwise it should be 

27# chosen as large as possible. 

28import os 

29import re 

30import json 

31import time 

32import typing as typ 

33import logging 

34import pathlib as pl 

35import subprocess as sp 

36 

37from . import kdf 

38 

39logger = logging.getLogger("sbk.sys_info") 

40 

41 

42Seconds = float 

43 

44DEFAULT_KDF_THREADS_RATIO = 2 

45DEFAULT_KDF_MEM_RATIO = int(os.getenv('SBK_MEM_PERCENT', "90")) / 100 

46 

47# Fallback value for systems on which total memory cannot be detected 

48FALLBACK_MEM_TOTAL_MB = int(os.getenv("SBK_FALLBACK_MEM_TOTAL_MB", "1024")) 

49 

50DEFAULT_XDG_CONFIG_HOME = str(pl.Path("~").expanduser() / ".config") 

51XDG_CONFIG_HOME = pl.Path(os.environ.get('XDG_CONFIG_HOME', DEFAULT_XDG_CONFIG_HOME)) 

52 

53SBK_APP_DIR_STR = os.getenv('SBK_APP_DIR') 

54SBK_APP_DIR = pl.Path(SBK_APP_DIR_STR) if SBK_APP_DIR_STR else XDG_CONFIG_HOME / "sbk" 

55SYSINFO_CACHE_FNAME = "sys_info_measurements.json" 

56SYSINFO_CACHE_FPATH = SBK_APP_DIR / SYSINFO_CACHE_FNAME 

57 

58 

59def mem_total() -> kdf.MebiBytes: 

60 """Get total memory.""" 

61 

62 # Linux 

63 meminfo_path = pl.Path("/proc/meminfo") 

64 if meminfo_path.exists(): 

65 try: 

66 with meminfo_path.open(mode="rb") as fobj: 

67 data = fobj.read() 

68 for line in data.splitlines(): 

69 key, num, unit = line.decode("ascii").strip().split() 

70 if key == "MemTotal:": 

71 assert unit == "kB" 

72 return int(num) // 1024 

73 except Exception: 

74 logger.error("Error while evaluating system memory", exc_info=True) 

75 

76 return FALLBACK_MEM_TOTAL_MB 

77 

78 

79class Measurement(typ.NamedTuple): 

80 

81 p: kdf.NumThreads 

82 m: kdf.MebiBytes 

83 t: kdf.Iterations 

84 

85 duration: Seconds 

86 

87 

88def _measure(kdf_params: kdf.KDFParams) -> Measurement: 

89 tzero = time.time() 

90 kdf.digest(b"saltsaltsaltsaltbrainkey", kdf_params, hash_len=16) 

91 duration = round(time.time() - tzero, 5) 

92 

93 logger.debug(f"kdf parameter calibration {kdf_params} -> {round(duration * 1000)}ms") 

94 

95 p, m, t = kdf_params 

96 return Measurement(p=p, m=m, t=t, duration=duration) 

97 

98 

99class SystemInfo(typ.NamedTuple): 

100 

101 num_cores: int 

102 total_mb : kdf.MebiBytes 

103 initial_p: kdf.NumThreads 

104 initial_m: kdf.MebiBytes 

105 

106 

107_SYS_INFO: typ.Optional[SystemInfo] = None 

108 

109 

110def dump_sys_info(sys_info: SystemInfo) -> None: 

111 global _SYS_INFO 

112 _SYS_INFO = sys_info 

113 

114 cache_path = SYSINFO_CACHE_FPATH 

115 try: 

116 cache_path.parent.mkdir(exist_ok=True, parents=True) 

117 except Exception as ex: 

118 logger.warning(f"Unable to create cache dir {cache_path.parent}: {ex}") 

119 return 

120 

121 sys_info_data = { 

122 'num_cores': sys_info.num_cores, 

123 'total_mb' : sys_info.total_mb, 

124 'initial_p': sys_info.initial_p, 

125 'initial_m': sys_info.initial_m, 

126 } 

127 

128 try: 

129 with cache_path.open(mode="w", encoding="utf-8") as fobj: 

130 json.dump(sys_info_data, fobj, indent=4) 

131 except Exception as ex: 

132 logger.warning(f"Error writing cache file {cache_path}: {ex}") 

133 return 

134 

135 

136def _load_cached_sys_info() -> SystemInfo: 

137 cache_path = SYSINFO_CACHE_FPATH 

138 try: 

139 with cache_path.open(mode="rb") as fobj: 

140 sys_info_data = json.load(fobj) 

141 nfo = SystemInfo(**sys_info_data) 

142 except Exception as ex: 

143 logger.warning(f"Error reading cache file {cache_path}: {ex}") 

144 nfo = init_sys_info() 

145 

146 return nfo 

147 

148 

149def load_sys_info(use_cache: bool = True) -> SystemInfo: 

150 global _SYS_INFO 

151 if _SYS_INFO: 

152 return _SYS_INFO 

153 

154 if use_cache and SYSINFO_CACHE_FPATH.exists(): 

155 nfo = _load_cached_sys_info() 

156 else: 

157 nfo = init_sys_info() 

158 

159 _SYS_INFO = nfo 

160 return nfo 

161 

162 

163def num_cores() -> int: 

164 if hasattr(os, 'sched_getaffinity'): 

165 # pylint: disable=no-member ; macos doesn't have this 

166 return len(os.sched_getaffinity(0)) 

167 else: 

168 return os.cpu_count() or 1 

169 

170 

171def init_sys_info() -> SystemInfo: 

172 import argon2 

173 

174 total_mb = mem_total() 

175 

176 initial_p = int(num_cores() * DEFAULT_KDF_THREADS_RATIO) 

177 initial_m = int(total_mb * DEFAULT_KDF_MEM_RATIO) // initial_p 

178 

179 while True: 

180 try: 

181 kdf_params = kdf.init_kdf_params(p=initial_p, m=initial_m, t=1) 

182 initial_p = kdf_params.p 

183 initial_m = kdf_params.m 

184 logger.debug(f"testing initial_p={initial_p}, initial_m={initial_m}") 

185 _measure(kdf_params) 

186 logger.debug(f"using initial_p={initial_p}, initial_m={initial_m}") 

187 break # success 

188 except argon2.exceptions.HashingError as err: 

189 if "Memory allocation error" not in str(err): 

190 raise 

191 initial_m = (2 * initial_m) // 3 

192 

193 nfo = SystemInfo(num_cores(), total_mb, initial_p, initial_m) 

194 dump_sys_info(nfo) 

195 return nfo 

196 

197 

198# NOTE (mb 2021-06-06): 

199# SBK tries to be as non-region specific as possible. 

200# no en_US, en_GB, en_AU etc.etc. just en 

201# 

202# I'm also not sure we'll ever support non-phonetic systems, 

203# especiall for the wordlists. 

204 

205# initially 

206SUPPORTED_LANGUAGES = {'en', 'de'} 

207 

208# next (PR welcome) 

209# SUPPORTED_LANGUAGES |= {'es', 'pt', 'ru', 'fr', de', 'it', 'tr'} 

210 

211# eventually/maybe (non-phonetic systems may be a design issue for wordlists) 

212# SUPPORTED_LANGUAGES |= {'ar', 'ko', 'cn', 'jp'} 

213 

214LAYOUT_TO_LANG = {'us': 'en', 'de': 'de'} 

215 

216LangCode = str 

217 

218 

219def detect_lang() -> LangCode: 

220 try: 

221 output_data = sp.check_output("localectl") 

222 output_text = output_data.decode("utf-8") 

223 

224 # We only parse the first portion on purpose. 

225 lang_match = re.search(r"LANG=([a-z]+)", output_text) 

226 if lang_match is None: 

227 lang = "default" 

228 else: 

229 lang = lang_match.group(1) 

230 

231 if lang != 'default' and lang in SUPPORTED_LANGUAGES: 

232 return lang 

233 

234 keyboard_match = re.search(r"X11 Layout: ([a-z]+)", output_text) 

235 if keyboard_match is None: 

236 layout = "default" 

237 else: 

238 layout = keyboard_match.group(1) 

239 

240 layout_lang = LAYOUT_TO_LANG.get(layout, layout) 

241 

242 if layout_lang != 'default' and layout_lang in SUPPORTED_LANGUAGES: 

243 return layout_lang 

244 

245 except Exception: 

246 pass 

247 

248 return "en" 

249 

250 

251if __name__ == '__main__': 

252 print("lang:", detect_lang())