Skip to content

Commit

Permalink
Fix kubelet memory leak when device plugin is registered
Browse files Browse the repository at this point in the history
  • Loading branch information
carlory committed May 7, 2024
1 parent 1dc30bf commit 516071a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
17 changes: 16 additions & 1 deletion pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ type DevicePlugin interface {
// Client interface provides methods for establishing/closing gRPC connection and running the device plugin gRPC client.
type Client interface {
Connect() error
SocketPath() string
Run()
Disconnect() error
}

var _ Client = &client{}
var _ DevicePlugin = &client{}

type client struct {
mutex sync.Mutex
resource string
Expand All @@ -69,11 +73,22 @@ func (c *client) Connect() error {
klog.ErrorS(err, "Unable to connect to device plugin client with socket path", "path", c.socket)
return err
}

err = c.handler.PluginConnected(c.resource, c)
if err != nil {
klog.ErrorS(err, "Failed to connect to device plugin", "resource", c.resource)
if err := conn.Close(); err != nil {
klog.V(2).ErrorS(err, "Failed to close grcp connection", "resource", c.resource)
}
return err
}

c.mutex.Lock()
c.grpc = conn
c.client = client
c.mutex.Unlock()
return c.handler.PluginConnected(c.resource, c)

return nil
}

// Run is for running the device plugin gRPC client.
Expand Down
13 changes: 10 additions & 3 deletions pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,21 @@ func (s *server) ValidatePlugin(pluginName string, endpoint string, versions []s
}

func (s *server) connectClient(name string, socketPath string) error {
c := NewPluginClient(name, socketPath, s.chandler)
c := s.getClient(name)
if c != nil {
if c.SocketPath() != socketPath {
return fmt.Errorf("the device plugin %s already registered with a different socket path %s", name, c.SocketPath())
}
klog.V(2).InfoS("Client already connected with the same socket path", "name", name, "socketPath", socketPath)
return nil
}

s.registerClient(name, c)
c = NewPluginClient(name, socketPath, s.chandler)
if err := c.Connect(); err != nil {
s.deregisterClient(name)
klog.ErrorS(err, "Failed to connect to new client", "resource", name)
return err
}
s.registerClient(name, c)

go func() {
s.runClient(name, c)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type server struct {
grpc *grpc.Server
rhandler RegistrationHandler
chandler ClientHandler
clients map[string]Client
// clients is a map of plugin name to connected client
clients map[string]Client
}

// NewServer returns an initialized device plugin registration server.
Expand Down

0 comments on commit 516071a

Please sign in to comment.