Fatih Arslan

My thoughts about Programming, Coffee, Bags and various other stuff

How to write a Container Storage Interface (CSI) plugin

As some of you know, I’m working for the Containers team at DigitalOcean. We recently announced our new product offering: Kubernetes at DigitalOcean.

We’ve already been using Kubernetes internally over the last 2.5 years, in almost all our production offerings. When I joined DigitalOcean in late 2015, I was in the team that started our initial PaaS platform on top of Kubernetes. We’re now able to provide our years of experience deploying and maintaining Kubernetes to our customers. This also means a simple and seamless integration with all our product offerings, such as Block Storage.

However to integrate these products with Kubernetes, work needs to be done. Luckily, there is an open community to establish a standardized mechanism which aims to integrate storage systems (such as DigitalOcean Block Storage) to containerized workloads. This mechanism is called CSI and Kubernetes is one of the container systems that supports CSI.

CSI comes with a specification. Specifications are good when you want to understand a whole concept exhaustively. However they are hard to understand for newcomers. I’m a person that learns by experimenting and going through examples. The goal of this blog post is to show you how to write a CSI plugin using examples and explain the concepts in a different way. I hope you like it.

Note: At the time of this writing, the spec was at v0.2.0. A new v0.3.0 was released lately, with new additions (such as snapshot API). This blog post won’t go over these new additions.

What is CSI?

Before I dive in, let me describe some of the abbreviations (which is also part of the CSI specification, more on this later):

So far, you probably have figured out that CSI means Container Storage Interface. This is a standardized mechanism for Container Orchestration Systems to expose arbitrary storage systems to their containerized workloads. There is a CSI specification which explains how the storage interface interaction is handled between various components, such as between Container Orchestration Systems (COs) and Storage Providers.

Various community members from Kubernetes, Mesos, Docker, etc.. worked on this specification together. What this means is that CSI is not a Kubernetes specific technology or specification. A well done CSI implementation should theoretically run in all COs.

Kubernetes v1.9 exposes an alpha implementation of the CSI specification, enabling CSI compatible volume drivers to be deployed on Kubernetes and consumed by Kubernetes workloads. The recommended version is v1.10 as there are little to no additional settings needed to use a CSI plugin.

Overview

The specification defines the boundary between the CO and the CSI plugin. Before we continue, note that the plugin part actually is separated into two individual plugins.

These two entities can live in a single binary or you can separate them. That’s up to you. But more on this later.

Finally, there is also the Identity gRPC server, which we’ll see in a bit. This service needs to be implemented for each individual plugin. For example, if you have two separate plugins running, Node and Controller, both binaries need to implement the Identity gRPC interface individually.

Node and Controller plugins are running as two separate services.

Node and Controller plugins are running as two separate services.

If you however have a single binary that implements Node and Controller, you only need to add the Identity plugin once.

Node and Controller plugin runs in a single binary.

Node and Controller plugin runs in a single binary.

This would mean that a single binary would be a Node and Controller plugin at the same time. Having a single binary, that contains a single gRPC server is much easier to maintain and still has all the benefits of the individual plugins. Depending on where you deploy, it can act as a Node plugin or as a Controller plugin.

Throughout this blog post, this is what we’re going to do. Later I’ll show how you can split it (if you want).

Interfaces

Let’s continue with the interfaces and explain our first interface. The Identity service interface. Remember this has to be implemented by all plugins (Node and Controller):

service Identity {
  rpc GetPluginInfo(GetPluginInfoRequest)
	returns (GetPluginInfoResponse) {}

  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
	returns (GetPluginCapabilitiesResponse) {}

  rpc Probe (ProbeRequest)
	returns (ProbeResponse) {}
}

Each of the method does the following:

The Identity service consists of very basic methods and is mainly for identifying the service, making sure it’s healthy, and returning basic information about the plugin itself (whether it’s a Node plugin or Controller plugin). Here is a basic Go implementation of this service:

// GetPluginInfo returns metadata of the plugin
func (d *Driver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
  return &csi.GetPluginInfoResponse{
    Name:          "com.example.csi.myplugin",
    VendorVersion: "0.1.0",
  }, nil
}

// GetPluginCapabilities returns available capabilities of the plugin
func (d *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
  return &csi.GetPluginCapabilitiesResponse{
    Capabilities: []*csi.PluginCapability{
      {
      	Type: &csi.PluginCapability_Service_{
      	  Service: &csi.PluginCapability_Service{
      	    Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
      	  },
      	},
      },
    },
  }, nil
}

// Probe returns the health and readiness of the plugin
func (d *Driver) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
  return &csi.ProbeResponse{}, nil
}

The above methods implement the following Go interface (csi refers to the following Go package: github.com/container-storage-interface/spec/lib/go/csi/v0):

type IdentityServer interface {
  GetPluginInfo(context.Context, *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error)
  GetPluginCapabilities(context.Context, *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error)
  Probe(context.Context, *csi.ProbeRequest) (*csi.ProbeResponse, error)
}

One thing to note here is that GetPluginInfo must return the name of the plugin in reverse domain name notation, i.e: io.arslan.csi-blog-post


Next is the Controller service interface. This interface is responsible of controlling and managing the volumes, such as: creating, deleting, attaching/detaching, snapshotting, etc..

If the volumes are part of a Cloud Provider (such as DigitalOcean, GKE, AWS), this interface must be implemented. However if you’re planning not to use any kind of block storage or have other ways of providing storage space, you don’t have to create this interface. Here is the current definition of the Controller interface:

service Controller {
  rpc CreateVolume (CreateVolumeRequest)
	returns (CreateVolumeResponse) {}

  rpc DeleteVolume (DeleteVolumeRequest)
	returns (DeleteVolumeResponse) {}

  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
	returns (ControllerPublishVolumeResponse) {}

  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
	returns (ControllerUnpublishVolumeResponse) {}

  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
	returns (ValidateVolumeCapabilitiesResponse) {}

  rpc ListVolumes (ListVolumesRequest)
	returns (ListVolumesResponse) {}

  rpc GetCapacity (GetCapacityRequest)
	returns (GetCapacityResponse) {}

  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
	returns (ControllerGetCapabilitiesResponse) {}  
}

Let us go quickly over these methods:

To implement the Controller service, you need to implement the following Go interface:

type ControllerServer interface {
  CreateVolume(context.Context, *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error)
  DeleteVolume(context.Context, *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error)
  ControllerPublishVolume(context.Context, *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error)
  ControllerUnpublishVolume(context.Context, *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error)
  ValidateVolumeCapabilities(context.Context, *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error)
  ListVolumes(context.Context, *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error)
  GetCapacity(context.Context, *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error)
  ControllerGetCapabilities(context.Context, *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error)
}

And here is an example Go implementation (linking to it as it’s very long): controller.go.

I’ll share some common important tips & tricks at the end of this section. Let’s move on until then :)


Finally, we have the Node service interface:

service Node {
  rpc NodeStageVolume (NodeStageVolumeRequest)
	returns (NodeStageVolumeResponse) {}

  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
	returns (NodeUnstageVolumeResponse) {}

  rpc NodePublishVolume (NodePublishVolumeRequest)
	returns (NodePublishVolumeResponse) {}

  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
	returns (NodeUnpublishVolumeResponse) {}

  rpc NodeGetId (NodeGetIdRequest)
	returns (NodeGetIdResponse) {}

  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
	returns (NodeGetCapabilitiesResponse) {}
}

Let’s go over these methods, as they need some explanation:

To implement the Node service, you need to implement the following Go interface:

type NodeServer interface {
  NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
  NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
  NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
  NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
  NodeGetId(context.Context, *NodeGetIdRequest) (*NodeGetIdResponse, error)
  NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
}

And here is a corresponding example implemented in Go: node.go

Implementation tips and tricks

Let me touch couple of things that are important for all interfaces:

vol, _, err := d.doClient.Storage.CreateVolume(ctx, volumeReq)
if err != nil {
	log.Err(err).Error("creating volume failed")
	return nil, status.Error(codes.Internal, err.Error())
}

The reason for that is to capture the error, because there is no way to understand what the error content was once you returned the error. The error will propagate and might end up somewhere else in the CO system (such as events in Kubernetes, which you will usually see if you do kubectl describe pods/pod-with-pvc ). Another disadvantage is that you need to write the error twice in every single return.

A better way is to log the errors via a gRPC interceptor. An interceptor is like a Go http/handler middleware. In gRPC, you can create the following errInterceptor, which will call your method and then log the error:

errInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	if err != nil {
		log.WithError(err).WithField("method", info.FullMethod).Error("method failed")
	}
	return resp, err
}

srv = grpc.NewServer(grpc.UnaryInterceptor(errInterceptor))
srv.Serve() 
// ...

Now, with this interceptor added, every time you return an error, it’ll be logged and you don’t have to write a log statement in each return anymore.

import "github.com/kubernetes-csi/csi-test/pkg/sanity"

