Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test maglev test suite #1141

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/bpf/ads/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"kmesh.net/kmesh/daemon/options"
"kmesh.net/kmesh/pkg/bpf/utils"
"kmesh.net/kmesh/pkg/consistenthash/maglev"
"kmesh.net/kmesh/pkg/logger"
)

Expand Down Expand Up @@ -71,10 +70,6 @@ func (sc *BpfAds) Start() error {
return fmt.Errorf("deserial_init failed:%v", ret)
}

if err := maglev.InitMaglevMap(); err != nil {
return fmt.Errorf("consistent hash lb maglev config init failed, %s", err)
}

return nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/bpf/ads/sock_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (sc *BpfSockConn) Attach() error {
if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil {
return err
}

} else {
sc.Link, err = link.AttachCgroup(cgopt)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/cache/v2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
resourceHash map[string][2]uint64
hashName *utils.HashName
clusterStatsMap *ebpf.Map
maglev *maglev.Maglev
}

func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCache {
var clusterStatsMap *ebpf.Map
var maglevOuter *ebpf.Map
if bpfAds != nil {
clusterStatsMap = bpfAds.GetClusterStatsMap()
maglevOuter = bpfAds.SockConn.KmMaglevOuter
}
apiClusterCache := newApiClusterCache()
if restart.GetStartType() == restart.Restart {
Expand All @@ -69,11 +72,18 @@
apiClusterCache[Cluster.Name] = &cluster_v2.Cluster{}
}
}

m, err := maglev.InitMaglevMap(maglevOuter)
if err != nil {
log.Errorf("InitMaglevMap failed: %v", err)
}

Check warning on line 79 in pkg/cache/v2/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/cluster.go#L78-L79

Added lines #L78 - L79 were not covered by tests

return ClusterCache{
apiClusterCache: apiClusterCache,
resourceHash: make(map[string][2]uint64),
hashName: hashName,
clusterStatsMap: clusterStatsMap,
maglev: m,
}
}

Expand Down Expand Up @@ -194,7 +204,7 @@
err := maps_v2.ClusterUpdate(name, cluster)
if cluster.GetLbPolicy() == cluster_v2.Cluster_MAGLEV {
// create consistent lb here and update table to bpf map
if err := maglev.CreateLB(cluster); err != nil {
if err := cache.maglev.CreateLB(cluster); err != nil {

Check warning on line 207 in pkg/cache/v2/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/cache/v2/cluster.go#L207

Added line #L207 was not covered by tests
log.Errorf("maglev lb update %v cluster failed: %v", name, err)
}
}
Expand Down
91 changes: 43 additions & 48 deletions pkg/consistenthash/maglev/maglev.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,18 @@
)

const (
DefaultTableSize uint64 = 16381
DefaultHashSeed = "JLfvgnHc2kaSUFaI"
MaglevOuterMapName = "km_maglev_outer"
MaglevInnerMapName = "inner_of_maglev"
MaglevMapMaxEntries = 65536
ClusterNameMaxLen = 192
DefaultTableSize uint64 = 16381
DefaultHashSeed = "JLfvgnHc2kaSUFaI"
MaglevOuterMapName = "km_maglev_outer"
MaglevInnerMapName = "inner_of_maglev"
ClusterNameMaxLen = 192
)

var (
type Maglev struct {
outer *ebpf.Map
seedMurmur uint32
maglevTableSize uint64
)
}

type Backend struct {
ep *endpoint.Endpoint
Expand All @@ -56,45 +55,45 @@
next uint64
}

func InitMaglevMap() error {
maglevTableSize = DefaultTableSize
opt := &ebpf.LoadPinOptions{}

outer_map, err := ebpf.LoadPinnedMap("/sys/fs/bpf"+"/bpf_kmesh/map/"+MaglevOuterMapName, opt)
if err != nil {
return fmt.Errorf("load outer map of maglev failed err: %v", err)
func InitMaglevMap(maglevMap *ebpf.Map) (*Maglev, error) {
m := &Maglev{
maglevTableSize: DefaultTableSize,
outer: maglevMap,
}
outer = outer_map

d, err := base64.StdEncoding.DecodeString(DefaultHashSeed)
if err != nil {
return fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err)
return nil, fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err)

Check warning on line 66 in pkg/consistenthash/maglev/maglev.go

View check run for this annotation

Codecov / codecov/patch

pkg/consistenthash/maglev/maglev.go#L66

Added line #L66 was not covered by tests
}
if len(d) != 12 {
return fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d))
return nil, fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d))

Check warning on line 69 in pkg/consistenthash/maglev/maglev.go

View check run for this annotation

Codecov / codecov/patch

pkg/consistenthash/maglev/maglev.go#L69

Added line #L69 was not covered by tests
}
seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3])
m.seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3])

return nil
return m, nil
}

// only trafficPolicy enable maglev in DestinationRule would create lb
func CreateLB(cluster *cluster_v2.Cluster) error {
func (m *Maglev) CreateLB(cluster *cluster_v2.Cluster) error {
if m == nil {
return nil
}

Check warning on line 80 in pkg/consistenthash/maglev/maglev.go

View check run for this annotation

Codecov / codecov/patch

pkg/consistenthash/maglev/maglev.go#L79-L80

Added lines #L79 - L80 were not covered by tests

if cluster == nil {
return errors.New("cluster is nil")
}

clusterName := cluster.GetName()
table, err := getLookupTable(cluster, maglevTableSize)
table, err := m.getLookupTable(cluster)
if err != nil {
return err
}
backendIDs := make([]uint32, maglevTableSize)
backendIDs := make([]uint32, m.maglevTableSize)
for i, id := range table {
backendIDs[i] = uint32(id)
}

err = updateMaglevTable(backendIDs, clusterName)
err = m.updateMaglevTable(backendIDs, clusterName)
if err != nil {
return fmt.Errorf("updateMaglevTable fail err:%v", err)
}
Expand All @@ -103,28 +102,24 @@
}

// createMaglevInnerMap creates a new Maglev inner map in the kernel
// using the given table size.
func createMaglevInnerMap(tableSize uint32) (*ebpf.Map, error) {
func (m *Maglev) createMaglevInnerMap() (*ebpf.Map, error) {
spec := &ebpf.MapSpec{
Name: MaglevInnerMapName,
Type: ebpf.Array,
KeySize: uint32(unsafe.Sizeof(uint32(0))),
ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize,
ValueSize: uint32(unsafe.Sizeof(uint32(0))) * uint32(m.maglevTableSize),
MaxEntries: 1,
}

m, err := ebpf.NewMap(spec)
inner, err := ebpf.NewMap(spec)
if err != nil {
return nil, err
}
return m, nil
return inner, nil
}

func updateMaglevTable(backendIDs []uint32, clusterName string) error {
if outer == nil {
return errors.New("outer maglev maps not yet initialized")
}
inner, err := createMaglevInnerMap(uint32(maglevTableSize))
func (m *Maglev) updateMaglevTable(backendIDs []uint32, clusterName string) error {
inner, err := m.createMaglevInnerMap()
if err != nil {
return err
}
Expand All @@ -141,25 +136,25 @@
var maglevKey [ClusterNameMaxLen]byte
copy(maglevKey[:], []byte(clusterName))

if err := outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil {
if err := m.outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil {
return fmt.Errorf("updating cluster %v: %w", clusterName, err)
}
return nil
}

func getOffsetAndSkip(address string, m uint64) (uint64, uint64) {
h1, h2 := hash.Hash128([]byte(address), seedMurmur)
offset := h1 % m
skip := (h2 % (m - 1)) + 1
func (m *Maglev) getOffsetAndSkip(address string) (uint64, uint64) {
h1, h2 := hash.Hash128([]byte(address), m.seedMurmur)
offset := h1 % m.maglevTableSize
skip := (h2 % (m.maglevTableSize - 1)) + 1

return offset, skip
}

func getPermutation(b Backend) uint64 {
return (b.offset + (b.skip * b.next)) % maglevTableSize
func getPermutation(b Backend, tableSize uint64) uint64 {
return (b.offset + (b.skip * b.next)) % tableSize
}

func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error) {
func (m *Maglev) getLookupTable(cluster *cluster_v2.Cluster) ([]int, error) {
loadAssignment := cluster.GetLoadAssignment()
clusterName := cluster.GetName()
localityLbEps := loadAssignment.GetEndpoints()
Expand All @@ -178,7 +173,7 @@
backends := make([]Backend, 0, len(flatEps))

for i, ep := range flatEps {
epOffset, epSkip := getOffsetAndSkip(ep.GetAddress().String(), maglevTableSize)
epOffset, epSkip := m.getOffsetAndSkip(ep.GetAddress().String())
b := Backend{
ep: ep,
index: i,
Expand All @@ -194,20 +189,20 @@
}

length := len(backends)
lookUpTable := make([]int, tableSize)
lookUpTable := make([]int, m.maglevTableSize)

for i := uint64(0); i < tableSize; i++ {
for i := uint64(0); i < m.maglevTableSize; i++ {
lookUpTable[i] = -1
}

for n := uint64(0); n < tableSize; n++ {
for n := uint64(0); n < m.maglevTableSize; n++ {
j := int(n) % length
b := backends[j]
for {
c := getPermutation(b)
c := getPermutation(b, m.maglevTableSize)
for lookUpTable[c] >= 0 {
b.next++
c = getPermutation(b)
c = getPermutation(b, m.maglevTableSize)
}
lookUpTable[c] = b.index
b.next++
Expand Down
76 changes: 21 additions & 55 deletions pkg/consistenthash/maglev/maglev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,59 @@ package maglev

import (
"fmt"
"os"
"testing"
"unsafe"

"github.com/cilium/ebpf"
"github.com/stretchr/testify/suite"

cluster_v2 "kmesh.net/kmesh/api/v2/cluster"
"kmesh.net/kmesh/api/v2/core"
"kmesh.net/kmesh/api/v2/endpoint"
)

func TestMaglevTestSuite(t *testing.T) {
suite.Run(t, new(MaglevTestSuite))
m := setup()
testCreateLB(t, m)
testGetLookupTable(t, m)
m.outer.Close()
}

type MaglevTestSuite struct {
mapPath string
suite.Suite
}

func (suite *MaglevTestSuite) SetupSuite() {
mapPath := "/sys/fs/bpf/bpf_kmesh/map/"
suite.mapPath = mapPath
_, err := os.Stat(mapPath)
if os.IsNotExist(err) {
err := os.MkdirAll(mapPath, 0755)
if err != nil {
fmt.Println("can not mkdir bpf map path", err)
}
} else if err != nil {
fmt.Println("other err:", err)
return
} else {
fmt.Println("bpf map path already exist ", mapPath)
}
func setup() *Maglev {
dummyInnerMapSpec := newMaglevInnerMapSpecTest(uint32(DefaultTableSize))
_, err = NewMaglevOuterMap(MaglevOuterMapName, MaglevMapMaxEntries, uint32(DefaultTableSize), dummyInnerMapSpec, mapPath)
outer, err := newMaglevOuterMap(MaglevOuterMapName, 16384, dummyInnerMapSpec)
if err != nil {
fmt.Printf("NewMaglevOuterMap err: %v\n", err)
}
InitMaglevMap()
maglev, _ := InitMaglevMap(outer)
return maglev
}

func (suite *MaglevTestSuite) TearDownSuite() {
fmt.Println(">>> From TearDownSuite")
}

func (suite *MaglevTestSuite) TestCreateLB() {
func testCreateLB(t *testing.T, maglev *Maglev) {
cluster := newCluster()
clusterName := cluster.GetName()

err := CreateLB(cluster)
err := maglev.CreateLB(cluster)
if err != nil {
fmt.Println(err)
t.Errorf("create lb err: %v \n", err)
}

var inner_fd uint32
var maglevKey [ClusterNameMaxLen]byte

copy(maglevKey[:], []byte(clusterName))
opt := &ebpf.LoadPinOptions{}
outer_map, err := ebpf.LoadPinnedMap(suite.mapPath+MaglevOuterMapName, opt)
if err != nil {
fmt.Printf("LoadPinnedMap err: %v \n", err)
}
err = outer_map.Lookup(maglevKey, &inner_fd)
err = maglev.outer.Lookup(maglevKey, &inner_fd)
if err != nil {
fmt.Printf("Lookup with key %v , err %v \n", clusterName, err)
t.Fatalf("Lookup with key %v , err %v \n", clusterName, err)
}
fmt.Println("inner fd: ", inner_fd)
}

func (suite *MaglevTestSuite) TestGetLookupTable() {
func testGetLookupTable(t *testing.T, maglev *Maglev) {
cluster := newCluster()

table, err := getLookupTable(cluster, DefaultTableSize)
table, err := maglev.getLookupTable(cluster)
if err != nil {
fmt.Printf("getLookupTable err:%v \n", err)
t.Fatalf("getLookupTable err:%v \n", err)
}
backendCount := make(map[int]int)
// print backend id distribute
Expand Down Expand Up @@ -166,23 +140,15 @@ func newMaglevInnerMapSpecTest(tableSize uint32) *ebpf.MapSpec {
}
}

// NewMaglevOuterMap returns a new object representing a maglev outer map.
func NewMaglevOuterMap(name string, maxEntries int, tableSize uint32, innerMap *ebpf.MapSpec, pinPath string) (*ebpf.Map, error) {
m, err := ebpf.NewMapWithOptions(&ebpf.MapSpec{
// newMaglevOuterMap returns a new object representing a maglev outer map.
func newMaglevOuterMap(name string, maxEntries int, innerMap *ebpf.MapSpec) (*ebpf.Map, error) {
return ebpf.NewMapWithOptions(&ebpf.MapSpec{
Name: name,
Type: ebpf.HashOfMaps,
Flags: 1,
KeySize: ClusterNameMaxLen,
ValueSize: uint32(unsafe.Sizeof(uint32(0))),
MaxEntries: uint32(maxEntries),
InnerMap: innerMap,
Pinning: ebpf.PinByName,
}, ebpf.MapOptions{
PinPath: pinPath,
})

if err != nil {
return nil, err
}

return m, nil
}, ebpf.MapOptions{})
}
Loading