Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.5] backport support dynamically adding & starting new member #17437

Open
wants to merge 3 commits into
base: release-3.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMemberReplace(t *testing.T) {
removedMemberPeerUrl := member.Config().Purl.String()
_, err = cc.MemberAdd(memberName, []string{removedMemberPeerUrl})
require.NoError(t, err)
member.Config().Args = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
require.NoError(t, err)

// Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687.
Expand Down
124 changes: 30 additions & 94 deletions tests/e2e/etcd_grpcproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/e2e"
Expand All @@ -33,82 +30,44 @@ import (
func TestGrpcProxyAutoSync(t *testing.T) {
e2e.SkipInShortMode(t)

var (
node1Name = "node1"
node1ClientURL = "http://localhost:12379"
node1PeerURL = "http://localhost:12380"

node2Name = "node2"
node2ClientURL = "http://localhost:22379"
node2PeerURL = "http://localhost:22380"
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
ClusterSize: 1,
})
require.NoError(t, err)
defer func() {
assert.NoError(t, epc.Close())
}()

var (
node1ClientURL = epc.Procs[0].Config().Acurl
proxyClientURL = "127.0.0.1:32379"

autoSyncInterval = 1 * time.Second
)

// Run cluster of one node
proc1, err := runEtcdNode(
node1Name, t.TempDir(),
node1ClientURL, node1PeerURL,
"new", fmt.Sprintf("%s=%s", node1Name, node1PeerURL),
)
require.NoError(t, err)

// Run grpc-proxy instance
// Run independent grpc-proxy instance
proxyProc, err := e2e.SpawnCmd([]string{e2e.BinDir + "/etcd", "grpc-proxy", "start",
"--advertise-client-url", proxyClientURL, "--listen-addr", proxyClientURL,
"--endpoints", node1ClientURL,
"--endpoints-auto-sync-interval", autoSyncInterval.String(),
"--endpoints-auto-sync-interval", "1s",
}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, proxyProc.Stop())
}()

err = e2e.SpawnWithExpect([]string{e2e.CtlBinPath, "--endpoints", proxyClientURL, "put", "k1", "v1"}, "OK")
require.NoError(t, err)

err = e2e.SpawnWithExpect([]string{e2e.CtlBinPath, "--endpoints", node1ClientURL, "member", "add", node2Name, "--peer-urls", node2PeerURL}, "added")
require.NoError(t, err)

// Run new member
proc2, err := runEtcdNode(
node2Name, t.TempDir(),
node2ClientURL, node2PeerURL,
"existing", fmt.Sprintf("%s=%s,%s=%s", node1Name, node1PeerURL, node2Name, node2PeerURL),
)
// Add and start second member
err = epc.StartNewProc(t)
require.NoError(t, err)

// Wait for auto sync of endpoints
_, err = proxyProc.Expect(strings.Replace(node2ClientURL, "http://", "", 1))
require.NoError(t, err)

memberList, err := getMemberListFromEndpoint(node1ClientURL)
require.NoError(t, err)

node1MemberID, err := findMemberIDByEndpoint(memberList.Members, node1ClientURL)
require.NoError(t, err)

node2MemberID, err := findMemberIDByEndpoint(memberList.Members, node2ClientURL)
require.NoError(t, err)

// Remove node1 from member list and stop this node

// Second node could be not ready yet
for i := 0; i < 10; i++ {
err = e2e.SpawnWithExpect([]string{e2e.CtlBinPath, "--endpoints", node2ClientURL, "member", "remove", fmt.Sprintf("%x", node1MemberID)}, "removed")
if err != nil && strings.Contains(err.Error(), rpctypes.ErrGRPCUnhealthy.Error()) {
time.Sleep(500 * time.Millisecond)
continue
}
break
}

err = waitForEndpointInLog(proxyProc, epc.Procs[1].Config().Acurl)
require.NoError(t, err)

_, err = proc1.Expect("the member has been permanently removed from the cluster")
require.NoError(t, err)
require.NoError(t, proc1.Stop())

_, err = proc2.Expect(fmt.Sprintf("%x became leader", node2MemberID))
err = epc.CloseProc(func(proc e2e.EtcdProcess) bool {
return proc.Config().Acurl == node1ClientURL
})
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand All @@ -120,10 +79,6 @@ func TestGrpcProxyAutoSync(t *testing.T) {
}
break
}
require.NoError(t, err)

require.NoError(t, proc2.Stop())
require.NoError(t, proxyProc.Stop())
}

func runEtcdNode(name, dataDir, clientURL, peerURL, clusterState, initialCluster string) (*expect.ExpectProcess, error) {
Expand All @@ -145,34 +100,15 @@ func runEtcdNode(name, dataDir, clientURL, peerURL, clusterState, initialCluster
return proc, err
}

func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (uint64, error) {
for _, m := range members {
if m.ClientURLs[0] == endpoint {
return m.ID, nil
}
}

return 0, fmt.Errorf("member not found")
}
func waitForEndpointInLog(proxyProc *expect.ExpectProcess, endpoint string) error {
endpoint = strings.Replace(endpoint, "http://", "", 1)

func getMemberListFromEndpoint(endpoint string) (etcdserverpb.MemberListResponse, error) {
proc, err := e2e.SpawnCmd([]string{e2e.CtlBinPath, "--endpoints", endpoint, "member", "list", "--write-out", "json"}, nil)
if err != nil {
return etcdserverpb.MemberListResponse{}, err
}
var txt string
txt, err = proc.Expect("members")
if err != nil {
return etcdserverpb.MemberListResponse{}, err
}
if err = proc.Close(); err != nil {
return etcdserverpb.MemberListResponse{}, err
}
_, err := proxyProc.ExpectFunc(func(s string) bool {
if strings.Contains(s, endpoint) {
return true
}
return false
})

resp := etcdserverpb.MemberListResponse{}
dec := json.NewDecoder(strings.NewReader(txt))
if err := dec.Decode(&resp); err == io.EOF {
return etcdserverpb.MemberListResponse{}, err
}
return resp, nil
return err
}
9 changes: 3 additions & 6 deletions tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,12 @@ func getMemberIdByName(ctx context.Context, c *e2e.Etcdctl, name string) (id uin
return 0, false, nil
}

// Different implementations here since 3.5 e2e test framework does not have "initial-cluster-state" as a default argument
// Append new flag if not exist, otherwise replace the value
func patchArgs(args []string, flag, newValue string) []string {
func patchArgs(args []string, flag, newValue string) error {
for i, arg := range args {
if strings.Contains(arg, flag) {
args[i] = fmt.Sprintf("--%s=%s", flag, newValue)
return args
return nil
}
}
args = append(args, fmt.Sprintf("--%s=%s", flag, newValue))
return args
return fmt.Errorf("--%s flag not found", flag)
}