Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
93 changes: 21 additions & 72 deletions executor.go → anvil/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
endpoints "github.com/aws/smithy-go/endpoints"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
Expand All @@ -39,12 +38,13 @@ import (
const BUCKET = "zetaforge"

type Endpoint struct {
Bucket string
S3Port int
Address string
Bucket string
S3Port int
}

func (endpoint *Endpoint) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (endpoints.Endpoint, error) {
uri, err := url.Parse(fmt.Sprintf("http://localhost:%d/%s", endpoint.S3Port, endpoint.Bucket))
uri, err := url.Parse(fmt.Sprintf("http://%s:%d/%s", endpoint.Address, endpoint.S3Port, endpoint.Bucket))
return endpoints.Endpoint{URI: *uri}, err
}

Expand All @@ -58,7 +58,11 @@ func s3Client(ctx context.Context, cfg Config) (*s3.Client, error) {
return &s3.Client{}, err
}
client := s3.NewFromConfig(awsConfig, func(o *s3.Options) {
o.EndpointResolverV2 = &Endpoint{Bucket: BUCKET, S3Port: cfg.Local.BucketPort}
if cfg.IsLocal {
o.EndpointResolverV2 = &Endpoint{Address: "localhost", Bucket: BUCKET, S3Port: cfg.Local.BucketPort}
} else {
o.EndpointResolverV2 = &Endpoint{Address: "weed", Bucket: BUCKET, S3Port: 8333}
}
})

return client, nil
Expand Down Expand Up @@ -135,47 +139,6 @@ func downloadFiles(ctx context.Context, sink string, prefix string, cfg Config)
return nil
}

func deleteFiles(ctx context.Context, prefix string, extraFiles []string, cfg Config) {
log.Printf("Deleting: %s", prefix)
client, err := s3Client(ctx, cfg)
if err != nil {
log.Printf("Failed to delete files; err=%v", err)
return
}

params := &s3.DeleteObjectsInput{
Bucket: aws.String(BUCKET),
Delete: &s3types.Delete{
Objects: []s3types.ObjectIdentifier{},
},
}

res, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(BUCKET),
Prefix: aws.String(prefix),
})
if err != nil {
log.Printf("Failed to delete files; err=%v", err)
}

for _, content := range res.Contents {
params.Delete.Objects = append(params.Delete.Objects, s3types.ObjectIdentifier{
Key: content.Key,
})
}

for _, file := range extraFiles {
params.Delete.Objects = append(params.Delete.Objects, s3types.ObjectIdentifier{
Key: aws.String(file),
})
}

_, err = client.DeleteObjects(ctx, params)
if err != nil {
log.Printf("Failed to delete files; err=%v", err)
}
}

