Go语言备忘录(3):net/http包的使用模式和源码解析

日期: 2019-12-06 15:18 浏览次数 :

route

kubernetes源码拆解剖判---- apiserver路由创设分析(2卡塔尔国

上文首要对go-restful这些包举行了简便易行的介绍,上边大家经过翻阅代码来了然apiserver路由的详尽营造进程。

(kubernetes代码版本:1.3.6 Commit id:ed3a29bd6aeb卡塔尔(قطر‎

从运维地点main函数开始(kubernetescmdkube-apiserverapiserver.go):

func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    // New APIServer
    s := options.NewAPIServer()
    s.AddFlags(pflag.CommandLine)

    // 解析命令行参数
    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    verflag.PrintAndExitIfRequested()

    // 启动APIServer
    if err := app.Run(s); err != nil {
        fmt.Fprintf(os.Stderr, "%vn", err)
        os.Exit(1)
    }
}

main函数里做的事务比较简单,重倘若变化暗许的ApiServer运转参数,解析命令行,设置log,然后调用 app.Run()方法运营服务。继续跟进这么些Run(卡塔尔(英语:State of Qatar)方法:

func Run(s *options.APIServer) error {
    genericvalidation.VerifyEtcdServersList(s.ServerRunOptions)
    genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)

    // master config组装
    ...

    // New master
    m, err := master.New(config)
    if err != nil {
        return err
    }

    sharedInformers.Start(wait.NeverStop)
    // 启动master
    m.Run(s.ServerRunOptions)
    return nil

Run(卡塔尔(英语:State of Qatar)方法代码较长,这里只贴出了内部有的。代码上,首先创立master config,当中包含ssh tunneler配置,storageFactory配置,authenticator配置和authorizer配置等,然后依据config创制并运转master。

此间有五个与路由创设有关的措施:master.New(config卡塔尔国和m.Run(s.ServerRunOptions卡塔尔(قطر‎。大家先来跟进New(卡塔尔国方法:

// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
//   KubeletClient
func New(c *Config) (*Master, error) {
    if c.KubeletClient == nil {
        return nil, fmt.Errorf("Master.New() called with config.KubeletClient == nil")
    }

    // 创建并初始化GenericAPIServer
    s, err := genericapiserver.New(c.Config)
    if err != nil {
        return nil, err
    }

    // 构造Master
    m := &Master{
        GenericAPIServer:        s,
        enableCoreControllers:   c.EnableCoreControllers,
        deleteCollectionWorkers: c.DeleteCollectionWorkers,
        tunneler:                c.Tunneler,

        disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting,
    }

    // Add some hardcoded storage for now.  Append to the map.
    if c.RESTStorageProviders == nil {
        c.RESTStorageProviders = map[string]RESTStorageProvider{}
    }
    c.RESTStorageProviders[appsapi.GroupName] = AppsRESTStorageProvider{}
    c.RESTStorageProviders[autoscaling.GroupName] = AutoscalingRESTStorageProvider{}
    c.RESTStorageProviders[batch.GroupName] = BatchRESTStorageProvider{}
    c.RESTStorageProviders[certificates.GroupName] = CertificatesRESTStorageProvider{}
    c.RESTStorageProviders[extensions.GroupName] = ExtensionsRESTStorageProvider{
        ResourceInterface:                     m,
        DisableThirdPartyControllerForTesting: m.disableThirdPartyControllerForTesting,
    }
    c.RESTStorageProviders[policy.GroupName] = PolicyRESTStorageProvider{}
    c.RESTStorageProviders[rbac.GroupName] = RBACRESTStorageProvider{AuthorizerRBACSuperUser: c.AuthorizerRBACSuperUser}
    c.RESTStorageProviders[authenticationv1beta1.GroupName] = AuthenticationRESTStorageProvider{Authenticator: c.Authenticator}
    c.RESTStorageProviders[authorization.GroupName] = AuthorizationRESTStorageProvider{Authorizer: c.Authorizer}

    // 安装APIs
    m.InstallAPIs(c)

    // TODO: Attempt clean shutdown?
    if m.enableCoreControllers {
        m.NewBootstrapController(c.EndpointReconcilerConfig).Start()
    }

    return m, nil
}

第黄金年代,调用genericapiserver.New(c.Config卡塔尔国,创造叁个genericapiserver对象。然后往master config中加多一些“hardcoded storage”,再调用m.InstallAPIs(c卡塔尔(قطر‎安装APIs。

大家先跟进那一个genericapiserver.New(卡塔尔(英语:State of Qatar)方法:

// New returns a new instance of GenericAPIServer from the given config.
// Certain config fields will be set to a default value if unset,
// including:
//   ServiceClusterIPRange
//   ServiceNodePortRange
//   MasterCount
//   ReadWritePort
//   PublicAddress
// Public fields:
//   Handler -- The returned GenericAPIServer has a field TopHandler which is an
//   http.Handler which handles all the endpoints provided by the GenericAPIServer,
//   including the API, the UI, and miscellaneous debugging endpoints.  All
//   these are subject to authorization and authentication.
//   InsecureHandler -- an http.Handler which handles all the same
//   endpoints as Handler, but no authorization and authentication is done.
// Public methods:
//   HandleWithAuth -- Allows caller to add an http.Handler for an endpoint
//   that uses the same authentication and authorization (if any is configured)
//   as the GenericAPIServer's built-in endpoints.
//   If the caller wants to add additional endpoints not using the GenericAPIServer's
//   auth, then the caller should create a handler for those endpoints, which delegates the
//   any unhandled paths to "Handler".
func New(c *Config) (*GenericAPIServer, error) {
    if c.Serializer == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
    }
    setDefaults(c)

    s := &GenericAPIServer{
        ServiceClusterIPRange: c.ServiceClusterIPRange,
        ServiceNodePortRange:  c.ServiceNodePortRange,
        RootWebService:        new(restful.WebService),
        enableLogsSupport:     c.EnableLogsSupport,
        enableUISupport:       c.EnableUISupport,
        enableSwaggerSupport:  c.EnableSwaggerSupport,
        enableSwaggerUI:       c.EnableSwaggerUI,
        enableProfiling:       c.EnableProfiling,
        enableWatchCache:      c.EnableWatchCache,
        APIPrefix:             c.APIPrefix,
        APIGroupPrefix:        c.APIGroupPrefix,
        corsAllowedOriginList: c.CorsAllowedOriginList,
        authenticator:         c.Authenticator,
        authorizer:            c.Authorizer,
        AdmissionControl:      c.AdmissionControl,
        RequestContextMapper:  c.RequestContextMapper,
        Serializer:            c.Serializer,

        cacheTimeout:      c.CacheTimeout,
        MinRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,

        MasterCount:          c.MasterCount,
        ExternalAddress:      c.ExternalHost,
        ClusterIP:            c.PublicAddress,
        PublicReadWritePort:  c.ReadWritePort,
        ServiceReadWriteIP:   c.ServiceReadWriteIP,
        ServiceReadWritePort: c.ServiceReadWritePort,
        ExtraServicePorts:    c.ExtraServicePorts,
        ExtraEndpointPorts:   c.ExtraEndpointPorts,

        KubernetesServiceNodePort: c.KubernetesServiceNodePort,
        apiGroupsForDiscovery:     map[string]unversioned.APIGroup{},

        enableOpenAPISupport:   c.EnableOpenAPISupport,
        openAPIInfo:            c.OpenAPIInfo,
        openAPIDefaultResponse: c.OpenAPIDefaultResponse,
    }

    // 初始化GenericAPIServer成员变量HandlerContainer与mux
    if c.RestfulContainer != nil {
        s.mux = c.RestfulContainer.ServeMux
        s.HandlerContainer = c.RestfulContainer
    } else {
        mux := http.NewServeMux()
        s.mux = mux
        s.HandlerContainer = NewHandlerContainer(mux, c.Serializer)
    }
    // Use CurlyRouter to be able to use regular expressions in paths. Regular expressions are required in paths for example for proxy (where the path is proxy/{kind}/{name}/{*})
    s.HandlerContainer.Router(restful.CurlyRouter{})
    s.MuxHelper = &apiserver.MuxHelper{Mux: s.mux, RegisteredPaths: []string{}}

    s.init(c)

    return s, nil
}

New(卡塔尔(قطر‎方法成立三个GenericAPIServer对象并赶回,这里起头化了GenericAPIServer布局用于营造Restful路由的三个分子变量:

  1. mux : net/http包中原生的路由布局器。

    ServeMux is an HTTP request multiplexer.
    It matches the URL of each incoming request against a list of registered
    patterns and calls the handler for the pattern that most closely matches the URL.

  2. HandlerContainer: 既是go-restful包中的Container类型对象(安详严整参见此种类作品1卡塔尔国。设置路由选拔器为CurlyRouter。后边创制的Webservice都要加盟到这么些Container中。

抽离那几个New(卡塔尔(قطر‎方法,继续跟进Master构造的m.InstallAPIs(c卡塔尔(قطر‎方法:

func (m *Master) InstallAPIs(c *Config) {
    apiGroupsInfo := []genericapiserver.APIGroupInfo{}

    // Install v1 unless disabled.
    if c.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
        // Install v1 API.
        m.initV1ResourcesStorage(c)
        apiGroupInfo := genericapiserver.APIGroupInfo{
            GroupMeta: *registered.GroupOrDie(api.GroupName),
            VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
                "v1": m.v1ResourcesStorage,
            },
            IsLegacyGroup:               true,
            Scheme:                      api.Scheme,
            ParameterCodec:              api.ParameterCodec,
            NegotiatedSerializer:        api.Codecs,
            SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
        }
        if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
            apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
        }
        if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
            apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
        }
        apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
    }

    ...

    // 安装APIs
    if err := m.InstallAPIGroups(apiGroupsInfo); err != nil {
        glog.Fatalf("Error in registering group versions: %v", err)
    }
}

InstallAPIs(卡塔尔方法开展实际的API安装职业,这里贴出部分代码。首要关注m.initV1ResourcesStorage(c卡塔尔(قطر‎和m.InstallAPIGroups(apiGroupsInfo卡塔尔那多个点子。

m.initV1ResourcesStorage(c卡塔尔开始化各个财富后端的存款和储蓄配置,并贮存到Master的积极分子变量v1ResourcesStorage中。那么些后端存款和储蓄其实也正是相应的Restful路由,只是这几个路由还不曾实装。

func (m *Master) initV1ResourcesStorage(c *Config) {
    restOptions := func(resource string) generic.RESTOptions {
        return m.GetRESTOptionsOrDie(c, api.Resource(resource))
    }

    // 生成资源对应的后端存储
    podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))

    eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds()))
    limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges"))

    resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas"))
    secretStorage := secretetcd.NewREST(restOptions("secrets"))
    serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts"))
    persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes"))
    persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims"))
    configMapStorage := configmapetcd.NewREST(restOptions("configMaps"))

    namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces"))
    m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)

    endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints"))
    m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)

    nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport)
    m.nodeRegistry = node.NewRegistry(nodeStorage.Node)

    podStorage := podetcd.NewStorage(
        restOptions("pods"),
        kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
        m.ProxyTransport,
    )

    serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
    m.serviceRegistry = service.NewRegistry(serviceRESTStorage)

    var serviceClusterIPRegistry rangeallocation.RangeRegistry
    serviceClusterIPRange := m.ServiceClusterIPRange
    if serviceClusterIPRange == nil {
        glog.Fatalf("service clusterIPRange is nil")
        return
    }

    serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
    if err != nil {
        glog.Fatal(err.Error())
    }

    serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
        serviceClusterIPRegistry = etcd
        return etcd
    })
    m.serviceClusterIPAllocator = serviceClusterIPRegistry

    var serviceNodePortRegistry rangeallocation.RangeRegistry
    serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
        mem := allocator.NewAllocationMap(max, rangeSpec)
        // TODO etcdallocator package to return a storage interface via the storageFactory
        etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
        serviceNodePortRegistry = etcd
        return etcd
    })
    m.serviceNodePortAllocator = serviceNodePortRegistry

    controllerStorage := controlleretcd.NewStorage(restOptions("replicationControllers"))

    serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport)

    // 将(资源:存储)的匹配关系存放到v1ResourcesStorage中
    // TODO: Factor out the core API registration
    m.v1ResourcesStorage = map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,

        "podTemplates": podTemplateStorage,

        "replicationControllers":        controllerStorage.Controller,
        "replicationControllers/status": controllerStorage.Status,

        "services":        serviceRest.Service,
        "services/proxy":  serviceRest.Proxy,
        "services/status": serviceStatusStorage,

        "endpoints": endpointsStorage,

        "nodes":        nodeStorage.Node,
        "nodes/status": nodeStorage.Status,
        "nodes/proxy":  nodeStorage.Proxy,

        "events": eventStorage,

        "limitRanges":                   limitRangeStorage,
        "resourceQuotas":                resourceQuotaStorage,
        "resourceQuotas/status":         resourceQuotaStatusStorage,
        "namespaces":                    namespaceStorage,
        "namespaces/status":             namespaceStatusStorage,
        "namespaces/finalize":           namespaceFinalizeStorage,
        "secrets":                       secretStorage,
        "serviceAccounts":               serviceAccountStorage,
        "persistentVolumes":             persistentVolumeStorage,
        "persistentVolumes/status":      persistentVolumeStatusStorage,
        "persistentVolumeClaims":        persistentVolumeClaimStorage,
        "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
        "configMaps":                    configMapStorage,

        "componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
    }
    if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
        m.v1ResourcesStorage["replicationControllers/scale"] = controllerStorage.Scale
    }
    if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
        m.v1ResourcesStorage["pods/eviction"] = podStorage.Eviction
    }
}

以pod为例,首先构造pod的后端存款和储蓄podStorage:

podStorage := podetcd.NewStorage(
        restOptions("pods"),
        kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
        m.ProxyTransport,
    )

接下来注册pod及其子能源的后端存款和储蓄到成员变量v1ResourcesStorage中:

m.v1ResourcesStorage = map[string]rest.Storage{
        "pods":             podStorage.Pod,
        "pods/attach":      podStorage.Attach,
        "pods/status":      podStorage.Status,
        "pods/log":         podStorage.Log,
        "pods/exec":        podStorage.Exec,
        "pods/portforward": podStorage.PortForward,
        "pods/proxy":       podStorage.Proxy,
        "pods/binding":     podStorage.Binding,
        "bindings":         podStorage.Binding,
        ...
}

那么APIs的实装在何地,大家来看下m.InstallAPIGroups(apiGroupsInfo卡塔尔(英语:State of Qatar)这几个措施:

// Exposes the given group versions in API. Helper method to install multiple group versions at once.
func (s *GenericAPIServer) InstallAPIGroups(groupsInfo []APIGroupInfo) error {
    for _, apiGroupInfo := range groupsInfo {
        if err := s.InstallAPIGroup(&apiGroupInfo); err != nil {
            return err
        }
    }
    return nil
}

InstallAPIGroups(卡塔尔国实际上是批量运转InstallAPIGroup(卡塔尔(قطر‎方法的包装,继续跟进到InstallAPIGroup(卡塔尔(英语:State of Qatar)这一个办法:

// Exposes the given group version in API.
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
    apiPrefix := s.APIGroupPrefix
    if apiGroupInfo.IsLegacyGroup {
        apiPrefix = s.APIPrefix
    }

    // Install REST handlers for all the versions in this group.
    apiVersions := []string{}
    for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
        apiVersions = append(apiVersions, groupVersion.Version)

        apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if err != nil {
            return err
        }
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }

        if err := apiGroupVersion.InstallREST(s.HandlerContainer); err != nil {
            return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
        }
    }

    // Install the version handler.
    ...

    apiserver.InstallServiceErrorHandler(s.Serializer, s.HandlerContainer, s.NewRequestInfoResolver(), apiVersions)
    return nil
}

调用getAPIGroupVersion(卡塔尔国方法组装APIGroupVersion类型对象,APIGroupVersion用于转转移存入款和储蓄配置(rest.Storage卡塔尔(قطر‎到Restful后端处理(Restful Handlers卡塔尔国。

APIGroupVersion is a helper for exposing rest.Storage objects as http.Handlers via go-restful

接下来调用APIGroupVersion中的InstallREST(卡塔尔(英语:State of Qatar)方法实装APIs:

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    installer := g.newInstaller()
    // 为每个资源新建WebService
    ws := installer.NewWebService()
    // 安装Routes
    apiResources, registrationErrors := installer.Install(ws)
    lister := g.ResourceLister
    if lister == nil {
        lister = staticLister{apiResources}
    }
    AddSupportedResourcesWebService(g.Serializer, ws, g.GroupVersion, lister)
    // 将WebService加入到Container中
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}

InstallREST(卡塔尔国代码逻辑极粗略,正是为能源新建一个WebService,然后调用installer.Install(ws卡塔尔(英语:State of Qatar)向WebService中增多财富对应的路由,最终将WebService出席到Container中。

installer.Install(ws卡塔尔方法中调用APIInstaller.registerResourceHandlers(卡塔尔(英语:State of Qatar)来开展Routes的装置,registerResourceHandlers(卡塔尔(قطر‎方法代码有500行左右,这里只贴出关键逻辑:

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler) (*unversioned.APIResource, error) {

    ...

    // what verbs are supported by the storage, used to know what verbs we support per path
    creater, isCreater := storage.(rest.Creater)
    namedCreater, isNamedCreater := storage.(rest.NamedCreater)
    lister, isLister := storage.(rest.Lister)
    getter, isGetter := storage.(rest.Getter)
    getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
    deleter, isDeleter := storage.(rest.Deleter)
    gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
    collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
    updater, isUpdater := storage.(rest.Updater)
    patcher, isPatcher := storage.(rest.Patcher)
    watcher, isWatcher := storage.(rest.Watcher)
    _, isRedirector := storage.(rest.Redirector)
    connecter, isConnecter := storage.(rest.Connecter)
    storageMeta, isMetadata := storage.(rest.StorageMetadata)
    if !isMetadata {
        storageMeta = defaultStorageMetadata{}
    }
    exporter, isExporter := storage.(rest.Exporter)
    if !isExporter {
        exporter = nil
    }

    versionedExportOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ExportOptions"))
    if err != nil {
        return nil, err
    }

    if isNamedCreater {
        isCreater = true
    }

    ...

    var apiResource unversioned.APIResource
    // Get the list of actions for the given scope.
    switch scope.Name() {
    case meta.RESTScopeNameRoot:
        // 组装actions

        ...

        break
    case meta.RESTScopeNameNamespace:
        // Handler for standard REST verbs (GET, PUT, POST and DELETE).
        // 组装actions

        ...

        break
    default:
        return nil, fmt.Errorf("unsupported restscope: %s", scope.Name())
    }

    // Create Routes for the actions.
    // TODO: Add status documentation using Returns()
    // Errors (see api/errors/errors.go as well as go-restful router):
    // http.StatusNotFound, http.StatusMethodNotAllowed,
    // http.StatusUnsupportedMediaType, http.StatusNotAcceptable,
    // http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden,
    // http.StatusRequestTimeout, http.StatusConflict, http.StatusPreconditionFailed,
    // 422 (StatusUnprocessableEntity), http.StatusInternalServerError,
    // http.StatusServiceUnavailable
    // and api error codes
    // Note that if we specify a versioned Status object here, we may need to
    // create one for the tests, also
    // Success:
    // http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent
    //
    // test/integration/auth_test.go is currently the most comprehensive status code test

    reqScope := RequestScope{
        ContextFunc:    ctxFn,
        Serializer:     a.group.Serializer,
        ParameterCodec: a.group.ParameterCodec,
        Creater:        a.group.Creater,
        Convertor:      a.group.Convertor,
        Copier:         a.group.Copier,

        // TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this.
        Resource:    a.group.GroupVersion.WithResource(resource),
        Subresource: subresource,
        Kind:        fqKindToRegister,
    }
    for _, action := range actions {
        reqScope.Namer = action.Namer
        namespaced := ""
        if apiResource.Namespaced {
            namespaced = "Namespaced"
        }
        switch action.Verb {
        case "GET": // Get a resource.
            var handler restful.RouteFunction
            // 创建handler
            if isGetterWithOptions {
                handler = GetResourceWithOptions(getterWithOptions, reqScope)
            } else {
                handler = GetResource(getter, exporter, reqScope)
            }
            handler = metrics.InstrumentRouteFunc(action.Verb, resource, handler)
            doc := "read the specified " + kind
            if hasSubresource {
                doc = "read " + subresource + " of the specified " + kind
            }

            // 设置路由属性,包括Path,Handler,Doc,Param,MIME等
            route := ws.GET(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("read"+namespaced+kind+strings.Title(subresource)).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), a.group.Serializer.SupportedMediaTypes()...)...).
                Returns(http.StatusOK, "OK", versionedObject).
                Writes(versionedObject)

            ...

            // 设置路由的参数
            addParams(route, action.Params)
            // 将路由添加到WebService中
            ws.Route(route)
        case "LIST": // List all resources of a kind.

            ...

            ws.Route(route)
        case "PUT": // Update a resource.

            ...

            ws.Route(route)
        case "POST": // Create a resource.

            ...

            ws.Route(route)
        case "DELETE": // Delete a resource.

            ...

            ws.Route(route)
        // TODO: deprecated
        case "WATCHLIST": // Watch all resources of a kind.

            ...

            ws.Route(route)
        // We add "proxy" subresource to remove the need for the generic top level prefix proxy.
        // The generic top level prefix proxy is deprecated in v1.2, and will be removed in 1.3, or 1.4 at the latest.
        // TODO: DEPRECATED in v1.2.
        case "PROXY": // Proxy requests to a resource.
            // Accept all methods as per http://issue.k8s.io/3996
            addProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
            addProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
            addProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
            addProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
            addProxyRoute(ws, "HEAD", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
            addProxyRoute(ws, "OPTIONS", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params)
        case "CONNECT":
            for _, method := range connecter.ConnectMethods() {

                ...

                ws.Route(route)
            }
        default:
            return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
        }
        // Note: update GetAttribs() when adding a custom handler.
    }
    return &apiResource, nil
}

代码首先对能源的后端存款和储蓄(rest.Storage卡塔尔(英语:State of Qatar)举办验证,依据存款和储蓄帮助的议程开展actions的开始化,然后依照actions来进展Route的营造,以Get方法为例,参见代码注释。

至此,全体路由创设的相干组件Container,WebService, Route等早就初阶化达成。

小编们再回到Master.Run(卡塔尔方法,其实也正是GenericAPIServer.Run(卡塔尔(قطر‎方法:

func (s *GenericAPIServer) Run(options *options.ServerRunOptions) {

    ...

    if secureLocation != "" {
        handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.Handler), longRunningTimeout)
        secureServer := &http.Server{
            Addr:           secureLocation,
            Handler:        apiserver.MaxInFlightLimit(sem, longRunningRequestCheck, handler),
            MaxHeaderBytes: 1 << 20,
            TLSConfig: &tls.Config{
                // Can't use SSLv3 because of POODLE and BEAST
                // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
                // Can't use TLSv1.1 because of RC4 cipher usage
                MinVersion: tls.VersionTLS12,
            },
        }

        ...

        go func() {
            defer utilruntime.HandleCrash()
            for {
                // err == systemd.SdNotifyNoSocket when not running on a systemd system
                if err := systemd.SdNotify("READY=1n"); err != nil && err != systemd.SdNotifyNoSocket {
                    glog.Errorf("Unable to send systemd daemon successful start message: %vn", err)
                }
                if err := secureServer.ListenAndServeTLS(options.TLSCertFile, options.TLSPrivateKeyFile); err != nil {
                    glog.Errorf("Unable to listen for secure (%v); will try again.", err)
                }
                time.Sleep(15 * time.Second)
            }
        }()
    } else {
        // err == systemd.SdNotifyNoSocket when not running on a systemd system
        if err := systemd.SdNotify("READY=1n"); err != nil && err != systemd.SdNotifyNoSocket {
            glog.Errorf("Unable to send systemd daemon successful start message: %vn", err)
        }
    }

    handler := apiserver.TimeoutHandler(apiserver.RecoverPanics(s.InsecureHandler), longRunningTimeout)
    http := &http.Server{
        Addr:           insecureLocation,
        Handler:        handler,
        MaxHeaderBytes: 1 << 20,
    }

    glog.Infof("Serving insecurely on %s", insecureLocation)
    go func() {
        defer utilruntime.HandleCrash()
        for {
            if err := http.ListenAndServe(); err != nil {
                glog.Errorf("Unable to listen for insecure (%v); will try again.", err)
            }
            time.Sleep(15 * time.Second)
        }
    }()
    select {}
}

能够看来Run(卡塔尔中经过http.ListenAndServe(卡塔尔(قطر‎方法运行了四个端口来监听诉求: Localhost Port和Secure Port。

于今,整个apiserver的Restful服务已经构建实现并运维。

形式意气风发:使用暗中认可的路由来注册管理函数:

由此RouteBuilder布局Route新闻,Path结合了rootPath和subPath。Function是路由Handler,即处理函数,它经过 ws.Get(subPath卡塔尔.To(function卡塔尔的法子投入。Filters落成了个八九不离十gRPC拦截器的事物,也左近go-chassis的chain。

Get、Head、Post和PostForm函数发出HTTP/ HTTPS央求。

go-restful

 

dispatch

风姿罗曼蒂克、http包的3个主重要项目目:
Handler接口:全数诉求的Computer、路由ServeMux都满足该接口;

往Container内增添Web瑟维斯,内部维护的webServices无法有双重的RootPath,

type HandlerFunc func(ResponseWriter, *Request)
//实现Handler接口的ServeHTTP方法
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
    f(w, r) //调用自身
}

缘何要规划RESTful的API,个人知道原因在于:用HTTP的操作统生龙活虎数据操作接口,约束UPAJEROL为财富,即每便须要对应某种能源的某种操作,这种 无状态的安插能够完结client-server的解耦分离,有限支持系统两端都有横向扩大手艺。

 

type Route struct {
  Method  string
  Produces []string
  Consumes []string
  Path   string // webservice root path + described path
  Function RouteFunction
  Filters []FilterFunction
  If    []RouteSelectionConditionFunction
  // cached values for dispatching
  relativePath string
  pathParts  []string
  pathExpr   *pathExpression
  // documentation
  Doc           string
  Notes          string
  Operation        string
  ParameterDocs      []*Parameter
  ResponseErrors     map[int]ResponseError
  ReadSample, WriteSample interface{} 
  Metadata map[string]interface{}
  Deprecated bool
}
type HandlerFunc func(ResponseWriter, *Request)
//实现Handler接口的ServeHTTP方法
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
    f(w, r) //调用自身
}
func (c *Container)addHandler(service *WebService, serveMux *http.ServeMux)bool {
  pattern := fixedPrefixPath(service.RootPath())
  serveMux.HandleFunc(pattern, c.dispatch)
}

http.HandlerFunc函数类型:它知足Handler接口

go-chassis

  

REST(Representational State Transfer,表现层状态转化)是最近几年利用较普及的布满式结点间协同通讯的贯彻形式。REST原则描述网络中client-server的意气风发种相互影响方式,即用URAV4L定位财富,用HTTP方法描述操作的相互方式。借使CS之间互相的互联网接口满意REST风格,则称得上RESTful API。以下是 精晓RESTful结构 总计的REST原则:

 

  1. reflect在路由登记中的使用,反射与质量
  2. route select时提到到模糊相称 如何保障管理速度
  3. pathParams的解析
func (srv *Server) ListenAndServe() error {
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

实际上是比较简单,就不写了。今日好困。

 

  1. Route 代表一条路由,包罗 USportageL/HTTP method/输入输出类型/回调解和管理理函数RouteFunction
  2. WebService 代表一个劳务,由八个Route组成,他们分享同一个Root Path
  3. Container 表示二个服务器,由三个WebService和二个 http.ServerMux 组成,使用RouteSelector进行分发
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
    if r.RequestURI == "*" {
        if r.ProtoAtLeast(1, 1) {
            w.Header().Set("Connection", "close")
        }
        w.WriteHeader(StatusBadRequest)
        return
    }
    h, _ := mux.Handler(r) //规范化请求的路径格式,查找最匹配的Handler
    h.ServeHTTP(w, r)
}

增进到container并注册到mux的是dispatch这么些函数,它承受依照分化WebService的rootPath举行分发。

  

以上就是本文的全部内容,希望对我们的上学抱有助于,也期待大家多多照拂脚本之家。

接下去,大家就跟踪源码来留神的解析下任何奉行进度。

go-restful is a package for building REST-style Web Services using 谷歌 Go。go-restful定义了Container WebService和Route八个关键数据构造。

  • 因为TCPListener实现了Listener接口,所以tcpKeepAliveListener也兑现了Listener接口,並且它重写了Accept(卡塔尔国方法,目标是为了调用SetKeepAlive(true卡塔尔(قطر‎,让操作系统为接到的每叁个三番若干遍运转发送keepalive音信(心跳,为了维持连接不断开卡塔尔国。

    type tcpKeepAliveListener struct {

    *net.TCPListener
    

    } func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {

    tc, err := ln.AcceptTCP()
    if err != nil {
        return
    }
    tc.SetKeepAlive(true) //发送心跳
    tc.SetKeepAlivePeriod(3 * time.Minute) //发送周期
    return tc, nil
    

    }

container

5.而conn.server(卡塔尔(英语:State of Qatar)方法会读取诉求,然后依照conn内保存的server来布局二个serverHandler类型,并调用它的ServeHTTP(卡塔尔(英语:State of Qatar)方法:serverHandler{c.server}.ServeHTTP(w, w.req卡塔尔国,该办法的源码如下:

通过Route注册的路由最后构成Route结构体,加多到WebService的routes中。

ServeMux布局体:HTTP诉求的多路转接器(路由),它承当将每叁个收下到的呼吁的U路虎极光L与七个报了超级模特式的列表进行相称,并调用和U奥迪Q3L最相配的方式的微机。它里面用三个map来保存全体拍卖器Handler

func (c *Container)dispatch(httpWriter http.ResponseWriter, httpRequest *http.Request) {
  func() {
    c.webServicesLock.RLock()
    defer c.webServicesLock.RUnlock()
    webService, route, err = c.router.SelectRoute(
      c.webServices,
      httpRequest)
  }()

  pathProcessor, routerProcessesPath := c.router.(PathProcessor)
  pathParams := pathProcessor.ExtractParameters(route, webService, httpRequest.URL.Path)
  wrappedRequest, wrappedResponse := route.wrapRequestResponse(writer,
  httpRequest, pathParams)

  if len(c.containerFilters)+len(webService.filters)+len(route.Filters) > 0 {
    chain := FilterChain{Filters: allFilters, Target: func(req *Request, resp *Response) {
      // handle request by route after passing all filters
      route.Function(wrappedRequest, wrappedResponse)
    }}
    chain.ProcessFilter(wrappedRequest, wrappedResponse)
  } else {
    route.Function(wrappedRequest, wrappedResponse)
  }
}

7.而路由ServeMux的ServeHTTP方准绳会依照近来伏乞提供的新闻来寻找最相称的Handler(这里为):

  1. SelectRoute遵照Req在注册的WebService中筛选极度的WebService和相配的Route。在那之中路由精选器默许是 CurlyRouter 。
  2. 深入解析pathParams,将wrap的乞求和相应交给路由的管理函数管理。假使有filters定义,则链式管理。

 

webservice

 

go-chassis完毕的rest-server是在go-restful上的大器晚成层封装。Register时生机勃勃经将登记的schema深入深入分析成routes,并注册到webService中,Start运行server时 container.Add(r.ws卡塔尔 ,同不时候将container作为handler交给 http.Server , 末了开头ListenAndServe就可以。

 

type restfulServer struct {
  microServiceName string
  container    *restful.Container
  ws        *restful.WebService
  opts       server.Options
  mux       sync.RWMutex
  exit       chan chan error
  server      *http.Server
}