Skip to content
Snippets Groups Projects
Select Git revision
  • b96bd977335ae0452bffae972dba72490d9e80c7
  • master default protected
  • v0.0.x
  • v0.0.31
  • v0.0.30
  • v0.0.29
  • v0.0.28
  • v0.0.28-rc1
  • v0.0.27
  • v0.0.26
  • v0.0.25
  • v0.0.24
  • v0.0.23
  • v0.0.22
  • v0.0.21
  • v0.0.20
  • v0.0.19
  • v0.0.18
  • v0.0.17
  • v0.0.16
  • v0.0.15
  • v0.0.14
  • v0.0.13
23 results

provisioner.go

Blame
  • provisioner.go 23.83 KiB
    package main
    
    import (
    	"bytes"
    	"context"
    	"encoding/json"
    	"fmt"
    	"io"
    	"os"
    	"path/filepath"
    	"reflect"
    	"strconv"
    	"strings"
    	"sync"
    	"time"
    
    	"github.com/Sirupsen/logrus"
    	"github.com/pkg/errors"
    	v1 "k8s.io/api/core/v1"
    	k8serror "k8s.io/apimachinery/pkg/api/errors"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	clientset "k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/rest"
    
    	pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/v8/controller"
    )
    
    type ActionType string
    
    const (
    	ActionTypeCreate = "create"
    	ActionTypeDelete = "delete"
    )
    
    const (
    	KeyNode = "kubernetes.io/hostname"
    
    	NodeDefaultNonListedNodes = "DEFAULT_PATH_FOR_NON_LISTED_NODES"
    
    	helperScriptDir     = "/script"
    	helperDataVolName   = "data"
    	helperScriptVolName = "script"
    
    	envVolDir  = "VOL_DIR"
    	envVolMode = "VOL_MODE"
    	envVolSize = "VOL_SIZE_BYTES"
    )
    
    const (
    	defaultCmdTimeoutSeconds = 120
    	defaultVolumeType        = "hostPath"
    )
    
    const (
    	nodeNameAnnotationKey = "local.path.provisioner/selected-node"
    )
    
    var (
    	ConfigFileCheckInterval = 30 * time.Second
    
    	HelperPodNameMaxLength = 128
    )
    
    type LocalPathProvisioner struct {
    	ctx                context.Context
    	kubeClient         *clientset.Clientset
    	namespace          string
    	helperImage        string
    	serviceAccountName string
    
    	config        *Config
    	configData    *ConfigData
    	configFile    string
    	configMapName string
    	configMutex   *sync.RWMutex
    	helperPod     *v1.Pod
    }
    
    type NodePathMapData struct {
    	Node  string   `json:"node,omitempty"`
    	Paths []string `json:"paths,omitempty"`
    }
    
    type StorageClassConfigData struct {
    	NodePathMap          []*NodePathMapData `json:"nodePathMap,omitempty"`
    	SharedFileSystemPath string             `json:"sharedFileSystemPath,omitempty"`
    }
    
    type ConfigData struct {
    	CmdTimeoutSeconds int    `json:"cmdTimeoutSeconds,omitempty"`
    	SetupCommand      string `json:"setupCommand,omitempty"`
    	TeardownCommand   string `json:"teardownCommand,omitempty"`
    	StorageClassConfigData
    	StorageClassConfigs map[string]StorageClassConfigData `json:"storageClassConfigs"`
    }
    
    type StorageClassConfig struct {
    	NodePathMap          map[string]*NodePathMap
    	SharedFileSystemPath string
    }
    
    type NodePathMap struct {
    	Paths map[string]struct{}
    }
    
    type Config struct {
    	CmdTimeoutSeconds int
    	SetupCommand      string
    	TeardownCommand   string
    	StorageClassConfig
    	StorageClassConfigs map[string]StorageClassConfig
    }
    
    func NewProvisioner(ctx context.Context, kubeClient *clientset.Clientset,
    	configFile, namespace, helperImage, configMapName, serviceAccountName, helperPodYaml string) (*LocalPathProvisioner, error) {
    	p := &LocalPathProvisioner{
    		ctx: ctx,
    
    		kubeClient:         kubeClient,
    		namespace:          namespace,
    		helperImage:        helperImage,
    		serviceAccountName: serviceAccountName,
    
    		// config will be updated shortly by p.refreshConfig()
    		config:        nil,
    		configFile:    configFile,
    		configData:    nil,
    		configMapName: configMapName,
    		configMutex:   &sync.RWMutex{},
    	}
    	var err error
    	p.helperPod, err = loadHelperPodFile(helperPodYaml)
    	if err != nil {
    		return nil, err
    	}
    	if err := p.refreshConfig(); err != nil {
    		return nil, err
    	}
    	p.watchAndRefreshConfig()
    	return p, nil
    }
    
    func (p *LocalPathProvisioner) refreshConfig() error {
    	p.configMutex.Lock()
    	defer p.configMutex.Unlock()
    
    	configData, err := loadConfigFile(p.configFile)
    	if err != nil {
    		return err
    	}
    	// no need to update
    	if reflect.DeepEqual(configData, p.configData) {
    		return nil
    	}
    	config, err := canonicalizeConfig(configData)
    	if err != nil {
    		return err
    	}
    	// only update the config if the new config file is valid
    	p.configData = configData
    	p.config = config
    
    	output, err := json.Marshal(p.configData)
    	if err != nil {
    		return err
    	}
    	logrus.Debugf("Applied config: %v", string(output))
    
    	return err
    }
    
    func (p *LocalPathProvisioner) refreshHelperPod() error {
    	p.configMutex.Lock()
    	defer p.configMutex.Unlock()
    
    	helperPodFile, envExists := os.LookupEnv(EnvConfigMountPath)
    	if !envExists {
    		return nil
    	}
    
    	helperPodFile = filepath.Join(helperPodFile, DefaultHelperPodFile)
    	newHelperPod, err := loadFile(helperPodFile)
    	if err != nil {
    		return err
    	}
    
    	p.helperPod, err = loadHelperPodFile(newHelperPod)
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    func (p *LocalPathProvisioner) watchAndRefreshConfig() {
    	go func() {
    		ticker := time.NewTicker(ConfigFileCheckInterval)
    		defer ticker.Stop()
    		for {
    			select {
    			case <-ticker.C:
    				if err := p.refreshConfig(); err != nil {
    					logrus.Errorf("failed to load the new config file: %v", err)
    				}
    				if err := p.refreshHelperPod(); err != nil {
    					logrus.Errorf("failed to load the new helper pod manifest: %v", err)
    				}
    			case <-p.ctx.Done():
    				logrus.Infof("stop watching config file")
    				return
    			}
    		}
    	}()
    }
    
    func (p *LocalPathProvisioner) getPathOnNode(node string, requestedPath string, c *StorageClassConfig) (string, error) {
    	p.configMutex.RLock()
    	defer p.configMutex.RUnlock()
    
    	if p.config == nil {
    		return "", fmt.Errorf("no valid config available")
    	}
    
    	sharedFS, err := p.isSharedFilesystem(c)
    	if err != nil {
    		return "", err
    	}
    	if sharedFS {
    		// we are ignoring 'node' and returning shared FS path
    		return c.SharedFileSystemPath, nil
    	}
    	// we are working with local FS
    	npMap := c.NodePathMap[node]
    	if npMap == nil {
    		npMap = c.NodePathMap[NodeDefaultNonListedNodes]
    		if npMap == nil {
    			return "", fmt.Errorf("config doesn't contain node %v, and no %v available", node, NodeDefaultNonListedNodes)
    		}
    		logrus.Debugf("config doesn't contain node %v, use %v instead", node, NodeDefaultNonListedNodes)
    	}
    	paths := npMap.Paths
    	if len(paths) == 0 {
    		return "", fmt.Errorf("no local path available on node %v", node)
    	}
    	// if a particular path was requested by storage class
    	if requestedPath != "" {
    		if _, ok := paths[requestedPath]; !ok {
    			return "", fmt.Errorf("config doesn't contain path %v on node %v", requestedPath, node)
    		}
    		return requestedPath, nil
    	}
    	// if no particular path was requested, choose a random one
    	path := ""
    	for path = range paths {
    		break
    	}
    	return path, nil
    }
    
    func (p *LocalPathProvisioner) isSharedFilesystem(c *StorageClassConfig) (bool, error) {
    	p.configMutex.RLock()
    	defer p.configMutex.RUnlock()
    
    	if p.config == nil {
    		return false, fmt.Errorf("no valid config available")
    	}
    
    	if (c.SharedFileSystemPath != "") && (len(c.NodePathMap) != 0) {
    		return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are defined. Please make sure only one is in use")
    	}
    
    	if len(c.NodePathMap) != 0 {
    		return false, nil
    	}
    
    	if c.SharedFileSystemPath != "" {
    		return true, nil
    	}
    
    	return false, fmt.Errorf("both nodePathMap and sharedFileSystemPath are unconfigured")
    }
    
    func (p *LocalPathProvisioner) pickConfig(storageClassName string) (*StorageClassConfig, error) {
    	if len(p.config.StorageClassConfigs) == 0 {
    		return &p.config.StorageClassConfig, nil
    	}
    	cfg, ok := p.config.StorageClassConfigs[storageClassName]
    	if !ok {
    		return nil, fmt.Errorf("BUG: Got request for unexpected storage class %s", storageClassName)
    	}
    	return &cfg, nil
    }
    
    func (p *LocalPathProvisioner) Provision(ctx context.Context, opts pvController.ProvisionOptions) (*v1.PersistentVolume, pvController.ProvisioningState, error) {
    	cfg, err := p.pickConfig(opts.StorageClass.Name)
    	if err != nil {
    		return nil, pvController.ProvisioningFinished, err
    	}
    	return p.provisionFor(opts, cfg)
    }
    
    func (p *LocalPathProvisioner) provisionFor(opts pvController.ProvisionOptions, c *StorageClassConfig) (*v1.PersistentVolume, pvController.ProvisioningState, error) {
    	pvc := opts.PVC
    	node := opts.SelectedNode
    	storageClass := opts.StorageClass
    	sharedFS, err := p.isSharedFilesystem(c)
    	if err != nil {
    		return nil, pvController.ProvisioningFinished, err
    	}
    	if !sharedFS {
    		if pvc.Spec.Selector != nil {
    			return nil, pvController.ProvisioningFinished, fmt.Errorf("claim.Spec.Selector is not supported")
    		}
    		for _, accessMode := range pvc.Spec.AccessModes {
    			if accessMode != v1.ReadWriteOnce && accessMode != v1.ReadWriteOncePod {
    				return nil, pvController.ProvisioningFinished, fmt.Errorf("NodePath only supports ReadWriteOnce and ReadWriteOncePod (1.22+) access modes")
    			}
    		}
    		if node == nil {
    			return nil, pvController.ProvisioningFinished, fmt.Errorf("configuration error, no node was specified")
    		}
    	}
    
    	nodeName := ""
    	if node != nil {
    		// This clause works only with sharedFS
    		nodeName = node.Name
    	}
    	var requestedPath string
    	if storageClass.Parameters != nil {
    		if _, ok := storageClass.Parameters["nodePath"]; ok {
    			requestedPath = storageClass.Parameters["nodePath"]
    		}
    	}
    	basePath, err := p.getPathOnNode(nodeName, requestedPath, c)
    	if err != nil {
    		return nil, pvController.ProvisioningFinished, err
    	}
    
    	name := opts.PVName
    	folderName := strings.Join([]string{name, opts.PVC.Namespace, opts.PVC.Name}, "_")
    
    	path := filepath.Join(basePath, folderName)
    	if nodeName == "" {
    		logrus.Infof("Creating volume %v at %v", name, path)
    	} else {
    		logrus.Infof("Creating volume %v at %v:%v", name, nodeName, path)
    	}
    
    	storage := pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
    	provisionCmd := make([]string, 0, 2)
    	if p.config.SetupCommand == "" {
    		provisionCmd = append(provisionCmd, "/bin/sh", "/script/setup")
    	} else {
    		provisionCmd = append(provisionCmd, p.config.SetupCommand)
    	}
    	if err := p.createHelperPod(ActionTypeCreate, provisionCmd, volumeOptions{
    		Name:        name,
    		Path:        path,
    		Mode:        *pvc.Spec.VolumeMode,
    		SizeInBytes: storage.Value(),
    		Node:        nodeName,
    	}, c); err != nil {
    		return nil, pvController.ProvisioningFinished, err
    	}
    
    	fs := v1.PersistentVolumeFilesystem
    
    	var pvs v1.PersistentVolumeSource
    	var volumeType string
    	if dVal, ok := opts.StorageClass.GetAnnotations()["defaultVolumeType"]; ok {
    		volumeType = dVal
    	} else {
    		volumeType = defaultVolumeType
    	}
    	if val, ok := opts.PVC.GetAnnotations()["volumeType"]; ok {
    		volumeType = val
    	}
    	pvs, err = createPersistentVolumeSource(volumeType, path)
    	if err != nil {
    		return nil, pvController.ProvisioningFinished, err
    	}
    
    	var nodeAffinity *v1.VolumeNodeAffinity
    	if sharedFS {
    		// If the same filesystem is mounted across all nodes, we don't need
    		// affinity, as path is accessible from any node
    		nodeAffinity = nil
    	} else {
    		valueNode, ok := node.GetLabels()[KeyNode]
    		if !ok {
    			valueNode = nodeName
    		}
    		nodeAffinity = &v1.VolumeNodeAffinity{
    			Required: &v1.NodeSelector{
    				NodeSelectorTerms: []v1.NodeSelectorTerm{
    					{
    						MatchExpressions: []v1.NodeSelectorRequirement{
    							{
    								Key:      KeyNode,
    								Operator: v1.NodeSelectorOpIn,
    								Values: []string{
    									valueNode,
    								},
    							},
    						},
    					},
    				},
    			},
    		}
    	}
    	return &v1.PersistentVolume{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:        name,
    			Annotations: map[string]string{nodeNameAnnotationKey: nodeName},
    		},
    		Spec: v1.PersistentVolumeSpec{
    			PersistentVolumeReclaimPolicy: *opts.StorageClass.ReclaimPolicy,
    			AccessModes:                   pvc.Spec.AccessModes,
    			VolumeMode:                    &fs,
    			Capacity: v1.ResourceList{
    				v1.ResourceStorage: pvc.Spec.Resources.Requests[v1.ResourceStorage],
    			},
    			PersistentVolumeSource: pvs,
    			NodeAffinity:           nodeAffinity,
    		},
    	}, pvController.ProvisioningFinished, nil
    }
    
    func (p *LocalPathProvisioner) Delete(ctx context.Context, pv *v1.PersistentVolume) (err error) {
    	cfg, err := p.pickConfig(pv.Spec.StorageClassName)
    	if err != nil {
    		return err
    	}
    	return p.deleteFor(pv, cfg)
    }
    
    func (p *LocalPathProvisioner) deleteFor(pv *v1.PersistentVolume, c *StorageClassConfig) (err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
    	}()
    
    	path, node, err := p.getPathAndNodeForPV(pv, c)
    	if err != nil {
    		return err
    	}
    	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
    		if node == "" {
    			logrus.Infof("Deleting volume %v at %v", pv.Name, path)
    		} else {
    			logrus.Infof("Deleting volume %v at %v:%v", pv.Name, node, path)
    		}
    		storage := pv.Spec.Capacity[v1.ResourceName(v1.ResourceStorage)]
    		cleanupCmd := make([]string, 0, 2)
    		if p.config.TeardownCommand == "" {
    			cleanupCmd = append(cleanupCmd, "/bin/sh", "/script/teardown")
    		} else {
    			cleanupCmd = append(cleanupCmd, p.config.TeardownCommand)
    		}
    		if err := p.createHelperPod(ActionTypeDelete, cleanupCmd, volumeOptions{
    			Name:        pv.Name,
    			Path:        path,
    			Mode:        *pv.Spec.VolumeMode,
    			SizeInBytes: storage.Value(),
    			Node:        node,
    		}, c); err != nil {
    			logrus.Infof("clean up volume %v failed: %v", pv.Name, err)
    			return err
    		}
    		return nil
    	}
    	logrus.Infof("Retained volume %v", pv.Name)
    	return nil
    }
    
    func (p *LocalPathProvisioner) getPathAndNodeForPV(pv *v1.PersistentVolume, cfg *StorageClassConfig) (path, node string, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
    	}()
    
    	volumeSource := pv.Spec.PersistentVolumeSource
    	if volumeSource.HostPath != nil && volumeSource.Local == nil {
    		path = volumeSource.HostPath.Path
    	} else if volumeSource.Local != nil && volumeSource.HostPath == nil {
    		path = volumeSource.Local.Path
    	} else {
    		return "", "", fmt.Errorf("no path set")
    	}
    
    	sharedFS, err := p.isSharedFilesystem(cfg)
    	if err != nil {
    		return "", "", err
    	}
    
    	if sharedFS {
    		// We don't have affinity and can use any node
    		return path, "", nil
    	}
    
    	// Dealing with local filesystem
    
    	nodeAffinity := pv.Spec.NodeAffinity
    	if nodeAffinity == nil {
    		return "", "", fmt.Errorf("no NodeAffinity set")
    	}
    	required := nodeAffinity.Required
    	if required == nil {
    		return "", "", fmt.Errorf("no NodeAffinity.Required set")
    	}
    
    	node = ""
    	for _, selectorTerm := range required.NodeSelectorTerms {
    		for _, expression := range selectorTerm.MatchExpressions {
    			if expression.Key == KeyNode && expression.Operator == v1.NodeSelectorOpIn {
    				if len(expression.Values) != 1 {
    					return "", "", fmt.Errorf("multiple values for the node affinity")
    				}
    				node = expression.Values[0]
    				break
    			}
    		}
    		if node != "" {
    			break
    		}
    	}
    	if node == "" {
    		return "", "", fmt.Errorf("cannot find affinited node")
    	}
    	return path, node, nil
    }
    
    type volumeOptions struct {
    	Name        string
    	Path        string
    	Mode        v1.PersistentVolumeMode
    	SizeInBytes int64
    	Node        string
    }
    
    func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmd []string, o volumeOptions, cfg *StorageClassConfig) (err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to %v volume %v", action, o.Name)
    	}()
    	sharedFS, err := p.isSharedFilesystem(cfg)
    	if err != nil {
    		return err
    	}
    	if o.Name == "" || o.Path == "" || (!sharedFS && o.Node == "") {
    		return fmt.Errorf("invalid empty name or path or node")
    	}
    	if !filepath.IsAbs(o.Path) {
    		return fmt.Errorf("volume path %s is not absolute", o.Path)
    	}
    	o.Path = filepath.Clean(o.Path)
    	parentDir, volumeDir := filepath.Split(o.Path)
    	hostPathType := v1.HostPathDirectoryOrCreate
    	lpvVolumes := []v1.Volume{
    		{
    			Name: helperDataVolName,
    			VolumeSource: v1.VolumeSource{
    				HostPath: &v1.HostPathVolumeSource{
    					Path: parentDir,
    					Type: &hostPathType,
    				},
    			},
    		},
    	}
    	lpvTolerations := []v1.Toleration{
    		{
    			Operator: v1.TolerationOpExists,
    		},
    	}
    	helperPod := p.helperPod.DeepCopy()
    
    	keyToPathItems := make([]v1.KeyToPath, 0, 2)
    
    	if p.config.SetupCommand == "" {
    		keyToPathItems = append(keyToPathItems, v1.KeyToPath{
    			Key:  "setup",
    			Path: "setup",
    		})
    	}
    
    	if p.config.TeardownCommand == "" {
    		keyToPathItems = append(keyToPathItems, v1.KeyToPath{
    			Key:  "teardown",
    			Path: "teardown",
    		})
    	}
    
    	if len(keyToPathItems) > 0 {
    		lpvVolumes = append(lpvVolumes, v1.Volume{
    			Name: "script",
    			VolumeSource: v1.VolumeSource{
    				ConfigMap: &v1.ConfigMapVolumeSource{
    					LocalObjectReference: v1.LocalObjectReference{
    						Name: p.configMapName,
    					},
    					Items: keyToPathItems,
    				},
    			},
    		})
    
    		scriptMount := addVolumeMount(&helperPod.Spec.Containers[0].VolumeMounts, helperScriptVolName, helperScriptDir)
    		scriptMount.MountPath = helperScriptDir
    	}
    
    	dataMount := addVolumeMount(&helperPod.Spec.Containers[0].VolumeMounts, helperDataVolName, parentDir)
    	parentDir = dataMount.MountPath
    	parentDir = strings.TrimSuffix(parentDir, string(filepath.Separator))
    	volumeDir = strings.TrimSuffix(volumeDir, string(filepath.Separator))
    	if parentDir == "" || volumeDir == "" || !filepath.IsAbs(parentDir) {
    		// it covers the `/` case
    		return fmt.Errorf("invalid path %v for %v: cannot find parent dir or volume dir or parent dir is relative", action, o.Path)
    	}
    	env := []v1.EnvVar{
    		{Name: envVolDir, Value: filepath.Join(parentDir, volumeDir)},
    		{Name: envVolMode, Value: string(o.Mode)},
    		{Name: envVolSize, Value: strconv.FormatInt(o.SizeInBytes, 10)},
    	}
    
    	// use different name for helper pods
    	// https://github.com/rancher/local-path-provisioner/issues/154
    	helperPod.Name = (helperPod.Name + "-" + string(action) + "-" + o.Name)
    	if len(helperPod.Name) > HelperPodNameMaxLength {
    		helperPod.Name = helperPod.Name[:HelperPodNameMaxLength]
    	}
    	helperPod.Namespace = p.namespace
    	if o.Node != "" {
    		helperPod.Spec.NodeName = o.Node
    	}
    	helperPod.Spec.ServiceAccountName = p.serviceAccountName
    	helperPod.Spec.RestartPolicy = v1.RestartPolicyNever
    	helperPod.Spec.Tolerations = append(helperPod.Spec.Tolerations, lpvTolerations...)
    	helperPod.Spec.Volumes = append(helperPod.Spec.Volumes, lpvVolumes...)
    	helperPod.Spec.Containers[0].Command = cmd
    	helperPod.Spec.Containers[0].Env = append(helperPod.Spec.Containers[0].Env, env...)
    	helperPod.Spec.Containers[0].Args = []string{"-p", filepath.Join(parentDir, volumeDir),
    		"-s", strconv.FormatInt(o.SizeInBytes, 10),
    		"-m", string(o.Mode),
    		"-a", string(action)}
    
    	// If it already exists due to some previous errors, the pod will be cleaned up later automatically
    	// https://github.com/rancher/local-path-provisioner/issues/27
    	logrus.Infof("create the helper pod %s into %s", helperPod.Name, p.namespace)
    	pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Create(context.TODO(), helperPod, metav1.CreateOptions{})
    	if err != nil && !k8serror.IsAlreadyExists(err) {
    		return err
    	}
    
    	defer func() {
    		// log helper pod logs to the controller
    		if err := saveHelperPodLogs(pod); err != nil {
    			logrus.Error(err.Error())
    		}
    		e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(context.TODO(), helperPod.Name, metav1.DeleteOptions{})
    		if e != nil {
    			logrus.Errorf("unable to delete the helper pod: %v", e)
    		}
    	}()
    
    	completed := false
    	for i := 0; i < p.config.CmdTimeoutSeconds; i++ {
    		if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(context.TODO(), helperPod.Name, metav1.GetOptions{}); err != nil {
    			return err
    		} else if pod.Status.Phase == v1.PodSucceeded {
    			completed = true
    			break
    		}
    		time.Sleep(1 * time.Second)
    	}
    	if !completed {
    		return fmt.Errorf("create process timeout after %v seconds", p.config.CmdTimeoutSeconds)
    	}
    
    	if o.Node == "" {
    		logrus.Infof("Volume %v has been %vd on %v", o.Name, action, o.Path)
    	} else {
    		logrus.Infof("Volume %v has been %vd on %v:%v", o.Name, action, o.Node, o.Path)
    	}
    	return nil
    }
    
    func addVolumeMount(mounts *[]v1.VolumeMount, name, mountPath string) *v1.VolumeMount {
    	for i, m := range *mounts {
    		if m.Name == name {
    			if m.MountPath == "" {
    				(*mounts)[i].MountPath = mountPath
    			}
    			return &(*mounts)[i]
    		}
    	}
    	*mounts = append(*mounts, v1.VolumeMount{Name: name, MountPath: mountPath})
    	return &(*mounts)[len(*mounts)-1]
    }
    
    func isJSONFile(configFile string) bool {
    	return strings.HasSuffix(configFile, ".json")
    }
    
    func unmarshalFromString(configFile string) (*ConfigData, error) {
    	var data ConfigData
    	if err := json.Unmarshal([]byte(configFile), &data); err != nil {
    		return nil, err
    	}
    	return &data, nil
    }
    
    func loadConfigFile(configFile string) (cfgData *ConfigData, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "fail to load config file %v", configFile)
    	}()
    
    	if !isJSONFile(configFile) {
    		return unmarshalFromString(configFile)
    	}
    
    	f, err := os.Open(configFile)
    	if err != nil {
    		return nil, err
    	}
    	defer f.Close()
    
    	var data ConfigData
    	if err := json.NewDecoder(f).Decode(&data); err != nil {
    		return nil, err
    	}
    	return &data, nil
    }
    
    func canonicalizeConfig(data *ConfigData) (cfg *Config, err error) {
    	cfg = &Config{}
    	if len(data.StorageClassConfigs) == 0 {
    		defaultConfig, err := canonicalizeStorageClassConfig(&data.StorageClassConfigData)
    		if err != nil {
    			return nil, err
    		}
    		cfg.StorageClassConfig = *defaultConfig
    	} else {
    		cfg.StorageClassConfigs = make(map[string]StorageClassConfig, len(data.StorageClassConfigs))
    		for name, classData := range data.StorageClassConfigs {
    			classCfg, err := canonicalizeStorageClassConfig(&classData)
    			if err != nil {
    				return nil, errors.Wrap(err, fmt.Sprintf("config for class %s is invalid", name))
    			}
    			cfg.StorageClassConfigs[name] = *classCfg
    		}
    	}
    	cfg.SetupCommand = data.SetupCommand
    	cfg.TeardownCommand = data.TeardownCommand
    	if data.CmdTimeoutSeconds > 0 {
    		cfg.CmdTimeoutSeconds = data.CmdTimeoutSeconds
    	} else {
    		cfg.CmdTimeoutSeconds = defaultCmdTimeoutSeconds
    	}
    	return cfg, nil
    }
    
    func canonicalizeStorageClassConfig(data *StorageClassConfigData) (cfg *StorageClassConfig, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "StorageClass config canonicalization failed")
    	}()
    	cfg = &StorageClassConfig{}
    	cfg.SharedFileSystemPath = data.SharedFileSystemPath
    	cfg.NodePathMap = map[string]*NodePathMap{}
    	for _, n := range data.NodePathMap {
    		if cfg.NodePathMap[n.Node] != nil {
    			return nil, fmt.Errorf("duplicate node %v", n.Node)
    		}
    		npMap := &NodePathMap{Paths: map[string]struct{}{}}
    		cfg.NodePathMap[n.Node] = npMap
    		for _, p := range n.Paths {
    			if p[0] != '/' {
    				return nil, fmt.Errorf("path must start with / for path %v on node %v", p, n.Node)
    			}
    			path, err := filepath.Abs(p)
    			if err != nil {
    				return nil, err
    			}
    			if path == "/" {
    				return nil, fmt.Errorf("cannot use root ('/') as path on node %v", n.Node)
    			}
    			if _, ok := npMap.Paths[path]; ok {
    				return nil, fmt.Errorf("duplicate path %v on node %v", p, n.Node)
    			}
    			npMap.Paths[path] = struct{}{}
    		}
    	}
    
    	return cfg, nil
    }
    
    func createPersistentVolumeSource(volumeType string, path string) (pvs v1.PersistentVolumeSource, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to create persistent volume source")
    	}()
    
    	switch strings.ToLower(volumeType) {
    	case "local":
    		pvs = v1.PersistentVolumeSource{
    			Local: &v1.LocalVolumeSource{
    				Path: path,
    			},
    		}
    	case "hostpath":
    		hostPathType := v1.HostPathDirectoryOrCreate
    		pvs = v1.PersistentVolumeSource{
    			HostPath: &v1.HostPathVolumeSource{
    				Path: path,
    				Type: &hostPathType,
    			},
    		}
    	default:
    		return pvs, fmt.Errorf("\"%s\" is not a recognised volume type", volumeType)
    	}
    
    	return pvs, nil
    }
    
    // saveHelperPodLogs takes what is in stdout/stderr from the helper
    // pod and logs it to the provisioner's logs. Returns an error if we
    // can't retrieve the helper pod logs.
    func saveHelperPodLogs(pod *v1.Pod) (err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to save %s logs", pod.Name)
    	}()
    
    	// save helper pod logs
    	podLogOpts := v1.PodLogOptions{
    		Container: "helper-pod",
    	}
    	config, err := rest.InClusterConfig()
    	if err != nil {
    		return fmt.Errorf("unable to retrieve in cluster config: %s", err.Error())
    	}
    	// creates the clientset
    	clientset, err := clientset.NewForConfig(config)
    	if err != nil {
    		return fmt.Errorf("unable to get access to k8s: %s", err.Error())
    	}
    	req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts)
    	podLogs, err := req.Stream(context.TODO())
    	if err != nil {
    		return fmt.Errorf("error in opening stream: %s", err.Error())
    	}
    
    	buf := new(bytes.Buffer)
    	_, err = io.Copy(buf, podLogs)
    	if err != nil {
    		return fmt.Errorf("error in copying information from podLogs to buf: %s", err.Error())
    	}
    	podLogs.Close()
    
    	// log all messages from the helper pod to the controller
    	logrus.Infof("Start of %s logs", pod.Name)
    	bufferStr := buf.String()
    	if len(bufferStr) > 0 {
    		helperPodLogs := strings.Split(strings.Trim(bufferStr, "\n"), "\n")
    		for _, log := range helperPodLogs {
    			logrus.Info(log)
    		}
    	}
    	logrus.Infof("End of %s logs", pod.Name)
    	return nil
    }