Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix GracefulStop issue when using cmux for TLS #17790

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 25 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (sctx *serveCtx) serve(
defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
gs.GracefulStop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
}
}(gs)
Expand Down Expand Up @@ -202,16 +202,39 @@ func (sctx *serveCtx) serve(
}

if grpcEnabled {
// TODO(XXX):
//
// WaitForHandlers is experimental function to drain
// all the inflight handlers, including stream RPCs.
// For cmux mode, we can't call GracefulStop because of
// [1].
//
// Actually, we do call http.Shutdown first in stopServers.
// We still need to drain all the inflight handlers to
// make sure that there is no leaky goroutines to
// use closed backend and panic. Add WaitForHandlers
// to force gs.Stop to drain. We can remove this option
// when we remove cmux [2].
//
// [1]: https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// [2]: https://github.com/etcd-io/etcd/issues/15402
gopts = append(gopts, grpc.WaitForHandlers(true))

gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
sctx.serviceRegister(gs)
}

defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
gs.Stop()
if httpEnabled {
gs.Stop()
} else {
gs.GracefulStop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the GracefulStop takes too long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GracefulStop might be hang when there is deadlock during applying changes. I run into issue that the Snapshot streaming RPCs sends data in very slow rate. The GracefulStop will be blocked until Snapshot finished. However, the connection is closed. Readiness probe failed and then kubelet sends SIGKILL. If there is no probe and force to kill it, the server will be blocked until all the RPCs finished.

}
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
}
}(gs)
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
const snapshotSendBufferSize = 32 * 1024

func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
// gofail: var v3rpcBeforeSnapshot struct{}
ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
storageVersion := ""
if ver != nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/common/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestAuthGracefulDisable(t *testing.T) {

watchCh := rootAuthClient.Watch(wCtx, "key", config.WatchOptions{Revision: 1})
wantedLen := 1
watchTimeout := 10 * time.Second
watchTimeout := 15 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this test affected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I change gs.Stop to gs.GracefulStop in serving code's cleanup.

etcd/server/embed/serve.go

Lines 159 to 163 in 0cd5999

defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))

ETCD calls http.Shutdown during stopServers.

etcd/server/embed/etcd.go

Lines 458 to 465 in 0cd5999

func stopServers(ctx context.Context, ss *servers) {
// first, close the http.Server
if ss.http != nil {
ss.http.Shutdown(ctx)
}
if ss.grpc == nil {
return
}

That call will close the net.Listener. So both http.Serve and grpc.Serve will exit because of using closed connection. Before this patch, we always call gs.Stop in the following code. It's kind of conflict with stopServers logic which wants graceful shutdown. So, I change it and it takes a little longger than before. Hope it can help

etcd/server/embed/serve.go

Lines 159 to 163 in 0cd5999

defer func(gs *grpc.Server) {
if err != nil {
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
gs.Stop()
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))

wanted := []testutils.KV{{Key: "key", Val: "value"}}
kvs, err := testutils.KeyValuesFromWatchChan(watchCh, wantedLen, watchTimeout)
require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err)
Expand Down
171 changes: 171 additions & 0 deletions tests/e2e/drain_in_shutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !cluster_proxy

package e2e

import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestShouldDrainRequestDuringShutdown(t *testing.T) {
e2e.BeforeTest(t)

// defaultBuildSnapshotConn is to setup a database with 10 MiB and a
// inflight snapshot streaming RPC.
defaultBuildSnapshotConn := func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser {
t.Helper()

require.NoError(t, fillEtcdWithData(ctx, cli, 10*1024*1024))

rc, err := cli.Snapshot(ctx)
require.NoError(t, err)
t.Cleanup(func() { rc.Close() })

// make sure that streaming RPC is in progress
buf := make([]byte, 1)
n, err := rc.Read(buf)
assert.NoError(t, err)
assert.Equal(t, 1, n)

return rc
}

// defaultVerifySnapshotConn is to make sure that connection is still
// working even if the server is in shutdown state.
defaultVerifySnapshotConn := func(t *testing.T, rc io.ReadCloser) {
t.Helper()

_, err := io.Copy(io.Discard, rc)
require.NoError(t, err)
}

tcs := []struct {
name string
options []e2e.EPClusterOption
cliOpt e2e.ClientConfig

buildSnapshotConn func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser
verifySnapshotConn func(t *testing.T, rc io.ReadCloser)
}{
{
name: "no-tls",
options: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithClientAutoTLS(false),
},
cliOpt: e2e.ClientConfig{ConnectionType: e2e.ClientNonTLS},

buildSnapshotConn: defaultBuildSnapshotConn,
verifySnapshotConn: defaultVerifySnapshotConn,
},
{
name: "auto-tls_http_separated",
options: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithClientAutoTLS(true),
e2e.WithClientConnType(e2e.ClientTLS),
e2e.WithClientHTTPSeparate(true),
},
cliOpt: e2e.ClientConfig{
ConnectionType: e2e.ClientTLS,
AutoTLS: true,
},
buildSnapshotConn: defaultBuildSnapshotConn,
verifySnapshotConn: defaultVerifySnapshotConn,
},
{
name: "auto-tls_cmux",
options: []e2e.EPClusterOption{
e2e.WithClusterSize(1),
e2e.WithClientAutoTLS(true),
e2e.WithClientConnType(e2e.ClientTLS),
e2e.WithClientHTTPSeparate(false),
e2e.WithGoFailEnabled(true),
// NOTE: Using failpoint is to make sure that
// the RPC handler won't exit because of closed
// connection.
e2e.WithEnvVars(map[string]string{
"GOFAIL_FAILPOINTS": `v3rpcBeforeSnapshot=sleep("8s")`,
}),
},
cliOpt: e2e.ClientConfig{
ConnectionType: e2e.ClientTLS,
AutoTLS: true,
},
buildSnapshotConn: func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser {
t.Helper()

rc, err := cli.Snapshot(ctx)
require.NoError(t, err)
t.Cleanup(func() { rc.Close() })

// make sure server receives the RPC.
time.Sleep(2 * time.Second)
return rc
},
verifySnapshotConn: func(t *testing.T, rc io.ReadCloser) {
t.Helper()

_, err := io.Copy(io.Discard, rc)
require.Error(t, err) // connection will be closed forcely
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()

epc, err := e2e.NewEtcdProcessCluster(ctx, t, tc.options...)
require.NoError(t, err)
t.Cleanup(func() { epc.Close() })

grpcEndpoint := epc.Procs[0].EndpointsGRPC()[0]
if tc.cliOpt.ConnectionType == e2e.ClientTLS {
grpcEndpoint = e2e.ToTLS(grpcEndpoint)
}

cli := newClient(t, []string{grpcEndpoint}, tc.cliOpt)

rc := tc.buildSnapshotConn(ctx, t, cli)

errCh := make(chan error, 1)
go func() {
defer close(errCh)
errCh <- epc.Stop()
}()

select {
case <-time.After(4 * time.Second):
case err := <-errCh:
t.Fatalf("should drain request but got error from cluster stop: %v", err)
}

tc.verifySnapshotConn(t, rc)

require.NoError(t, <-errCh)
})
}
}
4 changes: 4 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func WithClientAutoTLS(isClientAutoTLS bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.Client.AutoTLS = isClientAutoTLS }
}

func WithClientHTTPSeparate(separate bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.ClientHTTPSeparate = separate }
}

func WithClientRevokeCerts(isClientCRL bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.Client.RevokeCerts = isClientCRL }
}
Expand Down