From bd57312a6060c299ee41459fef7214b8fbacad07 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Mon, 23 Dec 2024 11:35:13 +0800 Subject: [PATCH 1/4] Skip maglev test Signed-off-by: Zhonghu Xu --- pkg/consistenthash/maglev/maglev_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/consistenthash/maglev/maglev_test.go b/pkg/consistenthash/maglev/maglev_test.go index 0cc4e68c5..96a5135ef 100644 --- a/pkg/consistenthash/maglev/maglev_test.go +++ b/pkg/consistenthash/maglev/maglev_test.go @@ -31,6 +31,7 @@ import ( ) func TestMaglevTestSuite(t *testing.T) { + t.Skip("TODO: enable this test, now it interfere with other tests") suite.Run(t, new(MaglevTestSuite)) } From 2cfac756e1f1483c4aeee3accaba75d5df792dc6 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Tue, 24 Dec 2024 17:06:46 +0800 Subject: [PATCH 2/4] Fix maglev to reduce influence to other test cases Signed-off-by: Zhonghu Xu --- pkg/bpf/ads/loader.go | 5 -- pkg/cache/v2/cluster.go | 8 ++- pkg/consistenthash/maglev/maglev.go | 87 +++++++++++------------- pkg/consistenthash/maglev/maglev_test.go | 77 ++++++--------------- 4 files changed, 67 insertions(+), 110 deletions(-) diff --git a/pkg/bpf/ads/loader.go b/pkg/bpf/ads/loader.go index d03b521df..4febb6cf8 100644 --- a/pkg/bpf/ads/loader.go +++ b/pkg/bpf/ads/loader.go @@ -30,7 +30,6 @@ import ( "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/bpf/utils" - "kmesh.net/kmesh/pkg/consistenthash/maglev" "kmesh.net/kmesh/pkg/logger" ) @@ -71,10 +70,6 @@ func (sc *BpfAds) Start() error { return fmt.Errorf("deserial_init failed:%v", ret) } - if err := maglev.InitMaglevMap(); err != nil { - return fmt.Errorf("consistent hash lb maglev config init failed, %s", err) - } - return nil } diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 910aa38dd..3a9dc3bf7 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -50,12 +50,15 @@ type ClusterCache struct { resourceHash map[string][2]uint64 hashName *utils.HashName clusterStatsMap *ebpf.Map + maglev *maglev.Maglev } func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCache { var clusterStatsMap *ebpf.Map + var maglevOuter *ebpf.Map if bpfAds != nil { clusterStatsMap = bpfAds.GetClusterStatsMap() + maglevOuter = bpfAds.SockConn.KmMaglevOuter } apiClusterCache := newApiClusterCache() if restart.GetStartType() == restart.Restart { @@ -69,11 +72,14 @@ func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCac apiClusterCache[Cluster.Name] = &cluster_v2.Cluster{} } } + + m, _ := maglev.InitMaglevMap(maglevOuter) return ClusterCache{ apiClusterCache: apiClusterCache, resourceHash: make(map[string][2]uint64), hashName: hashName, clusterStatsMap: clusterStatsMap, + maglev: m, } } @@ -194,7 +200,7 @@ func (cache *ClusterCache) Flush() { err := maps_v2.ClusterUpdate(name, cluster) if cluster.GetLbPolicy() == cluster_v2.Cluster_MAGLEV { // create consistent lb here and update table to bpf map - if err := maglev.CreateLB(cluster); err != nil { + if err := cache.maglev.CreateLB(cluster); err != nil { log.Errorf("maglev lb update %v cluster failed: %v", name, err) } } diff --git a/pkg/consistenthash/maglev/maglev.go b/pkg/consistenthash/maglev/maglev.go index 44ceb4fe7..a0777b85a 100644 --- a/pkg/consistenthash/maglev/maglev.go +++ b/pkg/consistenthash/maglev/maglev.go @@ -34,19 +34,18 @@ import ( ) const ( - DefaultTableSize uint64 = 16381 - DefaultHashSeed = "JLfvgnHc2kaSUFaI" - MaglevOuterMapName = "km_maglev_outer" - MaglevInnerMapName = "inner_of_maglev" - MaglevMapMaxEntries = 65536 - ClusterNameMaxLen = 192 + DefaultTableSize uint64 = 16381 + DefaultHashSeed = "JLfvgnHc2kaSUFaI" + MaglevOuterMapName = "km_maglev_outer" + MaglevInnerMapName = "inner_of_maglev" + ClusterNameMaxLen = 192 ) -var ( +type Maglev struct { outer *ebpf.Map seedMurmur uint32 maglevTableSize uint64 -) +} type Backend struct { ep *endpoint.Endpoint @@ -56,45 +55,41 @@ type Backend struct { next uint64 } -func InitMaglevMap() error { - maglevTableSize = DefaultTableSize - opt := &ebpf.LoadPinOptions{} - - outer_map, err := ebpf.LoadPinnedMap("/sys/fs/bpf"+"/bpf_kmesh/map/"+MaglevOuterMapName, opt) - if err != nil { - return fmt.Errorf("load outer map of maglev failed err: %v", err) +func InitMaglevMap(maglevMap *ebpf.Map) (*Maglev, error) { + m := &Maglev{ + maglevTableSize: DefaultTableSize, + outer: maglevMap, } - outer = outer_map d, err := base64.StdEncoding.DecodeString(DefaultHashSeed) if err != nil { - return fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err) + return nil, fmt.Errorf("cannot decode base64 Maglev hash seed %q: %w", DefaultHashSeed, err) } if len(d) != 12 { - return fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d)) + return nil, fmt.Errorf("decoded hash seed is %d bytes (not 12 bytes)", len(d)) } - seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3]) + m.seedMurmur = uint32(d[0])<<24 | uint32(d[1])<<16 | uint32(d[2])<<8 | uint32(d[3]) - return nil + return m, nil } // only trafficPolicy enable maglev in DestinationRule would create lb -func CreateLB(cluster *cluster_v2.Cluster) error { +func (m *Maglev) CreateLB(cluster *cluster_v2.Cluster) error { if cluster == nil { return errors.New("cluster is nil") } clusterName := cluster.GetName() - table, err := getLookupTable(cluster, maglevTableSize) + table, err := m.getLookupTable(cluster) if err != nil { return err } - backendIDs := make([]uint32, maglevTableSize) + backendIDs := make([]uint32, m.maglevTableSize) for i, id := range table { backendIDs[i] = uint32(id) } - err = updateMaglevTable(backendIDs, clusterName) + err = m.updateMaglevTable(backendIDs, clusterName) if err != nil { return fmt.Errorf("updateMaglevTable fail err:%v", err) } @@ -103,28 +98,24 @@ func CreateLB(cluster *cluster_v2.Cluster) error { } // createMaglevInnerMap creates a new Maglev inner map in the kernel -// using the given table size. -func createMaglevInnerMap(tableSize uint32) (*ebpf.Map, error) { +func (m *Maglev) createMaglevInnerMap() (*ebpf.Map, error) { spec := &ebpf.MapSpec{ Name: MaglevInnerMapName, Type: ebpf.Array, KeySize: uint32(unsafe.Sizeof(uint32(0))), - ValueSize: uint32(unsafe.Sizeof(uint32(0))) * tableSize, + ValueSize: uint32(unsafe.Sizeof(uint32(0))) * uint32(m.maglevTableSize), MaxEntries: 1, } - m, err := ebpf.NewMap(spec) + inner, err := ebpf.NewMap(spec) if err != nil { return nil, err } - return m, nil + return inner, nil } -func updateMaglevTable(backendIDs []uint32, clusterName string) error { - if outer == nil { - return errors.New("outer maglev maps not yet initialized") - } - inner, err := createMaglevInnerMap(uint32(maglevTableSize)) +func (m *Maglev) updateMaglevTable(backendIDs []uint32, clusterName string) error { + inner, err := m.createMaglevInnerMap() if err != nil { return err } @@ -141,25 +132,25 @@ func updateMaglevTable(backendIDs []uint32, clusterName string) error { var maglevKey [ClusterNameMaxLen]byte copy(maglevKey[:], []byte(clusterName)) - if err := outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil { + if err := m.outer.Update(maglevKey, uint32(inner.FD()), 0); err != nil { return fmt.Errorf("updating cluster %v: %w", clusterName, err) } return nil } -func getOffsetAndSkip(address string, m uint64) (uint64, uint64) { - h1, h2 := hash.Hash128([]byte(address), seedMurmur) - offset := h1 % m - skip := (h2 % (m - 1)) + 1 +func (m *Maglev) getOffsetAndSkip(address string) (uint64, uint64) { + h1, h2 := hash.Hash128([]byte(address), m.seedMurmur) + offset := h1 % m.maglevTableSize + skip := (h2 % (m.maglevTableSize - 1)) + 1 return offset, skip } -func getPermutation(b Backend) uint64 { - return (b.offset + (b.skip * b.next)) % maglevTableSize +func getPermutation(b Backend, tableSize uint64) uint64 { + return (b.offset + (b.skip * b.next)) % tableSize } -func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error) { +func (m *Maglev) getLookupTable(cluster *cluster_v2.Cluster) ([]int, error) { loadAssignment := cluster.GetLoadAssignment() clusterName := cluster.GetName() localityLbEps := loadAssignment.GetEndpoints() @@ -178,7 +169,7 @@ func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error backends := make([]Backend, 0, len(flatEps)) for i, ep := range flatEps { - epOffset, epSkip := getOffsetAndSkip(ep.GetAddress().String(), maglevTableSize) + epOffset, epSkip := m.getOffsetAndSkip(ep.GetAddress().String()) b := Backend{ ep: ep, index: i, @@ -194,20 +185,20 @@ func getLookupTable(cluster *cluster_v2.Cluster, tableSize uint64) ([]int, error } length := len(backends) - lookUpTable := make([]int, tableSize) + lookUpTable := make([]int, m.maglevTableSize) - for i := uint64(0); i < tableSize; i++ { + for i := uint64(0); i < m.maglevTableSize; i++ { lookUpTable[i] = -1 } - for n := uint64(0); n < tableSize; n++ { + for n := uint64(0); n < m.maglevTableSize; n++ { j := int(n) % length b := backends[j] for { - c := getPermutation(b) + c := getPermutation(b, m.maglevTableSize) for lookUpTable[c] >= 0 { b.next++ - c = getPermutation(b) + c = getPermutation(b, m.maglevTableSize) } lookUpTable[c] = b.index b.next++ diff --git a/pkg/consistenthash/maglev/maglev_test.go b/pkg/consistenthash/maglev/maglev_test.go index 96a5135ef..1ce9f4d3c 100644 --- a/pkg/consistenthash/maglev/maglev_test.go +++ b/pkg/consistenthash/maglev/maglev_test.go @@ -18,12 +18,10 @@ package maglev import ( "fmt" - "os" "testing" "unsafe" "github.com/cilium/ebpf" - "github.com/stretchr/testify/suite" cluster_v2 "kmesh.net/kmesh/api/v2/cluster" "kmesh.net/kmesh/api/v2/core" @@ -31,73 +29,48 @@ import ( ) func TestMaglevTestSuite(t *testing.T) { - t.Skip("TODO: enable this test, now it interfere with other tests") - suite.Run(t, new(MaglevTestSuite)) + m := setup() + testCreateLB(t, m) + testGetLookupTable(t, m) + m.outer.Close() } -type MaglevTestSuite struct { - mapPath string - suite.Suite -} - -func (suite *MaglevTestSuite) SetupSuite() { - mapPath := "/sys/fs/bpf/bpf_kmesh/map/" - suite.mapPath = mapPath - _, err := os.Stat(mapPath) - if os.IsNotExist(err) { - err := os.MkdirAll(mapPath, 0755) - if err != nil { - fmt.Println("can not mkdir bpf map path", err) - } - } else if err != nil { - fmt.Println("other err:", err) - return - } else { - fmt.Println("bpf map path already exist ", mapPath) - } +func setup() *Maglev { dummyInnerMapSpec := newMaglevInnerMapSpecTest(uint32(DefaultTableSize)) - _, err = NewMaglevOuterMap(MaglevOuterMapName, MaglevMapMaxEntries, uint32(DefaultTableSize), dummyInnerMapSpec, mapPath) + outer, err := newMaglevOuterMap(MaglevOuterMapName, 16384, dummyInnerMapSpec) if err != nil { fmt.Printf("NewMaglevOuterMap err: %v\n", err) } - InitMaglevMap() + maglev, _ := InitMaglevMap(outer) + return maglev } -func (suite *MaglevTestSuite) TearDownSuite() { - fmt.Println(">>> From TearDownSuite") -} - -func (suite *MaglevTestSuite) TestCreateLB() { +func testCreateLB(t *testing.T, maglev *Maglev) { cluster := newCluster() clusterName := cluster.GetName() - err := CreateLB(cluster) + err := maglev.CreateLB(cluster) if err != nil { - fmt.Println(err) + t.Errorf("create lb err: %v \n", err) } var inner_fd uint32 var maglevKey [ClusterNameMaxLen]byte copy(maglevKey[:], []byte(clusterName)) - opt := &ebpf.LoadPinOptions{} - outer_map, err := ebpf.LoadPinnedMap(suite.mapPath+MaglevOuterMapName, opt) - if err != nil { - fmt.Printf("LoadPinnedMap err: %v \n", err) - } - err = outer_map.Lookup(maglevKey, &inner_fd) + err = maglev.outer.Lookup(maglevKey, &inner_fd) if err != nil { - fmt.Printf("Lookup with key %v , err %v \n", clusterName, err) + t.Fatalf("Lookup with key %v , err %v \n", clusterName, err) } fmt.Println("inner fd: ", inner_fd) } -func (suite *MaglevTestSuite) TestGetLookupTable() { +func testGetLookupTable(t *testing.T, maglev *Maglev) { cluster := newCluster() - table, err := getLookupTable(cluster, DefaultTableSize) + table, err := maglev.getLookupTable(cluster) if err != nil { - fmt.Printf("getLookupTable err:%v \n", err) + t.Fatalf("getLookupTable err:%v \n", err) } backendCount := make(map[int]int) // print backend id distribute @@ -167,23 +140,15 @@ func newMaglevInnerMapSpecTest(tableSize uint32) *ebpf.MapSpec { } } -// NewMaglevOuterMap returns a new object representing a maglev outer map. -func NewMaglevOuterMap(name string, maxEntries int, tableSize uint32, innerMap *ebpf.MapSpec, pinPath string) (*ebpf.Map, error) { - m, err := ebpf.NewMapWithOptions(&ebpf.MapSpec{ +// newMaglevOuterMap returns a new object representing a maglev outer map. +func newMaglevOuterMap(name string, maxEntries int, innerMap *ebpf.MapSpec) (*ebpf.Map, error) { + return ebpf.NewMapWithOptions(&ebpf.MapSpec{ Name: name, Type: ebpf.HashOfMaps, + Flags: 1, KeySize: ClusterNameMaxLen, ValueSize: uint32(unsafe.Sizeof(uint32(0))), MaxEntries: uint32(maxEntries), InnerMap: innerMap, - Pinning: ebpf.PinByName, - }, ebpf.MapOptions{ - PinPath: pinPath, - }) - - if err != nil { - return nil, err - } - - return m, nil + }, ebpf.MapOptions{}) } From a314080ec0f92d1c36e88f7151d5cbc2cdcca7ec Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Thu, 2 Jan 2025 11:10:46 +0800 Subject: [PATCH 3/4] Address comment Signed-off-by: Zhonghu Xu --- pkg/cache/v2/cluster.go | 6 +++++- pkg/consistenthash/maglev/maglev.go | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/cache/v2/cluster.go b/pkg/cache/v2/cluster.go index 3a9dc3bf7..1136b5e42 100644 --- a/pkg/cache/v2/cluster.go +++ b/pkg/cache/v2/cluster.go @@ -73,7 +73,11 @@ func NewClusterCache(bpfAds *bpfads.BpfAds, hashName *utils.HashName) ClusterCac } } - m, _ := maglev.InitMaglevMap(maglevOuter) + m, err := maglev.InitMaglevMap(maglevOuter) + if err != nil { + log.Errorf("InitMaglevMap failed: %v", err) + } + return ClusterCache{ apiClusterCache: apiClusterCache, resourceHash: make(map[string][2]uint64), diff --git a/pkg/consistenthash/maglev/maglev.go b/pkg/consistenthash/maglev/maglev.go index a0777b85a..c3d565c19 100644 --- a/pkg/consistenthash/maglev/maglev.go +++ b/pkg/consistenthash/maglev/maglev.go @@ -75,6 +75,10 @@ func InitMaglevMap(maglevMap *ebpf.Map) (*Maglev, error) { // only trafficPolicy enable maglev in DestinationRule would create lb func (m *Maglev) CreateLB(cluster *cluster_v2.Cluster) error { + if m == nil { + return nil + } + if cluster == nil { return errors.New("cluster is nil") } From cd55a9b6f8ee4d0c9c8f89da7e561855a00987c9 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Mon, 6 Jan 2025 10:39:52 +0800 Subject: [PATCH 4/4] lint fix Signed-off-by: Zhonghu Xu --- pkg/bpf/ads/sock_connection.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/bpf/ads/sock_connection.go b/pkg/bpf/ads/sock_connection.go index 1fb887b7d..fff2a608a 100644 --- a/pkg/bpf/ads/sock_connection.go +++ b/pkg/bpf/ads/sock_connection.go @@ -165,7 +165,6 @@ func (sc *BpfSockConn) Attach() error { if sc.Link, err = utils.BpfProgUpdate(progPinPath, cgopt); err != nil { return err } - } else { sc.Link, err = link.AttachCgroup(cgopt) if err != nil {