func TestMyDriver(t *testing.T) {
	// Setup the driver and its environment
	endpoint := "unix:///tmp/csi.sock"
	
	// assuming your driver is encapsulated in a struct like this
	// also, your driver needs to be testable. So if it's calling a cloud
	// provider, you need to replace it with a fake API client. Applies to
	// everything that touches something outside ...
	driver := &Driver{
		endpoint: endpoint,
	
	}
	defer driver.Stop()
	
	// run your driver
	go driver.Run()
	
	mntDir, err := ioutil.TempDir("", "mnt")
	if err != nil {
		t.Fatal(err)
	}
	defer os.RemoveAll(mntDir)
	
	mntStageDir, err := ioutil.TempDir("", "mnt-stage")
	if err != nil {
		t.Fatal(err)
	}
	defer os.RemoveAll(mntStageDir)
	
	cfg := &sanity.Config{
		StagingPath: mntStageDir,
		TargetPath:  mntDir,
		Address:     endpoint,
	}
	
	// Now call the test suite
	sanity.Test(t, cfg)
}

A simple go test will run the test with all the individual cases:

$ go test
Running Suite: CSI Driver Test Suite
====================================
Random Seed: 1528418150
Will run 31 of 31 specs

Ran 30 of 31 Specs in 0.020 seconds
SUCCESS! -- 30 Passed | 0 Failed | 0 Pending | 1 Skipped --- PASS: TestDriverSuite (0.02s)
PASS
ok      github.com/digitalocean/csi-digitalocean/driver 0.039s

Checkout how the csi-test package is used within the csi-digitalocean driver: csi-digitalocean/driver_test.go. Note that this is not a end to end test. You should probably write integrations tests for your own driver.

Kubernetes Deployment

So far, I have only explained some of the concepts and how they would apply to Kubernetes. However, once you write a CSI driver, theoretically if the CO adheres to the specification, it should work exactly the same everywhere. That’s why there is no single word of Kubernetes related concepts in the CSI driver. For example you don’t have the ability to use any kind of Kubernetes resources via the K8s API, such as Pod, StatefulSet, Persistent Volumes, Persistent Volume Claim labels or annotations (in case you want to use them inside the driver). Those are unknown to the driver. (hint: there is some work being done to allow passing metadata down to drivers, but this work has not been formalized yet)

Nevertheless. there are a few things you need to know if you want to deploy a CSI driver to be used with Kubernetes.

First of all, there is already lengthy documentation on how you should deploy your CSI driver to Kubernetes. This is the recommended way of deploying the driver. There are many details here, but the most important piece is how the Controller and Node plugins are meant to work. Let us rephrase it again:

To deploy these plugins there are several options. Either you install them directly, via systemd, upstart, etc.. or you deploy them as Kubernetes primitives. If you’re already running a Kubernetes cluster, its best to deploy them as Kubernetes primitives (why not? :).

Suppose we want to deploy them as Kubernetes primitives, how would we achieve this? Since we know how the Node and Controller plugins work and interact with the systems, we can start designing this deployment.

  1. We could deploy the Node plugin as multiple Pods by using a nodeSelector, choosing a different node for each Pod. The Controller plugin could be deployed as a single Pod and placed by the scheduler to a random node. But this all is old school, pods are ephemeral and a very bad idea.
  2. We could deploy the Node plugin via Deployment and set the scale to the number of nodes. But this wouldn’t work because they might end up deployed to the same node. For the Controller plugin we could set the scale to 1. But this would also not work, because during a rolling update, there might be the possibility that two pods are running at the same time.
  3. What we could do is to deploy the Node plugin as a Daemonset. This ensures that a copy of a Pod (our plugin) runs on all nodes. This would satisfy the requirement for the Node plugin. For the Controller plugin, we could deploy it as a StatefulSet. Most people associate StatefulSet with a persistent storage. But it’s more powerful. A StatefulSet also comes with scaling guarantees. This means we could use the Controller plugin as a StatefulSet and set the replicas field to 1. This would give us a stable scaling, that means that if the pod dies or we do an update, it never creates a second pod before the first one is fully shutdown and terminated. This is very important, because this guarantees that only a single copy of the Controller plugin will run.

So, to recap:

Now, how would we deploy the plugins? In the beginning of this blog post, I mentioned that the interfaces could be separated into individual services (binaries) or unified into a single binary. Our options are:

  1. Create one, unified binary. A single binary that exposes all methods that satisfies a Node and Controller plugin.
  2. Create two binaries. One binary that satisfies the Node plugin and another one that satisfies Controller plugin driver.
  3. Create a single binary that only satisfies Node plugin. A Node-only Plugin component supplies only the Node Service. Its GetPluginCapabilities RPC does not report the CONTROLLER_SERVICE capability.

