Skip to content

Commit 6ada4f8

Browse files
everything all at once
1 parent 86db0ab commit 6ada4f8

File tree

3 files changed

+199
-1
lines changed

3 files changed

+199
-1
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
# Go
77
/bin
8-
/pkg
98

109
# Shared data
1110
/shared/output

pkg/kube/jobgen.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package kube
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
batchv1 "k8s.io/api/batch/v1"
8+
corev1 "k8s.io/api/core/v1"
9+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/kubernetes"
11+
"k8s.io/client-go/tools/clientcmd"
12+
"k8s.io/client-go/util/retry"
13+
)
14+
15+
func int32Ptr(i int32) *int32 { return &i }
16+
17+
// CreateJobForTile creates a Kubernetes Job that:
18+
// 1) downloads the tile from MinIO
19+
// 2) runs filter.wasm on it via Runwasi
20+
// 3) uploads the processed tile back to MinIO
21+
func CreateJobForTile(jobName, tileName, namespace, bucketURL, wasmBucketURL string) error {
22+
// Load kubeconfig
23+
cfg, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
24+
if err != nil {
25+
return fmt.Errorf("loading kubeconfig: %w", err)
26+
}
27+
clientset, err := kubernetes.NewForConfig(cfg)
28+
if err != nil {
29+
return fmt.Errorf("building clientset: %w", err)
30+
}
31+
32+
// Job spec
33+
job := &batchv1.Job{
34+
ObjectMeta: meta.ObjectMeta{
35+
Name: jobName,
36+
Namespace: namespace,
37+
Labels: map[string]string{"app": "wasm-tile-processor"},
38+
},
39+
Spec: batchv1.JobSpec{
40+
BackoffLimit: int32Ptr(1),
41+
Template: corev1.PodTemplateSpec{
42+
ObjectMeta: meta.ObjectMeta{
43+
Labels: map[string]string{"job-name": jobName},
44+
},
45+
Spec: corev1.PodSpec{
46+
RestartPolicy: corev1.RestartPolicyOnFailure,
47+
48+
// 1) InitContainer to download filter.wasm
49+
InitContainers: []corev1.Container{{
50+
Name: "init-wasm",
51+
Image: "curlimages/curl:7.85.0",
52+
Command: []string{
53+
"sh", "-c",
54+
fmt.Sprintf(
55+
"mkdir -p /opt/filter && "+
56+
"curl -s %s/filter.wasm -o /opt/filter/filter.wasm && "+
57+
"ls -l /opt/filter && echo \"WASM fetched!\"",
58+
wasmBucketURL,
59+
),
60+
},
61+
VolumeMounts: []corev1.VolumeMount{{
62+
Name: "wasm-volume",
63+
MountPath: "/opt/filter",
64+
}},
65+
}},
66+
67+
// 2) Main processing container
68+
Containers: []corev1.Container{{
69+
Name: "processor",
70+
Image: "ghcr.io/phantominthewire/image-pipeline:latest",
71+
Command: []string{
72+
"sh", "-c",
73+
fmt.Sprintf(
74+
"curl -s %s/%s | runwasi /opt/filter/filter.wasm > /tmp/out.png && "+
75+
"curl -X PUT -T /tmp/out.png %s/processed/%s",
76+
bucketURL, tileName,
77+
bucketURL, tileName,
78+
),
79+
},
80+
Env: []corev1.EnvVar{
81+
{Name: "INPUT_URL", Value: fmt.Sprintf("%s/%s", bucketURL, tileName)},
82+
{Name: "OUTPUT_URL",Value: fmt.Sprintf("%s/processed/%s", bucketURL, tileName)},
83+
},
84+
VolumeMounts: []corev1.VolumeMount{{
85+
Name: "wasm-volume",
86+
MountPath: "/opt/filter",
87+
}},
88+
}},
89+
90+
// 3) Shared emptyDir for the wasm artifact
91+
Volumes: []corev1.Volume{{
92+
Name: "wasm-volume",
93+
VolumeSource: corev1.VolumeSource{
94+
EmptyDir: &corev1.EmptyDirVolumeSource{},
95+
},
96+
}},
97+
},
98+
},
99+
},
100+
}
101+
102+
// Create with retry
103+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
104+
_, err := clientset.BatchV1().Jobs(namespace).Create(context.Background(), job, meta.CreateOptions{})
105+
return err
106+
})
107+
}

pkg/storage/minio.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"os"
8+
"path/filepath"
9+
"strings"
10+
11+
"github.com/aws/aws-sdk-go-v2/aws"
12+
"github.com/aws/aws-sdk-go-v2/config"
13+
"github.com/aws/aws-sdk-go-v2/credentials"
14+
"github.com/aws/aws-sdk-go-v2/service/s3"
15+
)
16+
17+
type MinioConfig struct {
18+
Endpoint string
19+
Region string
20+
AccessKey string
21+
SecretKey string
22+
Bucket string
23+
Prefix string
24+
Dir string
25+
}
26+
27+
func UploadTiles(cfg MinioConfig) error {
28+
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
29+
return aws.Endpoint{
30+
URL: cfg.Endpoint,
31+
SigningRegion: cfg.Region,
32+
HostnameImmutable: true,
33+
}, nil
34+
})
35+
36+
awsCfg, err := config.LoadDefaultConfig(context.TODO(),
37+
config.WithRegion(cfg.Region),
38+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, "")),
39+
config.WithEndpointResolverWithOptions(customResolver),
40+
)
41+
if err != nil {
42+
return err
43+
}
44+
45+
client := s3.NewFromConfig(awsCfg)
46+
47+
// Ensure the bucket exists
48+
_, err = client.HeadBucket(context.TODO(), &s3.HeadBucketInput{
49+
Bucket: aws.String(cfg.Bucket),
50+
})
51+
if err != nil {
52+
_, err = client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
53+
Bucket: aws.String(cfg.Bucket),
54+
})
55+
if err != nil {
56+
return fmt.Errorf("failed to create bucket %s: %w", cfg.Bucket, err)
57+
}
58+
log.Printf("Created bucket: %s", cfg.Bucket)
59+
}
60+
61+
files, err := os.ReadDir(cfg.Dir)
62+
if err != nil {
63+
return err
64+
}
65+
66+
for _, f := range files {
67+
if f.IsDir() || !strings.HasSuffix(f.Name(), ".png") {
68+
continue
69+
}
70+
71+
fpath := filepath.Join(cfg.Dir, f.Name())
72+
file, err := os.Open(fpath)
73+
if err != nil {
74+
log.Printf("could not open file %s: %v", f.Name(), err)
75+
continue
76+
}
77+
defer file.Close()
78+
79+
_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
80+
Bucket: aws.String(cfg.Bucket),
81+
Key: aws.String(filepath.Join(cfg.Prefix, f.Name())),
82+
Body: file,
83+
})
84+
if err != nil {
85+
log.Printf("failed to upload %s: %v", f.Name(), err)
86+
} else {
87+
log.Printf("uploaded: %s", f.Name())
88+
}
89+
}
90+
91+
return nil
92+
}

0 commit comments

Comments
 (0)