Skip to content

Commit 74defbd

Browse files
author
Suhem Parack
committed
Create media_upload_v2.py
Added python script for media upload v2
1 parent 8c63446 commit 74defbd

File tree

1 file changed

+229
-0
lines changed

1 file changed

+229
-0
lines changed

Media Upload/media_upload_v2.py

+229
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import os
2+
import sys
3+
import base64
4+
import hashlib
5+
import re
6+
import time
7+
import requests
8+
from requests_oauthlib import OAuth2Session
9+
10+
MEDIA_ENDPOINT_URL = 'https://api.x.com/2/media/upload'
11+
POST_TO_X_URL = 'https://api.x.com/2/tweets'
12+
13+
# Replace with path to file
14+
VIDEO_FILENAME = 'REPLACE_ME'
15+
16+
# You will need to enable OAuth 2.0 in your App’s auth settings in the Developer Portal to get your client ID.
17+
# Inside your terminal you will need to set an enviornment variable
18+
# export CLIENT_ID='your-client-id'
19+
client_id = os.environ.get("CLIENT_ID")
20+
21+
# If you have selected a type of App that is a confidential client you will need to set a client secret.
22+
# Confidential Clients securely authenticate with the authorization server.
23+
24+
# Inside your terminal you will need to set an enviornment variable
25+
# export CLIENT_SECRET='your-client-secret'
26+
27+
# Remove the comment on the following line if you are using a confidential client
28+
# client_secret = os.environ.get("CLIENT_SECRET")
29+
30+
# Replace the following URL with your callback URL, which can be obtained from your App's auth settings.
31+
redirect_uri = "https://www.example.com"
32+
33+
# Set the scopes
34+
scopes = ["media.write", "users.read", "tweet.read", "tweet.write", "offline.access"]
35+
36+
# Create a code verifier
37+
code_verifier = base64.urlsafe_b64encode(os.urandom(30)).decode("utf-8")
38+
code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
39+
40+
# Create a code challenge
41+
code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
42+
code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
43+
code_challenge = code_challenge.replace("=", "")
44+
45+
# Start and OAuth 2.0 session
46+
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri, scope=scopes)
47+
48+
# Create an authorize URL
49+
auth_url = "https://x.com/i/oauth2/authorize"
50+
authorization_url, state = oauth.authorization_url(
51+
auth_url, code_challenge=code_challenge, code_challenge_method="S256"
52+
)
53+
54+
# Visit the URL to authorize your App to make requests on behalf of a user
55+
print(
56+
"Visit the following URL to authorize your App on behalf of your X handle in a browser:"
57+
)
58+
print(authorization_url)
59+
60+
# Paste in your authorize URL to complete the request
61+
authorization_response = input(
62+
"Paste in the full URL after you've authorized your App:\n"
63+
)
64+
65+
# Fetch your access token
66+
token_url = "https://api.x.com/2/oauth2/token"
67+
68+
# The following line of code will only work if you are using a type of App that is a public client
69+
auth = False
70+
71+
# If you are using a confidential client you will need to pass in basic encoding of your client ID and client secret.
72+
73+
# Please remove the comment on the following line if you are using a type of App that is a confidential client
74+
# auth = HTTPBasicAuth(client_id, client_secret)
75+
76+
token = oauth.fetch_token(
77+
token_url=token_url,
78+
authorization_response=authorization_response,
79+
auth=auth,
80+
client_id=client_id,
81+
include_client_id=True,
82+
code_verifier=code_verifier,
83+
)
84+
85+
# Your access token
86+
access = token["access_token"]
87+
88+
headers = {
89+
"Authorization": "Bearer {}".format(access),
90+
"Content-Type": "application/json",
91+
"User-Agent": "MediaUploadSampleCode",
92+
}
93+
94+
95+
class VideoPost(object):
96+
97+
def __init__(self, file_name):
98+
# Defines video Post properties
99+
self.video_filename = file_name
100+
self.total_bytes = os.path.getsize(self.video_filename)
101+
self.media_id = None
102+
self.processing_info = None
103+
104+
def upload_init(self):
105+
# Initializes Upload
106+
print('INIT')
107+
108+
request_data = {
109+
'command': 'INIT',
110+
'media_type': 'video/mp4',
111+
'total_bytes': self.total_bytes,
112+
'media_category': 'tweet_video'
113+
}
114+
115+
req = requests.post(url=MEDIA_ENDPOINT_URL, params=request_data, headers=headers)
116+
print(req.status_code)
117+
print(req.text)
118+
media_id = req.json()['data']['id']
119+
120+
self.media_id = media_id
121+
122+
print('Media ID: %s' % str(media_id))
123+
124+
def upload_append(self):
125+
segment_id = 0
126+
bytes_sent = 0
127+
with open(self.video_filename, 'rb') as file:
128+
while bytes_sent < self.total_bytes:
129+
chunk = file.read(4 * 1024 * 1024) # 4MB chunk size
130+
131+
print('APPEND')
132+
133+
files = {'media': ('chunk', chunk, 'application/octet-stream')}
134+
135+
data = {
136+
'command': 'APPEND',
137+
'media_id': self.media_id,
138+
'segment_index': segment_id
139+
}
140+
141+
headers = {
142+
"Authorization": f"Bearer {access}",
143+
"User-Agent": "MediaUploadSampleCode",
144+
}
145+
146+
req = requests.post(url=MEDIA_ENDPOINT_URL, data=data, files=files, headers=headers)
147+
148+
if req.status_code < 200 or req.status_code > 299:
149+
print(req.status_code)
150+
print(req.text)
151+
sys.exit(0)
152+
153+
segment_id += 1
154+
bytes_sent = file.tell()
155+
156+
print(f'{bytes_sent} of {self.total_bytes} bytes uploaded')
157+
158+
print('Upload chunks complete.')
159+
160+
def upload_finalize(self):
161+
162+
# Finalizes uploads and starts video processing
163+
print('FINALIZE')
164+
165+
request_data = {
166+
'command': 'FINALIZE',
167+
'media_id': self.media_id
168+
}
169+
170+
req = requests.post(url=MEDIA_ENDPOINT_URL, params=request_data, headers=headers)
171+
172+
print(req.json())
173+
174+
self.processing_info = req.json()['data'].get('processing_info', None)
175+
self.check_status()
176+
177+
def check_status(self):
178+
# Checks video processing status
179+
if self.processing_info is None:
180+
return
181+
182+
state = self.processing_info['state']
183+
184+
print('Media processing status is %s ' % state)
185+
186+
if state == u'succeeded':
187+
return
188+
189+
if state == u'failed':
190+
sys.exit(0)
191+
192+
check_after_secs = self.processing_info['check_after_secs']
193+
194+
print('Checking after %s seconds' % str(check_after_secs))
195+
time.sleep(check_after_secs)
196+
197+
print('STATUS')
198+
199+
request_params = {
200+
'command': 'STATUS',
201+
'media_id': self.media_id
202+
}
203+
204+
req = requests.get(url=MEDIA_ENDPOINT_URL, params=request_params, headers=headers)
205+
206+
self.processing_info = req.json()['data'].get('processing_info', None)
207+
self.check_status()
208+
209+
def post(self):
210+
211+
# Publishes Post with attached video
212+
payload = {
213+
'text': 'I just uploaded a video with the media upload v2 @XDevelopers API.',
214+
'media': {
215+
'media_ids': [self.media_id]
216+
}
217+
}
218+
219+
req = requests.post(url=POST_TO_X_URL, json=payload, headers=headers)
220+
221+
print(req.json())
222+
223+
224+
if __name__ == '__main__':
225+
videoPost = VideoPost(VIDEO_FILENAME)
226+
videoPost.upload_init()
227+
videoPost.upload_append()
228+
videoPost.upload_finalize()
229+
videoPost.post()

0 commit comments

Comments
 (0)