func history(sinkPath string) error {
if err := os.MkdirAll(sinkPath, 0755); err != nil {
return err
Expand Down Expand Up @@ -247,17 +210,10 @@ func streaming(ctx context.Context, sink string, name string, room string, clien
return
}

namespace, _, err := client.Namespace()

if err != nil {
log.Printf("Log stream error; err=%v", err)
return
}

serviceClient := cli.NewWorkflowServiceClient()
containerStream := func(containerName string) {
stream, err := serviceClient.WorkflowLogs(ctx, &workflowpkg.WorkflowLogRequest{
Namespace: namespace,
Namespace: "default",
Name: name,
LogOptions: &corev1.PodLogOptions{
Container: containerName,
Expand Down Expand Up @@ -335,15 +291,9 @@ func runArgo(ctx context.Context, workflow *wfv1.Workflow, sink string, pipeline
return nil, err
}

namespace, _, err := client.Namespace()

if err != nil {
return nil, err
}

serviceClient := cli.NewWorkflowServiceClient()
workflow, err = serviceClient.CreateWorkflow(ctx, &workflowpkg.WorkflowCreateRequest{
Namespace: namespace,
Namespace: "default",
Workflow: workflow,
})

Expand All @@ -357,7 +307,7 @@ func runArgo(ctx context.Context, workflow *wfv1.Workflow, sink string, pipeline
for {
workflow, err = serviceClient.GetWorkflow(ctx, &workflowpkg.WorkflowGetRequest{
Name: workflow.Name,
Namespace: namespace,
Namespace: "default",
})

if err != nil {
Expand All @@ -381,14 +331,15 @@ func runArgo(ctx context.Context, workflow *wfv1.Workflow, sink string, pipeline
}

if workflow.Status.Phase != wfv1.WorkflowSucceeded {
errorCode := ""
errorCode := "workflow: " + workflow.Status.Message + ";"
for name, node := range workflow.Status.Nodes {
if node.Type == wfv1.NodeTypePod {
if node.Phase == wfv1.NodeFailed || node.Phase == wfv1.NodeError {
errorCode += name + ": " + node.Message
errorCode += name + ": " + node.Message + ";"
}
}
}

return workflow, errors.New(errorCode)
}

Expand All @@ -410,17 +361,10 @@ func deleteArgo(ctx context.Context, name string, client clientcmd.ClientConfig)
return
}

namespace, _, err := client.Namespace()

if err != nil {
log.Printf("Failed to delete workflow %s; err=%v", name, err)
return
}

serviceClient := cli.NewWorkflowServiceClient()
_, err = serviceClient.DeleteWorkflow(ctx, &workflowpkg.WorkflowDeleteRequest{
Name: name,
Namespace: namespace,
Namespace: "default",
Force: false,
})

Expand Down Expand Up @@ -685,6 +629,11 @@ func cloudExecute(pipeline *zjson.Pipeline, id int64, executionId string, build
}
defer hub.CloseRoom(pipeline.Id)

if err := upload(ctx, cfg.EntrypointFile, cfg.EntrypointFile, cfg); err != nil { // should never fail
log.Printf("Failed to upload entrypoint file; err=%v", err)
return
}

s3key := pipeline.Id + "/" + executionId

workflow, _, err := translate(ctx, pipeline, "org", s3key, build, cfg)
Expand Down
File renamed without changes.
File renamed without changes.
44 changes: 4 additions & 40 deletions main.go → anvil/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"database/sql"
"embed"
"encoding/base64"
"encoding/json"
"io"
"log"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/pressly/goose/v3"
"github.com/xeipuuv/gojsonschema"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"

"github.com/getsentry/sentry-go"
sentrygin "github.com/getsentry/sentry-go/gin"
Expand All @@ -50,24 +48,13 @@ type Config struct {
Database string
SetupVersion string
Local Local `json:"Local,omitempty"`
Cloud Cloud `json:"Cloud,omitempty"`
}

type Local struct {
BucketPort int
Driver string
}

type Cloud struct {
Registry string
RegistryAddr string
RegistryUser string
RegistryPass string
ClusterIP string
Token string
CaCert string
}

type WebSocketWriter struct {
PipelineId string
MessageFunc func(string, string)
Expand Down Expand Up @@ -166,13 +153,11 @@ func main() {
log.Fatalf("Failed to migrate database; err=%v", err)
}

var client clientcmd.ClientConfig
client := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
if config.IsLocal {
client = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)

// Switching to Cobra if we need more arguments
if len(os.Args) > 1 {
if os.Args[1] == "--uninstall" {
Expand All @@ -182,27 +167,6 @@ func main() {
os.Exit(1)
}
setup(ctx, config, client, db)
} else {
cacert, err := base64.StdEncoding.DecodeString(config.Cloud.CaCert)
if err != nil {
log.Fatalf("Invalid CA certificate; err=%v", err)
}

cfg := clientcmdapi.NewConfig()
cfg.Clusters["zetacluster"] = &clientcmdapi.Cluster{
Server: "https://" + config.Cloud.ClusterIP,
CertificateAuthorityData: cacert,
}
cfg.AuthInfos["zetaauth"] = &clientcmdapi.AuthInfo{
Token: config.Cloud.Token,
}
cfg.Contexts["zetacontext"] = &clientcmdapi.Context{
Cluster: "zetacluster",
AuthInfo: "zetaauth",
}
cfg.CurrentContext = "zetacontext"

client = clientcmd.NewDefaultClientConfig(*cfg, &clientcmd.ConfigOverrides{})
}

hub := newHub()
Expand Down
File renamed without changes.
File renamed without changes.
1 change: 0 additions & 1 deletion setup.go → anvil/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func kubectlCheckPods(ctx context.Context, clientConfig *rest.Config) error {

func migrate(ctx context.Context, resources map[string]string, config Config, clientConfig *rest.Config, db *sql.DB) error {
setupVersion, err := getSetupVersion(ctx, db)
log.Println(setupVersion)
var version string
if err != nil {
version = config.SetupVersion
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading