# -*- coding: utf-8 -*- from .__load__ import * class Auth(object): def init(self, param=param, request=False): if 'appid' not in param: return '参数错误:appid' if 'timestamp' not in param: return '参数错误:timestamp' if 'nonce' not in param: return '参数错误:nonce' if 'signature' not in param: return '参数错误:signature' model = Demeter.model('site') model.appid = param['appid'] site = model.select(type='fetchone') if not site: return '站点信息不存在' time = Demeter.time() if time < site['sdate'] or time > site['edate']: return '授权已失效' if time - int(param['timestamp']) > 600: return '签名已过期' if request: # 针对域名做白名单 referer = request.headers.get("Referer") if not referer: return '验证失败:来源错误' host = Demeter.host(site['link']) if host != Demeter.host(referer): return '验证失败:来源错误' uri = self.getHost(request) + request.uri if referer == uri: return '验证失败:来源错误' param['appsecret'] = site['appsecret'] if self.signature(param) != signature: return '验签失败' return site def getHost(self, request): host = request.host.replace(':8088', '') host = request.protocol + "://" + host return host def signature(self, param): for k, v in param.items(): all_params[k] = str(v) sorted_items = sorted(all_params.items(), key=lambda x: x[0]) param_str = "&".join(f"{k}={v}" for k, v in sorted_items) return Demeter.md5(param_str) # sign 只能使用一次 以后再说吧 def check(self, param): model = Demeter.model('signature') model.appid = param['appid'] model.signature = param['signature'] info = model.select(type='fetchone') if info: return False model.site_id = param['appid'] model.signature = param['signature'] model.insert() def clear(self): num = Demeter.time() - 3600*24 model = Demeter.model('signature') model.cdate.assign(num, exp='<=') model.delete()