Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add randomness in robustness cluster process version to test mixed version scenarios. #17923

Merged
merged 3 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions tests/e2e/etcd_mix_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease)
}

clusterOptions := []e2e.EPClusterOption{e2e.WithConfig(cfg), e2e.WithSnapshotCount(10)}
// TODO: remove version check after 3.5.14 release.
if cfg.Version == e2e.CurrentVersion {
clusterOptions = append(clusterOptions, e2e.WithSnapshotCatchUpEntries(10))
}
t.Logf("Create an etcd cluster with %d member", cfg.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, clusterOptions...)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
Expand All @@ -161,7 +162,7 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
assertKVHash(t, epc)

leaderEPC = epc.Procs[epc.WaitLeader(t)]
if leaderEPC.Config().ExecPath == e2e.BinPath.Etcd {
if cfg.Version == e2e.CurrentVersion {
t.Log("Verify logs to check snapshot be sent from leader to follower")
e2e.AssertProcessLogs(t, leaderEPC, "sent database snapshot")
}
Expand Down
105 changes: 77 additions & 28 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ type EtcdProcessClusterConfig struct {

// Cluster setup config

ClusterSize int
RollingStart bool
ClusterSize int
// InitialLeaderIndex makes sure the leader is the ith proc
// when the cluster starts if it is specified (>=0).
InitialLeaderIndex int
RollingStart bool
// BaseDataDirPath specifies the data-dir for the members. If test cases
// do not specify `BaseDataDirPath`, then e2e framework creates a
// temporary directory for each member; otherwise, it creates a
Expand Down Expand Up @@ -180,10 +183,10 @@ type EtcdProcessClusterConfig struct {

func DefaultConfig() *EtcdProcessClusterConfig {
cfg := &EtcdProcessClusterConfig{
ClusterSize: 3,
CN: true,

ServerConfig: *embed.NewConfig(),
ClusterSize: 3,
CN: true,
InitialLeaderIndex: -1,
ServerConfig: *embed.NewConfig(),
}
cfg.ServerConfig.InitialClusterToken = "new"
return cfg
Expand All @@ -207,6 +210,10 @@ func WithVersion(version ClusterVersion) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.Version = version }
}

func WithInitialLeaderIndex(i int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.InitialLeaderIndex = i }
}

func WithDataDirPath(path string) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.BaseDataDirPath = path }
}
Expand Down Expand Up @@ -398,6 +405,16 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP
cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount
}

// validate SnapshotCatchUpEntries could be set for at least one member
if cfg.ServerConfig.SnapshotCatchUpEntries != etcdserver.DefaultSnapshotCatchUpEntries {
if !CouldSetSnapshotCatchupEntries(BinPath.Etcd) {
return nil, fmt.Errorf("cannot set SnapshotCatchUpEntries for current etcd version: %s", BinPath.Etcd)
}
if cfg.Version == LastVersion && !CouldSetSnapshotCatchupEntries(BinPath.EtcdLastRelease) {
return nil, fmt.Errorf("cannot set SnapshotCatchUpEntries for last etcd version: %s", BinPath.EtcdLastRelease)
}
}

etcdCfgs := cfg.EtcdAllServerProcessConfigs(t)
epc := &EtcdProcessCluster{
Cfg: cfg,
Expand Down Expand Up @@ -437,7 +454,11 @@ func StartEtcdProcessCluster(ctx context.Context, t testing.TB, epc *EtcdProcess
t.Skip("please run 'make gofail-enable && make build' before running the test")
}
}

if cfg.InitialLeaderIndex >= 0 {
if err := epc.MoveLeader(ctx, t, cfg.InitialLeaderIndex); err != nil {
return nil, fmt.Errorf("failed to move leader: %v", err)
}
}
return epc, nil
}

Expand Down Expand Up @@ -570,27 +591,6 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
args = append(args, "--discovery="+cfg.Discovery)
}

