Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kubelet memory leak when device plugin is registered #124719

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ type DevicePlugin interface {
// Client interface provides methods for establishing/closing gRPC connection and running the device plugin gRPC client.
type Client interface {
Connect() error
SocketPath() string
Run()
Disconnect() error
}

var _ Client = &client{}
var _ DevicePlugin = &client{}

type client struct {
mutex sync.Mutex
resource string
Expand All @@ -63,16 +67,32 @@ func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
}

// Connect is for establishing a gRPC connection between device manager and device plugin.
func (c *client) Connect() error {
func (c *client) Connect() (rerr error) {
client, conn, err := dial(c.socket)
if err != nil {
klog.ErrorS(err, "Unable to connect to device plugin client with socket path", "path", c.socket)
return err
}

c.mutex.Lock()
defer c.mutex.Unlock()

c.grpc = conn
c.client = client
c.mutex.Unlock()

defer func() {
if rerr == nil {
return
}
klog.ErrorS(err, "Failed to connect to device plugin", "resource", c.resource)
if err := conn.Close(); err != nil {
klog.V(2).ErrorS(err, "Failed to close gRPC connection", "resource", c.resource)
return
}
c.grpc = nil
c.client = nil
}()

return c.handler.PluginConnected(c.resource, c)
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,21 @@ func (s *server) ValidatePlugin(pluginName string, endpoint string, versions []s
}

func (s *server) connectClient(name string, socketPath string) error {
c := NewPluginClient(name, socketPath, s.chandler)
c := s.getClient(name)
if c != nil {
if c.SocketPath() != socketPath {
return fmt.Errorf("the device plugin %s already registered with a different socket path %s", name, c.SocketPath())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("the device plugin %s already registered with a different socket path %s", name, c.SocketPath())
return fmt.Errorf("The device plugin %s already registered with a different socket path %s", name, c.SocketPath())

Fantastic work. Stick with one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go-staticcheck: error strings should not be capitalized (ST1005)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thx

}
klog.V(2).InfoS("Client already connected with the same socket path", "name", name, "socketPath", socketPath)
return nil
}

s.registerClient(name, c)
c = NewPluginClient(name, socketPath, s.chandler)
if err := c.Connect(); err != nil {
s.deregisterClient(name)
klog.ErrorS(err, "Failed to connect to new client", "resource", name)
return err
}
s.registerClient(name, c)

go func() {
s.runClient(name, c)
Expand All @@ -83,8 +90,11 @@ func (s *server) connectClient(name string, socketPath string) error {
}

func (s *server) disconnectClient(name string, c Client) error {
if err := c.Disconnect(); err != nil {
return err
}
s.deregisterClient(name)
return c.Disconnect()
return nil
}

func (s *server) registerClient(name string, c Client) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type server struct {
grpc *grpc.Server
rhandler RegistrationHandler
chandler ClientHandler
clients map[string]Client
// clients is a map of plugin name to connected client
clients map[string]Client
}

// NewServer returns an initialized device plugin registration server.
Expand Down