Skip to content
Snippets Groups Projects
Commit 451ce50e authored by Sheng Yang's avatar Sheng Yang
Browse files

Add cleanup logic for PV deletion

parent 7806f346
No related branches found
No related tags found
No related merge requests found
...@@ -14,7 +14,7 @@ rules: ...@@ -14,7 +14,7 @@ rules:
resources: ["nodes", "persistentvolumeclaims"] resources: ["nodes", "persistentvolumeclaims"]
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: [""] - apiGroups: [""]
resources: ["endpoints", "persistentvolumes"] resources: ["endpoints", "persistentvolumes", "pods"]
verbs: ["*"] verbs: ["*"]
- apiGroups: [""] - apiGroups: [""]
resources: ["events"] resources: ["events"]
...@@ -58,5 +58,6 @@ spec: ...@@ -58,5 +58,6 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
command: command:
- local-path-provisioner - local-path-provisioner
- --debug
- start - start
serviceAccountName: local-path-provisioner-service-account serviceAccountName: local-path-provisioner-service-account
...@@ -84,7 +84,7 @@ func startDaemon(c *cli.Context) error { ...@@ -84,7 +84,7 @@ func startDaemon(c *cli.Context) error {
if provisionerName == "" { if provisionerName == "" {
return fmt.Errorf("invalid empty provisioner name") return fmt.Errorf("invalid empty provisioner name")
} }
provisioner := NewProvisioner() provisioner := NewProvisioner(kubeClient)
pc := pvController.NewProvisionController( pc := pvController.NewProvisionController(
kubeClient, kubeClient,
provisionerName, provisionerName,
......
...@@ -3,23 +3,34 @@ package main ...@@ -3,23 +3,34 @@ package main
import ( import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/pkg/errors"
pvController "github.com/kubernetes-incubator/external-storage/lib/controller" pvController "github.com/kubernetes-incubator/external-storage/lib/controller"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
)
const (
KeyNode = "kubernetes.io/hostname"
) )
var ( var (
DefaultPath = "/opt" DefaultPath = "/opt"
CleanupCounts = 120
) )
type LocalPathProvisioner struct { type LocalPathProvisioner struct {
kubeClient *clientset.Clientset
} }
func NewProvisioner() *LocalPathProvisioner { func NewProvisioner(kubeClient *clientset.Clientset) *LocalPathProvisioner {
return &LocalPathProvisioner{} return &LocalPathProvisioner{
kubeClient: kubeClient,
}
} }
func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.PersistentVolume, error) { func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.PersistentVolume, error) {
...@@ -66,7 +77,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.P ...@@ -66,7 +77,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.P
{ {
MatchExpressions: []v1.NodeSelectorRequirement{ MatchExpressions: []v1.NodeSelectorRequirement{
{ {
Key: "kubernetes.io/hostname", Key: KeyNode,
Operator: v1.NodeSelectorOpIn, Operator: v1.NodeSelectorOpIn,
Values: []string{ Values: []string{
node.Name, node.Name,
...@@ -81,6 +92,140 @@ func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.P ...@@ -81,6 +92,140 @@ func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.P
}, nil }, nil
} }
func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) error { func (p *LocalPathProvisioner) Delete(pv *v1.PersistentVolume) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
}()
path, node, err := p.getPathAndNodeForPV(pv)
if err != nil {
return err
}
if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
if err := p.cleanupVolume(pv.Name, path, node); err != nil {
logrus.Infof("clean up volume %v failed: %v", pv.Name, err)
return err
}
return nil
}
logrus.Infof("retained volume %v", pv.Name)
return nil
}
func (p *LocalPathProvisioner) getPathAndNodeForPV(pv *v1.PersistentVolume) (path, node string, err error) {
defer func() {
err = errors.Wrapf(err, "failed to delete volume %v", pv.Name)
}()
hostPath := pv.Spec.PersistentVolumeSource.HostPath
if hostPath == nil {
return "", "", fmt.Errorf("no HostPath set")
}
path = hostPath.Path
nodeAffinity := pv.Spec.NodeAffinity
if nodeAffinity == nil {
return "", "", fmt.Errorf("no NodeAffinity set")
}
required := nodeAffinity.Required
if required == nil {
return "", "", fmt.Errorf("no NodeAffinity.Required set")
}
node = ""
for _, selectorTerm := range required.NodeSelectorTerms {
for _, expression := range selectorTerm.MatchExpressions {
if expression.Key == KeyNode && expression.Operator == v1.NodeSelectorOpIn {
if len(expression.Values) != 1 {
return "", "", fmt.Errorf("multiple values for the node affinity")
}
node = expression.Values[0]
break
}
}
if node != "" {
break
}
}
if node == "" {
return "", "", fmt.Errorf("cannot find affinited node")
}
return path, node, nil
}
func (p *LocalPathProvisioner) cleanupVolume(name, path, node string) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to cleanup volume %v", name)
}()
if name == "" || path == "" || node == "" {
return fmt.Errorf("invalid empty name or path or node")
}
//TODO match up with node prefixes, make sure it's one of them (and not `/`)
if filepath.Clean(path) == "/" {
return fmt.Errorf("not sure why you want to DESTROY the root directory")
}
hostPathType := v1.HostPathDirectoryOrCreate
cleanupPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "cleanup-" + name,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "local-path-cleanup",
Image: "busybox",
Command: []string{"rm"},
Args: []string{"-rf", "/data-to-cleanup/*"},
VolumeMounts: []v1.VolumeMount{
{
Name: "data-to-cleanup",
ReadOnly: false,
MountPath: "/data-to-cleanup/",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "data-to-cleanup",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: path,
Type: &hostPathType,
},
},
},
},
},
}
namespace := "default"
pod, err := p.kubeClient.CoreV1().Pods(namespace).Create(cleanupPod)
if err != nil {
return err
}
defer func() {
e := p.kubeClient.CoreV1().Pods(namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if e != nil {
logrus.Errorf("unable to delete the cleanup pod: %v", e)
}
}()
completed := false
for i := 0; i < CleanupCounts; i++ {
if pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(pod.Name, metav1.GetOptions{}); err != nil {
return err
} else if pod.Status.Phase == v1.PodSucceeded {
completed = true
break
}
time.Sleep(1 * time.Second)
}
if !completed {
return fmt.Errorf("cleanup process timeout after %v seconds", CleanupCounts)
}
logrus.Infof("Volume %v has been cleaned up", name)
return nil return nil
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment