-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserve.py
More file actions
157 lines (118 loc) · 4.46 KB
/
serve.py
File metadata and controls
157 lines (118 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import time
import logging
import socket
import json
import click
from pathlib import Path
from flask import Flask, send_from_directory, jsonify, request, redirect
app = Flask(__name__, static_folder=None)
current_files_dir = Path(os.getcwd())
static_files_dir = current_files_dir / "static"
is_debug = False
redirect_url = None
ums_delay = 15.
def enable_mass_storage(blkdev: str, ro: bool = True):
# Only works in Luckfox
if is_debug:
return
blk_file_path = "/sys/kernel/config/usb_gadget/rockchip/functions/mass_storage.0/lun.0/file"
ro_file_path = "/sys/kernel/config/usb_gadget/rockchip/functions/mass_storage.0/lun.0/ro"
with open(ro_file_path, "w") as f:
f.write("1" if ro else "0")
with open(blk_file_path, "w") as f:
f.write(f"{blkdev}\n")
def disable_mass_storage():
# Only works in Luckfox
if is_debug:
return
path = "/sys/kernel/config/usb_gadget/rockchip/functions/mass_storage.0/lun.0/file"
with open(path, "w") as f:
f.write("")
def send_payload(path: str, host: str, port: int):
# Implement netcat-like functionality to send a file over TCP
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto=0)
s.settimeout(3000)
s.connect((host, port))
try:
with open(path, "rb") as fp:
s.sendfile(fp)
finally:
s.close()
@app.route("/api/<path:path>", methods=["POST"])
def catch_api_routes(path: str):
params = request.args.to_dict()
def disable_etho0():
logging.error("Disable eth0 interface...")
if not is_debug:
os.system("ifconfig eth0 down")
exit(0)
def shutdown():
# TODO Shutting down or rebooting the system not working properly with UMS gadget
logging.error("Shutting down the system...")
if not is_debug:
os.system("halt")
exit(0)
def plug_exfathax():
logging.error("Enable UMS (USB Mass Storage)...")
enable_mass_storage(static_files_dir / "exfathax.img", ro=True)
def unplug_exfathax():
logging.error("Disable UMS (USB Mass Storage)...")
disable_mass_storage()
def send_main_payload():
logging.error("Sending main payload...")
ps4_addr = request.remote_addr
loader_port = 9020
time.sleep(5)
send_payload(static_files_dir / "payload.bin", ps4_addr, loader_port)
def emulate_exfathax():
delay = float(params.get("delay", max(ums_delay, 0)))
logging.error(f"Emulating USB mass storage for ExFatHax... delay={delay} seconds")
enable_mass_storage(static_files_dir / "exfathax.img", ro=True)
time.sleep(delay)
disable_mass_storage()
time.sleep(1)
def server_sleep():
time.sleep(float(params.get("seconds", 0)))
if path == "disable-eth0":
disable_etho0()
elif path == "shutdown":
shutdown()
elif path == "plug-exfathax":
plug_exfathax()
elif path == "unplug-exfathax":
unplug_exfathax()
elif path == "send-main-payload":
send_main_payload()
elif path == "emulate-exfathax":
emulate_exfathax()
elif path == "server-sleep":
server_sleep()
return jsonify({"path": path})
@app.route("/", methods=["GET"])
@app.route("/<path:path>", methods=["GET"])
def catch_all_routes(path: str = "index.html"):
if redirect_url and "playstation.net" in request.host:
return redirect(redirect_url)
if path == "":
path = "index.html"
file_path = static_files_dir / path
if os.path.isdir(file_path):
file_path = file_path / "index.html"
return send_from_directory(file_path.parent, file_path.name)
@click.command()
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
@click.option("--port", default=9191, help="Port to run the server on")
@click.option("--debug", is_flag=True, help="Enable debug mode")
@click.option("--redirect-playstation-domain", default=None, type=str, help="Redirect *.playstation.net domain to other URL")
def main(host: str, port: int, debug: bool, redirect_playstation_domain: str):
global is_debug, redirect_url
is_debug = debug
redirect_url = redirect_playstation_domain
with open(current_files_dir / "config.json", "r") as f:
config = json.load(f)
global ums_delay
ums_delay = config.get("ums_delay", 15.)
app.run(host=host, port=port, debug=debug, threaded=True)
if __name__ == "__main__":
main()