Hello! 欢迎来到盒子萌!

Python的exec():在安全环境下执行动态生成的代码


avatar
bgspider 2024-06-03 65

已经部署在线上的测试接口
https://test.python886.com/

欢迎大家测试,如果有问题,请及时联系我。
具体写法,还有一些exec其他语法见https://juejin.cn/post/7167978515529203719
懒得写具体实例
接口完整代码实例,需要的库有RestrictedPython +flask实现
相关功能有
1、动态执行代码,自定义运行执行的库,并返回duiy值
2、检测危险代码
3、检测代码运行时长,超过指定时间结束运行

import RestrictedPython, ast
import json
from flask import Flask, request, jsonify
import platform

restricted_globals = {
    '__builtins__': RestrictedPython.safe_builtins,
    'print': print,  # 允许使用print函数
}
executing_context = RestrictedPython.Guards.safe_globals.copy()
executing_context.update(restricted_globals)

def check_input(code):
    malicious_patterns = ['os', 'sys', 'import', 'eval', 'exec', 'while']
    for pattern in malicious_patterns:
        if pattern in code:
            return '恶意代码检测'
    try:
        ast.parse(code)
    except Exception as e:
        return '语法错误,请检查语法'
    return code

API_HOST = '0.0.0.0'
API_PORT = 18000
app = Flask(__name__)
import multiprocessing

def iteritems(d, **kw):
    return iter(d.items(**kw))

class TimeoutException(Exception):
    pass

def wrapper(q, func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
        q.put(result)
    except Exception as e:
        q.put(e)

def run_with_timeout(func, args=(), kwargs={}, timeout=10):
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=wrapper, args=(q, func) + args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        p.terminate()
        p.join()
        return "运行时间太久停止运行"
    else:
        result = q.get()
        print(result, '1111')
        if isinstance(result, Exception):
            raise result
        return result

def long_running_function(restricted_code):
    _a = {}
    exec(restricted_code, executing_context, _a)
    return _a

@app.route('/run_code', methods=["POST", "OPTIONS"])
def run_code():
    try:
        code = request.form.get('code')
        restricted_code = check_input(code)
        if '恶意代码检测' in restricted_code or '语法错误' in restricted_code:
            return jsonify({'result': restricted_code})
        else:
            _a = {}
            try:
                _a = run_with_timeout(long_running_function, args=(restricted_code,), timeout=1)
            except Exception as e:
                return jsonify({'result': str(e)})
            if _a:
                return jsonify({'result': json.dumps(_a, ensure_ascii=False)})
            else:
                return jsonify({'result': '没有返回值'})
    except TimeoutException as e:
        return jsonify({'result': '运行时间太久,结束运行'})

@app.route('/', methods=["GET"])
def run_task():
    print(request.headers)
    print(request.cookies)
    html = '''
    <!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>Python代码检测</title>
  <script>
    function runPythonCode() {
      const codeInput = document.getElementById('codeInput');
      fetch('/run_code', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/x-www-form-urlencoded'
        },
        body: `code=${encodeURIComponent(codeInput.value)}`
      })
      .then(response => response.json())
      .then(data => {
        document.getElementById('result').innerText = data.result;
      })
      .catch(error => console.error('Error:', error));
    }
  </script>
</head>
<body>
  <h2>在下面输入python代码</h2>
  <textarea id="codeInput" rows="10" cols="50" placeholder="Write your Python code here..."></textarea>
  <button onclick="runPythonCode()">Run Code</button>
  <pre id="result"></pre>
</body>
</html>
'''
    return html

if __name__ == '__main__':
    if platform.system() == "Windows":
        app.run(host=API_HOST, port=API_PORT)
    else:
        import gunicorn.app.base

        class StandaloneApplication(gunicorn.app.base.BaseApplication):
            def __init__(self, app, options=None):
                self.options = options or {}
                self.application = app
                super(StandaloneApplication, self).__init__()

            def load_config(self):
                _config = dict([(key, value) for key, value in iteritems(self.options)
                                if key in self.cfg.settings and value is not None])
                for key, value in iteritems(_config):
                    self.cfg.set(key.lower(), value)

            def load(self):
                return self.application

        _options = {
            'bind': '%s:%s' % (API_HOST, API_PORT),
            'workers': 4,
            'accesslog': '-',  # log to stdout
            'access_log_format': '%(h)s %(l)s %(t)s "%(r)s" %(s)s "%(a)s"'
        }
        StandaloneApplication(app, _options).run()