Skip to content
Snippets Groups Projects
Select Git revision
  • df53c25a38ffa7122ab8a3f34f1af7eec420b721
  • master default protected
  • v0.0.x
  • v0.0.31
  • v0.0.30
  • v0.0.29
  • v0.0.28
  • v0.0.28-rc1
  • v0.0.27
  • v0.0.26
  • v0.0.25
  • v0.0.24
  • v0.0.23
  • v0.0.22
  • v0.0.21
  • v0.0.20
  • v0.0.19
  • v0.0.18
  • v0.0.17
  • v0.0.16
  • v0.0.15
  • v0.0.14
  • v0.0.13
23 results

provisioner.go

Blame
  • provisioner.go 12.08 KiB
    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"os"
    	"path/filepath"
    	"reflect"
    	"strings"
    	"sync"
    	"time"
    
    	"github.com/Sirupsen/logrus"
    	"github.com/pkg/errors"
    	v1 "k8s.io/api/core/v1"
    	k8serror "k8s.io/apimachinery/pkg/api/errors"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	clientset "k8s.io/client-go/kubernetes"
    	pvController "sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
    )
    
    type ActionType string
    
    const (
    	ActionTypeCreate = "create"
    	ActionTypeDelete = "delete"
    )
    
    const (
    	KeyNode = "kubernetes.io/hostname"
    
    	NodeDefaultNonListedNodes = "DEFAULT_PATH_FOR_NON_LISTED_NODES"
    )
    
    var (
    	CmdTimeoutCounts = 120
    
    	ConfigFileCheckInterval = 30 * time.Second
    )
    
    type LocalPathProvisioner struct {
    	stopCh      chan struct{}
    	kubeClient  *clientset.Clientset
    	namespace   string
    	helperImage string
    
    	config      *Config
    	configData  *ConfigData
    	configFile  string
    	configMutex *sync.RWMutex
    }
    
    type NodePathMapData struct {
    	Node  string   `json:"node,omitempty"`
    	Paths []string `json:"paths,omitempty"`
    }
    
    type ConfigData struct {
    	NodePathMap []*NodePathMapData `json:"nodePathMap,omitempty"`
    }
    
    type NodePathMap struct {
    	Paths map[string]struct{}
    }
    
    type Config struct {
    	NodePathMap map[string]*NodePathMap
    }
    
    func NewProvisioner(stopCh chan struct{}, kubeClient *clientset.Clientset, configFile, namespace, helperImage string) (*LocalPathProvisioner, error) {
    	p := &LocalPathProvisioner{
    		stopCh: stopCh,
    
    		kubeClient:  kubeClient,
    		namespace:   namespace,
    		helperImage: helperImage,
    
    		// config will be updated shortly by p.refreshConfig()
    		config:      nil,
    		configFile:  configFile,
    		configData:  nil,
    		configMutex: &sync.RWMutex{},
    	}
    	if err := p.refreshConfig(); err != nil {
    		return nil, err
    	}
    	p.watchAndRefreshConfig()
    	return p, nil
    }
    
    func (p *LocalPathProvisioner) refreshConfig() error {
    	p.configMutex.Lock()
    	defer p.configMutex.Unlock()
    
    	configData, err := loadConfigFile(p.configFile)
    	if err != nil {
    		return err
    	}
    	// no need to update
    	if reflect.DeepEqual(configData, p.configData) {
    		return nil
    	}
    	config, err := canonicalizeConfig(configData)
    	if err != nil {
    		return err
    	}
    	// only update the config if the new config file is valid
    	p.configData = configData
    	p.config = config
    
    	output, err := json.Marshal(p.configData)
    	if err != nil {
    		return err
    	}
    	logrus.Debugf("Applied config: %v", string(output))
    
    	return err
    }
    
    func (p *LocalPathProvisioner) watchAndRefreshConfig() {
    	go func() {
    		ticker := time.NewTicker(ConfigFileCheckInterval)
    		defer ticker.Stop()
    		for {
    			select {
    			case <-ticker.C:
    				if err := p.refreshConfig(); err != nil {
    					logrus.Errorf("failed to load the new config file: %v", err)
    				}
    			case <-p.stopCh:
    				logrus.Infof("stop watching config file")
    				return
    			}
    		}
    	}()
    }
    
    func (p *LocalPathProvisioner) getRandomPathOnNode(node string) (string, error) {
    	p.configMutex.RLock()
    	defer p.configMutex.RUnlock()
    
    	if p.config == nil {
    		return "", fmt.Errorf("no valid config available")
    	}
    
    	c := p.config
    	npMap := c.NodePathMap[node]
    	if npMap == nil {
    		npMap = c.NodePathMap[NodeDefaultNonListedNodes]
    		if npMap == nil {
    			return "", fmt.Errorf("config doesn't contain node %v, and no %v available", node, NodeDefaultNonListedNodes)
    		}
    		logrus.Debugf("config doesn't contain node %v, use %v instead", node, NodeDefaultNonListedNodes)
    	}
    	paths := npMap.Paths
    	if len(paths) == 0 {
    		return "", fmt.Errorf("no local path available on node %v", node)
    	}
    	path := ""
    	for path = range paths {
    		break
    	}
    	return path, nil
    }
    
    func (p *LocalPathProvisioner) Provision(opts pvController.ProvisionOptions) (*v1.PersistentVolume, error) {
    	pvc := opts.PVC
    	if pvc.Spec.Selector != nil {
    		return nil, fmt.Errorf("claim.Spec.Selector is not supported")
    	}
    	for _, accessMode := range pvc.Spec.AccessModes {
    		if accessMode != v1.ReadWriteOnce {
    			return nil, fmt.Errorf("Only support ReadWriteOnce access mode")
    		}
    	}
    	node := opts.SelectedNode
    	if opts.SelectedNode == nil {
    		return nil, fmt.Errorf("configuration error, no node was specified")
    	}
    
    	basePath, err := p.getRandomPathOnNode(node.Name)
    	if err != nil {
    		return nil, err
    	}
    
    	name := opts.PVName
    	folderName := strings.Join([]string{name, opts.PVC.Namespace, opts.PVC.Name}, "_")
    
    	path := filepath.Join(basePath, folderName)
    	logrus.Infof("Creating volume %v at %v:%v", name, node.Name, path)
    
    	createCmdsForPath := []string{
    		"/bin/sh",
    		"/script/setup",
    	}
    	if err := p.createHelperPod(ActionTypeCreate, createCmdsForPath, name, path, node.Name); err != nil {
    		return nil, err
    	}
    
    	fs := v1.PersistentVolumeFilesystem
    	hostPathType := v1.HostPathDirectoryOrCreate
    	return &v1.PersistentVolume{
    		ObjectMeta: metav1.ObjectMeta{
    			Name: name,
    		},
    		Spec: v1.PersistentVolumeSpec{
    			PersistentVolumeReclaimPolicy: *opts.StorageClass.ReclaimPolicy,
    			AccessModes:                   pvc.Spec.AccessModes,
    			VolumeMode:                    &fs,
    			Capacity: v1.ResourceList{
    				v1.ResourceName(v1.ResourceStorage): pvc.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)],
    			},
    			PersistentVolumeSource: v1.PersistentVolumeSource{
    				HostPath: &v1.HostPathVolumeSource{
    					Path: path,
    					Type: &hostPathType,
    				},
    			},
    			NodeAffinity: &v1.VolumeNodeAffinity{
    				Required: &v1.NodeSelector{
    					NodeSelectorTerms: []v1.NodeSelectorTerm{
    						{
    							MatchExpressions: []v1.NodeSelectorRequirement{
    								{
    									Key:      KeyNode,
    									Operator: v1.NodeSelectorOpIn,
    									Values: []string{
    										node.Name,
    									},
    								},
    							},
    						},
    					},
    				},
    			},
    		},
    	}, nil
    }
    
    func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) (err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
    	}()
    	path, node, err := p.getPathAndNodeForPV(pv)
    	if err != nil {
    		return err
    	}
    	if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
    		logrus.Infof("Deleting volume %v at %v:%v", pv.Name, node, path)
    		cleanupCmdsForPath := []string{"/bin/sh", "/script/teardown"}
    		if err := p.createHelperPod(ActionTypeDelete, cleanupCmdsForPath, pv.Name, path, node); err != nil {
    			logrus.Infof("clean up volume %v failed: %v", pv.Name, err)
    			return err
    		}
    		return nil
    	}
    	logrus.Infof("Retained volume %v", pv.Name)
    	return nil
    }
    
    func (p *LocalPathProvisioner) getPathAndNodeForPV(pv *v1.PersistentVolume) (path, node string, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
    	}()
    
    	hostPath := pv.Spec.PersistentVolumeSource.HostPath
    	if hostPath == nil {
    		return "", "", fmt.Errorf("no HostPath set")
    	}
    	path = hostPath.Path
    
    	nodeAffinity := pv.Spec.NodeAffinity
    	if nodeAffinity == nil {
    		return "", "", fmt.Errorf("no NodeAffinity set")
    	}
    	required := nodeAffinity.Required
    	if required == nil {
    		return "", "", fmt.Errorf("no NodeAffinity.Required set")
    	}
    
    	node = ""
    	for _, selectorTerm := range required.NodeSelectorTerms {
    		for _, expression := range selectorTerm.MatchExpressions {
    			if expression.Key == KeyNode && expression.Operator == v1.NodeSelectorOpIn {
    				if len(expression.Values) != 1 {
    					return "", "", fmt.Errorf("multiple values for the node affinity")
    				}
    				node = expression.Values[0]
    				break
    			}
    		}
    		if node != "" {
    			break
    		}
    	}
    	if node == "" {
    		return "", "", fmt.Errorf("cannot find affinited node")
    	}
    	return path, node, nil
    }
    
    func (p *LocalPathProvisioner) createHelperPod(action ActionType, cmdsForPath []string, name, path, node string) (err error) {
    	defer func() {
    		err = errors.Wrapf(err, "failed to %v volume %v", action, name)
    	}()
    	if name == "" || path == "" || node == "" {
    		return fmt.Errorf("invalid empty name or path or node")
    	}
    	path, err = filepath.Abs(path)
    	if err != nil {
    		return err
    	}
    	path = strings.TrimSuffix(path, "/")
    	parentDir, volumeDir := filepath.Split(path)
    	parentDir = strings.TrimSuffix(parentDir, "/")
    	volumeDir = strings.TrimSuffix(volumeDir, "/")
    	if parentDir == "" || volumeDir == "" {
    		// it covers the `/` case
    		return fmt.Errorf("invalid path %v for %v: cannot find parent dir or volume dir", action, path)
    	}
    
    	hostPathType := v1.HostPathDirectoryOrCreate
    	helperPod := &v1.Pod{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      string(action) + "-" + name,
    			Namespace: p.namespace,
    		},
    		Spec: v1.PodSpec{
    			RestartPolicy: v1.RestartPolicyNever,
    			NodeName:      node,
    			Tolerations: []v1.Toleration{
    				{
    					Operator: v1.TolerationOpExists,
    				},
    			},
    			Containers: []v1.Container{
    				{
    					Name:    "local-path-" + string(action),
    					Image:   p.helperImage,
    					Command: append(cmdsForPath, filepath.Join("/data/", volumeDir)),
    					VolumeMounts: []v1.VolumeMount{
    						{
    							Name:      "data",
    							ReadOnly:  false,
    							MountPath: "/data/",
    						},
    						{
    							Name:      "script",
    							ReadOnly:  false,
    							MountPath: "/script",
    						},
    					},
    					ImagePullPolicy: v1.PullIfNotPresent,
    				},
    			},
    			Volumes: []v1.Volume{
    				{
    					Name: "data",
    					VolumeSource: v1.VolumeSource{
    						HostPath: &v1.HostPathVolumeSource{
    							Path: parentDir,
    							Type: &hostPathType,
    						},
    					},
    				},
    				{
    					Name: "script",
    					VolumeSource: v1.VolumeSource{
    						ConfigMap: &v1.ConfigMapVolumeSource{
    							LocalObjectReference: v1.LocalObjectReference{
    								Name: "local-path-config",
    							},
    							Items: []v1.KeyToPath{
    								{
    									Key:  "setup",
    									Path: "setup",
    								},
    								{
    									Key:  "teardown",
    									Path: "teardown",
    								},
    							},
    						},
    					},
    				},
    			},
    		},
    	}
    
    	// If it already exists due to some previous errors, the pod will be cleaned up later automatically
    	// https://github.com/rancher/local-path-provisioner/issues/27
    	logrus.Infof("create the helper pod %s into %s", helperPod.Name, p.namespace)
    	_, err = p.kubeClient.CoreV1().Pods(p.namespace).Create(helperPod)
    	if err != nil && !k8serror.IsAlreadyExists(err) {
    		return err
    	}
    
    	defer func() {
    		e := p.kubeClient.CoreV1().Pods(p.namespace).Delete(helperPod.Name, &metav1.DeleteOptions{})
    		if e != nil {
    			logrus.Errorf("unable to delete the helper pod: %v", e)
    		}
    	}()
    
    	completed := false
    	for i := 0; i < CmdTimeoutCounts; i++ {
    		if pod, err := p.kubeClient.CoreV1().Pods(p.namespace).Get(helperPod.Name, metav1.GetOptions{}); err != nil {
    			return err
    		} else if pod.Status.Phase == v1.PodSucceeded {
    			completed = true
    			break
    		}
    		time.Sleep(1 * time.Second)
    	}
    	if !completed {
    		return fmt.Errorf("create process timeout after %v seconds", CmdTimeoutCounts)
    	}
    
    	logrus.Infof("Volume %v has been %vd on %v:%v", name, action, node, path)
    	return nil
    }
    
    func isJSONFile(configFile string) bool {
    	return strings.HasSuffix(configFile, ".json")
    }
    
    func unmarshalFromString(configFile string) (*ConfigData, error) {
    	var data ConfigData
    	if err := json.Unmarshal([]byte(configFile), &data); err != nil {
    		return nil, err
    	}
    	return &data, nil
    }
    
    func loadConfigFile(configFile string) (cfgData *ConfigData, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "fail to load config file %v", configFile)
    	}()
    
    	if !isJSONFile(configFile) {
    		return unmarshalFromString(configFile)
    	}
    
    	f, err := os.Open(configFile)
    	if err != nil {
    		return nil, err
    	}
    	defer f.Close()
    
    	var data ConfigData
    	if err := json.NewDecoder(f).Decode(&data); err != nil {
    		return nil, err
    	}
    	return &data, nil
    }
    
    func canonicalizeConfig(data *ConfigData) (cfg *Config, err error) {
    	defer func() {
    		err = errors.Wrapf(err, "config canonicalization failed")
    	}()
    	cfg = &Config{}
    	cfg.NodePathMap = map[string]*NodePathMap{}
    	for _, n := range data.NodePathMap {
    		if cfg.NodePathMap[n.Node] != nil {
    			return nil, fmt.Errorf("duplicate node %v", n.Node)
    		}
    		npMap := &NodePathMap{Paths: map[string]struct{}{}}
    		cfg.NodePathMap[n.Node] = npMap
    		for _, p := range n.Paths {
    			if p[0] != '/' {
    				return nil, fmt.Errorf("path must start with / for path %v on node %v", p, n.Node)
    			}
    			path, err := filepath.Abs(p)
    			if err != nil {
    				return nil, err
    			}
    			if path == "/" {
    				return nil, fmt.Errorf("cannot use root ('/') as path on node %v", n.Node)
    			}
    			if _, ok := npMap.Paths[path]; ok {
    				return nil, fmt.Errorf("duplicate path %v on node %v", p, n.Node)
    			}
    			npMap.Paths[path] = struct{}{}
    		}
    	}
    	return cfg, nil
    }