开发Kubernetes自定义APIServer

SuKai April 4, 2022

前面文章介绍了DaoCloud开源的Clusterpedia的业务流程和资源同步机制,Clusterpedia将多个Kubernetes集群的指定资源对象同步保存到MySQL数据库并对外提供查询和检索。今天我们一起看一下Clusterpedia如何开发一个Kubernetes自定义的APIServer,将保存在数据库中的多个Kubernetes业务集群的资源对外提供查询和检索的。

本文内容分以下部分:

1,Kubernetes APIServer基本知识

2,自定义APIServer开发

Kubernetes APIServer基本知识

Kubernetes APIServer是一个实现了RESTful API的WebServer。

API类型

core group,/api/v1为前缀的API接口PATH,core group不需要在apiVersion字段中指定,示例:apiVersion: v1。

named groups,REST path为/apis/$GROUP_NAME/$VERSION,指定apiVersion: $GROUP_NAME/$VERSION,示例apiVersion: batch/v1。

暴露系统状态的API,比如/metrics、/healthz等。

扩展机制

为了增加Kubernetes API的扩展性,Kubernetes提供了两种机制:1,APIExtensions,创建自定义资源CRD,处理CRD/CR的REST请求。2,Aggretgator聚合机制,注册APIService资源,APIServer将对应的API GroupVersion请求代理转发到注册的Service上。

委托调用链

APIServer使用委托模式,通过DelegationTarget接口,把Aggretgator、API Server、APIExtensions链式串联起来,对外提供服务。

当请求Kubernetes对象时,如果在Aggregator中找不到,就去KubeAPIServer中找,最后到APIExtensions中找。

Clusterpedia通过这种委托模式,注册Aggregator服务,优先于所在Kubernetes集群的APIServer提供api和apis两个group的服务,来访问存储在数据库中的多个业务集群的Kubernetes资源对象,kubectl也可以直接执行命令进行下游集群的资源对象查询检索操作。

资源对象操作

在APIServer中,API请求资源对象时,先通过RESTStorage进行REST API处理,在RESTStorage中调用etcd store进行数据存储操作。

Clusterpedia中,与Kubernetes APIServer类似,先通过RESTStorage进行REST API处理,在RESTStorage中调用ResourceStorage进行数据库操作。

自定义APIServer开发

Clusterpedia APIServer基本流程

1,config-complete-new模式构造GenericAPIServer实例kubeResourceAPIServer

NewDefaultConfig初始化默认配置。BuildHandlerChain中WithRequestInfo处理API请求信息保存到context中,WithPanicRecovery处理崩溃日志并恢复,RemoveFieldSelectorFromRequest处理URL Query。

complete完善配置,wrapRequestInfoResolverForNamespace封装处理RequestInfo解析器。

New从空委托者构造GenericAPIServer。创建discoveryManager提供自动发现API处理,NonGoRestfulMux.Handle注册/api和/apis的自动发现路由。创建resourceHandler提供资源对象请求API处理,NonGoRestfulMux.HandlePrefix注册/api和/apis为前缀PATH的路由。NewClusterResourceController创建ClusterPedia控制器,监听ClusterPedia CRD事件,处理下游集群的资源,discoveryManager读取controller更新的资源信息提供自动发现API服务。

2,BuildHandlerChain

将WithRequestQuery添加到HandlerChain,WithRequestQuery保存URL Query到context

3,构造GenericAPIServer实例genericServer

config.GenericConfig.New从kubeResourceAPIServer委托者构造一个新的GenericAPIServer实例genericServer。所以API资源请求调用链是,先在kubeResourceAPIServer找,再到genericServer中找。

3,InstallAPIGroup注册API对象

genericServer中注册API对象:GroupVersion:clusterpedia.io/v1beta1,Resources资源为resources,collectionresources对象。每个资源对应一个REST storage,用于处理对应资源的请求。

4,AddPostStartHookOrDie添加APIServer启动后调用函数

