Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/23133.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: Add `consul services imported-services` and new api(/v1/exported-services) command to list services imported by partitions within a local datacenter
```
45 changes: 43 additions & 2 deletions agent/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"strconv"
"strings"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const ConfigEntryNotFoundErr string = "Config entry not found"
Expand Down Expand Up @@ -223,3 +224,43 @@ func (s *HTTPHandlers) ExportedServices(resp http.ResponseWriter, req *http.Requ

return svcs, nil
}

func (s *HTTPHandlers) ImportedServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var entMeta acl.EnterpriseMeta
if err := s.parseEntMetaPartition(req, &entMeta); err != nil {
return nil, err
}
args := pbconfigentry.GetImportedServicesRequest{
Partition: entMeta.PartitionOrEmpty(),
}

var dc string
options := structs.QueryOptions{}
s.parse(resp, req, &dc, &options)
ctx, err := external.ContextWithQueryOptions(req.Context(), options)
if err != nil {
return nil, err
}

var header metadata.MD
result, err := s.agent.grpcClientConfigEntry.GetImportedServices(ctx, &args, grpc.Header(&header))
if err != nil {
return nil, err
}

meta, err := external.QueryMetaFromGRPCMeta(header)
if err != nil {
return result.Services, fmt.Errorf("could not convert gRPC metadata to query meta: %w", err)
}
if err := setMeta(resp, &meta); err != nil {
return nil, err
}

svcs := make([]api.ImportedService, len(result.Services))

for idx, svc := range result.Services {
svcs[idx] = *svc.ToAPI()
}

return svcs, nil
}
85 changes: 85 additions & 0 deletions agent/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,88 @@ func TestConfig_Exported_Services(t *testing.T) {
require.Equal(t, expected, services)
})
}

func TestConfig_Imported_Services(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()
a := NewTestAgent(t, "")
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
defer a.Shutdown()

{
// Setup imported services via service intentions from peers
args := &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "api",
Sources: []*structs.SourceIntention{
{
Name: "web",
Action: structs.IntentionActionAllow,
Peer: "east",
},
},
}
req := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}

{
args := &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: "db",
Sources: []*structs.SourceIntention{
{
Name: "backend",
Action: structs.IntentionActionAllow,
Peer: "west",
},
},
}
req := structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: args,
}
var configOutput bool
require.NoError(t, a.RPC(context.Background(), "ConfigEntry.Apply", &req, &configOutput))
require.True(t, configOutput)
}

t.Run("imported services", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/imported-services", nil)
resp := httptest.NewRecorder()
raw, err := a.srv.ImportedServices(resp, req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.Code)

services, ok := raw.([]api.ImportedService)
require.True(t, ok)
require.Len(t, services, 2)
assertIndex(t, resp)

entMeta := acl.DefaultEnterpriseMeta()

expected := []api.ImportedService{
{
Service: "api",
Partition: entMeta.PartitionOrEmpty(),
Namespace: entMeta.NamespaceOrEmpty(),
SourcePeer: "east",
},
{
Service: "db",
Partition: entMeta.PartitionOrEmpty(),
Namespace: entMeta.NamespaceOrEmpty(),
SourcePeer: "west",
},
}
require.Equal(t, expected, services)
})
}
128 changes: 128 additions & 0 deletions agent/consul/state/config_entry_imported_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package state

import (
"fmt"
"sort"

"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
)

// importedService represents a service imported from a peer
type importedService struct {
service string
namespace string
peer string
}

// ImportedServicesForPartition returns the list of imported services along with their sources.
// This shows which services are being imported from peers.
func (s *Store) ImportedServicesForPartition(ws memdb.WatchSet, partition string) (uint64, []*pbconfigentry.ImportedService, error) {
tx := s.db.ReadTxn()
defer tx.Abort()

entMeta := acl.NewEnterpriseMetaWithPartition(partition, acl.WildcardName)
return importedServicesForPartitionTxn(tx, ws, &entMeta)
}

func importedServicesForPartitionTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta) (uint64, []*pbconfigentry.ImportedService, error) {
maxIdx := uint64(0)

// Get all service intentions that have a source peer set
// This indicates services that are imported from that peer
iter, err := tx.Get(tableConfigEntries, indexID+"_prefix", ConfigEntryKindQuery{
Kind: structs.ServiceIntentions,
EnterpriseMeta: *entMeta,
})
if err != nil {
return 0, nil, fmt.Errorf("failed to list service intentions: %w", err)
}

ws.Add(iter.WatchCh())

// Collect imported services from intentions
var importedServices []importedService

for entry := iter.Next(); entry != nil; entry = iter.Next() {
intention, ok := entry.(*structs.ServiceIntentionsConfigEntry)
if !ok {
continue
}

// Update max index
if intention.ModifyIndex > maxIdx {
maxIdx = intention.ModifyIndex
}

// Check each source intention for peer imports
for _, source := range intention.Sources {
if source.Peer != "" {
importedServices = append(importedServices, importedService{
service: intention.Name,
namespace: intention.NamespaceOrDefault(),
peer: source.Peer,
})
}
}
}

uniqueImportedServices := getUniqueImportedServices(importedServices)
resp := prepareImportedServicesResponse(uniqueImportedServices, entMeta)

return lib.MaxUint64(maxIdx, 1), resp, nil
}

// getUniqueImportedServices removes duplicate services and sources. Services are also sorted in ascending order
func getUniqueImportedServices(importedServices []importedService) []importedService {
// Service -> SourcePeers
type serviceKey struct {
name string
namespace string
}
importedServicesMapper := make(map[serviceKey]map[string]struct{})

for _, svc := range importedServices {
key := serviceKey{
name: svc.service,
namespace: svc.namespace,
}

peers, ok := importedServicesMapper[key]
if !ok {
peers = make(map[string]struct{})
importedServicesMapper[key] = peers
}
peers[svc.peer] = struct{}{}
}

uniqueImportedServices := make([]importedService, 0)

for svc, peers := range importedServicesMapper {
for peer := range peers {
uniqueImportedServices = append(uniqueImportedServices, importedService{
service: svc.name,
namespace: svc.namespace,
peer: peer,
})
}
}

sort.Slice(uniqueImportedServices, func(i, j int) bool {
if uniqueImportedServices[i].service != uniqueImportedServices[j].service {
return uniqueImportedServices[i].service < uniqueImportedServices[j].service
}
if uniqueImportedServices[i].namespace != uniqueImportedServices[j].namespace {
return uniqueImportedServices[i].namespace < uniqueImportedServices[j].namespace
}
return uniqueImportedServices[i].peer < uniqueImportedServices[j].peer
})

return uniqueImportedServices
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are looping over a lot services, intentions throughout this code piece, I feel it could be a performance issue, should we rather paginate the response? that way we wont have to process each of the intention and service, etc. for every call.

Since this does not hamper the core functionality, it is not a blocker, however worth discussing once and can be planned for phase 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will discuss and plan it for Phase 2

}
25 changes: 25 additions & 0 deletions agent/consul/state/config_entry_imported_services_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !consulent

package state

import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/proto/private/pbconfigentry"
)

func prepareImportedServicesResponse(importedServices []importedService, entMeta *acl.EnterpriseMeta) []*pbconfigentry.ImportedService {

resp := make([]*pbconfigentry.ImportedService, len(importedServices))

for idx, svc := range importedServices {
resp[idx] = &pbconfigentry.ImportedService{
Service: svc.service,
SourcePeer: svc.peer,
}
}

return resp
}
Loading
Loading