defaultValues := values(*embed.NewConfig())
overrideValues := values(cfg.ServerConfig)
for flag, value := range overrideValues {
if defaultValue := defaultValues[flag]; value == "" || value == defaultValue {
continue
}
if flag == "experimental-snapshot-catchup-entries" && !(cfg.Version == CurrentVersion || (cfg.Version == MinorityLastVersion && i <= cfg.ClusterSize/2) || (cfg.Version == QuorumLastVersion && i > cfg.ClusterSize/2)) {
continue
}
args = append(args, fmt.Sprintf("--%s=%s", flag, value))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

var execPath string
switch cfg.Version {
case CurrentVersion:
Expand All @@ -613,6 +613,27 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
panic(fmt.Sprintf("Unknown cluster version %v", cfg.Version))
}

defaultValues := values(*embed.NewConfig())
serathius marked this conversation as resolved.
Show resolved Hide resolved
overrideValues := values(cfg.ServerConfig)
for flag, value := range overrideValues {
if defaultValue := defaultValues[flag]; value == "" || value == defaultValue {
continue
}
if flag == "experimental-snapshot-catchup-entries" && !CouldSetSnapshotCatchupEntries(execPath) {
continue
}
args = append(args, fmt.Sprintf("--%s=%s", flag, value))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

return &EtcdServerProcessConfig{
lg: cfg.Logger,
ExecPath: execPath,
Expand Down Expand Up @@ -1050,3 +1071,31 @@ func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testi
t.Fatal("impossible path of execution")
return -1
}

// MoveLeader moves the leader to the ith process.
func (epc *EtcdProcessCluster) MoveLeader(ctx context.Context, t testing.TB, i int) error {
if i < 0 || i >= len(epc.Procs) {
return fmt.Errorf("invalid index: %d, must between 0 and %d", i, len(epc.Procs)-1)
}
t.Logf("moving leader to Procs[%d]", i)
oldLeader := epc.WaitMembersForLeader(ctx, t, epc.Procs)
if oldLeader == i {
t.Logf("Procs[%d] is already the leader", i)
return nil
}
resp, err := epc.Procs[i].Etcdctl().Status(ctx)
if err != nil {
return err
}
memberID := resp[0].Header.MemberId
err = epc.Procs[oldLeader].Etcdctl().MoveLeader(ctx, memberID)
if err != nil {
return err
}
newLeader := epc.WaitMembersForLeader(ctx, t, epc.Procs)
if newLeader != i {
t.Fatalf("expect new leader to be Procs[%d] but got Procs[%d]", i, newLeader)
}
t.Logf("moved leader from Procs[%d] to Procs[%d]", oldLeader, i)
return nil
}
27 changes: 26 additions & 1 deletion tests/framework/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
package e2e

import (
"fmt"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
)

func TestEtcdServerProcessConfig(t *testing.T) {
v3_5_12 := semver.Version{Major: 3, Minor: 5, Patch: 12}
v3_5_13 := semver.Version{Major: 3, Minor: 5, Patch: 13}
tcs := []struct {
name string
config *EtcdProcessClusterConfig
expectArgsNotContain []string
expectArgsContain []string
mockBinaryVersion *semver.Version
}{
{
name: "Default",
Expand Down Expand Up @@ -73,17 +78,37 @@ func TestEtcdServerProcessConfig(t *testing.T) {
expectArgsContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
mockBinaryVersion: &v3_5_13,
},
{
siyuanfoundation marked this conversation as resolved.
Show resolved Hide resolved
name: "CatchUpEntriesLastVersion",
name: "CatchUpEntriesNoVersion",
config: NewConfig(WithSnapshotCatchUpEntries(100), WithVersion(LastVersion)),
expectArgsNotContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
},
{
name: "CatchUpEntriesOldVersion",
config: NewConfig(WithSnapshotCatchUpEntries(100), WithVersion(LastVersion)),
expectArgsNotContain: []string{
"--experimental-snapshot-catchup-entries=100",
},
mockBinaryVersion: &v3_5_12,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var mockGetVersionFromBinary func(binaryPath string) (*semver.Version, error)
if tc.mockBinaryVersion == nil {
serathius marked this conversation as resolved.
Show resolved Hide resolved
mockGetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
return nil, fmt.Errorf("could not get binary version")
}
} else {
mockGetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
return tc.mockBinaryVersion, nil
}
}
setGetVersionFromBinary(t, mockGetVersionFromBinary)
args := tc.config.EtcdServerProcessConfig(t, 0).Args
if len(tc.expectArgsContain) != 0 {
assert.Subset(t, args, tc.expectArgsContain)
Expand Down
24 changes: 23 additions & 1 deletion tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,10 @@ func parseFailpointsBody(body io.Reader) (map[string]string, error) {
return failpoints, nil
}

func GetVersionFromBinary(binaryPath string) (*semver.Version, error) {
var GetVersionFromBinary = func(binaryPath string) (*semver.Version, error) {
if !fileutil.Exist(binaryPath) {
return nil, fmt.Errorf("binary path does not exist: %s", binaryPath)
}
lines, err := RunUtilCompletion([]string{binaryPath, "--version"}, nil)
if err != nil {
return nil, fmt.Errorf("could not find binary version from %s, err: %w", binaryPath, err)
Expand All @@ -508,3 +511,22 @@ func GetVersionFromBinary(binaryPath string) (*semver.Version, error) {

return nil, fmt.Errorf("could not find version in binary output of %s, lines outputted were %v", binaryPath, lines)
}

// setGetVersionFromBinary changes the GetVersionFromBinary function to a mock in testing.
func setGetVersionFromBinary(tb testing.TB, f func(binaryPath string) (*semver.Version, error)) {
origGetVersionFromBinary := GetVersionFromBinary
GetVersionFromBinary = f
tb.Cleanup(func() {
GetVersionFromBinary = origGetVersionFromBinary
})
}

func CouldSetSnapshotCatchupEntries(execPath string) bool {
v, err := GetVersionFromBinary(execPath)
if err != nil {
return false
}
// snapshot-catchup-entries flag was backported in https://github.com/etcd-io/etcd/pull/17808
v3_5_13 := semver.Version{Major: 3, Minor: 5, Patch: 13}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I cheated. The proper version that supports snapshot-catchup-entries is v3.5.14, but I didn't want to wait for release :P

return v.Compare(v3_5_13) >= 0
}
7 changes: 7 additions & 0 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (ctl *EtcdctlV3) MemberPromote(ctx context.Context, id uint64) (*clientv3.M
return &resp, err
}

// MoveLeader requests current leader to transfer its leadership to the transferee.
// Request must be made to the leader.
func (ctl *EtcdctlV3) MoveLeader(ctx context.Context, transfereeID uint64) error {
_, err := SpawnWithExpectLines(ctx, ctl.cmdArgs("move-leader", fmt.Sprintf("%x", transfereeID)), nil, expect.ExpectedResponse{Value: "Leadership transferred"})
return err
}

func (ctl *EtcdctlV3) cmdArgs(args ...string) []string {
cmdArgs := []string{BinPath.Etcdctl}
for k, v := range ctl.flags() {
Expand Down
5 changes: 5 additions & 0 deletions tests/framework/e2e/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func InitFlags() {
certDirDef := FixturesDir

binDir := flag.String("bin-dir", binDirDef, "The directory for store etcd and etcdctl binaries.")
binLastRelease := flag.String("bin-last-release", "", "The path for the last release etcd binary.")

flag.StringVar(&CertDir, "cert-dir", certDirDef, "The directory for store certificate files.")
flag.Parse()

Expand All @@ -79,6 +81,9 @@ func InitFlags() {
Etcdutl: *binDir + "/etcdutl",
LazyFS: *binDir + "/lazyfs",
}
if *binLastRelease != "" {
BinPath.EtcdLastRelease = *binLastRelease
}
CertPath = CertDir + "/server.crt"
PrivateKeyPath = CertDir + "/server.key.insecure"
CaPath = CertDir + "/ca.crt"
Expand Down
5 changes: 3 additions & 2 deletions tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func (f memberReplace) Name() string {
return "MemberReplace"
}

func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
return config.ClusterSize > 1
func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
// a lower etcd version may not be able to join a cluster with higher cluster version.
return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd)
}

func getID(ctx context.Context, cc clientv3.Cluster, name string) (id uint64, found bool, err error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e
func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool {
// Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout.
// Number of required entries for snapshot depends on etcd configuration.
if tb.waitTillSnapshot && entriesToGuaranteeSnapshot(config) > 200 {
if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) {
serathius marked this conversation as resolved.
Show resolved Hide resolved
return false
}
return config.ClusterSize > 1 && process.PeerProxy() != nil
Expand Down
16 changes: 14 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ test-robustness-reports:

# Test previous release branches

.PHONY: test-robustness-release-3.6
test-robustness-release-3.6: /tmp/etcd-release-3.6-failpoints/bin /tmp/etcd-release-3.5-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.6-failpoints/bin --bin-last-release=/tmp/etcd-release-3.5-failpoints/bin/etcd" make test-robustness

.PHONY: test-robustness-release-3.5
test-robustness-release-3.5: /tmp/etcd-release-3.5-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.5-failpoints/bin" make test-robustness
test-robustness-release-3.5: /tmp/etcd-release-3.5-failpoints/bin /tmp/etcd-release-3.4-failpoints/bin
GO_TEST_FLAGS="$${GO_TEST_FLAGS} --bin-dir=/tmp/etcd-release-3.5-failpoints/bin --bin-last-release=/tmp/etcd-release-3.4-failpoints/bin/etcd" make test-robustness

.PHONY: test-robustness-release-3.4
test-robustness-release-3.4: /tmp/etcd-release-3.4-failpoints/bin
Expand Down Expand Up @@ -72,6 +76,14 @@ $(GOPATH)/bin/gofail: tools/mod/go.mod tools/mod/go.sum
make gofail-enable; \
make build;

/tmp/etcd-release-3.6-failpoints/bin: $(GOPATH)/bin/gofail
rm -rf /tmp/etcd-release-3.6-failpoints/
mkdir -p /tmp/etcd-release-3.6-failpoints/
cd /tmp/etcd-release-3.6-failpoints/; \
git clone --depth 1 --branch main https://github.com/etcd-io/etcd.git .; \
make gofail-enable; \
make build;

/tmp/etcd-v3.5.2-failpoints/bin:
/tmp/etcd-v3.5.4-failpoints/bin:
/tmp/etcd-v3.5.5-failpoints/bin:
Expand Down
8 changes: 8 additions & 0 deletions tests/robustness/options/server_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ func WithExperimentalWatchProgressNotifyInterval(input ...time.Duration) e2e.EPC
c.ServerConfig.ExperimentalWatchProgressNotifyInterval = input[internalRand.Intn(len(input))]
}
}

func WithVersion(input ...e2e.ClusterVersion) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) { c.Version = input[internalRand.Intn(len(input))] }
}

func WithInitialLeaderIndex(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) { c.InitialLeaderIndex = input[internalRand.Intn(len(input))] }
}