Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:Safing/portbase into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Apr 6, 2023
2 parents c067126 + fcd91a8 commit fe11bff
Show file tree
Hide file tree
Showing 20 changed files with 520 additions and 140 deletions.
2 changes: 1 addition & 1 deletion api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func registerConfig() error {
err = config.Register(&config.Option{
Name: "API Keys",
Key: CfgAPIKeys,
Description: "Define API keys for priviledged access to the API. Every entry is a separate API key with respective permissions. Format is `<key>?read=<perm>&write=<perm>`. Permissions are `anyone`, `user` and `admin`, and may be omitted.",
Description: "Define API keys for privileged access to the API. Every entry is a separate API key with respective permissions. Format is `<key>?read=<perm>&write=<perm>`. Permissions are `anyone`, `user` and `admin`, and may be omitted.",
Sensitive: true,
OptType: config.OptTypeStringArray,
ExpertiseLevel: config.ExpertiseLevelDeveloper,
Expand Down
10 changes: 5 additions & 5 deletions api/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (api *DatabaseAPI) handleGet(opID []byte, key string) {

r, err := api.db.Get(key)
if err == nil {
data, err = marshalRecord(r, true)
data, err = MarshalRecord(r, true)
}
if err != nil {
api.send(opID, dbMsgTypeError, err.Error(), nil)
Expand Down Expand Up @@ -348,7 +348,7 @@ func (api *DatabaseAPI) processQuery(opID []byte, q *query.Query) (ok bool) {
// process query feed
if r != nil {
// process record
data, err := marshalRecord(r, true)
data, err := MarshalRecord(r, true)
if err != nil {
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
continue
Expand Down Expand Up @@ -425,7 +425,7 @@ func (api *DatabaseAPI) processSub(opID []byte, sub *database.Subscription) {
// process sub feed
if r != nil {
// process record
data, err := marshalRecord(r, true)
data, err := MarshalRecord(r, true)
if err != nil {
api.send(opID, dbMsgTypeWarning, err.Error(), nil)
continue
Expand Down Expand Up @@ -629,9 +629,9 @@ func (api *DatabaseAPI) handleDelete(opID []byte, key string) {
api.send(opID, dbMsgTypeSuccess, emptyString, nil)
}

// marsharlRecords locks and marshals the given record, additionally adding
// MarshalRecords locks and marshals the given record, additionally adding
// metadata and returning it as json.
func marshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) {
func MarshalRecord(r record.Record, withDSDIdentifier bool) ([]byte, error) {
r.Lock()
defer r.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (e *Endpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var rec record.Record
rec, err = e.RecordFunc(apiRequest)
if err == nil && r != nil {
responseData, err = marshalRecord(rec, false)
responseData, err = MarshalRecord(rec, false)
}

case e.HandlerFunc != nil:
Expand Down
8 changes: 2 additions & 6 deletions api/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package api

import (
"context"
"encoding/json"
"errors"
"flag"
Expand Down Expand Up @@ -58,7 +57,7 @@ func prep() error {
}

func start() error {
go Serve()
startServer()

_ = updateAPIKeys(module.Ctx, nil)
err := module.RegisterEventHook("config", "config change", "update API keys", updateAPIKeys)
Expand All @@ -75,10 +74,7 @@ func start() error {
}

func stop() error {
if server != nil {
return server.Shutdown(context.Background())
}
return nil
return stopServer()
}

func exportEndpointsCmd() error {
Expand Down
34 changes: 30 additions & 4 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/safing/portbase/utils"
)

// EnableServer defines if the HTTP server should be started.
const EnableServer = true

var (
// mainMux is the main mux router.
mainMux = mux.NewRouter()
Expand Down Expand Up @@ -48,15 +51,38 @@ func RegisterHandleFunc(path string, handleFunc func(http.ResponseWriter, *http.
return mainMux.HandleFunc(path, handleFunc)
}

// Serve starts serving the API endpoint.
func Serve() {
// configure server
func startServer() {
// Check if server is enabled.
if !EnableServer {
return
}

// Configure server.
server.Addr = listenAddressConfig()
server.Handler = &mainHandler{
// TODO: mainMux should not be modified anymore.
mux: mainMux,
}

// Start server manager.
module.StartServiceWorker("http server manager", 0, serverManager)
}

func stopServer() error {
// Check if server is enabled.
if !EnableServer {
return nil
}

if server.Addr != "" {
return server.Shutdown(context.Background())
}

return nil
}

// Serve starts serving the API endpoint.
func serverManager(_ context.Context) error {
// start serving
log.Infof("api: starting to listen on %s", server.Addr)
backoffDuration := 10 * time.Second
Expand All @@ -67,7 +93,7 @@ func Serve() {
})
// return on shutdown error
if errors.Is(err, http.ErrServerClosed) {
return
return nil
}
// log error and restart
log.Errorf("api: http endpoint failed: %s - restarting in %s", err, backoffDuration)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/safing/portbase

go 1.15
go 1.19

require (
github.com/VictoriaMetrics/metrics v1.23.1
Expand Down
4 changes: 2 additions & 2 deletions log/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ type (

// AdapterFunc is a convenience type for implementing
// Adapter.
AdapterFunc func(msg Message, duplciates uint64)
AdapterFunc func(msg Message, duplicates uint64)

// FormatFunc formats msg into a string.
FormatFunc func(msg Message, duplciates uint64) string
FormatFunc func(msg Message, duplicates uint64) string

// SimpleFileAdapter implements Adapter and writes all
// messages to File.
Expand Down
2 changes: 1 addition & 1 deletion modules/subsystems/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func prep() error {
}

// We need to listen for configuration changes so we can
// start/stop dependend modules in case a subsystem is
// start/stop depended modules in case a subsystem is
// (de-)activated.
if err := module.RegisterEventHook(
"config",
Expand Down
2 changes: 2 additions & 0 deletions modules/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (m *Module) runWorker(name string, fn func(context.Context) error) (err err
}()

// run
// TODO: get cancel func for worker context and cancel when worker is done.
// This ensure that when the worker passes its context to another (async) function, it will also be shutdown when the worker finished or dies.
err = fn(m.Ctx)
return
}
Expand Down
4 changes: 2 additions & 2 deletions updater/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (reg *ResourceRegistry) fetchFile(ctx context.Context, client *http.Client,
}
}

log.Infof("%s: fetched %s (stored to %s)", reg.Name, downloadURL, rv.storagePath())
log.Debugf("%s: fetched %s and stored to %s", reg.Name, downloadURL, rv.storagePath())
return nil
}

Expand Down Expand Up @@ -223,7 +223,7 @@ func (reg *ResourceRegistry) fetchMissingSig(ctx context.Context, client *http.C
}
}

log.Infof("%s: fetched %s (stored to %s)", reg.Name, rv.versionedSigPath(), rv.storageSigPath())
log.Debugf("%s: fetched %s and stored to %s", reg.Name, rv.versionedSigPath(), rv.storageSigPath())
return nil
}

Expand Down
20 changes: 20 additions & 0 deletions updater/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (reg *ResourceRegistry) GetFile(identifier string) (*File, error) {
return nil, fmt.Errorf("could not prepare tmp directory for download: %w", err)
}

// Start registry operation.
reg.state.StartOperation(StateFetching)
defer reg.state.EndOperation()

// download file
log.Tracef("%s: starting download of %s", reg.Name, file.versionedPath)
client := &http.Client{}
Expand All @@ -69,3 +73,19 @@ func (reg *ResourceRegistry) GetFile(identifier string) (*File, error) {
log.Warningf("%s: failed to download %s: %s", reg.Name, file.versionedPath, err)
return nil, err
}

// GetVersion returns the selected version of the given identifier.
// The returned resource version may not be modified.
func (reg *ResourceRegistry) GetVersion(identifier string) (*ResourceVersion, error) {
reg.RLock()
res, ok := reg.resources[identifier]
reg.RUnlock()
if !ok {
return nil, ErrNotFound
}

res.Lock()
defer res.Unlock()

return res.SelectedVersion, nil
}
3 changes: 3 additions & 0 deletions updater/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Index struct {
// not.
PreRelease bool

// AutoDownload specifies whether new versions should be automatically downloaded.
AutoDownload bool

// LastRelease holds the time of the last seen release of this index.
LastRelease time.Time
}
Expand Down
35 changes: 30 additions & 5 deletions updater/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ResourceRegistry struct {
storageDir *utils.DirStructure
tmpDir *utils.DirStructure
indexes []*Index
state *RegistryState

resources map[string]*Resource
UpdateURLs []string
Expand All @@ -44,6 +45,11 @@ type ResourceRegistry struct {
UsePreReleases bool
DevMode bool
Online bool

// StateNotifyFunc may be set to receive any changes to the registry state.
// The specified function may lock the state, but may not block or take a
// lot of time.
StateNotifyFunc func(*RegistryState)
}

// AddIndex adds a new index to the resource registry.
Expand All @@ -61,6 +67,18 @@ func (reg *ResourceRegistry) AddIndex(idx Index) {
reg.indexes = append(reg.indexes, &idx)
}

// PreInitUpdateState sets the initial update state of the registry before initialization.
func (reg *ResourceRegistry) PreInitUpdateState(s UpdateState) error {
if reg.state != nil {
return errors.New("registry already initialized")
}

reg.state = &RegistryState{
Updates: s,
}
return nil
}

// Initialize initializes a raw registry struct and makes it ready for usage.
func (reg *ResourceRegistry) Initialize(storageDir *utils.DirStructure) error {
// check if storage dir is available
Expand All @@ -78,6 +96,11 @@ func (reg *ResourceRegistry) Initialize(storageDir *utils.DirStructure) error {
reg.storageDir = storageDir
reg.tmpDir = storageDir.ChildDir("tmp", 0o0700)
reg.resources = make(map[string]*Resource)
if reg.state == nil {
reg.state = &RegistryState{}
}
reg.state.ID = StateReady
reg.state.reg = reg

// remove tmp dir to delete old entries
err = reg.Cleanup()
Expand Down Expand Up @@ -147,32 +170,34 @@ func (reg *ResourceRegistry) SetUsePreReleases(yes bool) {
}

// AddResource adds a resource to the registry. Does _not_ select new version.
func (reg *ResourceRegistry) AddResource(identifier, version string, available, currentRelease, preRelease bool) error {
func (reg *ResourceRegistry) AddResource(identifier, version string, index *Index, available, currentRelease, preRelease bool) error {
reg.Lock()
defer reg.Unlock()

err := reg.addResource(identifier, version, available, currentRelease, preRelease)
err := reg.addResource(identifier, version, index, available, currentRelease, preRelease)
return err
}

func (reg *ResourceRegistry) addResource(identifier, version string, available, currentRelease, preRelease bool) error {
func (reg *ResourceRegistry) addResource(identifier, version string, index *Index, available, currentRelease, preRelease bool) error {
res, ok := reg.resources[identifier]
if !ok {
res = reg.newResource(identifier)
reg.resources[identifier] = res
}
res.Index = index

return res.AddVersion(version, available, currentRelease, preRelease)
}

// AddResources adds resources to the registry. Errors are logged, the last one is returned. Despite errors, non-failing resources are still added. Does _not_ select new versions.
func (reg *ResourceRegistry) AddResources(versions map[string]string, available, currentRelease, preRelease bool) error {
func (reg *ResourceRegistry) AddResources(versions map[string]string, index *Index, available, currentRelease, preRelease bool) error {
reg.Lock()
defer reg.Unlock()

// add versions and their flags to registry
var lastError error
for identifier, version := range versions {
lastError = reg.addResource(identifier, version, available, currentRelease, preRelease)
lastError = reg.addResource(identifier, version, index, available, currentRelease, preRelease)
if lastError != nil {
log.Warningf("%s: failed to add resource %s: %s", reg.Name, identifier, lastError)
}
Expand Down
6 changes: 5 additions & 1 deletion updater/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Resource struct {

// VerificationOptions holds the verification options for this resource.
VerificationOptions *VerificationOptions

// Index holds a reference to the index this resource was last defined in.
// Will be nil if resource was only found on disk.
Index *Index
}

// ResourceVersion represents a single version of a resource.
Expand Down Expand Up @@ -89,7 +93,7 @@ func (rv *ResourceVersion) String() string {
return rv.VersionNumber
}

// SemVer returns the semantiv version of the resource.
// SemVer returns the semantic version of the resource.
func (rv *ResourceVersion) SemVer() *semver.Version {
return rv.semVer
}
Expand Down
Loading

0 comments on commit fe11bff

Please sign in to comment.