APIServer启动后,启动Informer,同步Cache。

5,PrepareRun()

准备健康状态检查,存活检查

6,Run()

启动运行APIServer

clusterpedia-apiserver命令

在opts.Config()中:

1,根据配置的数据库类型和配置项,创建storage接口,用于操作数据库

2,根据服务器配置,生成SSL自签名证书

3,生成genericapiserver默认配置

通过Config->Complete->New模式构造一个ClusterPediaServer实例server

server.Run运行ClusterPediaServer

func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
   opts := options.NewServerOptions()

   cmd := &cobra.Command{
      Use: "clusterpedia-apiserver",
      RunE: func(cmd *cobra.Command, args []string) error {

         if err := opts.Complete(); err != nil {
            return err
         }

         config, err := opts.Config()

         server, err := config.Complete().New()

         if err := server.Run(ctx); err != nil {
            return err
         }
         return nil
      },
   }


   return cmd
}

func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
	storage, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath)
	if err != nil {
		return nil, err
	}

	if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
		return nil, fmt.Errorf("error create self-signed certificates: %v", err)
	}

	// remove NamespaceLifecycle admission plugin explicitly
	// current admission plugins:  mutatingwebhook, validatingwebhook
	o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)

	genericConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
	// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
	// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
	// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion

	if err := o.genericOptionsApplyTo(genericConfig); err != nil {
		return nil, err
	}

	return &apiserver.Config{
		GenericConfig:  genericConfig,
		StorageFactory: storage,
	}, nil
}

在New中构造ClusterPediaServer实例

1,创建discoveryClient,通过GetAPIGroupResources自动发现Kubernetes集群的资源initialAPIGroupResources

2,创建Clusterpedia自定义资源的clientset crdclient,通过crdclient创建clusterpediaInformerFactory

3,NewDefaultConfig生成Clusterpedia的配置resourceServerConfig,这里主要是生成默认配置,构建HandlerChain处理请求信息保存到context中,以及健康检查。

4,resourceServerConfig.Complete().New(genericapiserver.NewEmptyDelegate())的Comeplete()中GenericConfig的请求信息解析器改变为wrapRequestInfoResolverForNamespace,这里主要是修改NewRequestInfo方法,如果请求的资源为namespaces,将请求信息的Namespace置空。

5,completedConfig的New(genericapiserver.NewEmptyDelegate())构造kubeResourceAPIServer,用于处理资源对象API请求。

a,参数genericapiserver.NewEmptyDelegate()表示不认识的资源请求直接返回空。

b,构造genericserver,genericserver.Handler.NonGoRestfulMux.Handle注册自动发现API路由,genericserver.Handler.NonGoRestfulMux.HandlePrefix注册资源请求API路由。

c,NewClusterResourceController创建一个集群资源控制器,处理Clusterpedia集群事件,同步集群的资源信息。

6,在GenericConfig的HandlerChain中添加WithRequestQuery Filter,将请求的查询信息保存到context中。

7,config.GenericConfig.New从GenericConfig构造genericServer。

8,genericServer.InstallAPIGroup向genericServer注册API对象。

9,genericServer.AddPostStartHookOrDie添加APIServer启动后执行函数。

