Skip to content

Commit dcaa4cb

Browse files
[search online] 新增联网搜索插件代码以及移动 code 节点代码 (#474)
* [search online] 新增联网搜索插件代码以及移动 code 节点代码 * [search online] 联网搜索插件内置 * [search online] 修改检视意见
1 parent 83d4c2c commit dcaa4cb

File tree

15 files changed

+925
-0
lines changed

15 files changed

+925
-0
lines changed

app-builder/plugins/fit_py_code_node_tools/conf/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
user:
2+
function:
3+
entrypoint: 'main'
4+
5+
code:
6+
import:
7+
whitelist:
8+
- 'json'
9+
- 'typing'
10+
- 'pandas'
11+
- 'numpy'
12+
- 're'
13+
- 'requests'
14+
- 'httpx'
15+
- 'datetime'
16+
- 'time'
17+
- 'base64'
18+
- 'hashlib'
19+
blacklist:
20+
- 'os'
21+
- 'sys'
22+
- 'cmd'
23+
- 'subprocess'
24+
- 'multiprocessing'
25+
- 'timeit'
26+
- 'platform'
27+
- 'asyncio'
28+
timeout: 10
29+
max_pool: 4
30+
mem_limit: 189792256 # 181*1024*1024
31+
verbose: False
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
category: "system"
2+
level: 4
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
import multiprocessing
7+
import os
8+
from typing import Dict
9+
import threading
10+
11+
from fitframework.api.decorators import fitable, value as FitConfigValue
12+
from fitframework.api.logging import fit_logger
13+
from fitframework.core.exception.fit_exception import FitException, InternalErrorCode
14+
from fitframework.utils.tools import to_list
15+
16+
from .python_repl_impl import execute_node_impl, GLOBAL_CONFIG
17+
18+
19+
@FitConfigValue(key='user.function.entrypoint', default_value='main')
20+
def _read_entrypoint_from_config():
21+
pass
22+
23+
24+
@FitConfigValue(key='code.import.whitelist', default_value=['asyncio', 'json', 'numpy', 'typing'], converter=to_list)
25+
def _read_import_whitelist_from_config():
26+
pass
27+
28+
29+
@FitConfigValue(key='code.import.blacklist',
30+
default_value=['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'],
31+
converter=to_list)
32+
def _read_import_blacklist_from_config():
33+
pass
34+
35+
36+
@FitConfigValue(key='code.timeout', default_value=10, converter=int)
37+
def _timeout():
38+
pass
39+
40+
41+
@FitConfigValue(key='code.max_pool', default_value=4, converter=int)
42+
def _max_pool():
43+
pass
44+
45+
46+
@FitConfigValue(key='code.mem_limit', default_value=181*1024*1024, converter=int)
47+
def _mem_limit():
48+
pass
49+
50+
51+
@FitConfigValue(key='code.verbose', default_value=False, converter=bool)
52+
def _verbose():
53+
pass
54+
55+
56+
def _init_config():
57+
GLOBAL_CONFIG["entrypoint"] = _read_entrypoint_from_config()
58+
GLOBAL_CONFIG["whitelist"] = _read_import_whitelist_from_config()
59+
GLOBAL_CONFIG["blacklist"] = _read_import_blacklist_from_config()
60+
GLOBAL_CONFIG["timeout"] = _timeout()
61+
GLOBAL_CONFIG["max_pool"] = _max_pool()
62+
GLOBAL_CONFIG["mem_limit"] = _mem_limit()
63+
GLOBAL_CONFIG["verbose"] = _verbose()
64+
65+
66+
class Singleton(type):
67+
_lock = threading.Lock()
68+
69+
def __init__(cls, *args, **kwargs):
70+
cls._instance = None
71+
super().__init__(*args, **kwargs)
72+
73+
def __call__(cls, *args, **kwargs):
74+
if cls._instance:
75+
return cls._instance
76+
77+
with cls._lock:
78+
if not cls._instance:
79+
cls._instance = super().__call__(*args, **kwargs)
80+
81+
return cls._instance
82+
83+
84+
class CodeExecutor(metaclass=Singleton):
85+
def __init__(self):
86+
_init_config()
87+
self.pools = []
88+
for _ in range(GLOBAL_CONFIG["max_pool"]):
89+
lock = threading.Lock()
90+
pool = multiprocessing.Pool(processes=1)
91+
self.pools.append((lock, pool))
92+
self.index = 0
93+
self.index_lock = threading.Lock()
94+
self.config = GLOBAL_CONFIG
95+
96+
def get_and_increment(self) -> int:
97+
with self.index_lock:
98+
i = self.index
99+
self.index = i + 1 if i < self.config["max_pool"] - 1 else 0
100+
return i
101+
102+
103+
def _print_process_usage():
104+
import psutil
105+
# Get the current process ID
106+
pid = os.getpid()
107+
108+
# Create a Process object for the current process
109+
process = psutil.Process(pid)
110+
111+
# Get the CPU and memory usage of the current process
112+
cpu_usage = process.cpu_percent(interval=1.0) # This returns the CPU usage as a percentage
113+
memory_info = process.memory_info() # Returns memory usage as a named tuple (rss, vms)
114+
115+
# rss (Resident Set Size) - the non-swapped physical memory the process has used
116+
# vms (Virtual Memory Size) - the total memory the process can access
117+
memory_usage = memory_info.rss / (1024 * 1024) # Convert to MB
118+
virtual_memory = memory_info.vms / (1024 * 1024) # Convert to MB
119+
120+
# Print CPU and memory usage
121+
fit_logger.info(f"CPU Usage: {cpu_usage}%, Memory Usage (RSS): {memory_usage:.2f} MB, "
122+
f"Virtual Memory Usage (VMS): {virtual_memory:.2f} MB")
123+
124+
current_process = psutil.Process()
125+
children = current_process.children(recursive=True)
126+
for child in children:
127+
fit_logger.info('Child pid is {}'.format(child.pid))
128+
129+
130+
@fitable("CodeNode.tool", "Python_REPL")
131+
def execute_code(args: Dict[str, object], code: str) -> object:
132+
# 由于插件初始化时使用守护进程,无法拉起进程池中的进程,选择在初次调用时初始化进程池
133+
executor = CodeExecutor()
134+
if GLOBAL_CONFIG["verbose"]:
135+
_print_process_usage()
136+
res = execute_node_impl(executor.pools, executor.get_and_increment(), args, code, GLOBAL_CONFIG)
137+
if res.isOk:
138+
return res.value
139+
raise FitException(res.error_code, res.msg)
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# -- encoding: utf-8 --
2+
# Copyright (c) 2024 Huawei Technologies Co., Ltd. All Rights Reserved.
3+
# This file is a part of the ModelEngine Project.
4+
# Licensed under the MIT License. See License.txt in the project root for license information.
5+
# ======================================================================================================================
6+
import asyncio
7+
import importlib
8+
import inspect
9+
import json
10+
import multiprocessing
11+
import platform
12+
import re
13+
from typing import Any, Dict, List, Tuple
14+
from pydantic import BaseModel
15+
16+
if platform.system() == 'Windows':
17+
from enum import IntEnum
18+
19+
class InternalErrorCode(IntEnum):
20+
EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000105
21+
TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED = 0x7F000106 # java 不存在
22+
else:
23+
from fitframework.core.exception.fit_exception import InternalErrorCode
24+
try:
25+
import resource
26+
except ImportError:
27+
resource = None
28+
try:
29+
from .safe_global import safe_builtins
30+
except ImportError as e:
31+
from safe_global import safe_builtins
32+
33+
_PYTHON_REPL_HEADER = '''
34+
import json
35+
from typing import Any
36+
37+
Output = Any
38+
39+
40+
'''
41+
42+
GLOBAL_CONFIG = \
43+
{
44+
"header": _PYTHON_REPL_HEADER,
45+
"header_len": len(_PYTHON_REPL_HEADER.split('\n')),
46+
"entrypoint": 'main',
47+
"whitelist": ['json', 'typing'],
48+
"blacklist": ['os', 'sys', 'cmd', 'subprocess', 'multiprocessing', 'timeit', 'platform'],
49+
"timeout": 10,
50+
"max_pool": 4,
51+
"mem_limit": 181 * 1024 * 1024,
52+
"verbose": False
53+
}
54+
55+
56+
class Result(BaseModel):
57+
isOk: bool
58+
value: Any = None
59+
error_code: int
60+
msg: str = None
61+
62+
@staticmethod
63+
def ok(data: Any) -> 'Result':
64+
return Result(isOk=True, value=data, error_code=0)
65+
66+
@staticmethod
67+
def err(err_code: int, err_msg: str) -> 'Result':
68+
return Result(isOk=False, error_code=err_code, msg=err_msg)
69+
70+
71+
# 创建一个安全的执行环境
72+
def _create_restricted_exec_env(config: Dict[str, object]):
73+
def safer_import(name, my_globals=None, my_locals=None, fromlist=(), level=0):
74+
if name not in config['whitelist'] or name in config['blacklist']:
75+
raise NameError(f'model {name} is not valid')
76+
return importlib.import_module(name)
77+
78+
safe_globals = {
79+
'__builtins__': {
80+
**safe_builtins,
81+
'__import__': safer_import,
82+
'Args': Dict
83+
}
84+
}
85+
return safe_globals
86+
87+
88+
# 获取内存使用(单位:kB)
89+
def _get_current_memory_usage():
90+
with open('/proc/self/status') as f:
91+
mem_usage = f.read().split('VmPeak:')[1].split('\n')[0].strip()
92+
return int(mem_usage.split()[0].strip())
93+
94+
95+
# 执行受限代码
96+
def _execute_code_with_restricted_python(args: Dict[str, object], code: str, config: Dict[str, object]):
97+
if resource:
98+
resource.setrlimit(resource.RLIMIT_AS, (GLOBAL_CONFIG["mem_limit"], GLOBAL_CONFIG["mem_limit"]))
99+
loop = asyncio.new_event_loop()
100+
asyncio.set_event_loop(loop)
101+
try:
102+
full_python_code = (f"{config['header']}"
103+
f'{code}\n\n')
104+
105+
safer_globals = _create_restricted_exec_env(config)
106+
exec(full_python_code, safer_globals)
107+
entrypoint = config['entrypoint']
108+
if (entrypoint not in safer_globals or
109+
not inspect.isfunction(safer_globals.get(entrypoint))):
110+
raise NameError("main function not defined")
111+
entrypoint = safer_globals.get(entrypoint)
112+
if inspect.iscoroutinefunction(entrypoint):
113+
ret = loop.run_until_complete(asyncio.wait_for(entrypoint(args), config['timeout']))
114+
return Result.ok(json.dumps(ret))
115+
else:
116+
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value,
117+
"Unable to execute non-asynchronous function")
118+
except asyncio.TimeoutError:
119+
return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value,
120+
"[TimeoutError] Execution timed out")
121+
except Exception as err:
122+
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value, _get_except_msg(err, config))
123+
finally:
124+
loop.close()
125+
126+
127+
def _get_except_msg(error: Any, config: Dict[str, object]) -> str:
128+
if isinstance(error, SyntaxError):
129+
error_msg = f"{error.msg} at line {error.lineno - config['header_len']}, column {error.offset}: {error.text}"
130+
elif isinstance(error, KeyError):
131+
error_msg = f"key {str(error)} do not exist"
132+
else:
133+
error_msg = str(error)
134+
return f"[{error.__class__.__name__}] {error_msg}"
135+
136+
137+
def _get_free_process_pool(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index):
138+
lock = pools[index][0]
139+
if lock.acquire():
140+
return lock
141+
raise multiprocessing.TimeoutError()
142+
143+
144+
def execute_node_impl(pools: List[Tuple[multiprocessing.Lock, multiprocessing.Pool]], index: int,
145+
args: Dict[str, object], code: str, config: Dict[str, object]):
146+
match = _validate_escape(code)
147+
if match is not None:
148+
return Result.err(InternalErrorCode.EXCEPTION_FROM_USER_CODE_OCCURRED.value,
149+
f'{match.group()} is not allowed in code node')
150+
lock = _get_free_process_pool(pools, index)
151+
pool = pools[index][1]
152+
try:
153+
result = pool.apply_async(_execute_code_with_restricted_python, args=[args, code, config])
154+
return result.get(config['timeout'])
155+
except multiprocessing.TimeoutError:
156+
index = pools.index((lock, pool))
157+
pool.terminate()
158+
pools[index] = (lock, multiprocessing.Pool(processes=1))
159+
return Result.err(InternalErrorCode.TIME_OUT_EXCEPTION_FROM_USER_CODE_OCCURRED.value,
160+
"[TimeoutError] Execution timed out")
161+
finally:
162+
lock.release()
163+
164+
165+
def _validate_escape(code: str) -> bool:
166+
# 校验代码中是否存在获取栈帧的字段,禁用可能用于沙箱逃逸的端
167+
pattern = r'.gi_frame|.tb_frame|__[a-zA-Z]+__'
168+
return re.search(pattern, code)

0 commit comments

Comments
 (0)