For a cloud provider, both the Node and Controller needs to be implemented. So option three is out of question. The question boils down to whether you want to have separate binaries or just a single binary. For the DigitalOcean CSI driver we opted to have a single binary. This makes it very easy to maintain and also to deploy (you specify the same docker image for both DaemonSet and StatefulSet).

If we would opt-in to have two different services (Node plugin and Controller plugin separated), it would create additional maintenance burden. This would mean we had to maintain two separate binaries, two different docker image, etc..

A single binary has the benefit that it doesn’t matter how you deploy things. Inside a Go main function, all you do is to register all your implementations within a single gRPC server:

// returns a struct instance that satisfies Node, Controller and 
// Identity interfaces
d := NewDriver()

srv = grpc.NewServer(grpc.UnaryInterceptor(errInterceptory))
csi.RegisterIdentityServer(srv, d)
csi.RegisterControllerServer(srv, d)
csi.RegisterNodeServer(srv, d)

listener, err := net.Listen("unix://", "/var/lib/kubelet/plugins/com.digitalocean.csi.dobs/csi.sock")
if err != nil {
	return fmt.Errorf("failed to listen: %v", err)
}

return srv.Serve(listener)

Once you have these and deploy them via unified binary, the CO will make sure to call the correct gRPC method. For example it’ll call NodePublishVolumeto a Node plugin if the times come, or it might call GetPluginCapabilities(via the Identifiy interface) and decide if the plugin supports the Controller interface, if yes it could call the CreateVolume method.


However, how does the CO (in our case Kubernetes) know which Pod is Node or Controller?

You might think that GetPluginCapabilities is a way of doing it. But this only indicates whether the plugin has Controller capabilities, it doesn’t signal whether it’s a Node or Controller plugin. It could be be both (which it is in our case because we use a single binary).

To solve this, Kubernetes has the following sidecar containers that does this for us:

What we do from here is, We deploy the Node plugin with the driver-registrar. This sidecar only registers the plugin. It doesn’t make any calls to the Node plugin interface methods. All calls to Node plugin are made via kubelet itself. For more details how this is done, checkout this section on the Kubelet to CSI Driver Communication protocol.

The Controller plugin will be deployed with the external-provisioner and external-attacher sidecars. This is more straightforward because the external-provisioner is responsible for creating/deleting the volumes and the external-attacher is responsible for attaching/detaching the volume. Both these are calling the appropriate CSI gRPC methods of the Controller plugin. They don’t know anything about which cloud provider is going to be used at all.

For an example Kubernetes YAML file that has all these concepts in a ready to be deployed form can be seen here: https://github.com/digitalocean/csi-digitalocean/blob/master/deploy/kubernetes/releases/csi-digitalocean-v0.1.1.yaml

Once you create a API Secret token:

apiVersion: v1
blogkind: Secret
metadata:
  name: digitalocean
  namespace: kube-system
stringData:
  access-token: "a05dd2f26b9b9ac2asdas__REPLACE_ME____123cb5d1ec17513e06da"

all you do is to install the plugin to you cluster via:

kubectl apply -f https://raw.githubusercontent.com/digitalocean/csi-digitalocean/master/deploy/kubernetes/releases/csi-digitalocean-v0.1.0.yaml

This will successfully install the csi-digitalocean plugin to your system. Remember, now that you have read this blog post, you know that the plugin is a system of separate components and plugins. You should be able to debug it and understand why we deploy the Node as a Daemonset and the Controller as a StatefulSet

Recap

To recap what we have done so far:

All these concepts are written in a very detailed way in the CSI specification and in the Kubernetes-CSI relationship proposal. I highly recommend to read and go over them. You should now find these documents much easier to digest, and understand how they come together.


As a final word, the CSI specification is fairly new. It’s an ongoing work and there are still things that need to be fleshed out. It makes things much easier compared to the old ways of provisioning volumes to Kubernetes itself. Because the drivers all live outside the Kubernetes repo, the Storage vendors have much more flexibility and power. Previously things could be only released with each Kubernetes version. With the CSI specification, drivers can be released at anytime and can be updated very easily.

For example, as of now, the CSI Spec was updated to have Snapshots as a feature. This was not part of my blog post, but you can see how the specification can advance independently on its own without relying on any CO’s release schedule, such as Kubernetes.

Thanks for reading it all the way down here. If you made it here, let me know about your thoughts and feedbacks!


If you have any questions or feedback, please feel free to share it with me on Twitter: @fatih