func (config completedConfig) New() (*ClusterPediaServer, error) {
   if config.ClientConfig == nil {
      return nil, errors.New("")
   }
   if config.StorageFactory == nil {
      return nil, errors.New("")
   }

   discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
   if err != nil {
      return nil, err
   }
   initialAPIGroupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
   if err != nil {
      return nil, err
   }

   crdclient, err := versioned.NewForConfig(config.ClientConfig)
   if err != nil {
      return nil, err
   }
   clusterpediaInformerFactory := informers.NewSharedInformerFactory(crdclient, 0)

   resourceServerConfig := kubeapiserver.NewDefaultConfig()
   resourceServerConfig.GenericConfig.ExternalAddress = config.GenericConfig.ExternalAddress
   resourceServerConfig.GenericConfig.LoopbackClientConfig = config.GenericConfig.LoopbackClientConfig
   resourceServerConfig.ExtraConfig = kubeapiserver.ExtraConfig{
      InformerFactory:          clusterpediaInformerFactory,
      StorageFactory:           config.StorageFactory,
      InitialAPIGroupResources: initialAPIGroupResources,
   }
   kubeResourceAPIServer, err := resourceServerConfig.Complete().New(genericapiserver.NewEmptyDelegate())
   if err != nil {
      return nil, err
   }

   handlerChainFunc := config.GenericConfig.BuildHandlerChainFunc
   config.GenericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
      handler := handlerChainFunc(apiHandler, c)
      handler = filters.WithRequestQuery(handler)
      return handler
   }

   genericServer, err := config.GenericConfig.New("clusterpedia", hooksDelegate{kubeResourceAPIServer})
   if err != nil {
      return nil, err
   }

   v1beta1storage := map[string]rest.Storage{}
   v1beta1storage["resources"] = resources.NewREST(kubeResourceAPIServer.Handler)
   v1beta1storage["collectionresources"] = collectionresources.NewREST(config.StorageFactory)

   apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(internal.GroupName, Scheme, ParameterCodec, Codecs)
   apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = v1beta1storage
   if err := genericServer.InstallAPIGroup(&apiGroupInfo); err != nil {
      return nil, err
   }

   genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error {
      clusterpediaInformerFactory.Start(context.StopCh)
      clusterpediaInformerFactory.WaitForCacheSync(context.StopCh)
      return nil
   })

   return &ClusterPediaServer{
      GenericAPIServer: genericServer,
   }, nil
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*genericapiserver.GenericAPIServer, error) {
	if c.ExtraConfig.StorageFactory == nil {
		return nil, errors.New("kubeapiserver.New() called with config.StorageFactory == nil")
	}
	if c.ExtraConfig.InformerFactory == nil {
		return nil, errors.New("kubeapiserver.New() called with config.InformerFactory == nil")
	}

	genericserver, err := c.GenericConfig.New("clusterpedia-kube-apiserver", delegationTarget)
	if err != nil {
		return nil, err
	}

	delegate := delegationTarget.UnprotectedHandler()
	if delegate == nil {
		delegate = http.NotFoundHandler()
	}

	restManager := NewRESTManager(runtime.ContentTypeJSON, c.ExtraConfig.StorageFactory, c.ExtraConfig.InitialAPIGroupResources)
	discoveryManager := discovery.NewDiscoveryManager(c.GenericConfig.Serializer, restManager, delegate)

	// handle root discovery request
	genericserver.Handler.NonGoRestfulMux.Handle("/api", discoveryManager)
	genericserver.Handler.NonGoRestfulMux.Handle("/apis", discoveryManager)

	resourceHandler := &ResourceHandler{
		minRequestTimeout: c.GenericConfig.RequestTimeout,

		delegate:      delegate,
		rest:          restManager,
		discovery:     discoveryManager,
		clusterLister: c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters().Lister(),
	}
	genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
	genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)

	_ = NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())
	return genericserver, nil
}

DiscoveryManager处理自动发现API

1,请求api时,handleLegacyAPI返回apiVersions支持的版本

2,请求apis时,如果指定了下游Kubernetes集群,则返回指定Kubernetes集群的APIGroup列表,如果没有指定具体集群,则返回全局所有的APIGroup列表。

