Skip to content
Snippets Groups Projects
Commit 0291c4eb authored by Sheng Yang's avatar Sheng Yang
Browse files

Add config file watcher

Refresh every 5 seconds by default
parent fffb6f93
No related branches found
No related tags found
No related merge requests found
......@@ -107,7 +107,7 @@ func startDaemon(c *cli.Context) error {
return fmt.Errorf("invalid empty flag %v", FlagNamespace)
}
provisioner, err := NewProvisioner(kubeClient, configFile, namespace)
provisioner, err := NewProvisioner(stopCh, kubeClient, configFile, namespace)
if err != nil {
return err
}
......
......@@ -5,7 +5,9 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
......@@ -25,12 +27,19 @@ const (
var (
CleanupTimeoutCounts = 120
ConfigFileCheckInterval = 5 * time.Second
)
type LocalPathProvisioner struct {
stopCh chan struct{}
kubeClient *clientset.Clientset
configFile string
namespace string
config *Config
configData *ConfigData
configFile string
configMutex *sync.RWMutex
}
type NodePathMapData struct {
......@@ -50,7 +59,80 @@ type Config struct {
NodePathMap map[string]*NodePathMap
}
func (c *Config) getRandomPathOnNode(node string) (string, error) {
func NewProvisioner(stopCh chan struct{}, kubeClient *clientset.Clientset, configFile, namespace string) (*LocalPathProvisioner, error) {
p := &LocalPathProvisioner{
stopCh: stopCh,
kubeClient: kubeClient,
namespace: namespace,
// config will be updated shortly by p.refreshConfig()
config: nil,
configFile: configFile,
configData: nil,
configMutex: &sync.RWMutex{},
}
if err := p.refreshConfig(); err != nil {
return nil, err
}
p.watchAndRefreshConfig()
return p, nil
}
func (p *LocalPathProvisioner) refreshConfig() error {
p.configMutex.Lock()
defer p.configMutex.Unlock()
configData, err := loadConfigFile(p.configFile)
if err != nil {
return err
}
// no need to update
if reflect.DeepEqual(configData, p.configData) {
return nil
}
config, err := canonicalizeConfig(configData)
if err != nil {
return err
}
// only update the config if the new config file is valid
p.configData = configData
p.config = config
output, err := json.Marshal(p.configData)
if err != nil {
return err
}
logrus.Debugf("Applied config: %v", string(output))
return err
}
func (p *LocalPathProvisioner) watchAndRefreshConfig() {
go func() {
for {
select {
case <-time.Tick(ConfigFileCheckInterval):
if err := p.refreshConfig(); err != nil {
logrus.Errorf("failed to load the new config file: %v", err)
}
case <-p.stopCh:
logrus.Infof("stop watching config file")
return
}
}
}()
}
func (p *LocalPathProvisioner) getRandomPathOnNode(node string) (string, error) {
p.configMutex.RLock()
defer p.configMutex.RUnlock()
if p.config == nil {
return "", fmt.Errorf("no valid config available")
}
c := p.config
npMap := c.NodePathMap[node]
if npMap == nil {
npMap = c.NodePathMap[NodeDefaultNonListedNodes]
......@@ -70,17 +152,6 @@ func (c *Config) getRandomPathOnNode(node string) (string, error) {
return path, nil
}
func NewProvisioner(kubeClient *clientset.Clientset, configFile, namespace string) (*LocalPathProvisioner, error) {
if _, err := getConfig(configFile); err != nil {
return nil, errors.Wrapf(err, "invalidate config file %v", configFile)
}
return &LocalPathProvisioner{
kubeClient: kubeClient,
configFile: configFile,
namespace: namespace,
}, nil
}
func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.PersistentVolume, error) {
pvc := opts.PVC
if pvc.Spec.Selector != nil {
......@@ -96,12 +167,7 @@ func (p *LocalPathProvisioner) Provision(opts pvController.VolumeOptions) (*v1.P
return nil, fmt.Errorf("configuration error, no node was specified")
}
cfg, err := getConfig(p.configFile)
if err != nil {
return nil, err
}
basePath, err := cfg.getRandomPathOnNode(node.Name)
basePath, err := p.getRandomPathOnNode(node.Name)
if err != nil {
return nil, err
}
......@@ -297,7 +363,7 @@ func (p *LocalPathProvisioner) cleanupVolume(name, path, node string) (err error
return nil
}
func getConfig(configFile string) (cfg *Config, err error) {
func loadConfigFile(configFile string) (cfgData *ConfigData, err error) {
defer func() {
err = errors.Wrapf(err, "fail to load config file %v", configFile)
}()
......@@ -311,11 +377,7 @@ func getConfig(configFile string) (cfg *Config, err error) {
if err := json.NewDecoder(f).Decode(&data); err != nil {
return nil, err
}
cfg, err = canonicalizeConfig(&data)
if err != nil {
return nil, err
}
return cfg, nil
return &data, nil
}
func canonicalizeConfig(data *ConfigData) (cfg *Config, err error) {
......@@ -338,6 +400,9 @@ func canonicalizeConfig(data *ConfigData) (cfg *Config, err error) {
if err != nil {
return nil, err
}
if path == "/" {
return nil, fmt.Errorf("cannot use root ('/') as path on node %v", n.Node)
}
if _, ok := npMap.Paths[path]; ok {
return nil, fmt.Errorf("duplicate path %v on node %v", p, n.Node)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment