Coverage for promplate/prompt/utils.py: 72%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-08 04:34 +0800

1from functools import cache, cached_property, wraps 

2from inspect import currentframe, isclass 

3from pathlib import Path 

4from re import compile 

5from typing import Any, Callable, ParamSpec, TypeVar 

6 

7split_template_tokens = compile( 

8 r"((?:\s{%-|{%).*?(?:%}|-%}\s))|((?:\s{{-|{{)[\s\S]*?(?:}}|-}}\s))|((?:\s{#-|{#)[\s\S]*?(?:#}|-#}\s))" 

9).split 

10 

11 

12var_name_checker = compile(r"[_a-zA-Z]\w*$") 

13 

14is_message_start = compile(r"<\|\s?(user|system|assistant)\s?(\w{1,64})?\s?\|>") 

15 

16 

17def is_not_valid(name: str): 

18 return not var_name_checker.match(name) 

19 

20 

21def ensure_valid(name): 

22 if is_not_valid(name): 

23 raise NameError(name) 

24 

25 

26class AutoNaming: 

27 def __new__(cls, *args, **kwargs): 

28 obj = super().__new__(cls) 

29 obj._bind_frame() 

30 return obj 

31 

32 def _bind_frame(self): 

33 self._frame = currentframe() 

34 

35 @cached_property 

36 def _name(self): 

37 f = self._frame 

38 if f and f.f_back and (frame := f.f_back.f_back): 38 ↛ exitline 38 didn't return from function '_name' because the condition on line 38 was always true

39 for name, var in frame.f_locals.items(): 

40 if var is self: 

41 return name 

42 

43 @property 

44 def class_name(self): 

45 return self.__class__.__name__ 

46 

47 fallback_name = class_name 

48 

49 @property 

50 def name(self): 

51 return self._name or self.fallback_name 

52 

53 @name.setter 

54 def name(self, name): 

55 self._name = name 

56 self._frame = None 

57 

58 @name.deleter 

59 def name(self): 

60 del self._name 

61 

62 def __repr__(self): 

63 if self._name: 

64 return f"<{self.class_name} {self.name}>" 

65 else: 

66 return f"<{self.class_name}>" 

67 

68 def __str__(self): 

69 return f"<{self.name}>" 

70 

71 

72P = ParamSpec("P") 

73T = TypeVar("T") 

74 

75 

76def cache_once(func: Callable[P, T]) -> Callable[P, T]: 

77 result = None 

78 

79 @wraps(func) 

80 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

81 nonlocal result 

82 if result is None: 

83 result = func(*args, **kwargs) 

84 return result 

85 

86 return wrapper 

87 

88 

89@cache_once 

90def get_builtins() -> dict[str, Any]: 

91 return __builtins__ if isinstance(__builtins__, dict) else __builtins__.__dict__ 

92 

93 

94@cache 

95def version(package: str): 

96 from importlib.metadata import PackageNotFoundError, version 

97 

98 try: 

99 return version(package) 

100 except PackageNotFoundError: 

101 return None 

102 

103 

104@cache_once 

105def get_user_agent(self, *additional_packages: tuple[str, str]): 

106 from sys import version as py_version 

107 

108 return " ".join( 

109 ( 

110 f"Promplate/{version('promplate')} ({self.__name__ if isclass(self) else self.__class__.__name__})", 

111 *(f"{name}/{v}" for name, v in additional_packages), 

112 f"HTTPX/{version('httpx') or '-'}", 

113 f"Python/{py_version.split()[0]}", 

114 ) 

115 ) 

116 

117 

118@cache_once 

119def _is_http2_available(): 

120 try: 

121 import h2 # type: ignore 

122 

123 return True 

124 except ImportError: 

125 return False 

126 

127 

128@cache_once 

129def _get_client(): 

130 from httpx import Client 

131 

132 return Client(follow_redirects=True, http2=_is_http2_available()) 

133 

134 

135@cache_once 

136def _get_aclient(): 

137 from httpx import AsyncClient 

138 

139 return AsyncClient(follow_redirects=True, http2=_is_http2_available()) 

140 

141 

142def add_linecache(filename: str, source_getter: Callable[[], str]): 

143 import linecache 

144 

145 linecache.cache[filename] = (source_getter,) 

146 

147 

148def save_tempfile(filename: str, source: str, auto_deletion: bool): 

149 from tempfile import gettempdir, mkdtemp 

150 

151 for i in Path(gettempdir()).glob(f"promplate-*/{filename}"): 

152 i.unlink() 

153 

154 file = Path(mkdtemp(prefix="promplate-")) / filename 

155 file.write_text(source) 

156 

157 if auto_deletion: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 import atexit 

159 

160 @atexit.register 

161 def _(): 

162 file.unlink(missing_ok=True) 

163 file.parent.rmdir() 

164 

165 return file