diff --git a/plugin/kubernetai/axfr_test.go b/plugin/kubernetai/axfr_test.go new file mode 100644 index 0000000..2daa864 --- /dev/null +++ b/plugin/kubernetai/axfr_test.go @@ -0,0 +1,145 @@ +package kubernetai + +import ( + "strings" + "testing" + + "github.com/coredns/coredns/plugin/transfer" + + "github.com/miekg/dns" +) + +func TestKubernetesTransferNonAuthZone(t *testing.T) { + type fields struct { + name string + kubernetes []*mockK8sPlugin + zone string + serial uint32 + expectedZone string + expectedError error + } + tests := []fields{ + { + name: "TestSingleKubernetesTransferNonAuthZone", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"cluster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + }, + zone: "example.com", + expectedError: transfer.ErrNotAuthoritative, + }, + { + name: "TestSingleKubernetesTransferAuthZone", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"cluster.local"}, + transfer: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + transferErr: nil, + }, + }, + zone: "cluster.local", + expectedZone: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + expectedError: nil, + }, + { + name: "TestMultipleNonAuthorititativeSingleAuthoritative", + kubernetes: []*mockK8sPlugin{ + { + zones: []string{"fluster.local"}, + transfer: ` +fluster.local. 5 IN SOA ns.dns.fluster.local. hostmaster.fluster.local. 3 7200 1800 86400 5 +fluster.local. 5 IN NS ns.dns.fluster.local. +ns.dns.fluster.local. 5 IN A 10.0.0.10 +fluster.local. 5 IN SOA ns.dns.fluster.local. hostmaster.fluster.local. 3 7200 1800 86400 5 +`, + transferErr: transfer.ErrNotAuthoritative, + }, + { + zones: []string{"bluster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + { + zones: []string{"cluster.local"}, + transfer: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + transferErr: nil, + }, + { + zones: []string{"muster.local"}, + transferErr: transfer.ErrNotAuthoritative, + }, + }, + zone: "cluster.local", + expectedZone: ` +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +cluster.local. 5 IN NS ns.dns.cluster.local. +ns.dns.cluster.local. 5 IN A 10.0.0.10 +cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 +`, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create kubernetai with mock kubernetes plugins + kai := Kubernetai{} + for _, plug := range tt.kubernetes { + kai.Kubernetes = append(kai.Kubernetes, plug) + } + + // create a axfr test message with test zone + dnsmsg := &dns.Msg{} + dnsmsg.SetAxfr(tt.zone) + + // perform AXFR + ch, err := kai.Transfer(tt.zone, tt.serial) + if err != nil { + if err != tt.expectedError { + t.Errorf("expected error %+v but received %+v", tt.expectedError, err) + } + return + } + validateAXFR(t, ch, tt.expectedZone) + }) + } +} + +func validateAXFR(t *testing.T, ch <-chan []dns.RR, expectedZone string) { + xfr := []dns.RR{} + for rrs := range ch { + xfr = append(xfr, rrs...) + } + if xfr[0].Header().Rrtype != dns.TypeSOA { + t.Error("Invalid transfer response, does not start with SOA record") + } + + zp := dns.NewZoneParser(strings.NewReader(expectedZone), "", "") + i := 0 + for rr, ok := zp.Next(); ok; rr, ok = zp.Next() { + if !dns.IsDuplicate(rr, xfr[i]) { + t.Fatalf("Record %d, expected\n%v\n, got\n%v", i, rr, xfr[i]) + } + i++ + } + + if err := zp.Err(); err != nil { + t.Fatal(err) + } +} diff --git a/plugin/kubernetai/kubernetai.go b/plugin/kubernetai/kubernetai.go index 61a34f4..144b101 100644 --- a/plugin/kubernetai/kubernetai.go +++ b/plugin/kubernetai/kubernetai.go @@ -1,3 +1,4 @@ +// Package kubernetai implements a plugin which can embed a number of kubernetes plugins in the same dns server. package kubernetai import ( @@ -5,29 +6,71 @@ import ( "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/kubernetes" + "github.com/coredns/coredns/plugin/kubernetes/object" clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/transfer" "github.com/coredns/coredns/request" "github.com/miekg/dns" ) var log = clog.NewWithPlugin("kubernetai") +// embeddedKubernetesPluginInterface describes the kubernetes plugin interface that kubernetai requires/uses. +type embeddedKubernetesPluginInterface interface { + plugin.Handler + transfer.Transferer + PodWithIP(ip string) (pod *object.Pod) + Zones() (zones plugin.Zones) +} + +// embeddedKubernetes wraps a real kubernetes plugin +type embeddedKubernetes struct { + *kubernetes.Kubernetes +} + +var _ embeddedKubernetesPluginInterface = &embeddedKubernetes{} + +func newEmbeddedKubernetes(k *kubernetes.Kubernetes) *embeddedKubernetes { + return &embeddedKubernetes{ + Kubernetes: k, + } +} + +// PodWithIP satisfies the embeddedKubernetesPluginInterface by adding this additional method not exported from the kubernetes plugin. +func (ek embeddedKubernetes) PodWithIP(ip string) *object.Pod { + if ek.Kubernetes == nil { + return nil + } + ps := ek.Kubernetes.APIConn.PodIndex(ip) + if len(ps) == 0 { + return nil + } + return ps[0] +} + +// Zones satisfies the embeddedKubernetesPluginInterface by providing access to the kubernetes plugin Zones. +func (ek embeddedKubernetes) Zones() plugin.Zones { + if ek.Kubernetes == nil { + return nil + } + return plugin.Zones(ek.Kubernetes.Zones) +} + // Kubernetai handles multiple Kubernetes type Kubernetai struct { Zones []string - Kubernetes []*kubernetes.Kubernetes + Kubernetes []embeddedKubernetesPluginInterface autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath. - p podHandlerItf } // New creates a Kubernetai containing one Kubernetes with zones func New(zones []string) (Kubernetai, *kubernetes.Kubernetes) { h := Kubernetai{ autoPathSearch: searchFromResolvConf(), - p: &podHandler{}, } k := kubernetes.New(zones) - h.Kubernetes = append(h.Kubernetes, k) + ek := newEmbeddedKubernetes(k) + h.Kubernetes = append(h.Kubernetes, ek) return h, k } @@ -43,7 +86,7 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { // Abort if zone is not in kubernetai stanza. var zMatch bool for _, k8s := range k8i.Kubernetes { - zone := plugin.Zones(k8s.Zones).Matches(state.Name()) + zone := k8s.Zones().Matches(state.Name()) if zone != "" { zMatch = true break @@ -55,13 +98,13 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { // Add autopath result for the handled zones for _, k := range k8i.Kubernetes { - pod := k8i.p.PodWithIP(*k, state.IP()) + pod := k.PodWithIP(state.IP()) if pod == nil { return nil } search := make([]string, 3) - for _, z := range k.Zones { + for _, z := range k.Zones() { if z == "." { search[0] = pod.Namespace + ".svc." search[1] = "svc." @@ -80,6 +123,20 @@ func (k8i Kubernetai) AutoPath(state request.Request) []string { return searchPath } +// Transfer supports the transfer plugin, implementing the Transferer interface, by calling Transfer on each of the embedded plugins. +// It will return a channel to the FIRST kubernetai stanza that reports that it is authoritative for the requested zone. +func (k8i Kubernetai) Transfer(zone string, serial uint32) (retCh <-chan []dns.RR, err error) { + for _, k := range k8i.Kubernetes { + retCh, err = k.Transfer(zone, serial) + if err == transfer.ErrNotAuthoritative { + continue + } + return + } + // none of the embedded plugins were authoritative + return nil, transfer.ErrNotAuthoritative +} + func searchFromResolvConf() []string { rc, err := dns.ClientConfigFromFile("/etc/resolv.conf") if err != nil { @@ -93,7 +150,7 @@ func searchFromResolvConf() []string { func (k8i Kubernetai) Health() bool { healthy := true for _, k := range k8i.Kubernetes { - healthy = healthy && k.APIConn.HasSynced() + healthy = healthy && k.(*embeddedKubernetes).APIConn.HasSynced() if !healthy { break } diff --git a/plugin/kubernetai/kubernetai_test.go b/plugin/kubernetai/kubernetai_test.go index f713336..72abd55 100644 --- a/plugin/kubernetai/kubernetai_test.go +++ b/plugin/kubernetai/kubernetai_test.go @@ -1,22 +1,33 @@ package kubernetai import ( + "context" "net" "reflect" + "strings" "testing" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/kubernetes" "github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/request" "github.com/miekg/dns" ) -type k8iPodHandlerTester struct{} +var ( + podip string +) + +// mockK8sPlugin satisfies the embeddedKubernetesPluginInterface interface and provides a mock kubernetes plugin that can be used to test kubernetai behaviour. +type mockK8sPlugin struct { + zones []string + transfer string + transferErr error +} -var podip string +var _ embeddedKubernetesPluginInterface = &mockK8sPlugin{} -func (k8i *k8iPodHandlerTester) PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod { +// PodWithIP always returns a pod with the given ip address in the namespace 'test-1'. +func (mkp *mockK8sPlugin) PodWithIP(ip string) *object.Pod { if ip == "" { return nil } @@ -27,7 +38,39 @@ func (k8i *k8iPodHandlerTester) PodWithIP(k kubernetes.Kubernetes, ip string) *o return pod } -var k8iPodHandlerTest k8iPodHandlerTester +// Name satisfies the plugin.Handler interface but is not used for tests. +func (mkp *mockK8sPlugin) Name() string { + return "" +} + +// ServeDNS satisfies the plugin.Handler interface but is not used for tests. +func (mkp *mockK8sPlugin) ServeDNS(_ context.Context, _ dns.ResponseWriter, _ *dns.Msg) (rcode int, err error) { + return 0, nil +} + +// Transfer satisfies the transfer.Transferer interface by playing back canned transfer responses. +// The canned transfer response is stored in a textual representation. +func (mkp *mockK8sPlugin) Transfer(_ string, _ uint32) (<-chan []dns.RR, error) { + if mkp.transferErr != nil { + return nil, mkp.transferErr + } + + ch := make(chan []dns.RR) + go func() { + zp := dns.NewZoneParser(strings.NewReader(mkp.transfer), "", "") + for rr, ok := zp.Next(); ok; rr, ok = zp.Next() { + ch <- []dns.RR{rr} + } + close(ch) + }() + + return ch, nil +} + +// Zones satisfies the embeddedKubernetesPluginInterface interface by returning pre-configured zones. +func (mkp *mockK8sPlugin) Zones() plugin.Zones { + return plugin.Zones(mkp.zones) +} type responseWriterTest struct { dns.ResponseWriter @@ -44,10 +87,8 @@ func (res *responseWriterTest) RemoteAddr() net.Addr { func TestKubernetai_AutoPath(t *testing.T) { type fields struct { Zones []string - Next plugin.Handler - Kubernetes []*kubernetes.Kubernetes + Kubernetes []embeddedKubernetesPluginInterface autoPathSearch []string - p *k8iPodHandlerTester } type args struct { state request.Request @@ -55,22 +96,21 @@ func TestKubernetai_AutoPath(t *testing.T) { w := &responseWriterTest{} - k8sClusterLocal := &kubernetes.Kubernetes{ - Zones: []string{ + k8sClusterLocal := &mockK8sPlugin{ + zones: []string{ "cluster.local.", }, } - k8sFlusterLocal := &kubernetes.Kubernetes{ - Zones: []string{ + k8sFlusterLocal := &mockK8sPlugin{ + zones: []string{ "fluster.local.", }, } defaultK8iConfig := fields{ - Kubernetes: []*kubernetes.Kubernetes{ + Kubernetes: []embeddedKubernetesPluginInterface{ k8sFlusterLocal, k8sClusterLocal, }, - p: &k8iPodHandlerTest, } tests := []struct { @@ -168,7 +208,6 @@ func TestKubernetai_AutoPath(t *testing.T) { Zones: tt.fields.Zones, Kubernetes: tt.fields.Kubernetes, autoPathSearch: tt.fields.autoPathSearch, - p: tt.fields.p, } podip = tt.ip if got := k8i.AutoPath(tt.args.state); !reflect.DeepEqual(got, tt.want) { diff --git a/plugin/kubernetai/podhandler.go b/plugin/kubernetai/podhandler.go deleted file mode 100644 index dcc060f..0000000 --- a/plugin/kubernetai/podhandler.go +++ /dev/null @@ -1,21 +0,0 @@ -package kubernetai - -import ( - "github.com/coredns/coredns/plugin/kubernetes" - "github.com/coredns/coredns/plugin/kubernetes/object" -) - -type podHandlerItf interface { - PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod -} - -type podHandler struct{} - -// podWithIP return the api.Pod for source IP ip. It returns nil if nothing can be found. -func (p *podHandler) PodWithIP(k kubernetes.Kubernetes, ip string) *object.Pod { - ps := k.APIConn.PodIndex(ip) - if len(ps) == 0 { - return nil - } - return ps[0] -} diff --git a/plugin/kubernetai/setup.go b/plugin/kubernetai/setup.go index c287754..5c82c9f 100644 --- a/plugin/kubernetai/setup.go +++ b/plugin/kubernetai/setup.go @@ -25,7 +25,7 @@ func setup(c *caddy.Controller) error { prev := &kubernetes.Kubernetes{} for _, k := range k8i.Kubernetes { - onStart, onShut, err := k.InitKubeCache(context.Background()) + onStart, onShut, err := k.(*embeddedKubernetes).InitKubeCache(context.Background()) if err != nil { return plugin.Error(Name(), err) } @@ -36,13 +36,13 @@ func setup(c *caddy.Controller) error { c.OnStartup(onStart) } // set Next of the previous kubernetes instance to the current instance - prev.Next = k - prev = k + prev.Next = k.(*embeddedKubernetes).Kubernetes + prev = k.(*embeddedKubernetes).Kubernetes } dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { // set Next of the last kubernetes instance to the next plugin - k8i.Kubernetes[len(k8i.Kubernetes)-1].Next = next + k8i.Kubernetes[len(k8i.Kubernetes)-1].(*embeddedKubernetes).Next = next return k8i }) @@ -53,7 +53,6 @@ func setup(c *caddy.Controller) error { func Parse(c *caddy.Controller) (*Kubernetai, error) { var k8i = &Kubernetai{ autoPathSearch: searchFromResolvConf(), - p: &podHandler{}, } for c.Next() { @@ -61,7 +60,7 @@ func Parse(c *caddy.Controller) (*Kubernetai, error) { if err != nil { return nil, err } - k8i.Kubernetes = append(k8i.Kubernetes, k8s) + k8i.Kubernetes = append(k8i.Kubernetes, newEmbeddedKubernetes(k8s)) } if len(k8i.Kubernetes) == 0 { diff --git a/plugin/kubernetai/setup_test.go b/plugin/kubernetai/setup_test.go index e0b3bdf..a0fa861 100644 --- a/plugin/kubernetai/setup_test.go +++ b/plugin/kubernetai/setup_test.go @@ -123,14 +123,14 @@ func TestSetup(t *testing.T) { } prev := &kubernetes.Kubernetes{ - Next: k8i.Kubernetes[0], + Next: k8i.Kubernetes[0].(*embeddedKubernetes).Kubernetes, } for j, k := range k8i.Kubernetes { - if prev.Next != k { - t.Fatalf("Test %d: Expected kubernetes instance %d to be referencing kubernetes instance %d as next, got %v", i, j-1, j, prev.Next) + if prev.Next != k.(*embeddedKubernetes).Kubernetes { + t.Fatalf("Test %d: Expected kubernetes instance %d to be referencing kubernetes instance %d as next, got %+v", i, j-1, j, prev.Next) } - prev = k + prev = k.(*embeddedKubernetes).Kubernetes } if prev.Next != nextHandler {