由 SuKai April 4, 2022
前面文章介绍了DaoCloud开源的Clusterpedia的业务流程和资源同步机制,Clusterpedia将多个Kubernetes集群的指定资源对象同步保存到MySQL数据库并对外提供查询和检索。今天我们一起看一下Clusterpedia如何开发一个Kubernetes自定义的APIServer,将保存在数据库中的多个Kubernetes业务集群的资源对外提供查询和检索的。
本文内容分以下部分:
1,Kubernetes APIServer基本知识
2,自定义APIServer开发
Kubernetes APIServer基本知识
Kubernetes APIServer是一个实现了RESTful API的WebServer。
API类型
core group,/api/v1为前缀的API接口PATH,core group不需要在apiVersion字段中指定,示例:apiVersion: v1。
named groups,REST path为/apis/$GROUP_NAME/$VERSION,指定apiVersion: $GROUP_NAME/$VERSION,示例apiVersion: batch/v1。
暴露系统状态的API,比如/metrics、/healthz等。
扩展机制
为了增加Kubernetes API的扩展性,Kubernetes提供了两种机制:1,APIExtensions,创建自定义资源CRD,处理CRD/CR的REST请求。2,Aggretgator聚合机制,注册APIService资源,APIServer将对应的API GroupVersion请求代理转发到注册的Service上。
委托调用链
APIServer使用委托模式,通过DelegationTarget接口,把Aggretgator、API Server、APIExtensions链式串联起来,对外提供服务。
当请求Kubernetes对象时,如果在Aggregator中找不到,就去KubeAPIServer中找,最后到APIExtensions中找。
Clusterpedia通过这种委托模式,注册Aggregator服务,优先于所在Kubernetes集群的APIServer提供api和apis两个group的服务,来访问存储在数据库中的多个业务集群的Kubernetes资源对象,kubectl也可以直接执行命令进行下游集群的资源对象查询检索操作。
资源对象操作
在APIServer中,API请求资源对象时,先通过RESTStorage进行REST API处理,在RESTStorage中调用etcd store进行数据存储操作。
Clusterpedia中,与Kubernetes APIServer类似,先通过RESTStorage进行REST API处理,在RESTStorage中调用ResourceStorage进行数据库操作。
自定义APIServer开发
Clusterpedia APIServer基本流程
1,config-complete-new模式构造GenericAPIServer实例kubeResourceAPIServer
NewDefaultConfig初始化默认配置。BuildHandlerChain中WithRequestInfo处理API请求信息保存到context中,WithPanicRecovery处理崩溃日志并恢复,RemoveFieldSelectorFromRequest处理URL Query。
complete完善配置,wrapRequestInfoResolverForNamespace封装处理RequestInfo解析器。
New从空委托者构造GenericAPIServer。创建discoveryManager提供自动发现API处理,NonGoRestfulMux.Handle注册/api和/apis的自动发现路由。创建resourceHandler提供资源对象请求API处理,NonGoRestfulMux.HandlePrefix注册/api和/apis为前缀PATH的路由。NewClusterResourceController创建ClusterPedia控制器,监听ClusterPedia CRD事件,处理下游集群的资源,discoveryManager读取controller更新的资源信息提供自动发现API服务。
2,BuildHandlerChain
将WithRequestQuery添加到HandlerChain,WithRequestQuery保存URL Query到context
3,构造GenericAPIServer实例genericServer
config.GenericConfig.New从kubeResourceAPIServer委托者构造一个新的GenericAPIServer实例genericServer。所以API资源请求调用链是,先在kubeResourceAPIServer找,再到genericServer中找。
3,InstallAPIGroup注册API对象
genericServer中注册API对象:GroupVersion:clusterpedia.io/v1beta1,Resources资源为resources,collectionresources对象。每个资源对应一个REST storage,用于处理对应资源的请求。
4,AddPostStartHookOrDie添加APIServer启动后调用函数
APIServer启动后,启动Informer,同步Cache。
5,PrepareRun()
准备健康状态检查,存活检查
6,Run()
启动运行APIServer
clusterpedia-apiserver命令
在opts.Config()中:
1,根据配置的数据库类型和配置项,创建storage接口,用于操作数据库
2,根据服务器配置,生成SSL自签名证书
3,生成genericapiserver默认配置
通过Config->Complete->New模式构造一个ClusterPediaServer实例server
server.Run运行ClusterPediaServer
func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
opts := options.NewServerOptions()
cmd := &cobra.Command{
Use: "clusterpedia-apiserver",
RunE: func(cmd *cobra.Command, args []string) error {
if err := opts.Complete(); err != nil {
return err
}
config, err := opts.Config()
server, err := config.Complete().New()
if err := server.Run(ctx); err != nil {
return err
}
return nil
},
}
return cmd
}
func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
storage, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath)
if err != nil {
return nil, err
}
if err := o.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error create self-signed certificates: %v", err)
}
// remove NamespaceLifecycle admission plugin explicitly
// current admission plugins: mutatingwebhook, validatingwebhook
o.Admission.DisablePlugins = append(o.Admission.DisablePlugins, lifecycle.PluginName)
genericConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
// genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(openapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
// genericConfig.OpenAPIConfig.Info.Title = openAPITitle
// genericConfig.OpenAPIConfig.Info.Version= openAPIVersion
if err := o.genericOptionsApplyTo(genericConfig); err != nil {
return nil, err
}
return &apiserver.Config{
GenericConfig: genericConfig,
StorageFactory: storage,
}, nil
}
在New中构造ClusterPediaServer实例
1,创建discoveryClient,通过GetAPIGroupResources自动发现Kubernetes集群的资源initialAPIGroupResources
2,创建Clusterpedia自定义资源的clientset crdclient,通过crdclient创建clusterpediaInformerFactory
3,NewDefaultConfig生成Clusterpedia的配置resourceServerConfig,这里主要是生成默认配置,构建HandlerChain处理请求信息保存到context中,以及健康检查。
4,resourceServerConfig.Complete().New(genericapiserver.NewEmptyDelegate())的Comeplete()中GenericConfig的请求信息解析器改变为wrapRequestInfoResolverForNamespace,这里主要是修改NewRequestInfo方法,如果请求的资源为namespaces,将请求信息的Namespace置空。
5,completedConfig的New(genericapiserver.NewEmptyDelegate())构造kubeResourceAPIServer,用于处理资源对象API请求。
a,参数genericapiserver.NewEmptyDelegate()表示不认识的资源请求直接返回空。
b,构造genericserver,genericserver.Handler.NonGoRestfulMux.Handle注册自动发现API路由,genericserver.Handler.NonGoRestfulMux.HandlePrefix注册资源请求API路由。
c,NewClusterResourceController创建一个集群资源控制器,处理Clusterpedia集群事件,同步集群的资源信息。
6,在GenericConfig的HandlerChain中添加WithRequestQuery Filter,将请求的查询信息保存到context中。
7,config.GenericConfig.New从GenericConfig构造genericServer。
8,genericServer.InstallAPIGroup向genericServer注册API对象。
9,genericServer.AddPostStartHookOrDie添加APIServer启动后执行函数。
func (config completedConfig) New() (*ClusterPediaServer, error) {
if config.ClientConfig == nil {
return nil, errors.New("")
}
if config.StorageFactory == nil {
return nil, errors.New("")
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)
if err != nil {
return nil, err
}
initialAPIGroupResources, err := restmapper.GetAPIGroupResources(discoveryClient)
if err != nil {
return nil, err
}
crdclient, err := versioned.NewForConfig(config.ClientConfig)
if err != nil {
return nil, err
}
clusterpediaInformerFactory := informers.NewSharedInformerFactory(crdclient, 0)
resourceServerConfig := kubeapiserver.NewDefaultConfig()
resourceServerConfig.GenericConfig.ExternalAddress = config.GenericConfig.ExternalAddress
resourceServerConfig.GenericConfig.LoopbackClientConfig = config.GenericConfig.LoopbackClientConfig
resourceServerConfig.ExtraConfig = kubeapiserver.ExtraConfig{
InformerFactory: clusterpediaInformerFactory,
StorageFactory: config.StorageFactory,
InitialAPIGroupResources: initialAPIGroupResources,
}
kubeResourceAPIServer, err := resourceServerConfig.Complete().New(genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
handlerChainFunc := config.GenericConfig.BuildHandlerChainFunc
config.GenericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
handler := handlerChainFunc(apiHandler, c)
handler = filters.WithRequestQuery(handler)
return handler
}
genericServer, err := config.GenericConfig.New("clusterpedia", hooksDelegate{kubeResourceAPIServer})
if err != nil {
return nil, err
}
v1beta1storage := map[string]rest.Storage{}
v1beta1storage["resources"] = resources.NewREST(kubeResourceAPIServer.Handler)
v1beta1storage["collectionresources"] = collectionresources.NewREST(config.StorageFactory)
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(internal.GroupName, Scheme, ParameterCodec, Codecs)
apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = v1beta1storage
if err := genericServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
genericServer.AddPostStartHookOrDie("start-clusterpedia-informers", func(context genericapiserver.PostStartHookContext) error {
clusterpediaInformerFactory.Start(context.StopCh)
clusterpediaInformerFactory.WaitForCacheSync(context.StopCh)
return nil
})
return &ClusterPediaServer{
GenericAPIServer: genericServer,
}, nil
}
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*genericapiserver.GenericAPIServer, error) {
if c.ExtraConfig.StorageFactory == nil {
return nil, errors.New("kubeapiserver.New() called with config.StorageFactory == nil")
}
if c.ExtraConfig.InformerFactory == nil {
return nil, errors.New("kubeapiserver.New() called with config.InformerFactory == nil")
}
genericserver, err := c.GenericConfig.New("clusterpedia-kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
delegate := delegationTarget.UnprotectedHandler()
if delegate == nil {
delegate = http.NotFoundHandler()
}
restManager := NewRESTManager(runtime.ContentTypeJSON, c.ExtraConfig.StorageFactory, c.ExtraConfig.InitialAPIGroupResources)
discoveryManager := discovery.NewDiscoveryManager(c.GenericConfig.Serializer, restManager, delegate)
// handle root discovery request
genericserver.Handler.NonGoRestfulMux.Handle("/api", discoveryManager)
genericserver.Handler.NonGoRestfulMux.Handle("/apis", discoveryManager)
resourceHandler := &ResourceHandler{
minRequestTimeout: c.GenericConfig.RequestTimeout,
delegate: delegate,
rest: restManager,
discovery: discoveryManager,
clusterLister: c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters().Lister(),
}
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/api/", resourceHandler)
genericserver.Handler.NonGoRestfulMux.HandlePrefix("/apis/", resourceHandler)
_ = NewClusterResourceController(restManager, discoveryManager, c.ExtraConfig.InformerFactory.Cluster().V1alpha2().PediaClusters())
return genericserver, nil
}
DiscoveryManager处理自动发现API
1,请求api时,handleLegacyAPI返回apiVersions支持的版本
2,请求apis时,如果指定了下游Kubernetes集群,则返回指定Kubernetes集群的APIGroup列表,如果没有指定具体集群,则返回全局所有的APIGroup列表。
func (m *DiscoveryManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
pathParts := splitPath(req.URL.Path)
if len(pathParts) == 0 || len(pathParts) > 3 {
m.delegate.ServeHTTP(w, req)
return
}
prefix := pathParts[0]
if prefix == "api" {
m.handleLegacyAPI(pathParts, w, req)
return
}
if prefix != "apis" {
m.delegate.ServeHTTP(w, req)
return
}
// match /apis
if len(pathParts) == 1 {
var apigroups []metav1.APIGroup
if cluster := request.ClusterNameValue(req.Context()); cluster != "" {
var ok bool
clusterAPIGroups := m.clusterAPIGroups.Load().(map[string][]metav1.APIGroup)
if apigroups, ok = clusterAPIGroups[cluster]; !ok {
m.delegate.ServeHTTP(w, req)
return
}
} else {
apigroups = m.apigroups.Load().([]metav1.APIGroup)
}
responsewriters.WriteObjectNegotiated(m.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &metav1.APIGroupList{Groups: apigroups})
return
}
// match /apis/<group>
if len(pathParts) == 2 {
m.groupHandler.ServeHTTP(w, req)
return
}
// match /apis/<group>/<version>
m.versionHandler.ServeHTTP(w, req)
}
ResourceHandler处理Kubernetes资源API请求
1,根据请求信息生成请求资源gvr。
2,GetRESTResourceInfo获取gvr对应的RESTResourceInfo信息info。
3,根据请求的集群名称,检查集群是否存在,不存在则返回,存在则检查集群状态,状态不正常返回集群异常。
4,根据请求的操作类型requestInfo.Verb,调用对应的资源处理函数GetResource,ListResource。资源处理函数调用对应的storage,调用RESTStorage操作。
func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
requestInfo, ok := genericrequest.RequestInfoFrom(req.Context())
if !ok {
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("no RequestInfo found in the context")),
Codecs, schema.GroupVersion{}, w, req,
)
return
}
// handle discovery request
if !requestInfo.IsResourceRequest {
r.discovery.ServeHTTP(w, req)
return
}
gvr := schema.GroupVersionResource{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion, Resource: requestInfo.Resource}
clusterName := request.ClusterNameValue(req.Context())
if !r.discovery.ResourceEnabled(clusterName, gvr) {
r.delegate.ServeHTTP(w, req)
return
}
info := r.rest.GetRESTResourceInfo(gvr)
if info.Empty() {
err := fmt.Errorf("not found request scope or resource storage")
klog.ErrorS(err, "Failed to handle resource request", "resource", gvr)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(err),
Codecs, gvr.GroupVersion(), w, req,
)
return
}
resource, reqScope, storage := info.APIResource, info.RequestScope, info.Storage
if requestInfo.Namespace != "" && !resource.Namespaced {
r.delegate.ServeHTTP(w, req)
return
}
// Check the health of the cluster
if clusterName != "" {
cluster, err := r.clusterLister.Get(clusterName)
if err != nil {
err := fmt.Errorf("not found request cluster")
klog.ErrorS(err, "Failed to handle resource request, not get cluster from cache", "cluster", clusterName, "resource", gvr)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(err),
Codecs, gvr.GroupVersion(), w, req,
)
return
}
var msg string
readyCondition := meta.FindStatusCondition(cluster.Status.Conditions, clusterv1alpha2.ClusterConditionReady)
switch {
case readyCondition == nil:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate.", clusterName)
case readyCondition.Status != metav1.ConditionTrue:
msg = fmt.Sprintf("%s is not ready and the resources obtained may be inaccurate, reason: %s", clusterName, readyCondition.Reason)
}
/*
TODO(scyda): Determine the synchronization status of a specific resource
for _, resource := range c.Status.Resources {
}
*/
if msg != "" {
warning.AddWarning(req.Context(), "", msg)
}
}
var handler http.Handler
switch requestInfo.Verb {
case "get":
if clusterName == "" {
r.delegate.ServeHTTP(w, req)
return
}
handler = handlers.GetResource(storage, reqScope)
case "list":
handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Codecs, gvr.GroupVersion(), w, req,
)
}
if handler != nil {
handler.ServeHTTP(w, req)
}
}
RESTStorage操作
RESTStorage操作中,s.Storage.Get调用ResourceStorage操作也就是数据库中读取对应的资源对象。
func (s *RESTStorage) Get(ctx context.Context, name string, _ *metav1.GetOptions) (runtime.Object, error) {
clusterName := request.ClusterNameValue(ctx)
if clusterName == "" {
return nil, errors.New("missing cluster")
}
requestInfo, ok := genericrequest.RequestInfoFrom(ctx)
if !ok {
return nil, errors.New("missing RequestInfo")
}
obj := s.New()
if err := s.Storage.Get(ctx, clusterName, requestInfo.Namespace, name, obj); err != nil {
return nil, storeerr.InterpretGetError(err, s.DefaultQualifiedResource, name)
}
return obj, nil
}
func (s *RESTStorage) List(ctx context.Context, _ *metainternalversion.ListOptions) (runtime.Object, error) {
var opts internal.ListOptions
query := request.RequestQueryFrom(ctx)
if err := scheme.ParameterCodec.DecodeParameters(query, v1beta1.SchemeGroupVersion, &opts); err != nil {
return nil, apierrors.NewBadRequest(err.Error())
}
return s.list(ctx, &opts)
}
ClusterResourceController
NewClusterResourceController创建一个ClusterResourceController,将添加的下游Kubernetes集群资源APIGroup信息保存到discoveryManager的Map中,用于提供自动发现API接口服务。
func NewClusterResourceController(restManager *RESTManager, discoveryManager *discovery.DiscoveryManager, informer clusterinformer.PediaClusterInformer) *ClusterResourceController {
controller := &ClusterResourceController{
clusterLister: informer.Lister(),
restManager: restManager,
discoveryManager: discoveryManager,
clusterresources: make(map[string]ResourceInfoMap),
}
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
},
UpdateFunc: func(_, obj interface{}) {
cluster := obj.(*clusterv1alpha2.PediaCluster)
if !cluster.DeletionTimestamp.IsZero() {
controller.removeCluster(cluster.Name)
return
}
controller.updateClusterResources(obj.(*clusterv1alpha2.PediaCluster))
},
DeleteFunc: func(obj interface{}) {
clusterName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
controller.removeCluster(clusterName)
},
})
return controller
}
func (c *ClusterResourceController) updateClusterResources(cluster *clusterv1alpha2.PediaCluster) {
resources := ResourceInfoMap{}
for _, groupResources := range cluster.Status.SyncResources {
for _, resource := range groupResources.Resources {
if len(resource.SyncConditions) == 0 {
continue
}
versions := sets.NewString()
for _, cond := range resource.SyncConditions {
versions.Insert(cond.Version)
}
gr := schema.GroupResource{Group: groupResources.Group, Resource: resource.Name}
resources[gr] = resourceInfo{
Namespaced: resource.Namespaced,
Kind: resource.Kind,
Versions: versions,
}
}
}
currentResources := c.clusterresources[cluster.Name]
if reflect.DeepEqual(resources, currentResources) {
return
}
discoveryapis := c.restManager.LoadResources(resources)
c.discoveryManager.SetClusterGroupResource(cluster.Name, discoveryapis)
c.clusterresources[cluster.Name] = resources
}
总结
通过Kubernetes API扩展机制,注册Aggretgator服务,Aggretgator优先级高于Kubernetes APIServer,这样Clusterpedia注册的api group和apis group路由处理函数优先提供服务。Clusterpedia使用RESTStorage处理REST API,ResourceStorage处理数据库操作。通过ClusterResourceController将下游Kubernetes资源信息保存到discoveryManager的MAP中,为自动发现API提供数据服务。