Python
<pre><code class="language-python">import requests,time,random,hashlib,threading
center_id = 1111; #开发者id
api_password = &#039;123123&#039;; #api密码
software = &#039;test&#039;; #软件名称
card = &#039;testcard&#039;; #卡密
local_storage = {}
def get_net_timestamp(platform=&#039;taobao&#039;):
if platform == &#039;taobao&#039;:
try:
res = requests.get(&#039;https://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp&#039;, timeout=5)
obj = res.json()
except Exception as e:
obj = None
if not obj or &#039;data&#039; not in obj:
print(&#039;获取淘宝时间戳失败,开始获取拼多多时间戳&#039;)
return get_net_timestamp(&#039;pdd&#039;)
else:
print(&#039;获取淘宝时间戳成功&#039;)
return int(str(obj[&#039;data&#039;][&#039;t&#039;])[:10])
elif platform == &#039;pdd&#039;:
try:
res = requests.get(&#039;https://api.pinduoduo.com/api/server/_stm&#039;, timeout=5)
obj = res.json()
except Exception as e:
obj = None
if not obj or &#039;server_time&#039; not in obj:
print(&#039;获取拼多多时间戳失败,开始获取世界时间戳&#039;)
return get_net_timestamp(&#039;world&#039;)
else:
print(&#039;获取拼多多时间戳成功&#039;)
return int(str(obj[&#039;server_time&#039;])[:10])
elif platform == &#039;world&#039;:
try:
res = requests.get(&#039;https://worldtimeapi.org/api/timezone/Asia/Shanghai&#039;, timeout=5)
obj = res.json()
except Exception as e:
obj = None
if not obj or &#039;unixtime&#039; not in obj:
print(&#039;获取世界时间戳失败,开始获取本地时间戳&#039;)
return get_net_timestamp(&#039;local&#039;)
else:
print(&#039;获取世界时间戳成功&#039;)
return int(obj[&#039;unixtime&#039;])
else:
print(&#039;获取本地时间戳&#039;)
return int(time.time())
def hex_md5(s):
return hashlib.md5(s.encode(&#039;utf-8&#039;)).hexdigest()
def ql_request(api, param):
api_list = [
&#039;https://napi.2cccc.cc/&#039;,
&#039;http://api2.2cccc.cc/&#039;,
&#039;http://api3.2cccc.cc/&#039;
]
connect_server_times = 0
server_return_json = &#039;&#039;
# 添加全局参数
param[&#039;center_id&#039;] = center_id
param[&#039;software&#039;] = software
while True:
if connect_server_times &gt; 0:
time.sleep(6)
if connect_server_times &gt; 20:
print(&#039;连接服务器失败,请检查网络或联系管理员&#039;)
return [False, &#039;连接服务器失败,请检查网络或联系管理员&#039;]
api_complete_path = random.choice(api_list) + api
connect_server_times += 1
print(f&#039;尝试第【{connect_server_times}】次连接服务器&#039;)
# 获取时间戳
timestamp = get_net_timestamp(&#039;taobao&#039;)
param[&#039;timestamp&#039;] = timestamp
param[&#039;sign&#039;] = hex_md5(api_password + str(timestamp))
try:
response = requests.post(api_complete_path, data=param, timeout=10)
server_return_json = response.text
except Exception as e:
server_return_json = &#039;&#039;
if server_return_json[2:6] == &quot;code&quot;:
break
try:
server_return_data = response.json()
except Exception:
return [False, &#039;返回数据格式错误&#039;]
if server_return_data.get(&#039;code&#039;) == 0:
return [False, server_return_data.get(&#039;msg&#039;, &#039;未知错误&#039;)]
# 签名校验
server_sign = server_return_data.get(&#039;sign&#039;, &#039;&#039;)
server_timestamp = server_return_data.get(&#039;timestamp&#039;, 0)
valid_sign = hex_md5(str(server_timestamp) + api_password)
time_diff = abs(int(timestamp) - int(server_timestamp))
if server_sign != valid_sign or time_diff &gt; 600:
return [False, &#039;算法验证错误,请联系管理员&#039;]
return [True, server_return_data.get(&#039;data&#039;)]
def card_ping():
while True:
ping_result = ql_request(&#039;apiv3/card_ping&#039;, {
&#039;card&#039;: card,
&#039;needle&#039;: local_storage.get(&#039;card_login_needle&#039;, &#039;&#039;)
})
if ping_result[0]:
print(&#039;最新的卡密到期时间:&#039;, ping_result[1][&#039;endtime&#039;])
print(&#039;最新的卡密剩余时间:&#039;, ping_result[1][&#039;less_time&#039;])
heartbeat_sec = int(ping_result[1].get(&#039;heartbeat_second&#039;, 60))
time.sleep(heartbeat_sec)
else:
print(&#039;心跳失败,失败原因是:&#039;, ping_result[1])
break # 心跳失败后退出循环,线程自然结束
def card_login():
global card
result = ql_request(&#039;apiv3/card_login&#039;, {&#039;card&#039;: card})
if result[0]:
print(&#039;卡密到期时间:&#039;, result[1][&#039;endtime&#039;])
print(&#039;卡密剩余时间:&#039;, result[1][&#039;less_time&#039;])
print(&#039;卡密类型为:&#039;, result[1][&#039;type&#039;])
# 存储 needle 并启动心跳线程
local_storage[&#039;card_login_needle&#039;] = result[1][&#039;needle&#039;]
threading.Thread(target=card_ping, daemon=True).start()
else:
print(&#039;卡密登录失败,失败原因是:&#039;, result[1])
# 执行登录
card_login()
while(True):
time.sleep(1);
print(&#039;主线程&#039;)</code></pre>