Skip to content
Snippets Groups Projects
Commit 940b2389 authored by Derek Su's avatar Derek Su
Browse files

Replace stop channel with context


Signed-off-by: default avatarDerek Su <derek.su@suse.com>
parent 2d35cf82
No related branches found
No related tags found
No related merge requests found
package main package main
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
...@@ -15,6 +16,8 @@ import ( ...@@ -15,6 +16,8 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller" pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
) )
...@@ -55,13 +58,13 @@ func onUsageError(c *cli.Context, err error, isSubcommand bool) error { ...@@ -55,13 +58,13 @@ func onUsageError(c *cli.Context, err error, isSubcommand bool) error {
panic(fmt.Errorf("Usage error, please check your command")) panic(fmt.Errorf("Usage error, please check your command"))
} }
func RegisterShutdownChannel(done chan struct{}) { func RegisterShutdownChannel(cancelFn context.CancelFunc) {
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() { go func() {
sig := <-sigs sig := <-sigs
logrus.Infof("Receive %v to exit", sig) klog.Infof("Receive %v to exit", sig)
close(done) cancelFn()
}() }()
} }
...@@ -168,7 +171,7 @@ func loadConfig(kubeconfig string) (*rest.Config, error) { ...@@ -168,7 +171,7 @@ func loadConfig(kubeconfig string) (*rest.Config, error) {
} }
func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, configMapName, key string) (string, error) { func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, configMapName, key string) (string, error) {
cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(configMapName, metav1.GetOptions{}) cm, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -180,8 +183,8 @@ func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, conf ...@@ -180,8 +183,8 @@ func findConfigFileFromConfigMap(kubeClient clientset.Interface, namespace, conf
} }
func startDaemon(c *cli.Context) error { func startDaemon(c *cli.Context) error {
stopCh := make(chan struct{}) ctx, cancelFn := context.WithCancel(context.TODO())
RegisterShutdownChannel(stopCh) RegisterShutdownChannel(cancelFn)
config, err := loadConfig(c.String(FlagKubeconfig)) config, err := loadConfig(c.String(FlagKubeconfig))
if err != nil { if err != nil {
...@@ -258,7 +261,7 @@ func startDaemon(c *cli.Context) error { ...@@ -258,7 +261,7 @@ func startDaemon(c *cli.Context) error {
return fmt.Errorf("invalid zero or negative integer flag %v", FlagWorkerThreads) return fmt.Errorf("invalid zero or negative integer flag %v", FlagWorkerThreads)
} }
provisioner, err := NewProvisioner(stopCh, kubeClient, configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml) provisioner, err := NewProvisioner(ctx, kubeClient, configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml)
if err != nil { if err != nil {
return err return err
} }
...@@ -273,7 +276,7 @@ func startDaemon(c *cli.Context) error { ...@@ -273,7 +276,7 @@ func startDaemon(c *cli.Context) error {
pvController.Threadiness(workerThreads), pvController.Threadiness(workerThreads),
) )
logrus.Debug("Provisioner started") logrus.Debug("Provisioner started")
pc.Run(stopCh) pc.Run(ctx)
logrus.Debug("Provisioner stopped") logrus.Debug("Provisioner stopped")
return nil return nil
} }
......
package main package main
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
...@@ -17,6 +18,7 @@ import ( ...@@ -17,6 +18,7 @@ import (
k8serror "k8s.io/apimachinery/pkg/api/errors" k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller" pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
) )
...@@ -52,7 +54,7 @@ var ( ...@@ -52,7 +54,7 @@ var (
) )
type LocalPathProvisioner struct { type LocalPathProvisioner struct {
stopCh chan struct{} ctx context.Context
kubeClient *clientset.Clientset kubeClient *clientset.Clientset
namespace string namespace string
helperImage string helperImage string
...@@ -87,10 +89,10 @@ type Config struct { ...@@ -87,10 +89,10 @@ type Config struct {
SharedFileSystemPath string SharedFileSystemPath string
} }
func NewProvisioner(stopCh chan struct{}, kubeClient *clientset.Clientset, func NewProvisioner(ctx context.Context, kubeClient *clientset.Clientset,
configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml string) (*LocalPathProvisioner, error) { configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml string) (*LocalPathProvisioner, error) {
p := &LocalPathProvisioner{ p := &LocalPathProvisioner{
stopCh: stopCh, ctx: ctx,
kubeClient: kubeClient, kubeClient: kubeClient,
namespace: namespace, namespace: namespace,
...@@ -155,7 +157,7 @@ func (p *LocalPathProvisioner) watchAndRefreshConfig() { ...@@ -155,7 +157,7 @@ func (p *LocalPathProvisioner) watchAndRefreshConfig() {
if err := p.refreshConfig(); err != nil { if err := p.refreshConfig(); err != nil {
logrus.Errorf("failed to load the new config file: %v", err) logrus.Errorf("failed to load the new config file: %v", err)
} }
case <-p.stopCh: case <-p.ctx.Done():
logrus.Infof("stop watching config file") logrus.Infof("stop watching config file")
return return
} }
...@@ -224,24 +226,24 @@ func (p *LocalPathProvisioner) isSharedFilesystem() (bool, error) { ...@@ -224,24 +226,24 @@ func (p *LocalPathProvisioner) isSharedFilesystem() (bool, error) {
return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are unconfigured") return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are unconfigured")
} }
func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v1.PersistentVolume, error) { func (p *LocalPathProvisioner) Provision(ctx context.Context, opts pvController.ProvisionOptions) (*v1.PersistentVolume, pvController.ProvisioningState, error) {
pvc := opts.PVC pvc := opts.PVC
node := opts.SelectedNode node := opts.SelectedNode
sharedFS, err := p.isSharedFilesystem() sharedFS, err := p.isSharedFilesystem()
if err != nil { if err != nil {
return nil, err return nil, pvController.ProvisioningFinished, err
} }
if !sharedFS { if !sharedFS {
if pvc.Spec.Selector != nil { if pvc.Spec.Selector != nil {
return nil, fmt.Errorf("claim.Spec.Selector is not supported") return nil, pvController.ProvisioningFinished, fmt.Errorf("claim.Spec.Selector is not supported")
} }
for _, accessMode := range pvc.Spec.AccessModes { for _, accessMode := range pvc.Spec.AccessModes {
if accessMode != v1.ReadWriteOnce { if accessMode != v1.ReadWriteOnce {
return nil, fmt.Errorf("Only support ReadWriteOnce access mode") return nil, pvController.ProvisioningFinished, fmt.Errorf("Only support ReadWriteOnce access mode")
} }
} }
if node == nil { if node == nil {
return nil, fmt.Errorf("configuration error, no node was specified") return nil, pvController.ProvisioningFinished, fmt.Errorf("configuration error, no node was specified")
} }
} }
...@@ -252,7 +254,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v ...@@ -252,7 +254,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
} }
basePath, err := p.getRandomPathOnNode(nodeName) basePath, err := p.getRandomPathOnNode(nodeName)
if err != nil { if err != nil {
return nil, err return nil, pvController.ProvisioningFinished, err
} }
name := opts.PVName name := opts.PVName
...@@ -274,7 +276,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v ...@@ -274,7 +276,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
SizeInBytes: storage.Value(), SizeInBytes: storage.Value(),
Node: nodeName, Node: nodeName,
}); err != nil { }); err != nil {
return nil, err return nil, pvController.ProvisioningFinished, err
} }
fs := v1.PersistentVolumeFilesystem fs := v1.PersistentVolumeFilesystem
...@@ -338,10 +340,10 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v ...@@ -338,10 +340,10 @@ func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v
PersistentVolumeSource: pvs, PersistentVolumeSource: pvs,
NodeAffinity: nodeAffinity, NodeAffinity: nodeAffinity,
}, },
}, nil }, pvController.ProvisioningFinished, nil
} }
func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) (err error) { func (p *LocalPathProvisioner) Delete(ctx context.Context, pv *v1.PersistentVolume) (err error) {
defer func() { defer func() {
err = errors.Wrapf(err, "failed to delete volume %v", pv.Name) err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
}() }()
...@@ -535,13 +537,13 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string, ...@@ -535,13 +537,13 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string,
// If it already exists due to some previous errors, the pod will be cleaned up later automatically // If it already exists due to some previous errors, the pod will be cleaned up later automatically
// https://github.com/rancher/local-path-provisioner/issues/27 // https://github.com/rancher/local-path-provisioner/issues/27
logrus.Infof("create the helper pod %s into %s", helperPod.Name, p.namespace) logrus.Infof("create the helper pod %s into %s", helperPod.Name, p.namespace)
_, err = p.kubeClient.CoreV1().Pods(p.namespace).Create(helperPod) _, err = p.kubeClient.CoreV1().Pods(p.namespace).Create(context.TODO(), helperPod, metav1.CreateOptions{})
if err != nil && !k8serror.IsAlreadyExists(err) { if err != nil && !k8serror.IsAlreadyExists(err) {
return err return err
} }
defer func() { defer func() {
e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(helperPod.Name, &metav1.DeleteOptions{}) e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(context.TODO(), helperPod.Name, metav1.DeleteOptions{})
if e != nil { if e != nil {
logrus.Errorf("unable to delete the helper pod: %v", e) logrus.Errorf("unable to delete the helper pod: %v", e)
} }
...@@ -549,7 +551,7 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string, ...@@ -549,7 +551,7 @@ func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string,
completed := false completed := false
for i := 0; i < p.config.CmdTimeoutSeconds; i++ { for i := 0; i < p.config.CmdTimeoutSeconds; i++ {
if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(helperPod.Name, metav1.GetOptions{}); err != nil { if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(context.TODO(), helperPod.Name, metav1.GetOptions{}); err != nil {
return err return err
} else if pod.Status.Phase == v1.PodSucceeded { } else if pod.Status.Phase == v1.PodSucceeded {
completed = true completed = true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment