最近一直在做类似skydns的dns服务发现相关的研究和代码实现,也就有了兴趣想解读下kube-dns的实现过程,学习和参考下社区的做法。

源码结构

如何从头开始阅读一份开源项目的源码呢?一般来说,我们会从执行入口顺腾摸瓜地依次八下去,这里强烈推荐难易大佬的k8s源码阅读笔记,条理非常清晰,解读的方式也很值得学习,我们这里不妨也参考难易的方法进行kube-dns代码的阅读。

我们首先需要从github仓库clone下来kube-dns的源代码。不难看到其代码入口cmd目录下除开测试组件,主要是dnsmasq、dns、sidecar这三块组件的代码:

kube-dns-dir

OK,由此我们产生了第一个也是最核心的问题:

Q1: kube-dns、dnsmasq-nanny、sidecar分别实现了什么功能?

我们不妨先从kube-dns这块开始,接着往下读。

kube-dns

我们可以看到kube-dns的入口代码dns.go,这里面的main函数具体内容如下:

...

func main() {
	config := options.NewKubeDNSConfig()
	config.AddFlags(pflag.CommandLine)

	flag.InitFlags()
    ...

	server := app.NewKubeDNSServerDefault(config)
	server.Run()
}

很显然,初始化kube-dns的配置然后根据命令行参数重载,并随后通过NewKubeDNSServerDefault方法拿到kube-dns server的实例,并运行它的Run方法。在这部分的阅读过程中,我们也产生了如下的疑问:

Q2: NewKubeDNSConfig方法里有哪些具体可配置的参数?

Q3: NewKubeDNSServerDefault方法做了哪些事情?

戳进去NewKubeDNSConfig方法的具体代码位置

func NewKubeDNSConfig() *KubeDNSConfig {
	return &KubeDNSConfig{
    	// NOTE(Colstuwjx): 集群域名的后缀,所有这类域名解析均由kube-dns管理和解析
		ClusterDomain:      "cluster.local.",
		HealthzPort:        8081,
		DNSBindAddress:     "0.0.0.0",
		DNSPort:            53,
        
        // ?
		InitialSyncTimeout: 60 * time.Second,

		Federations: make(map[string]string),

		ConfigMapNs: api.NamespaceSystem,
		ConfigMap:   "", // default to using command line flags

		ConfigPeriod: 10 * time.Second,
		ConfigDir:    "",

		NameServers: "",
	}
}

鄙人不禁又产生了新的疑问:

Q4: 为什么还有ConfigMap相关的配置以及同步参数?

Q5: NameServers的配置会影响什么?

OK,没关系,先放着这些问题,接着来看下NewKubeDNSServerDefault方法都做了啥:

func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
    ...

	var configSync dnsconfig.Sync
	switch {
	case config.ConfigMap != "" && config.ConfigDir != "":
    ...
    }
    
	return &KubeDNSServer{
		domain:         config.ClusterDomain,
		healthzPort:    config.HealthzPort,
		dnsBindAddress: config.DNSBindAddress,
		dnsPort:        config.DNSPort,
		nameServers:    config.NameServers,
		kd:             dns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync),
	}
}

啊哈,前面的问题似乎有些眉目了。

为什么还有ConfigMap相关的配置以及同步参数?

A4: 原来如此,前面ConfigMap的配置是用来做kube-dns本身配置的动态同步

那这具体执行的NewKubeDNS又是如何实现的呢?看看代码

	kd := &KubeDNS{
		kubeClient:          client,
		domain:              clusterDomain,
		cache:               treecache.NewTreeCache(),
		cacheLock:           sync.RWMutex{},
		nodesStore:          kcache.NewStore(kcache.MetaNamespaceKeyFunc),
		reverseRecordMap:    make(map[string]*skymsg.Service),
		clusterIPServiceMap: make(map[string]*v1.Service),
		domainPath:          util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")),
		initialSyncTimeout:  timeout,

		configLock: sync.RWMutex{},
		configSync: configSync,
	}

	kd.setEndpointsStore()
	kd.setServicesStore()

咦,这里又不禁冒出来几个问题:

Q6: 这里的cache是做什么的?

Q7: 这里的setEndpointsStoresetServicesStore做了啥?

我们继续看下各自的实现: TreeCachesetEndpointsStoresetServicesStore,不难得出Q6和Q7的答案:

这里的cache是做什么的?

A6: 这里的cache通过一个公共的TreeCache结构维护,应该是充当本地cache的角色

这里的setEndpointsStoresetServicesStore做了啥?

A7: 这里的两个setXXXStore设置了对所有namespace的servicesendpoints的list watch句柄,订阅相应的变化,并实现了对应的CURD callback:handleEndpointAddhandleEndpointUpdatehandleEndpointDelete,Q6的本地cache也即是为了这些数据的本地缓存服务(包括上锁等)

OK,回过头来,读完了NewKubeDNSServerDefault大致的调用过程,我们也就知道了:

NewKubeDNSConfig方法里有哪些具体可配置的参数?

A2: 包括了集群域名、健康检查配置、DNS监听地址及端口、ConfigMap动态同步的参数、配置文件目录及upstream nameserver配置等

NewKubeDNSServerDefault方法做了哪些事情?

A3: NewKubeDNSServerDefault初始化了dns配置的同步方式,通过执行NewKubeDNS实例化KubeDNS对象,包括本地cache的初始化等,最后返回一个KubeDNSServer的实例对象

那么,Run方法具体又执行了哪些逻辑呢?

func (server *KubeDNSServer) Run() {
	...
	setupSignalHandlers()
    
	server.startSkyDNSServer()
	server.kd.Start()
	server.setupHandlers()

	glog.V(0).Infof("Status HTTP port %v", server.healthzPort)
	if server.nameServers != "" {
		glog.V(0).Infof("Upstream nameservers: %s", server.nameServers)
	}
	glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil))
}

从上面代码不难看出Run方法实现了如下逻辑:

  • setupSignalHandlers注册了收到信号量时的处理句柄;
  • startSkyDNSServer则应该是通过现有配置启动skydns服务,实现DNS解析的功能;
  • kd.Start则是执行了kube-dns自身控制器的启动,继续跟踪到kube-dns pkg里的实现部分,不难理解,这里即是启动了对servicesendpoints变更监听及处理的controller,它们将会把变更的部分同步到本地cache,此外,还将执行configMap的同步routine,实现上面提到的kube-dns配置的动态同步和加载;
  • setupHandlers即启动了/readiness/cache的endpoint,并监听在healthcheck配置的端口;

我们不妨再深入看看startSkyDNSServer具体做了哪些事情:

func (d *KubeDNSServer) startSkyDNSServer() {
	...
	skydnsConfig := &server.Config{
		Domain:  d.domain,
		DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort),
	}
	if d.nameServers != "" {
		for _, nameServer := range strings.Split(d.nameServers, ",") {
			server, err := validateNameServer(nameServer)
			if err != nil {
				glog.Fatalf("nameserver '%s' is invalid: %v\n", nameServer, err)
			}
			skydnsConfig.Nameservers = append(skydnsConfig.Nameservers, server)
		}
	}
	server.SetDefaults(skydnsConfig)
	s := server.New(d.kd, skydnsConfig)
	if err := metrics.Metrics(); err != nil {
		glog.Fatalf("Skydns metrics error: %s", err)
	} else if metrics.Port != "" {
		glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port)
	} else {
		glog.V(0).Infof("Skydns metrics not enabled")
	}

	go s.Run()
}

还是蛮清晰的,这里根据kube-dns的配置生成skydnsConfig,并随后根据之前配置的nameserver配置skydns server启动时的upstream nameserver,这也就解答了Q5提出的问题。

A5:nameservers配置了的话,在kube-dns启动时会设置upstream的nameserver列表,这里upstream nameserver的含义即当kube-dns(背后做事的即SkyDNS)发现非clusterDomain的dns解析时会递归到upstream nameserver

此外,server.New(d.kd, skydnsConfig)实例化skydns server的时候传入了d.kd作为其backend,我们深挖下去不难发现,skydns里这块是定义了一个统一的backend interface,抽象了需要实现的方法RecordsReverseRecord,回过头来看kubedns这块的代码,确实是实现了这两个接口:

func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
	...

	path := util.ReverseArray(segments)
	records, err := kd.getRecordsForPath(path, exact)

	if err != nil {
		return nil, err
	}

    ...
}

...

而且这里面调用的方法getRecordsForPath,其方法体里执行的kd.cache.GetEntry这行正是消费之前定义的本地cache,也即是读取实时维护的本地cache来得到dns记录解析的数据依据:

func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
	...

	...
		kd.cacheLock.RLock()
		defer kd.cacheLock.RUnlock()
		if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
			glog.V(3).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
			return []skymsg.Service{*(record.(*skymsg.Service))}, nil
		}

		...
	}

	...
}

回到上面执行的skydns启动部分,还有一块是实例化metrics对象,进而调用的是pkg/dnsmasq/metric.go里面的代码,如下这部分即获取metric的核心部分:

func (mc *metricsClient) getSingleMetric(name string) (int64, error) {
	msg := new(dns.Msg)
	msg.Id = dns.Id()
	msg.RecursionDesired = false
	msg.Question = make([]dns.Question, 1)
	msg.Question[0] = dns.Question{
		Name:   name,
		Qtype:  dns.TypeTXT,
		Qclass: dns.ClassCHAOS,
	}

	in, _, err := mc.dnsClient.Exchange(msg, mc.addrPort)
	if err != nil {
		return 0, err
	}

	if len(in.Answer) != 1 {
		return 0, fmt.Errorf("invalid number of Answer records for %s: %d",
			name, len(in.Answer))
	}

	if t, ok := in.Answer[0].(*dns.TXT); ok {
		glog.V(4).Infof("Got valid TXT response %+v for %s", t, name)
		if len(t.Txt) != 1 {
			return 0, fmt.Errorf("invalid number of TXT records for %s: %d",
				name, len(t.Txt))
		}

		value, err := strconv.ParseInt(t.Txt[0], 10, 64)
		if err != nil {
			return 0, err
		}

		return value, nil
	}

	return 0, fmt.Errorf("missing TXT record for %s", name)
}

看上去是通过dnsClient请求dnsmasq server的形式采集到一些dns请求的metric,这部分我们留到dnsmasq和sidecar组件的解读部分再来细说。

至此,我们初略读完了kube-dns部分的启动过程,并基本解答了除开Q1以外的所有疑问,总结一下的话kube-dns基本上是这样的启动流程:

  • 通过执行options.NewKubeDNSConfig方法和命令行传参初始化Kube-DNS配置;
  • 然后通过执行app.NewKubeDNSServerDefault(config)初始化配置文件的配置方式(配置文件形式或者从ConfigMap动态配置),并通过kd.setEndpointsStore()kd.setServicesStore()方法初始化endpointsservices的本地cache(本地cache是通过构造TreeCache这样的结构体来实现维护的);
  • 最终,通过执行server.Run()方法,初始化信号量的处理句柄,启动skydns服务器并以kube-dns的实现作为backend(数据消费自之前构建和持续维护的本地cache),里面还启动了一个metric endpoint,暴露出dnsmasq上DNS请求相关的监控指标。随后,启动endpoints和services的变更监听及callback处理的controller从而实时维护本地cache数据的准确性,并通过执行kd.startConfigMapSync方法,在配置了ConfigMap的情况下实现kube-dns配置的动态加载

dnsmasq、sidecar

kube-dns整体的逻辑理顺了,剩下的dnsmasq和sidecar部分实现了怎样的逻辑自然也很好解读。

nanny,顾名思义,即是保姆的意思,因此cmd目录下的dnsmasq部分很显然即是包装了dnsmasq运行时的组件,管理和维护dnsmasq作为kube-dns前置的dns缓存。

我们不妨跟上文解读kube-dns类似,来看看代码入口做了哪些事情:

func main() {
	parseFlags()
	glog.V(0).Infof("opts: %v", opts)

	sync := config.NewFileSync(opts.configDir, opts.syncInterval)

	dnsmasq.RunNanny(sync, opts.RunNannyOpts)
}

代码挺简单的,很显然,config.NewFileSync即是通过读取ConfigMap的数据,动态同步并生成dnsmasq对应的配置文件,并根据配置参数确认是否需要重启生效。而如下所示的dnsmasq.RunNanny方法:

