2
2
3
3
from __future__ import annotations
4
4
5
+ import asyncio
5
6
import base64
6
7
from collections .abc import AsyncIterator , Callable , Coroutine , Mapping
7
8
import hashlib
8
9
import logging
10
+ import random
9
11
from typing import Any , Self
10
12
11
13
from aiohttp import ClientError , ClientTimeout , StreamReader
26
28
27
29
_LOGGER = logging .getLogger (__name__ )
28
30
_STORAGE_BACKUP = "backup"
31
+ _RETRY_LIMIT = 5
32
+ _RETRY_SECONDS_MIN = 60
33
+ _RETRY_SECONDS_MAX = 600
29
34
30
35
31
36
async def _b64md5 (stream : AsyncIterator [bytes ]) -> str :
@@ -138,37 +143,34 @@ async def async_download_backup(
138
143
raise BackupAgentError ("Failed to get download details" ) from err
139
144
140
145
try :
141
- resp = await self ._cloud .websession .get (details ["url" ])
146
+ resp = await self ._cloud .websession .get (
147
+ details ["url" ],
148
+ timeout = ClientTimeout (connect = 10.0 , total = 43200.0 ), # 43200s == 12h
149
+ )
150
+
142
151
resp .raise_for_status ()
143
152
except ClientError as err :
144
153
raise BackupAgentError ("Failed to download backup" ) from err
145
154
146
155
return ChunkAsyncStreamIterator (resp .content )
147
156
148
- async def async_upload_backup (
157
+ async def _async_do_upload_backup (
149
158
self ,
150
159
* ,
151
160
open_stream : Callable [[], Coroutine [Any , Any , AsyncIterator [bytes ]]],
152
- backup : AgentBackup ,
153
- ** kwargs : Any ,
161
+ filename : str ,
162
+ base64md5hash : str ,
163
+ metadata : dict [str , Any ],
164
+ size : int ,
154
165
) -> None :
155
- """Upload a backup.
156
-
157
- :param open_stream: A function returning an async iterator that yields bytes.
158
- :param backup: Metadata about the backup that should be uploaded.
159
- """
160
- if not backup .protected :
161
- raise BackupAgentError ("Cloud backups must be protected" )
162
-
163
- base64md5hash = await _b64md5 (await open_stream ())
164
-
166
+ """Upload a backup."""
165
167
try :
166
168
details = await async_files_upload_details (
167
169
self ._cloud ,
168
170
storage_type = _STORAGE_BACKUP ,
169
- filename = self . _get_backup_filename () ,
170
- metadata = backup . as_dict () ,
171
- size = backup . size ,
171
+ filename = filename ,
172
+ metadata = metadata ,
173
+ size = size ,
172
174
base64md5hash = base64md5hash ,
173
175
)
174
176
except (ClientError , CloudError ) as err :
@@ -178,7 +180,7 @@ async def async_upload_backup(
178
180
upload_status = await self ._cloud .websession .put (
179
181
details ["url" ],
180
182
data = await open_stream (),
181
- headers = details ["headers" ] | {"content-length" : str (backup . size )},
183
+ headers = details ["headers" ] | {"content-length" : str (size )},
182
184
timeout = ClientTimeout (connect = 10.0 , total = 43200.0 ), # 43200s == 12h
183
185
)
184
186
_LOGGER .log (
@@ -190,6 +192,51 @@ async def async_upload_backup(
190
192
except (TimeoutError , ClientError ) as err :
191
193
raise BackupAgentError ("Failed to upload backup" ) from err
192
194
195
+ async def async_upload_backup (
196
+ self ,
197
+ * ,
198
+ open_stream : Callable [[], Coroutine [Any , Any , AsyncIterator [bytes ]]],
199
+ backup : AgentBackup ,
200
+ ** kwargs : Any ,
201
+ ) -> None :
202
+ """Upload a backup.
203
+
204
+ :param open_stream: A function returning an async iterator that yields bytes.
205
+ :param backup: Metadata about the backup that should be uploaded.
206
+ """
207
+ if not backup .protected :
208
+ raise BackupAgentError ("Cloud backups must be protected" )
209
+
210
+ base64md5hash = await _b64md5 (await open_stream ())
211
+ filename = self ._get_backup_filename ()
212
+ metadata = backup .as_dict ()
213
+ size = backup .size
214
+
215
+ tries = 1
216
+ while tries <= _RETRY_LIMIT :
217
+ try :
218
+ await self ._async_do_upload_backup (
219
+ open_stream = open_stream ,
220
+ filename = filename ,
221
+ base64md5hash = base64md5hash ,
222
+ metadata = metadata ,
223
+ size = size ,
224
+ )
225
+ break
226
+ except BackupAgentError as err :
227
+ if tries == _RETRY_LIMIT :
228
+ raise
229
+ tries += 1
230
+ retry_timer = random .randint (_RETRY_SECONDS_MIN , _RETRY_SECONDS_MAX )
231
+ _LOGGER .info (
232
+ "Failed to upload backup, retrying (%s/%s) in %ss: %s" ,
233
+ tries ,
234
+ _RETRY_LIMIT ,
235
+ retry_timer ,
236
+ err ,
237
+ )
238
+ await asyncio .sleep (retry_timer )
239
+
193
240
async def async_delete_backup (
194
241
self ,
195
242
backup_id : str ,
0 commit comments