博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kubernetes的rolling update机制解析
阅读量:5899 次
发布时间:2019-06-19

本文共 8374 字,大约阅读时间需要 27 分钟。

hot3.png

commit: d577db99873cbf04b8e17b78f17ec8f3a27eca30 Date: Fri Apr 10 23:45:36 2015 -0700

##0.命令行和依赖的基础知识

Synopsis

Perform a rolling update of the given ReplicationController.Replaces the specified controller with new controller, updating one pod at a time to use thenew PodTemplate. The new-controller.json must specify the same namespace as theexisting controller and overwrite at least one (common) label in its replicaSelector.kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC

Examples

// Update pods of frontend-v1 using new controller data in frontend-v2.json.$ kubectl rolling-update frontend-v1 -f frontend-v2.json// Update pods of frontend-v1 using JSON data passed into stdin.$ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -

ReplicationController,简称rc,是kubernet体系中某一种类型pod的集合,rc有一个关键参数叫做replicas,也是就是pod的数量。

那么rc有什么用呢?这是为了解决在集群上一堆pod中有些如果挂了,那么就在别的宿主机上把容器启动起来,并让业务流量导入到正确启动的pod上。也就是说,rc保证了集群服务的可用性,当你有很多个服务启动在一个集群中,你需要用程序去监控这些服务的运行状况,并动态保证服务可用。

rc和pod的对应关系是怎么样的?rc通过selector来选择一些pod作为他的控制范围。只要pod的标签(label)符合seletor,则属于这个rc,下面是pod和rc的示例。

xx-controller.json

"spec":{      "replicas":1,      "selector":{         "name":"redis",         "role":"master"      },

xx-pod.json

"labels": {    "name": "redis"  },

kubernetes被我们简称为k8s,如果对其中的基础概念有兴趣可以看

##1.kubctl入口

/cmd/kubectl/kubctl.go

func main() {	runtime.GOMAXPROCS(runtime.NumCPU())	cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)	if err := cmd.Execute(); err != nil {		os.Exit(1)	}}

##2.实际调用

源代码在pkg包内,/pkg/kubectl/cmd/cmd.go,每个子命令都实现统一的接口,rollingupdate这行是:

cmds.AddCommand(NewCmdRollingUpdate(f, out))

这个函数的实现在:/pkg/kubectl/cmd/rollingupdate.go

func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command {	cmd := &cobra.Command{		Use: "rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC",		// rollingupdate is deprecated.		Aliases: []string{"rollingupdate"},		Short:   "Perform a rolling update of the given ReplicationController.",		Long:    rollingUpdate_long,		Example: rollingUpdate_example,		Run: func(cmd *cobra.Command, args []string) {			err := RunRollingUpdate(f, out, cmd, args)			cmdutil.CheckErr(err)		},	}}

可以看到实际调用时的执行函数是RunRollingUpdate,算是进入正题了

func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error {...	mapper, typer := f.Object()	// TODO: use resource.Builder instead	obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).		NamespaceParam(cmdNamespace).RequireNamespace().		FilenameParam(filename).		Do().		Object()	if err != nil {		return err	}	newRc, ok := obj.(*api.ReplicationController)	if !ok {		return cmdutil.UsageError(cmd, "%s does not specify a valid ReplicationController", filename)	}

这是建立一个新的rc的代码,其中resource是kubneter所有资源(pod,service,rc)的基类。可以看到新的rc从json参数文件中获取所有信息,然后转义为ReplicationController这个类。

if oldName == newName {		return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s",			filename, oldName)	}	var hasLabel bool	for key, oldValue := range oldRc.Spec.Selector {		if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {			hasLabel = true			break		}	}	if !hasLabel {		return cmdutil.UsageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s",			filename, oldName)	}

这里可以看到,对于新的rc和旧的rc,有2项限制,一个是新旧名字需要不同,另一个是rc的selector中需要至少有一项的值不一样。

updater := kubectl.NewRollingUpdater(newRc.Namespace, client)	// fetch rc	oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName)	if err != nil {		return err	}...	err = updater.Update(out, oldRc, newRc, period, interval, timeout)	if err != nil {		return err	}

在做rolling update的时候,有两个条件限制,一个是新的rc的名字需要和旧的不一样,第二是至少有个一个标签的值不一样。其中namespace是k8s用来做多租户资源隔离的,可以先忽略不计。

##3. 数据结构和实现

这段代码出现了NewRollingUpdater,是在上一层的/pkg/kubectl/rollingupdate.go这个文件中,更加接近主体了

// RollingUpdater provides methods for updating replicated pods in a predictable,// fault-tolerant way.type RollingUpdater struct {	// Client interface for creating and updating controllers	c client.Interface	// Namespace for resources	ns string}

可以看到这里的RollingUpdater里面是一个k8s的client的结构来向api server发送命令

func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {	oldName := oldRc.ObjectMeta.Name	newName := newRc.ObjectMeta.Name	retry := &RetryParams{interval, timeout}	waitForReplicas := &RetryParams{interval, timeout}	if newRc.Spec.Replicas <= 0 {		return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)	}	desired := newRc.Spec.Replicas	sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)	// look for existing newRc, incase this update was previously started but interrupted	rc, existing, err := r.getExistingNewRc(sourceId, newName)	if existing {		fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)		if err != nil {			return err		}		replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]		desired, err = strconv.Atoi(replicas)		if err != nil {			return fmt.Errorf("Unable to parse annotation for %s: %s=%s",				newName, desiredReplicasAnnotation, replicas)		}		newRc = rc	} else {		fmt.Fprintf(out, "Creating %s\n", newName)		if newRc.ObjectMeta.Annotations == nil {			newRc.ObjectMeta.Annotations = map[string]string{}		}		newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)		newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId		newRc.Spec.Replicas = 0		newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)		if err != nil {			return err		}	}	// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas	for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {		newRc.Spec.Replicas += 1		oldRc.Spec.Replicas -= 1		fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n",			oldName, oldRc.Spec.Replicas,			newName, newRc.Spec.Replicas)		fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",			oldName, oldRc.Spec.Replicas,			newName, newRc.Spec.Replicas)		newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)		if err != nil {			return err		}		time.Sleep(updatePeriod)		oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)		if err != nil {			return err		}		fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n",			oldName, oldRc.Spec.Replicas,			newName, newRc.Spec.Replicas)	}	// delete remaining replicas on oldRc	if oldRc.Spec.Replicas != 0 {		fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",			oldName, oldRc.Spec.Replicas, 0)		oldRc.Spec.Replicas = 0		oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)		// oldRc, err = r.resizeAndWait(oldRc, interval, timeout)		if err != nil {			return err		}	}	// add remaining replicas on newRc	if newRc.Spec.Replicas != desired {		fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",			newName, newRc.Spec.Replicas, desired)		newRc.Spec.Replicas = desired		newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)		if err != nil {			return err		}	}	// Clean up annotations	if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil {		return err	}	delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)	delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)	newRc, err = r.updateAndWait(newRc, interval, timeout)	if err != nil {		return err	}	// delete old rc	fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)	return r.c.ReplicationControllers(r.ns).Delete(oldName)}

这段代码很长,但做的事情很简单:

  1. 如果新的rc没有被创建,就先创一下,如果已经创建了(在上次的rolling_update中创建了但超时了)
  2. 用几个循环,把新的rc的replicas增加上去,旧的rc的replicas降低下来,主要调用的函数是resizeAndWait和updateAndWait

##4. 底层调用

接上一节的resizeAndWait,代码在/pkg/kubectl/resize.go,这里的具体代码就不贴了 其余的所有调用都发生/pkg/client这个目录下,这是一个http/json的client,主要功能就是向api-server发送请求 整体来说,上面的wait的实现都是比较土的,就是发一个update请求过去,后面轮询的调用get来检测状态是否符合最终需要的状态。

##5. 总结

先说一下这三个时间参数的作用:

update-period:新rc增加一个pod后,等待这个period,然后从旧rc缩减一个pod poll-interval:这个函数名来源于linux上的poll调用,就是每过一个poll-interval,向服务端发起请求,直到这个请求成功或者报失败 timeout:总操作的超时时间

rolling update主要是客户端这边实现的,分析完了,但还是有一些未知的问题,例如:

  1. api-server, cadvisor, kubelet, proxy, etcd这些服务端组件是怎么交互的?怎么保证在服务一直可用的情况下增减pod?
  2. 是否有可能在pod增减的时候插入自己的一些代码或者过程?因为我们目前的架构中没有使用k8s的proxy,需要自己去调用负载均衡的系统给这些pod导流量
  3. 对于具体的pod,我们怎么去做内部程序的健康检查?在业务不可用的情况下向k8s系统发送消息,干掉这个pod,在别的机器上创建新的来替代。

转载于:https://my.oschina.net/HardySimpson/blog/403908

你可能感兴趣的文章
提升CRM实施成功率
查看>>
雅虎开源了TensorFlowOnSpark
查看>>
ERP实施应立足于两点
查看>>
网络安全保险在欧洲更受欢迎
查看>>
三星未及时提供系统更新 荷兰消协把它告上法庭
查看>>
如何处理IT事件管理以避免混乱
查看>>
投资半导体产业不能只想赚快钱
查看>>
物联网确保消费者隐私安全 才能起飞
查看>>
iPhone升級iOS 10变砖 可用iTunes恢复
查看>>
揭秘使用免费WiFi的真实代价
查看>>
思科:网络可见化仍然是安全的数字化转型改造的关键
查看>>
CloudCC CRM梳理CRM软件已经实现的发展
查看>>
《交互式程序设计 第2版》一2.3.2 数组
查看>>
移动互联网金融app 存在信息安全问题
查看>>
Android 开发中使用 SQLite 数据库
查看>>
Android后门GhostCtrl,完美控制设备任意权限并窃取用户数据
查看>>
IBM郭继军:机器学习配合行业经验将帮助企业成就未来
查看>>
Rambus9000万美元收购Inphi存储器互联业务
查看>>
3GPP一反常态提前制定NB-IoT标准有何深意?
查看>>
泉州电信推进渠道互联网化转型
查看>>