func (m *DiscoveryManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   pathParts := splitPath(req.URL.Path)
   if len(pathParts) == 0 || len(pathParts) > 3 {
      m.delegate.ServeHTTP(w, req)
      return
   }

   prefix := pathParts[0]
   if prefix == "api" {
      m.handleLegacyAPI(pathParts, w, req)
      return
   }
   if prefix != "apis" {
      m.delegate.ServeHTTP(w, req)
      return
   }

   // match /apis
   if len(pathParts) == 1 {
      var apigroups []metav1.APIGroup
      if cluster := request.ClusterNameValue(req.Context()); cluster != "" {
         var ok bool
         clusterAPIGroups := m.clusterAPIGroups.Load().(map[string][]metav1.APIGroup)
         if apigroups, ok = clusterAPIGroups[cluster]; !ok {
            m.delegate.ServeHTTP(w, req)
            return
         }
      } else {
         apigroups = m.apigroups.Load().([]metav1.APIGroup)
      }

      responsewriters.WriteObjectNegotiated(m.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &metav1.APIGroupList{Groups: apigroups})
      return
   }

   // match /apis/<group>
   if len(pathParts) == 2 {
      m.groupHandler.ServeHTTP(w, req)
      return
   }

   // match /apis/<group>/<version>
   m.versionHandler.ServeHTTP(w, req)
}

ResourceHandler处理Kubernetes资源API请求

1,根据请求信息生成请求资源gvr。

2,GetRESTResourceInfo获取gvr对应的RESTResourceInfo信息info。

3,根据请求的集群名称,检查集群是否存在,不存在则返回,存在则检查集群状态,状态不正常返回集群异常。

4,根据请求的操作类型requestInfo.Verb,调用对应的资源处理函数GetResource,ListResource。资源处理函数调用对应的storage,调用RESTStorage操作。

func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   requestInfo, ok := genericrequest.RequestInfoFrom(req.Context())
   if !ok {
      responsewriters.ErrorNegotiated(
         apierrors.NewInternalError(fmt.Errorf("no RequestInfo found in the context")),
         Codecs, schema.GroupVersion{}, w, req,
      )
      return
   }

   // handle discovery request
   if !requestInfo.IsResourceRequest {
      r.discovery.ServeHTTP(w, req)
      return
   }

   gvr := schema.GroupVersionResource{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion, Resource: requestInfo.Resource}

   clusterName := request.ClusterNameValue(req.Context())
   if !r.discovery.ResourceEnabled(clusterName, gvr) {
      r.delegate.ServeHTTP(w, req)
      return
   }

   info := r.rest.GetRESTResourceInfo(gvr)
   if info.Empty() {
      err := fmt.Errorf("not found request scope or resource storage")
      klog.ErrorS(err, "Failed to handle resource request", "resource", gvr)
      responsewriters.ErrorNegotiated(
         apierrors.NewInternalError(err),
         Codecs, gvr.GroupVersion(), w, req,
      )
      return
   }

   resource, reqScope, storage := info.APIResource, info.RequestScope, info.Storage
   if requestInfo.Namespace != "" && !resource.Namespaced {
      r.delegate.ServeHTTP(w, req)
      return
   }

   // Check the health of the cluster
   if clusterName != "" {
      cluster, err := r.clusterLister.Get(clusterName)
      if err != nil {
         err := fmt.Errorf("not found request cluster")
         klog.ErrorS(err, "Failed to handle resource request, not get cluster from cache", "cluster", clusterName, "resource", gvr)
         responsewriters.ErrorNegotiated(
            apierrors.NewInternalError(err),
            Codecs, gvr.GroupVersion(), w, req,
         )
         return
      }

      var msg string
      readyCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterv1alpha2.ClusterConditionReady)
      switch {
      case readyCondition == nil:
         msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate.", clusterName)
      case readyCondition.Status != metav1.ConditionTrue:
         msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate, reason: %s", clusterName, readyCondition.Reason)
      }
      /*
         TODO(scyda): Determine the synchronization status of a specific resource

         for _, resource := range c.Status.Resources {
         }
      */

      if msg != "" {
         warning.AddWarning(req.Context(), "", msg)
      }
   }

   var handler http.Handler
   switch requestInfo.Verb {
   case "get":
      if clusterName == "" {
         r.delegate.ServeHTTP(w, req)
         return
      }

      handler = handlers.GetResource(storage, reqScope)
   case "list":
      handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
   default:
      responsewriters.ErrorNegotiated(
         apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
         Codecs, gvr.GroupVersion(), w, req,
      )
   }

   if handler != nil {
      handler.ServeHTTP(w, req)
   }
}

RESTStorage操作

RESTStorage操作中,s.Storage.Get调用ResourceStorage操作也就是数据库中读取对应的资源对象。

func (s *RESTStorage) Get(ctx context.Context, name string, _ *metav1.GetOptions) (runtime.Object, error) {
   clusterName := request.ClusterNameValue(ctx)
   if clusterName == "" {
      return nil, errors.New("missing cluster")
   }

   requestInfo, ok := genericrequest.RequestInfoFrom(ctx)
   if !ok {
      return nil, errors.New("missing RequestInfo")
   }

   obj := s.New()
   if err := s.Storage.Get(ctx, clusterName, requestInfo.Namespace, name, obj); err != nil {
      return nil, storeerr.InterpretGetError(err, s.DefaultQualifiedResource, name)
   }
   return obj, nil
}

func (s *RESTStorage) List(ctx context.Context, _ *metainternalversion.ListOptions) (runtime.Object, error) {
   var opts internal.ListOptions
   query := request.RequestQueryFrom(ctx)
   if err := scheme.ParameterCodec.DecodeParameters(query, v1beta1.SchemeGroupVersion, &opts); err != nil {
      return nil, apierrors.NewBadRequest(err.Error())
   }
   return s.list(ctx, &opts)
}

ClusterResourceController

NewClusterResourceController创建一个ClusterResourceController,将添加的下游Kubernetes集群资源APIGroup信息保存到discoveryManager的Map中,用于提供自动发现API接口服务。

func NewClusterResourceController(restManager *RESTManager, discoveryManager *discovery.DiscoveryManager, informer clusterinformer.PediaClusterInformer) *ClusterResourceController {
   controller := &ClusterResourceController{
      clusterLister: informer.Lister(),

      restManager:      restManager,
      discoveryManager: discoveryManager,
      clusterresources: make(map[string]ResourceInfoMap),
   }

   informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
      },
      UpdateFunc: func(_, obj interface{}) {
         cluster := obj.(*clusterv1alpha2.PediaCluster)
         if !cluster.DeletionTimestamp.IsZero() {
            controller.removeCluster(cluster.Name)
            return
         }

         controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
      },
      DeleteFunc: func(obj interface{}) {
         clusterName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
         if err != nil {
            return
         }

         controller.removeCluster(clusterName)
      },
   })
   return controller
}

func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alpha2.PediaCluster) {
   resources := ResourceInfoMap{}
   for _, groupResources := range cluster.Status.SyncResources {
      for _, resource := range groupResources.Resources {
         if len(resource.SyncConditions) == 0 {
            continue
         }

         versions := sets.NewString()
         for _, cond := range resource.SyncConditions {
            versions.Insert(cond.Version)
         }

         gr := schema.GroupResource{Group: groupResources.Group, Resource: resource.Name}
         resources[gr] = resourceInfo{
            Namespaced: resource.Namespaced,
            Kind:       resource.Kind,
            Versions:   versions,
         }
      }
   }

   currentResources := c.clusterresources[cluster.Name]
   if reflect.DeepEqual(resources, currentResources) {
      return
   }

   discoveryapis := c.restManager.LoadResources(resources)
   c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis)

   c.clusterresources[cluster.Name] = resources
}

总结

通过Kubernetes API扩展机制,注册Aggretgator服务,Aggretgator优先级高于Kubernetes APIServer,这样Clusterpedia注册的api group和apis group路由处理函数优先提供服务。Clusterpedia使用RESTStorage处理REST API,ResourceStorage处理数据库操作。通过ClusterResourceController将下游Kubernetes资源信息保存到discoveryManager的MAP中,为自动发现API提供数据服务。