Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability Test and Cluster Management Doc #511

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Release Notes.
- Add quick-start guide.
- Add web-ui interacting guide.
- Add bydbctl interacting guide.
- Add cluster management guide.

### Chores

Expand Down
57 changes: 57 additions & 0 deletions docs/operation/cluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Cluster Maintenance
hanahmily marked this conversation as resolved.
Show resolved Hide resolved

## Introduction
Properly maintaining and scaling a cluster is crucial for ensuring its reliable and efficient operation. This document provides guidance on setting up a cluster, planning its capacity, and scaling it to meet evolving requirements.

## Cluster Setup
Before deploying or maintaining a cluster, it is recommended to familiarize oneself with the basic clustering concepts by reviewing the [clustering documentation](../concept/clustering.md).

To set up a cluster, one can refer to the [cluster installation guide](../installation/cluster.md), which describes the process in detail. A minimal cluster should consist of the following nodes:

- 3 etcd nodes
- 2 liaison nodes
- 2 data nodes

This configuration is recommended for high availability, ensuring that the cluster can continue operating even if a single node becomes temporarily unavailable, as the remaining nodes can handle the increased workload.

It is generally preferable to deploy multiple smaller data nodes rather than a few larger ones, as this approach reduces the workload increase on the remaining data nodes when some nodes become temporarily unavailable.

To balance the write and query traffic to the liaison nodes, the use of an gRPC load balancer is recommended. The gRPC port defaults to `17912`, but the gRPC host and port can be altered using the `grpc-host` and `grpc-port` configuration options.

For those seeking to set up a cluster in a Kubernetes environment, a [dedicated guide](../installation/kubernetes.md) is available to assist with the process.

## Capacity Planning
Each node role can be provisioned with the most suitable hardware resources. The cluster's capacity scales linearly with the available resources. The required amounts of CPU and RAM per node role depend highly on the workload, such as the number of time series, query types, and write/query QPS. It is recommended to set up a test cluster mirroring the production workload and iteratively scale the per-node resources and the number of nodes per role until the cluster becomes stable. Additionally, the use of observability tools is advised, as they can help identify bottlenecks in the cluster setup.

The necessary storage space can be estimated based on the disk space usage observed during a test run. For example, if the storage space usage is 10GB after a day-long test run on a production workload, then the cluster should have at least 10GB*7=70GB of disk space for a group with `ttl=7day`.

To ensure the cluster's resilience and responsiveness, it is recommended to maintain the following spare resource levels:

- 50% of free RAM across all the nodes to reduce the probability of OOM (out of memory) crashes and slowdowns during temporary spikes in workload.
- 50% of spare CPU across all the nodes to reduce the probability of slowdowns during temporary spikes in workload.
- At least 20% of free storage space at the directories pointed by `measure-root-path` and `stream-root-path`.

## Scalability
The cluster's performance and capacity can be scaled in two ways: vertical scalability and horizontal scalability.

### Vertical Scalability
Vertical scalability refers to adding more resources (CPU, RAM, disk I/O, disk space, network bandwidth) to existing nodes in the cluster.

Increasing the CPU and RAM of existing liaison nodes can improve the performance for heavy queries that process a large number of time series with many data points.

Increasing the CPU and RAM of existing data nodes can increase the number of time series the cluster can handle. However, it is generally preferred to add more data nodes rather than increasing the resources of existing data nodes, as a higher number of data nodes increases cluster stability and improves query performance over time series.

Increasing the disk I/O and disk space of existing etcd nodes can improve the performance for heavy metadata queries that process a large number of metadata entries.

### Horizontal Scalability
Horizontal scalability refers to adding more nodes to the cluster.

Increasing the number of liaison nodes can increase the maximum possible data ingestion speed, as the ingested data can be split among a larger number of liaison nodes. It can also increase the maximum possible query rate, as the incoming concurrent requests can be split among a larger number of liaison nodes.

Increasing the number of data nodes can increase the number of time series the cluster can handle. This can also improve query performance, as each data node contains a lower number of time series when the number of data nodes increases.

The new added data nodes can be automatically discovered by the existing liaison nodes. It is recommended to add data nodes one by one to avoid overloading the liaison nodes with the new data nodes' metadata.

The cluster's availability is also improved by increasing the number of data nodes, as active data nodes need to handle a lower additional workload when some data nodes become unavailable. For example, if one node out of 2 nodes is unavailable, then 50% of the load is re-distributed across the remaining node, resulting in a 100% per-node workload increase. If one node out of 10 nodes is unavailable, then 10% of the load is re-distributed across the 9 remaining nodes, resulting in only an 11% per-node workload increase.