// RunNanny runs the nanny and handles configuration updates.
func RunNanny(sync config.Sync, opts RunNannyOpts) {
	defer glog.Flush()

	currentConfig, err := sync.Once()
	if err != nil {
		glog.Errorf("Error getting initial config, using default: %v", err)
		currentConfig = config.NewDefaultConfig()
	}

	nanny := &Nanny{Exec: opts.DnsmasqExec}
	nanny.Configure(opts.DnsmasqArgs, currentConfig)
	if err := nanny.Start(); err != nil {
		glog.Fatalf("Could not start dnsmasq with initial configuration: %v", err)
	}

	configChan := sync.Periodic()

	for {
		select {
		case status := <-nanny.ExitChannel:
			glog.Flush()
			glog.Fatalf("dnsmasq exited: %v", status)
			break
		case currentConfig = <-configChan:
			if opts.RestartOnChange {
				glog.V(0).Infof("Restarting dnsmasq with new configuration")
				nanny.Kill()
				nanny = &Nanny{Exec: opts.DnsmasqExec}
				nanny.Configure(opts.DnsmasqArgs, currentConfig)
				nanny.Start()
			} else {
				glog.V(2).Infof("Not restarting dnsmasq (--restartDnsmasq=false)")
			}
			break
		}
	}
}

即是同步dnsmasq的配置并通过nanny.Start()启动dnsmasq服务。nanny.Start()方法内容则比较清晰,便是通过exec启动dnsmasq,这里需要指出的是,dnsmasq启动时上游nameserver的配置,正是根据ConfigMap里是否配置了config.StubDomains以及config.UpstreamNameservers来生成相应的配置。

K8S官方文档里讲述的指定一组特定的StubDomains由xxx nameserver解析以及配置上游nameserver的功能均由dnsmasq这块的配置实际落地生效!

至此,dnsmasq便成功启动了,它作为kube-dns的前置部分,担任dns缓存和其他非cluster domain的解析请求的转发工作。

那么,我们再来看看sidecar做了什么事情。

func main() {
	options := sidecar.NewOptions()
    configureFlags(options, pflag.CommandLine)

	...

	server := sidecar.NewServer()
	server.Run(options)
}

同样是读取配置参数,并启动了一个sidecar的server。而查看server.Run(options)的具体实现:

// Run the server (does not return)
func (s *server) Run(options *Options) {
	s.options = options
	glog.Infof("Starting server (options %+v)", *s.options)

	for _, probeOption := range options.Probes {
		probe := &dnsProbe{DNSProbeOption: probeOption}
		s.probes = append(s.probes, probe)
		probe.Start(options)
	}

	s.runMetrics(options)
}

这里根据配置的options指定需要对哪些endpoint执行探测,options的配置也很简单,通过传入的命令行参数指定,执行configureFlags实例化得到:

...
	flagSet.Var(
		(*probeOptions)(&opt.Probes), "probe",
		"probe the given DNS server with the DNS name and export probe"+
			" metrics and healthcheck URI. Specified as"+
			" <label>,<server>,<dns name>[,<interval_seconds>][,<type>]."+
			" Healthcheck url will be exported under /healthcheck/<label>."+
			" interval_seconds is optional."+
			" This option may be specified multiple times to check multiple servers."+
			" <type> is one of ANY, A, AAAA, SRV."+
			" Example: 'mydns,127.0.0.1:53,example.com,10,A'.")

最后,即通过runMetrics方法里的InitializeMetrics方法,实例化并启动一个prometheus的metrics端点,提供相应dns记录解析的监控指标。

至此,我们也终于能回答第一个问题了:

kube-dns、dnsmasq-nanny、sidecar分别实现了什么功能?

A1:

(1)kube-dns包装了skydns server的启动,并通过监听services和endpoints的变更事件且同步到本地cache的形式,实现了一个实时级别的k8s集群内service和pod等端点的DNS服务发现;

(2)dnsmasq-nanny则是通过包装dnsmasq,实现了kube-dns前置的dns缓存,并通过监听ConfigMap动态生成配置,将配置了的StubDomains转发给指定的nameservers,将其他域名的解析递归到配置好的Upstream nameservers;

(3)sidecar部分代码则是实现了可配置的DNS探测,并采集对应的监控指标暴露出来供prometheus消费

上述的解析过程和kubedns文档里描述的也正是不谋而合:

kube-dns

完。