12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- class Auth(object):
- def init(self, param={}, request=False):
- if 'appid' not in param:
- return '参数错误:appid'
- if 'timestamp' not in param:
- return '参数错误:timestamp'
- if 'nonce' not in param:
- return '参数错误:nonce'
- if 'signature' not in param:
- return '参数错误:signature'
- model = Demeter.model('site')
- model.appid = param['appid']
- site = model.select(type='fetchone')
- return site
- if not site:
- return '站点信息不存在'
- time = Demeter.time()
- if time < site['sdate'] or time > site['edate']:
- return '授权已失效'
- if time - int(param['timestamp']) > 600:
- return '签名已过期'
- if request:
- # 针对域名做白名单
- referer = request.headers.get("Referer")
- if not referer:
- return '验证失败:来源错误'
- host = Demeter.host(site['link'])
- if host != Demeter.host(referer):
- return '验证失败:来源错误'
- uri = self.getHost(request) + request.uri
- if referer == uri:
- return '验证失败:来源错误'
- param['appsecret'] = site['appsecret']
- if self.signature(param) != signature:
- return '验签失败'
- return site
- def getHost(self, request):
- host = request.host.replace(':8088', '')
- host = request.protocol + "://" + host
- return host
- def signature(self, param):
- all_params = {}
- for k, v in param.items():
- all_params[k] = str(v)
- sorted_items = sorted(all_params.items(), key=lambda x: x[0])
- param_str = "&".join(f"{k}={v}" for k, v in sorted_items)
- return Demeter.md5(param_str)
- # sign 只能使用一次 以后再说吧
- def check(self, param):
- model = Demeter.model('signature')
- model.appid = param['appid']
- model.signature = param['signature']
- info = model.select(type='fetchone')
- if info:
- return False
- model.site_id = param['appid']
- model.signature = param['signature']
- model.insert()
- def clear(self):
- num = Demeter.time() - 3600*24
- model = Demeter.model('signature')
- model.cdate.assign(num, exp='<=')
- model.delete()
|