Increasing the number of etcd nodes can increase the cluster's metadata capacity and improve the cluster's metadata query performance. It can also improve the cluster's metadata availability, as the metadata is replicated across all the etcd nodes. However, the cluster size should be odd to avoid split-brain situations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etcd could be a potential risk when we run larger scale deployment, I believe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely.

During the test, I used 10 data nodes and only 1 etcd node in a medium-sized cluster. Moving forward, we need to include more extensive and complex scenarios in the scale testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and when we meet Lenovo team next month, we need to verify the scale with them.

22 changes: 13 additions & 9 deletions pkg/node/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,33 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata schema.Metadata) {
}
r.mu.Lock()
defer r.mu.Unlock()
r.removeGroup(group.Metadata.Name)
for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
k := key{group: group.Metadata.Name, shardID: i}
r.lookupTable = append(r.lookupTable, k)
}
r.sortEntries()
}

func (r *roundRobinSelector) removeGroup(group string) {
for i := 0; i < len(r.lookupTable); {
if r.lookupTable[i].group == group {
copy(r.lookupTable[i:], r.lookupTable[i+1:])
r.lookupTable = r.lookupTable[:len(r.lookupTable)-1]
} else {
i++
}
}
}

func (r *roundRobinSelector) OnDelete(schemaMetadata schema.Metadata) {
if schemaMetadata.Kind != schema.KindGroup {
return
}
r.mu.Lock()
defer r.mu.Unlock()
group := schemaMetadata.Spec.(*commonv1.Group)
for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
k := key{group: group.Metadata.Name, shardID: i}
for j := range r.lookupTable {
if r.lookupTable[j] == k {
r.lookupTable = append(r.lookupTable[:j], r.lookupTable[j+1:]...)
break
}
}
}
r.removeGroup(group.Metadata.Name)
}

func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) {
Expand Down
65 changes: 53 additions & 12 deletions pkg/node/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,61 @@ func TestStringer(t *testing.T) {
assert.NotEmpty(t, selector.String())
}

var groupSchema = schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindGroup,
},
Spec: &commonv1.Group{
Metadata: &commonv1.Metadata{
Name: "group1",
func TestChangeShard(t *testing.T) {
s := NewRoundRobinSelector(nil)
selector := s.(*roundRobinSelector)
setupGroup(selector)
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node1"}})
selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: "node2"}})
_, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
_, err = selector.Pick("group1", "", 1)
assert.NoError(t, err)
// Reduce shard number to 1
selector.OnAddOrUpdate(groupSchema1)
_, err = selector.Pick("group1", "", 0)
assert.NoError(t, err)
_, err = selector.Pick("group1", "", 1)
assert.Error(t, err)
// Restore shard number to 2
setupGroup(selector)
node1, err := selector.Pick("group1", "", 0)
assert.NoError(t, err)
node2, err := selector.Pick("group1", "", 1)
assert.NoError(t, err)
assert.NotEqual(t, node1, node2)
}

var (
groupSchema = schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindGroup,
},
Catalog: commonv1.Catalog_CATALOG_MEASURE,
ResourceOpts: &commonv1.ResourceOpts{
ShardNum: 2,
Spec: &commonv1.Group{
Metadata: &commonv1.Metadata{
Name: "group1",
},
Catalog: commonv1.Catalog_CATALOG_MEASURE,
ResourceOpts: &commonv1.ResourceOpts{
ShardNum: 2,
},
},
},
}
}
groupSchema1 = schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindGroup,
},
Spec: &commonv1.Group{
Metadata: &commonv1.Metadata{
Name: "group1",
},
Catalog: commonv1.Catalog_CATALOG_MEASURE,
ResourceOpts: &commonv1.ResourceOpts{
ShardNum: 1,
},
},
}
)

func setupGroup(selector Selector) {
selector.(*roundRobinSelector).OnAddOrUpdate(groupSchema)
Expand Down
16 changes: 8 additions & 8 deletions test/e2e-v2/cases/cluster/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ setup:
timeout: 20m
init-system-environment: ../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl

trigger:
action: http
Expand All @@ -46,5 +46,5 @@ verify:
# the interval between two retries, in millisecond.
interval: 10s
cases:
- includes:
- storage-cases.yaml
- includes:
- storage-cases.yaml
33 changes: 33 additions & 0 deletions test/scale/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

QPS ?= 10

GROUP ?= "default"

.PHONY: up_traffic
up_traffic:
curl -XPOST 'http://localhost:12800/mock-data/segments/tasks?qps=$(QPS)&group=$(GROUP)' -H'Content-Type: application/json' -d "@segment.tpl.json"

.PHONY: ls_traffic
ls_traffic:
curl -XGET 'http://localhost:12800/mock-data/segments/tasks'

.PHONY: rm_traffic
rm_traffic:
curl -XDELETE 'http://localhost:12800/mock-data/segments/tasks'
Loading
Loading