diff --git a/main.go b/main.go index 0a54636a440b4c80969f8cf99f447674904d993a..7c0bcfbd197d0e13baac1501dc38a79ce0267a63 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -15,6 +16,8 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller" ) @@ -55,13 +58,13 @@ func onUsageError(c *cli.Context, err error, isSubcommand bool) error { panic(fmt.Errorf("Usage error, please check your command")) } -func RegisterShutdownChannel(done chan struct{}) { +func RegisterShutdownChannel(cancelFn context.CancelFunc) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs - logrus.Infof("Receive %v to exit", sig) - close(done) + klog.Infof("Receive %v to exit", sig) + cancelFn() }() } @@ -168,7 +171,7 @@ func loadConfig(kubeconfig string) (*rest.Config, error) { } func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, configMapName, key string) (string, error) { - cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(configMapName, metav1.GetOptions{}) + cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{}) if err != nil { return "", err } @@ -180,8 +183,8 @@ func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, conf } func startDaemon(c *cli.Context) error { - stopCh := make(chan struct{}) - RegisterShutdownChannel(stopCh) + ctx, cancelFn := context.WithCancel(context.TODO()) + RegisterShutdownChannel(cancelFn) config, err := loadConfig(c.String(FlagKubeconfig)) if err != nil { @@ -258,7 +261,7 @@ func startDaemon(c *cli.Context) error { return fmt.Errorf("invalid zero or negative integer flag %v", FlagWorkerThreads) } - provisioner, err := NewProvisioner(stopCh, kubeClient, configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml) + provisioner, err := NewProvisioner(ctx, kubeClient, configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml) if err != nil { return err } @@ -273,7 +276,7 @@ func startDaemon(c *cli.Context) error { pvController.Threadiness(workerThreads), ) logrus.Debug("Provisioner started") - pc.Run(stopCh) + pc.Run(ctx) logrus.Debug("Provisioner stopped") return nil } diff --git a/provisioner.go b/provisioner.go index 6f87efc680b68985da3e8481ca75a2179c58b752..7c05013d8fee72ab9fdfca44b48cce2cf75a669c 100644 --- a/provisioner.go +++ b/provisioner.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "os" @@ -17,6 +18,7 @@ import ( k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller" ) @@ -52,7 +54,7 @@ var ( ) type LocalPathProvisioner struct { - stopCh chan struct{} + ctx context.Context kubeClient *clientset.Clientset namespace string helperImage string @@ -87,10 +89,10 @@ type Config struct { SharedFileSystemPath string } -func NewProvisioner(stopCh chan struct{}, kubeClient *clientset.Clientset, +func NewProvisioner(ctx context.Context, kubeClient *clientset.Clientset, configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml string) (*LocalPathProvisioner, error) { p := &LocalPathProvisioner{ - stopCh: stopCh, + ctx: ctx, kubeClient: kubeClient, namespace: namespace, @@ -155,7 +157,7 @@ func (p *LocalPathProvisioner) watchAndRefreshConfig() { if err := p.refreshConfig(); err != nil { logrus.Errorf("failed to load the new config file: %v", err) } - case <-p.stopCh: + case <-p.ctx.Done(): logrus.Infof("stop watching config file") return } @@ -224,24 +226,24 @@ func (p *LocalPathProvisioner) isSharedFilesystem() (bool, error) { return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are unconfigured") } -func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v1.PersistentVolume, error) { +func (p *LocalPathProvisioner) Provision(ctx context.Context, opts pvController.ProvisionOptions) (*v1.PersistentVolume, pvController.ProvisioningState, error) { pvc := opts.PVC node := opts.SelectedNode sharedFS, err := p.isSharedFilesystem() if err != nil { - return nil, err + return nil, pvController.ProvisioningFinished, err } if !sharedFS { if pvc.Spec.Selector != nil { - return nil, fmt.Errorf("claim.Spec.Selector is not supported") + return nil, pvController.ProvisioningFinished, fmt.Errorf("claim.Spec.Selector is not supported") } for _, accessMode := range pvc.Spec.AccessModes { if accessMode != v1.ReadWriteOnce { - return nil, fmt.Errorf("Only support ReadWriteOnce access mode") + return nil, pvController.ProvisioningFinished, fmt.Errorf("Only support ReadWriteOnce access mode") } } if node == nil { - return nil, fmt.Errorf("configuration error, no node was specified") + return nil, pvController.ProvisioningFinished, fmt.Errorf("configuration error, no node was specified") } } @@ -252,7 +254,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v } basePath, err := p.getRandomPathOnNode(nodeName) if err != nil { - return nil, err + return nil, pvController.ProvisioningFinished, err } name := opts.PVName @@ -274,7 +276,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v SizeInBytes: storage.Value(), Node: nodeName, }); err != nil { - return nil, err + return nil, pvController.ProvisioningFinished, err } fs := v1.PersistentVolumeFilesystem @@ -338,10 +340,10 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v PersistentVolumeSource: pvs, NodeAffinity: nodeAffinity, }, - }, nil + }, pvController.ProvisioningFinished, nil } -func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) (err error) { +func (p *LocalPathProvisioner) Delete(ctx context.Context, pv *v1.PersistentVolume) (err error) { defer func() { err = errors.Wrapf(err, "failed to delete volume %v", pv.Name) }() @@ -535,13 +537,13 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string, // If it already exists due to some previous errors, the pod will be cleaned up later automatically // https://github.com/rancher/local-path-provisioner/issues/27 logrus.Infof("create the helper pod %s into %s", helperPod.Name, p.namespace) - _, err = p.kubeClient.CoreV1().Pods(p.namespace).Create(helperPod) + _, err = p.kubeClient.CoreV1().Pods(p.namespace).Create(context.TODO(), helperPod, metav1.CreateOptions{}) if err != nil && !k8serror.IsAlreadyExists(err) { return err } defer func() { - e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(helperPod.Name, &metav1.DeleteOptions{}) + e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(context.TODO(), helperPod.Name, metav1.DeleteOptions{}) if e != nil { logrus.Errorf("unable to delete the helper pod: %v", e) } @@ -549,7 +551,7 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string, completed := false for i := 0; i < p.config.CmdTimeoutSeconds; i++ { - if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(helperPod.Name, metav1.GetOptions{}); err != nil { + if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(context.TODO(), helperPod.Name, metav1.GetOptions{}); err != nil { return err } else if pod.Status.Phase == v1.PodSucceeded { completed = true