-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbilibili.py
410 lines (378 loc) · 16.4 KB
/
bilibili.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
import json, os, random,requests,rsa, sys, time
SEND_KEY = os.environ['SEND_KEY']
BILI_COOKIE = os.environ['BILI_COOKIE'].replace(" ", "")
class BiliBiliCheckIn(object):
# 待测试,需要大会员账号测试领取福利
def __init__(self, bilibili_cookie_list):
self.bilibili_cookie_list = bilibili_cookie_list
@staticmethod
def get_nav(session):
url = "https://api.bilibili.com/x/web-interface/nav"
ret = session.get(url=url).json()
# print(ret) #取消本段输出
uname = ret.get("data", {}).get("uname")
uid = ret.get("data", {}).get("mid")
is_login = ret.get("data", {}).get("isLogin")
coin = ret.get("data", {}).get("money")
vip_type = ret.get("data", {}).get("vipType")
current_exp = ret.get("data", {}).get("level_info", {}).get("current_exp")
return uname, uid, is_login, coin, vip_type, current_exp
@staticmethod
def reward(session) -> dict:
"""取B站经验信息"""
url = "https://account.bilibili.com/home/reward"
ret = session.get(url=url).json()
return ret
@staticmethod
def live_sign(session) -> dict:
"""B站直播签到"""
try:
url = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
ret = session.get(url=url).json()
if ret["code"] == 0:
msg = f'签到成功,{ret["data"]["text"]},特别信息:{ret["data"]["specialText"]},本月已签到{ret["data"]["hadSignDays"]}天'
elif ret["code"] == 1011040:
msg = "今日已签到过,无法重复签到"
else:
msg = f'签到失败,信息为: {ret["message"]}'
except Exception as e:
msg = f"签到异常,原因为{str(e)}"
return msg
@staticmethod
def manga_sign(session, platform="android") -> dict:
"""
模拟B站漫画客户端签到
"""
try:
url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
post_data = {"platform": platform}
ret = session.post(url=url, data=post_data).json()
if ret["code"] == 0:
msg = "签到成功"
elif ret["msg"] == "clockin clockin is duplicate":
msg = "今天已经签到过了"
else:
msg = f'签到失败,信息为({ret["msg"]})'
except Exception as e:
msg = f"签到异常,原因为: {str(e)}"
return msg
@staticmethod
def manga_book(session) -> dict:
"""
模拟B站漫画看书
"""
try:
url = "https://manga.bilibili.com/twirp/bookshelf.v1.Bookshelf/AddHistory"
post_data = {
"device": "pc",
"platform": "web",
"comic_id": "27355",
"ep_id": "381662"
}
ret = session.post(url=url, data=post_data).json()
if ret["code"] == 0:
msg = "本日漫画自动阅读1章节成功!,阅读漫画为:堀与宫村"
else:
msg = f'出错,信息为({ret["msg"]})'
except Exception as e:
msg = f"看书异常,原因为: {str(e)}"
return msg
@staticmethod
def vip_privilege_receive(session, bili_jct, receive_type: int = 1) -> dict:
"""
领取B站大会员权益
receive_type int 权益类型,1为B币劵,2为优惠券
"""
url = "https://api.bilibili.com/x/vip/privilege/receive"
post_data = {"type": receive_type, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def vip_manga_reward(session) -> dict:
"""获取漫画大会员福利"""
url = "https://manga.bilibili.com/twirp/user.v1.User/GetVipReward"
ret = session.post(url=url, json={"reason_id": 1}).json()
return ret
@staticmethod
def report_task(session, bili_jct, aid: int, cid: int, progres: int = 300) -> dict:
"""
B站上报视频观看进度
aid int 视频av号
cid int 视频cid号
progres int 观看秒数
"""
url = "http://api.bilibili.com/x/v2/history/report"
progres2 = progres + random.randint(0,59)
post_data = {"aid": aid, "cid": cid, "progres": progres2, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def share_task(session, bili_jct, aid) -> dict:
"""
分享指定av号视频
aid int 视频av号
"""
url = "https://api.bilibili.com/x/web-interface/share/add"
post_data = {"aid": aid, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def get_followings(
session, uid: int, pn: int = 1, ps: int = 50, order: str = "desc", order_type: str = "attention"
) -> dict:
"""
获取指定用户关注的up主
uid int 账户uid,默认为本账户,非登录账户只能获取20个*5页
pn int 页码,默认第一页
ps int 每页数量,默认50
order str 排序方式,默认desc
order_type 排序类型,默认attention
"""
params = {
"vmid": uid,
"pn": pn,
"ps": ps,
"order": order,
"order_type": order_type,
}
url = f"https://api.bilibili.com/x/relation/followings"
ret = session.get(url=url, params=params).json()
return ret
@staticmethod
def space_arc_search(
session, uid: int, pn: int = 1, ps: int = 100, tid: int = 0, order: str = "pubdate", keyword: str = ""
) -> dict:
"""
获取指定up主空间视频投稿信息
uid int 账户uid,默认为本账户
pn int 页码,默认第一页
ps int 每页数量,默认50
tid int 分区 默认为0(所有分区)
order str 排序方式,默认pubdate
keyword str 关键字,默认为空
"""
params = {
"mid": uid,
"pn": pn,
"ps": ps,
"tid": tid,
"order": order,
"keyword": keyword,
}
url = f"https://api.bilibili.com/x/space/arc/search"
ret = session.get(url=url, params=params).json()
data_list = [
{"aid": one.get("aid"), "cid": 0, "title": one.get("title"), "owner": one.get("author")}
for one in ret.get("data", {}).get("list", {}).get("vlist", [])
]
return data_list
@staticmethod
def elec_pay(session, bili_jct, uid: int, num: int = 50) -> dict:
"""
用B币给up主充电
uid int up主uid
num int 充电电池数量
"""
url = "https://api.bilibili.com/x/ugcpay/trade/elec/pay/quick"
post_data = {"elec_num": num, "up_mid": uid, "otype": "up", "oid": uid, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def coin_add(session, bili_jct, aid: int, num: int = 1, select_like: int = 1) -> dict:
"""
给指定 av 号视频投币
aid int 视频av号
num int 投币数量
select_like int 是否点赞
"""
url = "https://api.bilibili.com/x/web-interface/coin/add"
post_data = {
"aid": aid,
"multiply": num,
"select_like": select_like,
"cross_domain": "true",
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def live_status(session) -> dict:
"""B站直播获取金银瓜子状态"""
url = "https://api.live.bilibili.com/pay/v1/Exchange/getStatus"
ret = session.get(url=url).json()
data = ret.get("data")
silver = data.get("silver", 0)
gold = data.get("gold", 0)
coin = data.get("coin", 0)
msg = f"银瓜子数量: {silver}\n金瓜子数量: {gold}\n硬币数量: {coin}"
return msg
@staticmethod
def silver2coin(session, bili_jct) -> dict:
"""银瓜子兑换硬币"""
url = "https://api.live.bilibili.com/pay/v1/Exchange/silver2coin"
post_data = {
"csrf_token": bili_jct,
"csrf": bili_jct
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def get_region(session, rid=1, num=6) -> dict:
"""
获取 B站分区视频信息
rid int 分区号
num int 获取视频数量
"""
url = "https://api.bilibili.com/x/web-interface/dynamic/region?ps=" + str(num) + "&rid=" + str(rid)
ret = session.get(url=url).json()
data_list = [
{
"aid": one.get("aid"),
"cid": one.get("cid"),
"title": one.get("title"),
"owner": one.get("owner", {}).get("name"),
}
for one in ret.get("data", {}).get("archives", [])
]
return data_list
def main(self):
msg_all = ""
bilibili_cookie = self.bilibili_cookie_list
bili_jct = bilibili_cookie.get("bili_jct")
if os.environ['BILI_NUM'] == "":
coin_num = 0 # 投币数量
else:
coin_num = int(os.environ['BILI_NUM'])
if os.environ['BILI_TYPE'] == "":
coin_type = 1 # 投币方式 默认为 1 ;1: 为关注用户列表视频投币 0: 为随机投币。如果关注用户发布的视频不足配置的投币数,则剩余部分使用随机投币
else:
coin_type = int(os.environ['BILI_TYPE'])
if os.environ['BILI_S2C'] == "":
silver2coin = True # 是否开启银瓜子换硬币,默认为 True 开启
else:
silver2coin = False
session = requests.session()
requests.utils.add_dict_to_cookiejar(session.cookies, bilibili_cookie)
session.headers.update(
{
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/63.0.3239.108",
"Referer": "https://www.bilibili.com/",
"Connection": "keep-alive",
}
)
success_count = 0
uname, uid, is_login, coin, vip_type, current_exp = self.get_nav(session=session)
# print(uname, uid, is_login, coin, vip_type, current_exp)
if is_login:
manhua_msg = self.manga_sign(session=session)
time.sleep(random.randint(7,19))
#print('-----------随机暂停14s左右------------')
manhua_book = self.manga_book(session=session)
time.sleep(random.randint(7,19))
#print('-----------随机暂停14s左右------------')
live_msg = self.live_sign(session=session)
time.sleep(random.randint(7,19))
#print('-----------随机暂停14s左右------------')
aid_list = self.get_region(session=session)
time.sleep(random.randint(7,19))
#print('-----------随机暂停14s左右------------')
reward_ret = self.reward(session=session)
# print(reward_ret) # 取消本段输出
coins_av_count = reward_ret.get("data", {}).get("coins_av") // 10
coin_num = coin_num - coins_av_count
coin_num = coin_num if coin_num < coin else coin
#print(coin_num)
if coin_type == 1 and coin_num:
following_list = self.get_followings(session=session, uid=uid)
for following in following_list.get("data", {}).get("list"):
mid = following.get("mid")
if mid:
aid_list += self.space_arc_search(session=session, uid=mid)
if coin_num > 0:
for aid in aid_list[::-1]:
# print(f'成功给{aid.get("title")}投一个币')
# coin_num -= 1
# success_count += 1
ret = self.coin_add(session=session, aid=aid.get("aid"), bili_jct=bili_jct)
if ret["code"] == 0:
coin_num -= 1
#print(f'成功给{aid.get("title")}投一个币')
success_count += 1
elif ret["code"] == 34005:
#print(f'投币{aid.get("title")}失败,原因为{ret["message"]}')
continue
# -104 硬币不够了 -111 csrf 失败 34005 投币达到上限
else:
#print(f'投币{aid.get("title")}失败,原因为{ret["message"]},跳过投币')
break
if coin_num <= 0:
break
coin_msg = f"今日成功投币{success_count + coins_av_count}/{coin_num}个"
#print(coin_msg)
else:
coin_msg = f"今日成功投币{coins_av_count}/{coin_type}个"
#print(coin_msg)
aid = aid_list[0].get("aid")
cid = aid_list[0].get("cid")
title = aid_list[0].get("title")
report_ret = self.report_task(session=session, bili_jct=bili_jct, aid=aid, cid=cid)
if report_ret.get("code") == 0:
report_msg = f"观看《{title}》300+秒"
else:
report_msg = f"任务失败"
#print(report_msg)
share_ret = self.share_task(session=session, bili_jct=bili_jct, aid=aid)
if share_ret.get("code") == 0:
share_msg = f"分享《{title}》成功"
else:
share_msg = f"分享失败"
#print(share_msg)
if silver2coin:
silver2coin_ret = self.silver2coin(session=session, bili_jct=bili_jct)
if silver2coin_ret["code"] == 0:
silver2coin_msg = f"成功将银瓜子兑换为1个硬币"
else:
silver2coin_msg = silver2coin_ret["message"]
print(silver2coin_msg)
else:
silver2coin_msg = f"未开启银瓜子兑换硬币功能"
time.sleep(random.randint(10,19))
#print('-----------随机暂停14s左右------------')
live_stats = self.live_status(session=session)
uname, uid, is_login, new_coin, vip_type, new_current_exp = self.get_nav(session=session)
#print(uname, uid, is_login, new_coin, vip_type, new_current_exp)
reward_ret = self.reward(session=session)
login = reward_ret.get("data", {}).get("login")
watch_av = reward_ret.get("data", {}).get("watch_av")
coins_av = reward_ret.get("data", {}).get("coins_av", 0)
share_av = reward_ret.get("data", {}).get("share_av")
today_exp = len([one for one in [login, watch_av, share_av] if one]) * 5
today_exp += coins_av
update_data = (28800 - new_current_exp) // (today_exp if today_exp else 1)
msg = (
f"【Bilibili任务简报】\n帐号信息: {uname}\n漫画签到: {manhua_msg}\n直播签到: {live_msg}\n"
f"登陆任务: 今日已登陆\n观看视频: {report_msg}\n分享任务: {share_msg}\n投币任务: {coin_msg}\n"
f"银瓜子兑换硬币: {silver2coin_msg}\n今日获得经验: {today_exp}\n当前经验: {new_current_exp}\n"
f"按当前速度升级还需: {update_data}天\n{live_stats}"
)
msg_all += msg + "\n\n"
return msg_all
BILIBILI_MSG = ''
if __name__ == "__main__":
# 未填写参数取消运行
if os.environ['BILI_USER'] == "" or os.environ['BILI_PASS'] == "":
if os.environ['BILI_COOKIE'] == "":
print("未填写哔哩哔哩账号密码或COOKIE取消运行")
exit(0)
if BILI_COOKIE == "":
b = Bilibili()
login = b.login(username=os.environ['BILI_USER'], password=os.environ['BILI_PASS'])
if login == False:
print('登录失败 账号或密码错误,详情前往Github查看')
exit(0)
_bilibili_cookie_list = b.get_cookies()
else:
_bilibili_cookie_list = {cookie.split('=')[0]:cookie.split('=')[-1] for cookie in BILI_COOKIE.split(';')}
#global BILIBILI_MSG
BILIBILI_MSG = BiliBiliCheckIn(bilibili_cookie_list=_bilibili_cookie_list).main()
print(BILIBILI_MSG)