-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
129 lines (112 loc) · 2.91 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package main
import (
"flag"
"os"
"net/http"
"io"
"archive/zip"
"context"
"cloud.google.com/go/storage"
"fmt"
"log"
"sync"
)
var (
bucketName = flag.String("bucket-name", "deeplesion-data", "the name of the GCP bucket you want to upload to")
parallel = flag.Bool("parallel", false, "Download and upload data in parallel, generally requires more disk space")
removeFiles = flag.Bool("remove-files", false, "remove each file after download and upload (only if parallel=false)")
)
func main() {
flag.Parse()
if *parallel {
BeginConcurrent()
} else {
Begin()
}
}
// Begin starts the download, unzip, and upload process with no concurrency
func Begin() {
for i, url := range DownloadURLs {
fn := fmt.Sprintf("Images_png_%02d.zip", i + 1)
FetchUploadAndHandleFile(fn, url, *bucketName, nil)
if *removeFiles {
os.Remove(fn)
}
}
}
// BeginConcurrent will begin the download, unzip, and upload process for all source files concurrently
func BeginConcurrent() {
var wg sync.WaitGroup
for i, url := range DownloadURLs {
fn := fmt.Sprintf("Images_png_%02d.zip", i + 1)
wg.Add(1)
go FetchUploadAndHandleFile(fn, url, *bucketName, &wg)
}
wg.Wait()
}
// FetchUploadAndHandleFile fetches, unzips, and uploads the file at url to the bucket specified by bucketName
func FetchUploadAndHandleFile(filename, url, bucketName string, wg *sync.WaitGroup) error {
log.Printf("Starting download of %s\n", filename)
FetchFile(filename, url)
log.Printf("Download of file %s complete, begining unzip and upload to GCP\n", filename)
UnzipAndUploadFiles(filename, bucketName)
if wg != nil {
wg.Done()
}
return nil
}
// UnzipAndUploadFiles unzips the file at filename and then uploads the constituent files to the bucketName bucket
func UnzipAndUploadFiles(filename, bucketName string) error {
r, err := zip.OpenReader(filename)
if err != nil {
log.Printf("Unable to open zip %s\n", filename)
return err
}
defer r.Close()
// Setup connection to GCP bucket
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
return err
}
bkt := client.Bucket(bucketName)
for _, f := range r.File {
imageFile, err := f.Open()
if err != nil {
return err
}
fmt.Printf("Uploading %s\n", f.Name)
imageObj := bkt.Object(f.Name)
w := imageObj.NewWriter(context.Background())
_, err = io.Copy(w, imageFile)
if err != nil {
w.Close()
imageFile.Close()
log.Println("error copying to gcp")
}
w.Close()
imageFile.Close()
}
return nil
}
// FetchFile downloads the file at the url and saves it to the local filename path
func FetchFile(filename string, url string) error {
// Create the file
out, err := os.Create(filename)
if err != nil {
return err
}
defer out.Close()
// Get the data
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// Write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}
return nil
}