diff --git a/Makefile b/Makefile index ebb626e35..c79ab19db 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ ARCH ?= amd64 ALL_ARCH = amd64 arm arm64 ppc64le s390x +GOPATH ?= $(GOPATH) + REGISTRY ?= gcr.io/$(shell gcloud config get-value project) STAGING_REGISTRY := gcr.io/k8s-staging-kas-network-proxy @@ -54,13 +56,13 @@ bin: .PHONY: build build: bin/proxy-agent bin/proxy-server bin/proxy-test-client -bin/proxy-agent: bin cmd/agent/main.go proto/agent/agent.pb.go +bin/proxy-agent: proto/agent/agent.pb.go proto/client/client.pb.go bin cmd/agent/main.go GO111MODULE=on go build -o bin/proxy-agent cmd/agent/main.go -bin/proxy-test-client: bin cmd/client/main.go proto/proxy.pb.go +bin/proxy-test-client: proto/client/client.pb.go bin cmd/client/main.go GO111MODULE=on go build -o bin/proxy-test-client cmd/client/main.go -bin/proxy-server: bin cmd/proxy/main.go proto/agent/agent.pb.go proto/proxy.pb.go +bin/proxy-server: proto/agent/agent.pb.go proto/client/client.pb.go bin cmd/proxy/main.go GO111MODULE=on go build -o bin/proxy-server cmd/proxy/main.go ## -------------------------------------- @@ -73,18 +75,18 @@ bin/proxy-server: bin cmd/proxy/main.go proto/agent/agent.pb.go proto/proxy.pb.g ## -------------------------------------- .PHONY: gen -gen: proto/agent/agent.pb.go proto/proxy.pb.go mock_gen +gen: proto/agent/agent.pb.go proto/client/client.pb.go mock_gen + +proto/client/client.pb.go: konnectivity-client/proto/client/client.proto + protoc -I . konnectivity-client/proto/client/client.proto --go_out=plugins=grpc:${GOPATH}/src + cat hack/go-license-header.txt konnectivity-client/proto/client/client.pb.go > konnectivity-client/proto/client/client.licensed.go + mv konnectivity-client/proto/client/client.licensed.go konnectivity-client/proto/client/client.pb.go proto/agent/agent.pb.go: proto/agent/agent.proto - protoc -I proto proto/agent/agent.proto --go_out=plugins=grpc:proto + protoc -I . proto/agent/agent.proto --go_out=plugins=grpc:${GOPATH}/src cat hack/go-license-header.txt proto/agent/agent.pb.go > proto/agent/agent.licensed.go mv proto/agent/agent.licensed.go proto/agent/agent.pb.go -proto/proxy.pb.go: proto/proxy.proto - protoc -I proto proto/proxy.proto --go_out=plugins=grpc:proto - cat hack/go-license-header.txt proto/proxy.pb.go > proto/proxy.licensed.go - mv proto/proxy.licensed.go proto/proxy.pb.go - ## -------------------------------------- ## Certs ## -------------------------------------- @@ -155,7 +157,7 @@ docker-push/proxy-agent: docker-build/proxy-agent ${DOCKER_CMD} push ${AGENT_FULL_IMAGE}-$(ARCH):${TAG} .PHONY: docker-build/proxy-server -docker-build/proxy-server: cmd/proxy/main.go proto/agent/agent.pb.go proto/proxy.pb.go +docker-build/proxy-server: cmd/proxy/main.go proto/agent/agent.pb.go @[ "${TAG}" ] || ( echo "TAG is not set"; exit 1 ) echo "Building proxy-server for ${ARCH}" ${DOCKER_CMD} build . --build-arg ARCH=$(ARCH) -f artifacts/images/server-build.Dockerfile -t ${SERVER_FULL_IMAGE}-$(ARCH):${TAG} @@ -166,7 +168,7 @@ docker-push/proxy-server: docker-build/proxy-server ${DOCKER_CMD} push ${SERVER_FULL_IMAGE}-$(ARCH):${TAG} .PHONY: docker-build/proxy-test-client -docker-build/proxy-test-client: cmd/client/main.go proto/agent/agent.pb.go proto/proxy.pb.go +docker-build/proxy-test-client: cmd/client/main.go proto/agent/agent.pb.go @[ "${TAG}" ] || ( echo "TAG is not set"; exit 1 ) echo "Building proxy-test-client for ${ARCH}" ${DOCKER_CMD} build . --build-arg ARCH=$(ARCH) -f artifacts/images/client-build.Dockerfile -t ${TEST_CLIENT_FULL_IMAGE}-$(ARCH):${TAG} @@ -249,4 +251,4 @@ release-alias-tag: # Adds the tag to the last build tag. BASE_REF comes from the .PHONY: clean clean: - rm -rf proto/agent/agent.pb.go proto/proxy.pb.go easy-rsa.tar.gz easy-rsa-master cfssl cfssljson certs bin proto/agent/mocks + rm -rf proto/agent/agent.pb.go konnectivity-client/proto/client/client.pb.go easy-rsa.tar.gz easy-rsa-master cfssl cfssljson certs bin proto/agent/mocks diff --git a/cmd/client/main.go b/cmd/client/main.go index 10949e064..c4c5826e0 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -34,7 +34,7 @@ import ( "google.golang.org/grpc/credentials" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent/client" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" "sigs.k8s.io/apiserver-network-proxy/pkg/util" ) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index f97bbacaf..101dbb28c 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/apiserver-network-proxy/pkg/agent/agentserver" "sigs.k8s.io/apiserver-network-proxy/pkg/util" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) @@ -368,7 +369,7 @@ func (p *Proxy) runUDSMasterServer(ctx context.Context, o *ProxyRunOptions, serv var stop StopFunc if o.mode == "grpc" { grpcServer := grpc.NewServer() - agent.RegisterProxyServiceServer(grpcServer, server) + client.RegisterProxyServiceServer(grpcServer, server) var lc net.ListenConfig lis, err := lc.Listen(ctx, "unix", o.udsName) if err != nil { @@ -445,7 +446,7 @@ func (p *Proxy) runMTLSMasterServer(ctx context.Context, o *ProxyRunOptions, ser if o.mode == "grpc" { serverOption := grpc.Creds(credentials.NewTLS(tlsConfig)) grpcServer := grpc.NewServer(serverOption) - agent.RegisterProxyServiceServer(grpcServer, server) + client.RegisterProxyServiceServer(grpcServer, server) lis, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen on %s: %v", addr, err) diff --git a/go.mod b/go.mod index 9667749b8..999dc9837 100644 --- a/go.mod +++ b/go.mod @@ -19,4 +19,7 @@ require ( k8s.io/apimachinery v0.17.1 k8s.io/client-go v0.17.1 k8s.io/klog v1.0.0 + sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.0 ) + +replace sigs.k8s.io/apiserver-network-proxy/konnectivity-client => ./konnectivity-client diff --git a/konnectivity-client/go.mod b/konnectivity-client/go.mod new file mode 100644 index 000000000..f82d0bdf8 --- /dev/null +++ b/konnectivity-client/go.mod @@ -0,0 +1,9 @@ +module sigs.k8s.io/apiserver-network-proxy/konnectivity-client + +go 1.13 + +require ( + github.com/golang/protobuf v1.3.2 + google.golang.org/grpc v1.26.0 + k8s.io/klog v1.0.0 +) diff --git a/konnectivity-client/go.sum b/konnectivity-client/go.sum new file mode 100644 index 000000000..19db02c8a --- /dev/null +++ b/konnectivity-client/go.sum @@ -0,0 +1,51 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= +k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= diff --git a/pkg/agent/client/client.go b/konnectivity-client/pkg/client/client.go similarity index 89% rename from pkg/agent/client/client.go rename to konnectivity-client/pkg/client/client.go index d7eebde97..09f00f965 100644 --- a/pkg/agent/client/client.go +++ b/konnectivity-client/pkg/client/client.go @@ -27,7 +27,7 @@ import ( "google.golang.org/grpc" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // Tunnel provides ability to dial a connection through a tunnel. @@ -44,7 +44,7 @@ type dialResult struct { // grpcTunnel implements Tunnel type grpcTunnel struct { - stream agent.ProxyService_ProxyClient + stream client.ProxyService_ProxyClient pendingDial map[int64]chan<- dialResult conns map[int64]*conn pendingDialLock sync.RWMutex @@ -59,9 +59,9 @@ func CreateGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel, error) { return nil, err } - client := agent.NewProxyServiceClient(c) + grpcClient := client.NewProxyServiceClient(c) - stream, err := client.Proxy(context.Background()) + stream, err := grpcClient.Proxy(context.Background()) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func (t *grpcTunnel) serve() { klog.Infof("[tracing] recv packet %+v", pkt) switch pkt.Type { - case agent.PacketType_DIAL_RSP: + case client.PacketType_DIAL_RSP: resp := pkt.GetDialResponse() t.pendingDialLock.RLock() ch, ok := t.pendingDial[resp.Random] @@ -105,7 +105,7 @@ func (t *grpcTunnel) serve() { connid: resp.ConnectID, } } - case agent.PacketType_DATA: + case client.PacketType_DATA: resp := pkt.GetData() // TODO: flow control t.connsLock.RLock() @@ -117,7 +117,7 @@ func (t *grpcTunnel) serve() { } else { klog.Warningf("connection id %d not recognized", resp.ConnectID) } - case agent.PacketType_CLOSE_RSP: + case client.PacketType_CLOSE_RSP: resp := pkt.GetCloseResponse() t.connsLock.RLock() conn, ok := t.conns[resp.ConnectID] @@ -155,10 +155,10 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) { t.pendingDialLock.Unlock() }() - req := &agent.Packet{ - Type: agent.PacketType_DIAL_REQ, - Payload: &agent.Packet_DialRequest{ - DialRequest: &agent.DialRequest{ + req := &client.Packet{ + Type: client.PacketType_DIAL_REQ, + Payload: &client.Packet_DialRequest{ + DialRequest: &client.DialRequest{ Protocol: protocol, Address: address, Random: random, diff --git a/pkg/agent/client/client_test.go b/konnectivity-client/pkg/client/client_test.go similarity index 70% rename from pkg/agent/client/client_test.go rename to konnectivity-client/pkg/client/client_test.go index 77ebc6045..75cf086b4 100644 --- a/pkg/agent/client/client_test.go +++ b/konnectivity-client/pkg/client/client_test.go @@ -24,7 +24,7 @@ import ( "google.golang.org/grpc" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) func TestDial(t *testing.T) { @@ -48,8 +48,8 @@ func TestDial(t *testing.T) { t.Fatalf("expect nil; got %v", err) } - if ts.packets[0].Type != agent.PacketType_DIAL_REQ { - t.Fatalf("expect packet.type %v; got %v", agent.PacketType_CLOSE_REQ, ts.packets[0].Type) + if ts.packets[0].Type != client.PacketType_DIAL_REQ { + t.Fatalf("expect packet.type %v; got %v", client.PacketType_CLOSE_REQ, ts.packets[0].Type) } if ts.packets[0].GetDialRequest().Address != "127.0.0.1:80" { @@ -139,8 +139,8 @@ func TestClose(t *testing.T) { t.Error(err) } - if ts.packets[1].Type != agent.PacketType_CLOSE_REQ { - t.Fatalf("expect packet.type %v; got %v", agent.PacketType_CLOSE_REQ, ts.packets[1].Type) + if ts.packets[1].Type != client.PacketType_CLOSE_REQ { + t.Fatalf("expect packet.type %v; got %v", client.PacketType_CLOSE_REQ, ts.packets[1].Type) } if ts.packets[1].GetCloseRequest().ConnectID != 100 { t.Errorf("expect connectID=100; got %d", ts.packets[1].GetCloseRequest().ConnectID) @@ -152,21 +152,21 @@ func TestClose(t *testing.T) { // fakeStream implements ProxyService_ProxyClient type fakeStream struct { grpc.ClientStream - r <-chan *agent.Packet - w chan<- *agent.Packet + r <-chan *client.Packet + w chan<- *client.Packet } -var _ agent.ProxyService_ProxyClient = &fakeStream{} +var _ client.ProxyService_ProxyClient = &fakeStream{} func pipe() (*fakeStream, *fakeStream) { - r, w := make(chan *agent.Packet, 2), make(chan *agent.Packet, 2) + r, w := make(chan *client.Packet, 2), make(chan *client.Packet, 2) s1, s2 := &fakeStream{}, &fakeStream{} s1.r, s1.w = r, w s2.r, s2.w = w, r return s1, s2 } -func (s *fakeStream) Send(packet *agent.Packet) error { +func (s *fakeStream) Send(packet *client.Packet) error { klog.Infof("[DEBUG] send packet %+v", packet) if packet == nil { return nil @@ -175,7 +175,7 @@ func (s *fakeStream) Send(packet *agent.Packet) error { return nil } -func (s *fakeStream) Recv() (*agent.Packet, error) { +func (s *fakeStream) Recv() (*client.Packet, error) { select { case pkt := <-s.r: klog.Infof("[DEBUG] recv packet %+v", pkt) @@ -191,24 +191,24 @@ func (s *fakeStream) Close() { type proxyServer struct { t testing.T - s agent.ProxyService_ProxyClient - handlers map[agent.PacketType]handler + s client.ProxyService_ProxyClient + handlers map[client.PacketType]handler connid int64 data bytes.Buffer - packets []*agent.Packet + packets []*client.Packet } -func testServer(s agent.ProxyService_ProxyClient, connid int64) *proxyServer { +func testServer(s client.ProxyService_ProxyClient, connid int64) *proxyServer { server := &proxyServer{ s: s, connid: connid, - handlers: make(map[agent.PacketType]handler), - packets: []*agent.Packet{}, + handlers: make(map[client.PacketType]handler), + packets: []*client.Packet{}, } - server.handlers[agent.PacketType_CLOSE_REQ] = server.handleClose - server.handlers[agent.PacketType_DIAL_REQ] = server.handleDial - server.handlers[agent.PacketType_DATA] = server.handleData + server.handlers[client.PacketType_CLOSE_REQ] = server.handleClose + server.handlers[client.PacketType_DIAL_REQ] = server.handleDial + server.handlers[client.PacketType_DATA] = server.handleData return server } @@ -234,19 +234,19 @@ func (s *proxyServer) serve() { } -func (s *proxyServer) handle(t agent.PacketType, h handler) *proxyServer { +func (s *proxyServer) handle(t client.PacketType, h handler) *proxyServer { s.handlers[t] = h return s } -type handler func(pkt *agent.Packet) *agent.Packet +type handler func(pkt *client.Packet) *client.Packet -func (s *proxyServer) handleDial(pkt *agent.Packet) *agent.Packet { +func (s *proxyServer) handleDial(pkt *client.Packet) *client.Packet { s.packets = append(s.packets, pkt) - return &agent.Packet{ - Type: agent.PacketType_DIAL_RSP, - Payload: &agent.Packet_DialResponse{ - DialResponse: &agent.DialResponse{ + return &client.Packet{ + Type: client.PacketType_DIAL_RSP, + Payload: &client.Packet_DialResponse{ + DialResponse: &client.DialResponse{ Random: pkt.GetDialRequest().Random, ConnectID: s.connid, }, @@ -254,26 +254,26 @@ func (s *proxyServer) handleDial(pkt *agent.Packet) *agent.Packet { } } -func (s *proxyServer) handleClose(pkt *agent.Packet) *agent.Packet { +func (s *proxyServer) handleClose(pkt *client.Packet) *client.Packet { s.packets = append(s.packets, pkt) - return &agent.Packet{ - Type: agent.PacketType_CLOSE_RSP, - Payload: &agent.Packet_CloseResponse{ - CloseResponse: &agent.CloseResponse{ + return &client.Packet{ + Type: client.PacketType_CLOSE_RSP, + Payload: &client.Packet_CloseResponse{ + CloseResponse: &client.CloseResponse{ ConnectID: pkt.GetCloseRequest().ConnectID, }, }, } } -func (s *proxyServer) handleData(pkt *agent.Packet) *agent.Packet { +func (s *proxyServer) handleData(pkt *client.Packet) *client.Packet { s.packets = append(s.packets, pkt) s.data.Write(pkt.GetData().Data) - return &agent.Packet{ - Type: agent.PacketType_DATA, - Payload: &agent.Packet_Data{ - Data: &agent.Data{ + return &client.Packet{ + Type: client.PacketType_DATA, + Payload: &client.Packet_Data{ + Data: &client.Data{ ConnectID: pkt.GetData().ConnectID, Data: append([]byte("echo: "), pkt.GetData().Data...), }, diff --git a/pkg/agent/client/conn.go b/konnectivity-client/pkg/client/conn.go similarity index 87% rename from pkg/agent/client/conn.go rename to konnectivity-client/pkg/client/conn.go index 445efb6b0..22c25234c 100644 --- a/pkg/agent/client/conn.go +++ b/konnectivity-client/pkg/client/conn.go @@ -23,7 +23,7 @@ import ( "time" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // CloseTimeout is the timeout to wait CLOSE_RSP packet after a @@ -33,7 +33,7 @@ const CloseTimeout = 10 * time.Second // conn is an implementation of net.Conn, where the data is transported // over an established tunnel defined by a gRPC service ProxyService. type conn struct { - stream agent.ProxyService_ProxyClient + stream client.ProxyService_ProxyClient connID int64 readCh chan []byte closeCh chan string @@ -44,10 +44,10 @@ var _ net.Conn = &conn{} // Write sends the data thru the connection over proxy service func (c *conn) Write(data []byte) (n int, err error) { - req := &agent.Packet{ - Type: agent.PacketType_DATA, - Payload: &agent.Packet_Data{ - Data: &agent.Data{ + req := &client.Packet{ + Type: client.PacketType_DATA, + Payload: &client.Packet_Data{ + Data: &client.Data{ ConnectID: c.connID, Data: data, }, @@ -113,10 +113,10 @@ func (c *conn) SetWriteDeadline(t time.Time) error { // proxy service to notify remote to drop the connection. func (c *conn) Close() error { klog.Info("conn.Close()") - req := &agent.Packet{ - Type: agent.PacketType_CLOSE_REQ, - Payload: &agent.Packet_CloseRequest{ - CloseRequest: &agent.CloseRequest{ + req := &client.Packet{ + Type: client.PacketType_CLOSE_REQ, + Payload: &client.Packet_CloseRequest{ + CloseRequest: &client.CloseRequest{ ConnectID: c.connID, }, }, diff --git a/konnectivity-client/proto/client/client.pb.go b/konnectivity-client/proto/client/client.pb.go new file mode 100644 index 000000000..107f2e546 --- /dev/null +++ b/konnectivity-client/proto/client/client.pb.go @@ -0,0 +1,653 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: konnectivity-client/proto/client/client.proto + +package client + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type PacketType int32 + +const ( + PacketType_DIAL_REQ PacketType = 0 + PacketType_DIAL_RSP PacketType = 1 + PacketType_CLOSE_REQ PacketType = 2 + PacketType_CLOSE_RSP PacketType = 3 + PacketType_DATA PacketType = 4 +) + +var PacketType_name = map[int32]string{ + 0: "DIAL_REQ", + 1: "DIAL_RSP", + 2: "CLOSE_REQ", + 3: "CLOSE_RSP", + 4: "DATA", +} + +var PacketType_value = map[string]int32{ + "DIAL_REQ": 0, + "DIAL_RSP": 1, + "CLOSE_REQ": 2, + "CLOSE_RSP": 3, + "DATA": 4, +} + +func (x PacketType) String() string { + return proto.EnumName(PacketType_name, int32(x)) +} + +func (PacketType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{0} +} + +type Error int32 + +const ( + Error_EOF Error = 0 +) + +var Error_name = map[int32]string{ + 0: "EOF", +} + +var Error_value = map[string]int32{ + "EOF": 0, +} + +func (x Error) String() string { + return proto.EnumName(Error_name, int32(x)) +} + +func (Error) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{1} +} + +type Packet struct { + Type PacketType `protobuf:"varint,1,opt,name=type,proto3,enum=PacketType" json:"type,omitempty"` + // Types that are valid to be assigned to Payload: + // *Packet_DialRequest + // *Packet_DialResponse + // *Packet_Data + // *Packet_CloseRequest + // *Packet_CloseResponse + Payload isPacket_Payload `protobuf_oneof:"payload"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Packet) Reset() { *m = Packet{} } +func (m *Packet) String() string { return proto.CompactTextString(m) } +func (*Packet) ProtoMessage() {} +func (*Packet) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{0} +} + +func (m *Packet) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Packet.Unmarshal(m, b) +} +func (m *Packet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Packet.Marshal(b, m, deterministic) +} +func (m *Packet) XXX_Merge(src proto.Message) { + xxx_messageInfo_Packet.Merge(m, src) +} +func (m *Packet) XXX_Size() int { + return xxx_messageInfo_Packet.Size(m) +} +func (m *Packet) XXX_DiscardUnknown() { + xxx_messageInfo_Packet.DiscardUnknown(m) +} + +var xxx_messageInfo_Packet proto.InternalMessageInfo + +func (m *Packet) GetType() PacketType { + if m != nil { + return m.Type + } + return PacketType_DIAL_REQ +} + +type isPacket_Payload interface { + isPacket_Payload() +} + +type Packet_DialRequest struct { + DialRequest *DialRequest `protobuf:"bytes,2,opt,name=dialRequest,proto3,oneof"` +} + +type Packet_DialResponse struct { + DialResponse *DialResponse `protobuf:"bytes,3,opt,name=dialResponse,proto3,oneof"` +} + +type Packet_Data struct { + Data *Data `protobuf:"bytes,4,opt,name=data,proto3,oneof"` +} + +type Packet_CloseRequest struct { + CloseRequest *CloseRequest `protobuf:"bytes,5,opt,name=closeRequest,proto3,oneof"` +} + +type Packet_CloseResponse struct { + CloseResponse *CloseResponse `protobuf:"bytes,6,opt,name=closeResponse,proto3,oneof"` +} + +func (*Packet_DialRequest) isPacket_Payload() {} + +func (*Packet_DialResponse) isPacket_Payload() {} + +func (*Packet_Data) isPacket_Payload() {} + +func (*Packet_CloseRequest) isPacket_Payload() {} + +func (*Packet_CloseResponse) isPacket_Payload() {} + +func (m *Packet) GetPayload() isPacket_Payload { + if m != nil { + return m.Payload + } + return nil +} + +func (m *Packet) GetDialRequest() *DialRequest { + if x, ok := m.GetPayload().(*Packet_DialRequest); ok { + return x.DialRequest + } + return nil +} + +func (m *Packet) GetDialResponse() *DialResponse { + if x, ok := m.GetPayload().(*Packet_DialResponse); ok { + return x.DialResponse + } + return nil +} + +func (m *Packet) GetData() *Data { + if x, ok := m.GetPayload().(*Packet_Data); ok { + return x.Data + } + return nil +} + +func (m *Packet) GetCloseRequest() *CloseRequest { + if x, ok := m.GetPayload().(*Packet_CloseRequest); ok { + return x.CloseRequest + } + return nil +} + +func (m *Packet) GetCloseResponse() *CloseResponse { + if x, ok := m.GetPayload().(*Packet_CloseResponse); ok { + return x.CloseResponse + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*Packet) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*Packet_DialRequest)(nil), + (*Packet_DialResponse)(nil), + (*Packet_Data)(nil), + (*Packet_CloseRequest)(nil), + (*Packet_CloseResponse)(nil), + } +} + +type DialRequest struct { + // tcp or udp? + Protocol string `protobuf:"bytes,1,opt,name=protocol,proto3" json:"protocol,omitempty"` + // node:port + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + // random id for client, maybe should be longer + Random int64 `protobuf:"varint,3,opt,name=random,proto3" json:"random,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DialRequest) Reset() { *m = DialRequest{} } +func (m *DialRequest) String() string { return proto.CompactTextString(m) } +func (*DialRequest) ProtoMessage() {} +func (*DialRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{1} +} + +func (m *DialRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DialRequest.Unmarshal(m, b) +} +func (m *DialRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DialRequest.Marshal(b, m, deterministic) +} +func (m *DialRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DialRequest.Merge(m, src) +} +func (m *DialRequest) XXX_Size() int { + return xxx_messageInfo_DialRequest.Size(m) +} +func (m *DialRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DialRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DialRequest proto.InternalMessageInfo + +func (m *DialRequest) GetProtocol() string { + if m != nil { + return m.Protocol + } + return "" +} + +func (m *DialRequest) GetAddress() string { + if m != nil { + return m.Address + } + return "" +} + +func (m *DialRequest) GetRandom() int64 { + if m != nil { + return m.Random + } + return 0 +} + +type DialResponse struct { + // error failed reason; enum? + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + // connectID indicates the identifier of the connection + ConnectID int64 `protobuf:"varint,2,opt,name=connectID,proto3" json:"connectID,omitempty"` + // random copied from DialRequest + Random int64 `protobuf:"varint,3,opt,name=random,proto3" json:"random,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DialResponse) Reset() { *m = DialResponse{} } +func (m *DialResponse) String() string { return proto.CompactTextString(m) } +func (*DialResponse) ProtoMessage() {} +func (*DialResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{2} +} + +func (m *DialResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DialResponse.Unmarshal(m, b) +} +func (m *DialResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DialResponse.Marshal(b, m, deterministic) +} +func (m *DialResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DialResponse.Merge(m, src) +} +func (m *DialResponse) XXX_Size() int { + return xxx_messageInfo_DialResponse.Size(m) +} +func (m *DialResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DialResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DialResponse proto.InternalMessageInfo + +func (m *DialResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *DialResponse) GetConnectID() int64 { + if m != nil { + return m.ConnectID + } + return 0 +} + +func (m *DialResponse) GetRandom() int64 { + if m != nil { + return m.Random + } + return 0 +} + +type CloseRequest struct { + // connectID of the stream to close + ConnectID int64 `protobuf:"varint,1,opt,name=connectID,proto3" json:"connectID,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CloseRequest) Reset() { *m = CloseRequest{} } +func (m *CloseRequest) String() string { return proto.CompactTextString(m) } +func (*CloseRequest) ProtoMessage() {} +func (*CloseRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{3} +} + +func (m *CloseRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CloseRequest.Unmarshal(m, b) +} +func (m *CloseRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CloseRequest.Marshal(b, m, deterministic) +} +func (m *CloseRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CloseRequest.Merge(m, src) +} +func (m *CloseRequest) XXX_Size() int { + return xxx_messageInfo_CloseRequest.Size(m) +} +func (m *CloseRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CloseRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CloseRequest proto.InternalMessageInfo + +func (m *CloseRequest) GetConnectID() int64 { + if m != nil { + return m.ConnectID + } + return 0 +} + +type CloseResponse struct { + // error message + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + // connectID indicates the identifier of the connection + ConnectID int64 `protobuf:"varint,2,opt,name=connectID,proto3" json:"connectID,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CloseResponse) Reset() { *m = CloseResponse{} } +func (m *CloseResponse) String() string { return proto.CompactTextString(m) } +func (*CloseResponse) ProtoMessage() {} +func (*CloseResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{4} +} + +func (m *CloseResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CloseResponse.Unmarshal(m, b) +} +func (m *CloseResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CloseResponse.Marshal(b, m, deterministic) +} +func (m *CloseResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CloseResponse.Merge(m, src) +} +func (m *CloseResponse) XXX_Size() int { + return xxx_messageInfo_CloseResponse.Size(m) +} +func (m *CloseResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CloseResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CloseResponse proto.InternalMessageInfo + +func (m *CloseResponse) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *CloseResponse) GetConnectID() int64 { + if m != nil { + return m.ConnectID + } + return 0 +} + +type Data struct { + // connectID to connect to + ConnectID int64 `protobuf:"varint,1,opt,name=connectID,proto3" json:"connectID,omitempty"` + // error message if error happens + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + // stream data + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Data) Reset() { *m = Data{} } +func (m *Data) String() string { return proto.CompactTextString(m) } +func (*Data) ProtoMessage() {} +func (*Data) Descriptor() ([]byte, []int) { + return fileDescriptor_fec4258d9ecd175d, []int{5} +} + +func (m *Data) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Data.Unmarshal(m, b) +} +func (m *Data) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Data.Marshal(b, m, deterministic) +} +func (m *Data) XXX_Merge(src proto.Message) { + xxx_messageInfo_Data.Merge(m, src) +} +func (m *Data) XXX_Size() int { + return xxx_messageInfo_Data.Size(m) +} +func (m *Data) XXX_DiscardUnknown() { + xxx_messageInfo_Data.DiscardUnknown(m) +} + +var xxx_messageInfo_Data proto.InternalMessageInfo + +func (m *Data) GetConnectID() int64 { + if m != nil { + return m.ConnectID + } + return 0 +} + +func (m *Data) GetError() string { + if m != nil { + return m.Error + } + return "" +} + +func (m *Data) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func init() { + proto.RegisterEnum("PacketType", PacketType_name, PacketType_value) + proto.RegisterEnum("Error", Error_name, Error_value) + proto.RegisterType((*Packet)(nil), "Packet") + proto.RegisterType((*DialRequest)(nil), "DialRequest") + proto.RegisterType((*DialResponse)(nil), "DialResponse") + proto.RegisterType((*CloseRequest)(nil), "CloseRequest") + proto.RegisterType((*CloseResponse)(nil), "CloseResponse") + proto.RegisterType((*Data)(nil), "Data") +} + +func init() { + proto.RegisterFile("konnectivity-client/proto/client/client.proto", fileDescriptor_fec4258d9ecd175d) +} + +var fileDescriptor_fec4258d9ecd175d = []byte{ + // 472 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xd1, 0x6e, 0x9b, 0x30, + 0x14, 0x85, 0x00, 0x49, 0xb8, 0x21, 0x15, 0xb2, 0xa6, 0x09, 0x75, 0x93, 0x5a, 0xf1, 0x14, 0x55, + 0x0b, 0x54, 0xa9, 0x34, 0xed, 0x35, 0x0d, 0xa9, 0x52, 0xa9, 0x5a, 0x99, 0xd3, 0xa7, 0xee, 0x61, + 0xf2, 0xc0, 0x9a, 0x50, 0x18, 0x66, 0xb6, 0x97, 0x8d, 0x0f, 0xda, 0x7f, 0x4e, 0x18, 0x52, 0xc8, + 0xa4, 0x6d, 0x52, 0x9f, 0xe0, 0x1c, 0xdf, 0x73, 0x7c, 0x7d, 0xae, 0x0d, 0xf3, 0x1d, 0x2b, 0x0a, + 0x9a, 0xc8, 0x6c, 0x9f, 0xc9, 0x6a, 0x9e, 0xe4, 0x19, 0x2d, 0x64, 0x58, 0x72, 0x26, 0x59, 0xd8, + 0x82, 0xe6, 0x13, 0x28, 0xce, 0xff, 0x35, 0x80, 0x61, 0x4c, 0x92, 0x1d, 0x95, 0xe8, 0x0c, 0x4c, + 0x59, 0x95, 0xd4, 0xd3, 0xcf, 0xf5, 0xd9, 0xc9, 0x62, 0x12, 0x34, 0xf4, 0x43, 0x55, 0x52, 0xac, + 0x16, 0xd0, 0x25, 0x4c, 0xd2, 0x8c, 0xe4, 0x98, 0x7e, 0xfb, 0x4e, 0x85, 0xf4, 0x06, 0xe7, 0xfa, + 0x6c, 0xb2, 0x70, 0x82, 0xa8, 0xe3, 0x36, 0x1a, 0xee, 0x97, 0xa0, 0x2b, 0x70, 0x1a, 0x28, 0x4a, + 0x56, 0x08, 0xea, 0x19, 0x4a, 0x32, 0x6d, 0x25, 0x0d, 0xb9, 0xd1, 0xf0, 0x51, 0x11, 0x7a, 0x05, + 0x66, 0x4a, 0x24, 0xf1, 0x4c, 0x55, 0x6c, 0x05, 0x11, 0x91, 0x64, 0xa3, 0x61, 0x45, 0xd6, 0x8e, + 0x49, 0xce, 0x04, 0x3d, 0x34, 0x61, 0xb5, 0x8e, 0xab, 0x1e, 0x59, 0x3b, 0xf6, 0x8b, 0xd0, 0x5b, + 0x98, 0xb6, 0xb8, 0xed, 0x63, 0xa8, 0x54, 0x27, 0x07, 0xd5, 0x53, 0x23, 0xc7, 0x65, 0xd7, 0x36, + 0x8c, 0x4a, 0x52, 0xe5, 0x8c, 0xa4, 0xfe, 0x47, 0x98, 0xf4, 0xce, 0x89, 0x4e, 0x61, 0xac, 0xf2, + 0x4b, 0x58, 0xae, 0xf2, 0xb2, 0xf1, 0x13, 0x46, 0x1e, 0x8c, 0x48, 0x9a, 0x72, 0x2a, 0x84, 0x8a, + 0xc8, 0xc6, 0x07, 0x88, 0x5e, 0xc2, 0x90, 0x93, 0x22, 0x65, 0x5f, 0x55, 0x10, 0x06, 0x6e, 0x91, + 0xff, 0x08, 0x4e, 0x3f, 0x11, 0xf4, 0x02, 0x2c, 0xca, 0x39, 0xe3, 0xad, 0x75, 0x03, 0xd0, 0x6b, + 0xb0, 0x93, 0x66, 0xb6, 0xb7, 0x91, 0x72, 0x36, 0x70, 0x47, 0xfc, 0xd5, 0xfb, 0x0d, 0x38, 0xfd, + 0x6c, 0x8e, 0x5d, 0xf4, 0x3f, 0x5c, 0xfc, 0x15, 0x4c, 0x8f, 0x32, 0x79, 0x4e, 0x2b, 0xfe, 0x7b, + 0x30, 0xeb, 0x99, 0xfd, 0x7b, 0xab, 0xce, 0x79, 0xd0, 0x77, 0x46, 0xed, 0xf0, 0xeb, 0x43, 0x38, + 0xcd, 0xcc, 0x2f, 0x62, 0x80, 0xee, 0x2e, 0x22, 0x07, 0xc6, 0xd1, 0xed, 0xf2, 0xee, 0x13, 0x5e, + 0x7f, 0x70, 0xb5, 0x0e, 0x6d, 0x63, 0x57, 0x47, 0x53, 0xb0, 0x57, 0x77, 0xf7, 0xdb, 0xb5, 0x5a, + 0x1c, 0xf4, 0xe0, 0x36, 0x76, 0x0d, 0x34, 0x06, 0x33, 0x5a, 0x3e, 0x2c, 0x5d, 0xf3, 0xc2, 0x05, + 0x6b, 0xad, 0xb6, 0x1b, 0x81, 0xb1, 0xbe, 0xbf, 0x71, 0xb5, 0x45, 0x08, 0x4e, 0xcc, 0xd9, 0xcf, + 0x6a, 0x4b, 0xf9, 0x3e, 0x4b, 0x28, 0x3a, 0x03, 0x4b, 0x61, 0x34, 0x6a, 0xdf, 0xc1, 0xe9, 0xe1, + 0xc7, 0xd7, 0x66, 0xfa, 0xa5, 0x7e, 0x7d, 0xf3, 0x18, 0x89, 0xec, 0x8b, 0x08, 0x76, 0xef, 0x44, + 0x90, 0xb1, 0x90, 0x94, 0x99, 0xa0, 0x7c, 0x4f, 0xf9, 0xbc, 0xa0, 0xf2, 0x07, 0xe3, 0xbb, 0x79, + 0x59, 0xcb, 0xc3, 0xff, 0xbd, 0xc6, 0xcf, 0x43, 0x85, 0xae, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, + 0x64, 0xe0, 0x62, 0xbe, 0xb8, 0x03, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// ProxyServiceClient is the client API for ProxyService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ProxyServiceClient interface { + Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error) +} + +type proxyServiceClient struct { + cc *grpc.ClientConn +} + +func NewProxyServiceClient(cc *grpc.ClientConn) ProxyServiceClient { + return &proxyServiceClient{cc} +} + +func (c *proxyServiceClient) Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error) { + stream, err := c.cc.NewStream(ctx, &_ProxyService_serviceDesc.Streams[0], "/ProxyService/Proxy", opts...) + if err != nil { + return nil, err + } + x := &proxyServiceProxyClient{stream} + return x, nil +} + +type ProxyService_ProxyClient interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ClientStream +} + +type proxyServiceProxyClient struct { + grpc.ClientStream +} + +func (x *proxyServiceProxyClient) Send(m *Packet) error { + return x.ClientStream.SendMsg(m) +} + +func (x *proxyServiceProxyClient) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ProxyServiceServer is the server API for ProxyService service. +type ProxyServiceServer interface { + Proxy(ProxyService_ProxyServer) error +} + +// UnimplementedProxyServiceServer can be embedded to have forward compatible implementations. +type UnimplementedProxyServiceServer struct { +} + +func (*UnimplementedProxyServiceServer) Proxy(srv ProxyService_ProxyServer) error { + return status.Errorf(codes.Unimplemented, "method Proxy not implemented") +} + +func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) { + s.RegisterService(&_ProxyService_serviceDesc, srv) +} + +func _ProxyService_Proxy_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ProxyServiceServer).Proxy(&proxyServiceProxyServer{stream}) +} + +type ProxyService_ProxyServer interface { + Send(*Packet) error + Recv() (*Packet, error) + grpc.ServerStream +} + +type proxyServiceProxyServer struct { + grpc.ServerStream +} + +func (x *proxyServiceProxyServer) Send(m *Packet) error { + return x.ServerStream.SendMsg(m) +} + +func (x *proxyServiceProxyServer) Recv() (*Packet, error) { + m := new(Packet) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _ProxyService_serviceDesc = grpc.ServiceDesc{ + ServiceName: "ProxyService", + HandlerType: (*ProxyServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Proxy", + Handler: _ProxyService_Proxy_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "konnectivity-client/proto/client/client.proto", +} diff --git a/konnectivity-client/proto/client/client.proto b/konnectivity-client/proto/client/client.proto new file mode 100644 index 000000000..3aadac064 --- /dev/null +++ b/konnectivity-client/proto/client/client.proto @@ -0,0 +1,95 @@ +// Copyright The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +// Retransmit? +// Sliding windows? + +option go_package = "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"; + +service ProxyService { + rpc Proxy(stream Packet) returns (stream Packet) {} +} + +enum PacketType { + DIAL_REQ = 0; + DIAL_RSP = 1; + CLOSE_REQ = 2; + CLOSE_RSP = 3; + DATA = 4; +} + +enum Error { + EOF = 0; + // ... +} + +message Packet { + PacketType type = 1; + + oneof payload { + DialRequest dialRequest = 2; + DialResponse dialResponse = 3; + Data data = 4; + CloseRequest closeRequest = 5; + CloseResponse closeResponse = 6; + } +} + +message DialRequest { + // tcp or udp? + string protocol = 1; + + // node:port + string address = 2; + + // random id for client, maybe should be longer + int64 random = 3; +} + +message DialResponse { + // error failed reason; enum? + string error = 1; + + // connectID indicates the identifier of the connection + int64 connectID = 2; + + // random copied from DialRequest + int64 random = 3; +} + +message CloseRequest { + // connectID of the stream to close + int64 connectID = 1; +} + +message CloseResponse { + // error message + string error = 1; + + // connectID indicates the identifier of the connection + int64 connectID = 2; +} + +message Data { + // connectID to connect to + int64 connectID = 1; + + // error message if error happens + string error = 2; + + // stream data + bytes data = 3; +} diff --git a/pkg/agent/agentclient/client.go b/pkg/agent/agentclient/client.go index 97a7a9971..50c70294a 100644 --- a/pkg/agent/agentclient/client.go +++ b/pkg/agent/agentclient/client.go @@ -24,7 +24,7 @@ import ( "google.golang.org/grpc" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // AgentClient runs on the node network side. It connects to proxy server and establishes @@ -125,11 +125,11 @@ func (a *AgentClient) Serve() { } switch pkt.Type { - case agent.PacketType_DIAL_REQ: + case client.PacketType_DIAL_REQ: klog.Info("received DIAL_REQ") - resp := &agent.Packet{ - Type: agent.PacketType_DIAL_RSP, - Payload: &agent.Packet_DialResponse{DialResponse: &agent.DialResponse{}}, + resp := &client.Packet{ + Type: client.PacketType_DIAL_RSP, + Payload: &client.Packet_DialResponse{DialResponse: &client.DialResponse{}}, } dialReq := pkt.GetDialRequest() @@ -151,9 +151,9 @@ func (a *AgentClient) Serve() { dataCh: dataCh, cleanFunc: func() { klog.Infof("close connection(id=%d)", connID) - resp := &agent.Packet{ - Type: agent.PacketType_CLOSE_RSP, - Payload: &agent.Packet_CloseResponse{CloseResponse: &agent.CloseResponse{}}, + resp := &client.Packet{ + Type: client.PacketType_CLOSE_RSP, + Payload: &client.Packet_CloseResponse{CloseResponse: &client.CloseResponse{}}, } resp.GetCloseResponse().ConnectID = connID @@ -180,7 +180,7 @@ func (a *AgentClient) Serve() { go a.remoteToProxy(conn, connID) go a.proxyToRemote(conn, connID) - case agent.PacketType_DATA: + case client.PacketType_DATA: data := pkt.GetData() klog.Infof("received DATA(id=%d)", data.ConnectID) klog.Infof("[tracing] %v", data) @@ -189,7 +189,7 @@ func (a *AgentClient) Serve() { ctx.dataCh <- data.Data } - case agent.PacketType_CLOSE_REQ: + case client.PacketType_CLOSE_REQ: closeReq := pkt.GetCloseRequest() connID := closeReq.ConnectID @@ -198,9 +198,9 @@ func (a *AgentClient) Serve() { if ctx, ok := a.connContext[connID]; ok { ctx.cleanup() } else { - resp := &agent.Packet{ - Type: agent.PacketType_CLOSE_RSP, - Payload: &agent.Packet_CloseResponse{CloseResponse: &agent.CloseResponse{}}, + resp := &client.Packet{ + Type: client.PacketType_CLOSE_RSP, + Payload: &client.Packet_CloseResponse{CloseResponse: &client.CloseResponse{}}, } resp.GetCloseResponse().ConnectID = connID resp.GetCloseResponse().Error = "Unknown connectID" @@ -225,8 +225,8 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) { defer ctx.cleanup() var buf [1 << 12]byte - resp := &agent.Packet{ - Type: agent.PacketType_DATA, + resp := &client.Packet{ + Type: client.PacketType_DATA, } for { @@ -241,7 +241,7 @@ func (a *AgentClient) remoteToProxy(conn net.Conn, connID int64) { klog.Warningf("connection read error: %v", err) return } else { - resp.Payload = &agent.Packet_Data{Data: &agent.Data{ + resp.Payload = &client.Packet_Data{Data: &client.Data{ Data: buf[:n], ConnectID: connID, }} diff --git a/pkg/agent/agentclient/client_test.go b/pkg/agent/agentclient/client_test.go index 567c53081..3b00b5b2a 100644 --- a/pkg/agent/agentclient/client_test.go +++ b/pkg/agent/agentclient/client_test.go @@ -12,20 +12,21 @@ import ( "google.golang.org/grpc" "k8s.io/klog" "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) func TestServeData_HTTP(t *testing.T) { var err error var stream agent.AgentService_ConnectClient stopCh := make(chan struct{}) - client := &AgentClient{ + testClient := &AgentClient{ connContext: make(map[int64]*connContext), stopCh: stopCh, } - client.stream, stream = pipe2() + testClient.stream, stream = pipe2() // Start agent - go client.Serve() + go testClient.Serve() defer close(stopCh) // Start test http server as remote service @@ -50,10 +51,10 @@ func TestServeData_HTTP(t *testing.T) { if pkg == nil { t.Fatal("unexpected nil packet") } - if pkg.Type != agent.PacketType_DIAL_RSP { + if pkg.Type != client.PacketType_DIAL_RSP { t.Errorf("expect PacketType_DIAL_RSP; got %v", pkg.Type) } - dialRsp := pkg.Payload.(*agent.Packet_DialResponse) + dialRsp := pkg.Payload.(*client.Packet_DialResponse) connID := dialRsp.DialResponse.ConnectID if dialRsp.DialResponse.Random != 111 { t.Errorf("expect random=111; got %v", dialRsp.DialResponse.Random) @@ -71,10 +72,10 @@ func TestServeData_HTTP(t *testing.T) { if pkg == nil { t.Fatal("unexpected nil packet") } - if pkg.Type != agent.PacketType_DATA { + if pkg.Type != client.PacketType_DATA { t.Errorf("expect PacketType_DATA; got %v", pkg.Type) } - data := pkg.Payload.(*agent.Packet_Data).Data.Data + data := pkg.Payload.(*client.Packet_Data).Data.Data // Verify response data // @@ -97,16 +98,16 @@ func TestServeData_HTTP(t *testing.T) { if pkg == nil { t.Fatal("unexpected nil packet") } - if pkg.Type != agent.PacketType_CLOSE_RSP { + if pkg.Type != client.PacketType_CLOSE_RSP { t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkg.Type) } - closeErr := pkg.Payload.(*agent.Packet_CloseResponse).CloseResponse.Error + closeErr := pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error if closeErr != "" { t.Errorf("expect nil closeErr; got %v", closeErr) } // Verify internal state is consistent - if _, ok := client.connContext[connID]; ok { + if _, ok := testClient.connContext[connID]; ok { t.Error("client.connContext not released") } } @@ -114,14 +115,14 @@ func TestServeData_HTTP(t *testing.T) { func TestClose_Client(t *testing.T) { var stream agent.AgentService_ConnectClient stopCh := make(chan struct{}) - client := &AgentClient{ + testClient := &AgentClient{ connContext: make(map[int64]*connContext), stopCh: stopCh, } - client.stream, stream = pipe2() + testClient.stream, stream = pipe2() // Start agent - go client.Serve() + go testClient.Serve() defer close(stopCh) // Start test http server as remote service @@ -142,10 +143,10 @@ func TestClose_Client(t *testing.T) { if pkg == nil { t.Fatal("unexpected nil packet") } - if pkg.Type != agent.PacketType_DIAL_RSP { + if pkg.Type != client.PacketType_DIAL_RSP { t.Errorf("expect PacketType_DIAL_RSP; got %v", pkg.Type) } - dialRsp := pkg.Payload.(*agent.Packet_DialResponse) + dialRsp := pkg.Payload.(*client.Packet_DialResponse) connID := dialRsp.DialResponse.ConnectID if dialRsp.DialResponse.Random != 111 { t.Errorf("expect random=111; got %v", dialRsp.DialResponse.Random) @@ -161,16 +162,16 @@ func TestClose_Client(t *testing.T) { if pkg == nil { t.Error("unexpected nil packet") } - if pkg.Type != agent.PacketType_CLOSE_RSP { + if pkg.Type != client.PacketType_CLOSE_RSP { t.Errorf("expect PacketType_CLOSE_RSP; got %v", pkg.Type) } - closeErr := pkg.Payload.(*agent.Packet_CloseResponse).CloseResponse.Error + closeErr := pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error if closeErr != "" { t.Errorf("expect nil closeErr; got %v", closeErr) } // Verify internal state is consistent - if _, ok := client.connContext[connID]; ok { + if _, ok := testClient.connContext[connID]; ok { t.Error("client.connContext not released") } @@ -184,10 +185,10 @@ func TestClose_Client(t *testing.T) { if pkg == nil { t.Error("unexpected nil packet") } - if pkg.Type != agent.PacketType_CLOSE_RSP { + if pkg.Type != client.PacketType_CLOSE_RSP { t.Errorf("expect PacketType_CLOSE_RSP; got %+v", pkg) } - closeErr = pkg.Payload.(*agent.Packet_CloseResponse).CloseResponse.Error + closeErr = pkg.Payload.(*client.Packet_CloseResponse).CloseResponse.Error if closeErr != "Unknown connectID" { t.Errorf("expect Unknown connectID; got %v", closeErr) } @@ -197,12 +198,12 @@ func TestClose_Client(t *testing.T) { // fakeStream implements AgentService_ConnectClient type fakeStream struct { grpc.ClientStream - r <-chan *agent.Packet - w chan<- *agent.Packet + r <-chan *client.Packet + w chan<- *client.Packet } func pipe() (agent.AgentService_ConnectClient, agent.AgentService_ConnectClient) { - r, w := make(chan *agent.Packet, 2), make(chan *agent.Packet, 2) + r, w := make(chan *client.Packet, 2), make(chan *client.Packet, 2) s1, s2 := &fakeStream{}, &fakeStream{} s1.r, s1.w = r, w s2.r, s2.w = w, r @@ -214,13 +215,13 @@ func pipe2() (*RedialableAgentClient, agent.AgentService_ConnectClient) { return &RedialableAgentClient{stream: s1}, s2 } -func (s *fakeStream) Send(packet *agent.Packet) error { +func (s *fakeStream) Send(packet *client.Packet) error { klog.Infof("[DEBUG] send packet %+v", packet) s.w <- packet return nil } -func (s *fakeStream) Recv() (*agent.Packet, error) { +func (s *fakeStream) Recv() (*client.Packet, error) { select { case pkg := <-s.r: klog.Infof("[DEBUG] recv packet %+v", pkg) @@ -230,11 +231,11 @@ func (s *fakeStream) Recv() (*agent.Packet, error) { } } -func newDialPacket(protocol, address string, random int64) *agent.Packet { - return &agent.Packet{ - Type: agent.PacketType_DIAL_REQ, - Payload: &agent.Packet_DialRequest{ - DialRequest: &agent.DialRequest{ +func newDialPacket(protocol, address string, random int64) *client.Packet { + return &client.Packet{ + Type: client.PacketType_DIAL_REQ, + Payload: &client.Packet_DialRequest{ + DialRequest: &client.DialRequest{ Protocol: protocol, Address: address, Random: random, @@ -243,11 +244,11 @@ func newDialPacket(protocol, address string, random int64) *agent.Packet { } } -func newDataPacket(connID int64, data []byte) *agent.Packet { - return &agent.Packet{ - Type: agent.PacketType_DATA, - Payload: &agent.Packet_Data{ - Data: &agent.Data{ +func newDataPacket(connID int64, data []byte) *client.Packet { + return &client.Packet{ + Type: client.PacketType_DATA, + Payload: &client.Packet_Data{ + Data: &client.Data{ ConnectID: connID, Data: data, }, @@ -255,11 +256,11 @@ func newDataPacket(connID int64, data []byte) *agent.Packet { } } -func newClosePacket(connID int64) *agent.Packet { - return &agent.Packet{ - Type: agent.PacketType_CLOSE_REQ, - Payload: &agent.Packet_CloseRequest{ - CloseRequest: &agent.CloseRequest{ +func newClosePacket(connID int64) *client.Packet { + return &client.Packet{ + Type: client.PacketType_CLOSE_REQ, + Payload: &client.Packet_CloseRequest{ + CloseRequest: &client.CloseRequest{ ConnectID: connID, }, }, diff --git a/pkg/agent/agentclient/stream.go b/pkg/agent/agentclient/stream.go index 794035ae8..3f927621e 100644 --- a/pkg/agent/agentclient/stream.go +++ b/pkg/agent/agentclient/stream.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/metadata" "k8s.io/klog" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -134,7 +135,7 @@ func (c *RedialableAgentClient) probe() { } } -func (c *RedialableAgentClient) Send(pkt *agent.Packet) error { +func (c *RedialableAgentClient) Send(pkt *client.Packet) error { c.sendLock.Lock() defer c.sendLock.Unlock() @@ -151,7 +152,7 @@ func (c *RedialableAgentClient) Send(pkt *agent.Packet) error { return nil } -func (c *RedialableAgentClient) RetrySend(pkt *agent.Packet) error { +func (c *RedialableAgentClient) RetrySend(pkt *client.Packet) error { err := c.Send(pkt) if err == nil { return nil @@ -194,11 +195,11 @@ func (c *RedialableAgentClient) doneReconnect(err error) { c.reconnWaiters = nil } -func (c *RedialableAgentClient) Recv() (*agent.Packet, error) { +func (c *RedialableAgentClient) Recv() (*client.Packet, error) { c.recvLock.Lock() defer c.recvLock.Unlock() - var pkt *agent.Packet + var pkt *client.Packet var err error if pkt, err = c.stream.Recv(); err != nil { diff --git a/pkg/agent/agentclient/stream_test.go b/pkg/agent/agentclient/stream_test.go index ef6028339..6df060537 100644 --- a/pkg/agent/agentclient/stream_test.go +++ b/pkg/agent/agentclient/stream_test.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "k8s.io/klog" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -22,13 +23,13 @@ func TestReconnectExits(t *testing.T) { time.Sleep(time.Millisecond) - client, err := NewRedialableAgentClient("localhost:8899", uuid.New().String(), &ClientSet{}, grpc.WithInsecure()) + testClient, err := NewRedialableAgentClient("localhost:8899", uuid.New().String(), &ClientSet{}, grpc.WithInsecure()) if err != nil { t.Fatal(err) } - err = client.Send(&agent.Packet{ - Type: agent.PacketType_DIAL_REQ, + err = testClient.Send(&client.Packet{ + Type: client.PacketType_DIAL_REQ, }) if err != nil { t.Error(err) @@ -36,7 +37,7 @@ func TestReconnectExits(t *testing.T) { client1 := make(chan bool) go func() { - _, err := client.Recv() + _, err := testClient.Recv() if err != nil { if err2, ok := err.(*ReconnectError); ok { err2.Wait() @@ -47,7 +48,7 @@ func TestReconnectExits(t *testing.T) { client2 := make(chan bool) go func() { - _, err := client.Recv() + _, err := testClient.Recv() if err != nil { if err2, ok := err.(*ReconnectError); ok { err2.Wait() @@ -56,7 +57,7 @@ func TestReconnectExits(t *testing.T) { } }() - client.Close() + testClient.Close() var got1 bool var got2 bool diff --git a/pkg/agent/agentserver/server.go b/pkg/agent/agentserver/server.go index c2ca7538c..e7cc56522 100644 --- a/pkg/agent/agentserver/server.go +++ b/pkg/agent/agentserver/server.go @@ -31,6 +31,7 @@ import ( authv1 "k8s.io/api/authentication/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/proto/agent" "sigs.k8s.io/apiserver-network-proxy/proto/header" ) @@ -38,23 +39,23 @@ import ( // ProxyClientConnection... type ProxyClientConnection struct { Mode string - Grpc agent.ProxyService_ProxyServer + Grpc client.ProxyService_ProxyServer HTTP net.Conn connected chan struct{} connectID int64 } -func (c *ProxyClientConnection) send(pkt *agent.Packet) error { +func (c *ProxyClientConnection) send(pkt *client.Packet) error { if c.Mode == "grpc" { stream := c.Grpc return stream.Send(pkt) } else if c.Mode == "http-connect" { - if pkt.Type == agent.PacketType_CLOSE_RSP { + if pkt.Type == client.PacketType_CLOSE_RSP { return c.HTTP.Close() - } else if pkt.Type == agent.PacketType_DATA { + } else if pkt.Type == client.PacketType_DATA { _, err := c.HTTP.Write(pkt.GetData().Data) return err - } else if pkt.Type == agent.PacketType_DIAL_RSP { + } else if pkt.Type == client.PacketType_DIAL_RSP { return nil } else { return fmt.Errorf("attempt to send via unrecognized connection type %v", pkt.Type) @@ -97,7 +98,7 @@ type AgentTokenAuthenticationOptions struct { var _ agent.AgentServiceServer = &ProxyServer{} -var _ agent.ProxyServiceServer = &ProxyServer{} +var _ client.ProxyServiceServer = &ProxyServer{} func (s *ProxyServer) addBackend(agentID string, conn agent.AgentService_ConnectServer) { klog.Infof("register Backend %v for agentID %s", conn, agentID) @@ -167,10 +168,10 @@ func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions } // Proxy handles incoming streams from gRPC frontend. -func (s *ProxyServer) Proxy(stream agent.ProxyService_ProxyServer) error { +func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error { klog.Info("proxy request from client") - recvCh := make(chan *agent.Packet, 10) + recvCh := make(chan *client.Packet, 10) stopCh := make(chan error) go s.serveRecvFrontend(stream, recvCh) @@ -200,7 +201,7 @@ func (s *ProxyServer) Proxy(stream agent.ProxyService_ProxyServer) error { return <-stopCh } -func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, recvCh <-chan *agent.Packet) { +func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, recvCh <-chan *client.Packet) { klog.Info("start serving frontend stream") var firstConnID int64 @@ -211,7 +212,7 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r for pkt := range recvCh { switch pkt.Type { - case agent.PacketType_DIAL_REQ: + case client.PacketType_DIAL_REQ: klog.Info(">>> Received DIAL_REQ") // TODO: if we track what agent has historically served // the address, then we can send the Dial_REQ to the @@ -232,7 +233,7 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r } klog.Info(">>> DIAL_REQ sent to backend") // got this. but backend didn't receive anything. - case agent.PacketType_CLOSE_REQ: + case client.PacketType_CLOSE_REQ: connID := pkt.GetCloseRequest().ConnectID klog.Infof(">>> Received CLOSE_REQ(id=%d)", connID) if backend == nil { @@ -245,7 +246,7 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r } klog.Info(">>> CLOSE_REQ sent to backend") - case agent.PacketType_DATA: + case client.PacketType_DATA: connID := pkt.GetData().ConnectID klog.Infof(">>> Received DATA(id=%d)", connID) if firstConnID == 0 { @@ -271,10 +272,10 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r klog.Infof(">>> Close streaming (id=%d)", firstConnID) - pkt := &agent.Packet{ - Type: agent.PacketType_CLOSE_REQ, - Payload: &agent.Packet_CloseRequest{ - CloseRequest: &agent.CloseRequest{ + pkt := &client.Packet{ + Type: client.PacketType_CLOSE_REQ, + Payload: &client.Packet_CloseRequest{ + CloseRequest: &client.CloseRequest{ ConnectID: firstConnID, }, }, @@ -289,7 +290,7 @@ func (s *ProxyServer) serveRecvFrontend(stream agent.ProxyService_ProxyServer, r } } -func (s *ProxyServer) serveSend(stream agent.ProxyService_ProxyServer, sendCh <-chan *agent.Packet) { +func (s *ProxyServer) serveSend(stream client.ProxyService_ProxyServer, sendCh <-chan *client.Packet) { klog.Info("start serve send ...") for pkt := range sendCh { err := stream.Send(pkt) @@ -396,7 +397,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error { return err } - recvCh := make(chan *agent.Packet, 10) + recvCh := make(chan *client.Packet, 10) stopCh := make(chan error) if s.AgentAuthenticationOptions.Enabled { @@ -433,12 +434,12 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error { } // route the packet back to the correct client -func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer, recvCh <-chan *agent.Packet) { +func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer, recvCh <-chan *client.Packet) { var firstConnID int64 for pkt := range recvCh { switch pkt.Type { - case agent.PacketType_DIAL_RSP: + case client.PacketType_DIAL_RSP: resp := pkt.GetDialResponse() firstConnID = resp.ConnectID klog.Infof("<<< Received DIAL_RSP(rand=%d, id=%d)", resp.Random, resp.ConnectID) @@ -457,7 +458,7 @@ func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer, } } - case agent.PacketType_DATA: + case client.PacketType_DATA: resp := pkt.GetData() klog.Infof("<<< Received DATA(id=%d)", resp.ConnectID) if client, ok := s.Frontends[resp.ConnectID]; ok { @@ -468,7 +469,7 @@ func (s *ProxyServer) serveRecvBackend(stream agent.AgentService_ConnectServer, } } - case agent.PacketType_CLOSE_RSP: + case client.PacketType_CLOSE_RSP: resp := pkt.GetCloseResponse() klog.Infof("<<< Received CLOSE_RSP(id=%d)", resp.ConnectID) if client, ok := s.Frontends[resp.ConnectID]; ok { diff --git a/pkg/agent/agentserver/tunnel.go b/pkg/agent/agentserver/tunnel.go index d5f5a5320..4d3ceda65 100644 --- a/pkg/agent/agentserver/tunnel.go +++ b/pkg/agent/agentserver/tunnel.go @@ -23,7 +23,7 @@ import ( "net/http" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/proto/agent" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // Tunnel implements Proxy based on HTTP Connect, which tunnels the traffic to @@ -56,10 +56,10 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { } random := rand.Int63() - dialRequest := &agent.Packet{ - Type: agent.PacketType_DIAL_REQ, - Payload: &agent.Packet_DialRequest{ - DialRequest: &agent.DialRequest{ + dialRequest := &client.Packet{ + Type: client.PacketType_DIAL_REQ, + Payload: &client.Packet_DialRequest{ + DialRequest: &client.DialRequest{ Protocol: "tcp", Address: r.Host, Random: random, @@ -113,10 +113,10 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { break } - packet := &agent.Packet{ - Type: agent.PacketType_DATA, - Payload: &agent.Packet_Data{ - Data: &agent.Data{ + packet := &client.Packet{ + Type: client.PacketType_DATA, + Payload: &client.Packet_Data{ + Data: &client.Data{ ConnectID: connection.connectID, Data: pkt[:n], }, diff --git a/proto/agent/agent.pb.go b/proto/agent/agent.pb.go index 3f8af777b..20ca92280 100644 --- a/proto/agent/agent.pb.go +++ b/proto/agent/agent.pb.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: agent/agent.proto +// source: proto/agent/agent.proto package agent @@ -26,6 +26,7 @@ import ( codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" math "math" + client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // Reference imports to suppress errors if they are not otherwise used. @@ -39,501 +40,20 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package -type PacketType int32 +func init() { proto.RegisterFile("proto/agent/agent.proto", fileDescriptor_656b6c96a18ce683) } -const ( - PacketType_DIAL_REQ PacketType = 0 - PacketType_DIAL_RSP PacketType = 1 - PacketType_CLOSE_REQ PacketType = 2 - PacketType_CLOSE_RSP PacketType = 3 - PacketType_DATA PacketType = 4 -) - -var PacketType_name = map[int32]string{ - 0: "DIAL_REQ", - 1: "DIAL_RSP", - 2: "CLOSE_REQ", - 3: "CLOSE_RSP", - 4: "DATA", -} - -var PacketType_value = map[string]int32{ - "DIAL_REQ": 0, - "DIAL_RSP": 1, - "CLOSE_REQ": 2, - "CLOSE_RSP": 3, - "DATA": 4, -} - -func (x PacketType) String() string { - return proto.EnumName(PacketType_name, int32(x)) -} - -func (PacketType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{0} -} - -type Error int32 - -const ( - Error_EOF Error = 0 -) - -var Error_name = map[int32]string{ - 0: "EOF", -} - -var Error_value = map[string]int32{ - "EOF": 0, -} - -func (x Error) String() string { - return proto.EnumName(Error_name, int32(x)) -} - -func (Error) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{1} -} - -type Packet struct { - Type PacketType `protobuf:"varint,1,opt,name=type,proto3,enum=PacketType" json:"type,omitempty"` - // Types that are valid to be assigned to Payload: - // *Packet_DialRequest - // *Packet_DialResponse - // *Packet_Data - // *Packet_CloseRequest - // *Packet_CloseResponse - Payload isPacket_Payload `protobuf_oneof:"payload"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Packet) Reset() { *m = Packet{} } -func (m *Packet) String() string { return proto.CompactTextString(m) } -func (*Packet) ProtoMessage() {} -func (*Packet) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{0} -} - -func (m *Packet) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Packet.Unmarshal(m, b) -} -func (m *Packet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Packet.Marshal(b, m, deterministic) -} -func (m *Packet) XXX_Merge(src proto.Message) { - xxx_messageInfo_Packet.Merge(m, src) -} -func (m *Packet) XXX_Size() int { - return xxx_messageInfo_Packet.Size(m) -} -func (m *Packet) XXX_DiscardUnknown() { - xxx_messageInfo_Packet.DiscardUnknown(m) -} - -var xxx_messageInfo_Packet proto.InternalMessageInfo - -func (m *Packet) GetType() PacketType { - if m != nil { - return m.Type - } - return PacketType_DIAL_REQ -} - -type isPacket_Payload interface { - isPacket_Payload() -} - -type Packet_DialRequest struct { - DialRequest *DialRequest `protobuf:"bytes,2,opt,name=dialRequest,proto3,oneof"` -} - -type Packet_DialResponse struct { - DialResponse *DialResponse `protobuf:"bytes,3,opt,name=dialResponse,proto3,oneof"` -} - -type Packet_Data struct { - Data *Data `protobuf:"bytes,4,opt,name=data,proto3,oneof"` -} - -type Packet_CloseRequest struct { - CloseRequest *CloseRequest `protobuf:"bytes,5,opt,name=closeRequest,proto3,oneof"` -} - -type Packet_CloseResponse struct { - CloseResponse *CloseResponse `protobuf:"bytes,6,opt,name=closeResponse,proto3,oneof"` -} - -func (*Packet_DialRequest) isPacket_Payload() {} - -func (*Packet_DialResponse) isPacket_Payload() {} - -func (*Packet_Data) isPacket_Payload() {} - -func (*Packet_CloseRequest) isPacket_Payload() {} - -func (*Packet_CloseResponse) isPacket_Payload() {} - -func (m *Packet) GetPayload() isPacket_Payload { - if m != nil { - return m.Payload - } - return nil -} - -func (m *Packet) GetDialRequest() *DialRequest { - if x, ok := m.GetPayload().(*Packet_DialRequest); ok { - return x.DialRequest - } - return nil -} - -func (m *Packet) GetDialResponse() *DialResponse { - if x, ok := m.GetPayload().(*Packet_DialResponse); ok { - return x.DialResponse - } - return nil -} - -func (m *Packet) GetData() *Data { - if x, ok := m.GetPayload().(*Packet_Data); ok { - return x.Data - } - return nil -} - -func (m *Packet) GetCloseRequest() *CloseRequest { - if x, ok := m.GetPayload().(*Packet_CloseRequest); ok { - return x.CloseRequest - } - return nil -} - -func (m *Packet) GetCloseResponse() *CloseResponse { - if x, ok := m.GetPayload().(*Packet_CloseResponse); ok { - return x.CloseResponse - } - return nil -} - -// XXX_OneofWrappers is for the internal use of the proto package. -func (*Packet) XXX_OneofWrappers() []interface{} { - return []interface{}{ - (*Packet_DialRequest)(nil), - (*Packet_DialResponse)(nil), - (*Packet_Data)(nil), - (*Packet_CloseRequest)(nil), - (*Packet_CloseResponse)(nil), - } -} - -type DialRequest struct { - // tcp or udp? - Protocol string `protobuf:"bytes,1,opt,name=protocol,proto3" json:"protocol,omitempty"` - // node:port - Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` - // random id for client, maybe should be longer - Random int64 `protobuf:"varint,3,opt,name=random,proto3" json:"random,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *DialRequest) Reset() { *m = DialRequest{} } -func (m *DialRequest) String() string { return proto.CompactTextString(m) } -func (*DialRequest) ProtoMessage() {} -func (*DialRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{1} -} - -func (m *DialRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DialRequest.Unmarshal(m, b) -} -func (m *DialRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DialRequest.Marshal(b, m, deterministic) -} -func (m *DialRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialRequest.Merge(m, src) -} -func (m *DialRequest) XXX_Size() int { - return xxx_messageInfo_DialRequest.Size(m) -} -func (m *DialRequest) XXX_DiscardUnknown() { - xxx_messageInfo_DialRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_DialRequest proto.InternalMessageInfo - -func (m *DialRequest) GetProtocol() string { - if m != nil { - return m.Protocol - } - return "" -} - -func (m *DialRequest) GetAddress() string { - if m != nil { - return m.Address - } - return "" -} - -func (m *DialRequest) GetRandom() int64 { - if m != nil { - return m.Random - } - return 0 -} - -type DialResponse struct { - // error failed reason; enum? - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - // connectID indicates the identifier of the connection - ConnectID int64 `protobuf:"varint,2,opt,name=connectID,proto3" json:"connectID,omitempty"` - // random copied from DialRequest - Random int64 `protobuf:"varint,3,opt,name=random,proto3" json:"random,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *DialResponse) Reset() { *m = DialResponse{} } -func (m *DialResponse) String() string { return proto.CompactTextString(m) } -func (*DialResponse) ProtoMessage() {} -func (*DialResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{2} -} - -func (m *DialResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DialResponse.Unmarshal(m, b) -} -func (m *DialResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DialResponse.Marshal(b, m, deterministic) -} -func (m *DialResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialResponse.Merge(m, src) -} -func (m *DialResponse) XXX_Size() int { - return xxx_messageInfo_DialResponse.Size(m) -} -func (m *DialResponse) XXX_DiscardUnknown() { - xxx_messageInfo_DialResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_DialResponse proto.InternalMessageInfo - -func (m *DialResponse) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func (m *DialResponse) GetConnectID() int64 { - if m != nil { - return m.ConnectID - } - return 0 -} - -func (m *DialResponse) GetRandom() int64 { - if m != nil { - return m.Random - } - return 0 -} - -type CloseRequest struct { - // connectID of the stream to close - ConnectID int64 `protobuf:"varint,1,opt,name=connectID,proto3" json:"connectID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *CloseRequest) Reset() { *m = CloseRequest{} } -func (m *CloseRequest) String() string { return proto.CompactTextString(m) } -func (*CloseRequest) ProtoMessage() {} -func (*CloseRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{3} -} - -func (m *CloseRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_CloseRequest.Unmarshal(m, b) -} -func (m *CloseRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_CloseRequest.Marshal(b, m, deterministic) -} -func (m *CloseRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_CloseRequest.Merge(m, src) -} -func (m *CloseRequest) XXX_Size() int { - return xxx_messageInfo_CloseRequest.Size(m) -} -func (m *CloseRequest) XXX_DiscardUnknown() { - xxx_messageInfo_CloseRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_CloseRequest proto.InternalMessageInfo - -func (m *CloseRequest) GetConnectID() int64 { - if m != nil { - return m.ConnectID - } - return 0 -} - -type CloseResponse struct { - // error message - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - // connectID indicates the identifier of the connection - ConnectID int64 `protobuf:"varint,2,opt,name=connectID,proto3" json:"connectID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *CloseResponse) Reset() { *m = CloseResponse{} } -func (m *CloseResponse) String() string { return proto.CompactTextString(m) } -func (*CloseResponse) ProtoMessage() {} -func (*CloseResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{4} -} - -func (m *CloseResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_CloseResponse.Unmarshal(m, b) -} -func (m *CloseResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_CloseResponse.Marshal(b, m, deterministic) -} -func (m *CloseResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_CloseResponse.Merge(m, src) -} -func (m *CloseResponse) XXX_Size() int { - return xxx_messageInfo_CloseResponse.Size(m) -} -func (m *CloseResponse) XXX_DiscardUnknown() { - xxx_messageInfo_CloseResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_CloseResponse proto.InternalMessageInfo - -func (m *CloseResponse) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func (m *CloseResponse) GetConnectID() int64 { - if m != nil { - return m.ConnectID - } - return 0 -} - -type Data struct { - // connectID to connect to - ConnectID int64 `protobuf:"varint,1,opt,name=connectID,proto3" json:"connectID,omitempty"` - // error message if error happens - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` - // stream data - Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Data) Reset() { *m = Data{} } -func (m *Data) String() string { return proto.CompactTextString(m) } -func (*Data) ProtoMessage() {} -func (*Data) Descriptor() ([]byte, []int) { - return fileDescriptor_253ddf9934d56132, []int{5} -} - -func (m *Data) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Data.Unmarshal(m, b) -} -func (m *Data) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Data.Marshal(b, m, deterministic) -} -func (m *Data) XXX_Merge(src proto.Message) { - xxx_messageInfo_Data.Merge(m, src) -} -func (m *Data) XXX_Size() int { - return xxx_messageInfo_Data.Size(m) -} -func (m *Data) XXX_DiscardUnknown() { - xxx_messageInfo_Data.DiscardUnknown(m) -} - -var xxx_messageInfo_Data proto.InternalMessageInfo - -func (m *Data) GetConnectID() int64 { - if m != nil { - return m.ConnectID - } - return 0 -} - -func (m *Data) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func (m *Data) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func init() { - proto.RegisterEnum("PacketType", PacketType_name, PacketType_value) - proto.RegisterEnum("Error", Error_name, Error_value) - proto.RegisterType((*Packet)(nil), "Packet") - proto.RegisterType((*DialRequest)(nil), "DialRequest") - proto.RegisterType((*DialResponse)(nil), "DialResponse") - proto.RegisterType((*CloseRequest)(nil), "CloseRequest") - proto.RegisterType((*CloseResponse)(nil), "CloseResponse") - proto.RegisterType((*Data)(nil), "Data") -} - -func init() { proto.RegisterFile("agent/agent.proto", fileDescriptor_253ddf9934d56132) } - -var fileDescriptor_253ddf9934d56132 = []byte{ - // 443 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xd1, 0x6e, 0xd3, 0x30, - 0x14, 0x4d, 0x9a, 0xa4, 0x69, 0x6e, 0xd3, 0x29, 0x5c, 0x21, 0x14, 0x0d, 0xa4, 0x8d, 0x3c, 0x55, - 0x13, 0xea, 0x46, 0x27, 0xf1, 0x5e, 0x9a, 0xa2, 0x4e, 0x9a, 0x58, 0x70, 0xf7, 0x04, 0x0f, 0xc8, - 0x24, 0x16, 0x9a, 0x28, 0x71, 0x70, 0x0c, 0xa2, 0x1f, 0xc4, 0x7f, 0xa2, 0x38, 0xce, 0xe2, 0x20, - 0x8d, 0x87, 0xbd, 0x44, 0x39, 0xc7, 0xf7, 0x1c, 0x5f, 0x9f, 0x6b, 0xc3, 0x13, 0xfa, 0x95, 0x95, - 0xf2, 0x5c, 0x7d, 0x17, 0x95, 0xe0, 0x92, 0x27, 0x7f, 0x46, 0x30, 0xce, 0x68, 0xfe, 0x8d, 0x49, - 0x3c, 0x01, 0x57, 0x1e, 0x2a, 0x16, 0xdb, 0xa7, 0xf6, 0xfc, 0x68, 0x39, 0x5d, 0xb4, 0xf4, 0xed, - 0xa1, 0x62, 0x44, 0x2d, 0xe0, 0x05, 0x4c, 0x8b, 0x3b, 0xba, 0x27, 0xec, 0xc7, 0x4f, 0x56, 0xcb, - 0x78, 0x74, 0x6a, 0xcf, 0xa7, 0xcb, 0x70, 0x91, 0xf6, 0xdc, 0xd6, 0x22, 0x66, 0x09, 0x5e, 0x42, - 0xd8, 0xc2, 0xba, 0xe2, 0x65, 0xcd, 0x62, 0x47, 0x49, 0x66, 0x5a, 0xd2, 0x92, 0x5b, 0x8b, 0x0c, - 0x8a, 0xf0, 0x39, 0xb8, 0x05, 0x95, 0x34, 0x76, 0x55, 0xb1, 0xb7, 0x48, 0xa9, 0xa4, 0x5b, 0x8b, - 0x28, 0xb2, 0x71, 0xcc, 0xf7, 0xbc, 0x66, 0x5d, 0x13, 0x9e, 0x76, 0x5c, 0x1b, 0x64, 0xe3, 0x68, - 0x16, 0xe1, 0x1b, 0x98, 0x69, 0xac, 0xfb, 0x18, 0x2b, 0xd5, 0x51, 0xa7, 0xba, 0x6f, 0x64, 0x58, - 0xf6, 0x36, 0x00, 0xbf, 0xa2, 0x87, 0x3d, 0xa7, 0x45, 0xf2, 0x09, 0xa6, 0xc6, 0x39, 0xf1, 0x18, - 0x26, 0x2a, 0xbf, 0x9c, 0xef, 0x55, 0x5e, 0x01, 0xb9, 0xc7, 0x18, 0x83, 0x4f, 0x8b, 0x42, 0xb0, - 0xba, 0x56, 0x11, 0x05, 0xa4, 0x83, 0xf8, 0x0c, 0xc6, 0x82, 0x96, 0x05, 0xff, 0xae, 0x82, 0x70, - 0x88, 0x46, 0xc9, 0x47, 0x08, 0xcd, 0x44, 0xf0, 0x29, 0x78, 0x4c, 0x08, 0x2e, 0xb4, 0x75, 0x0b, - 0xf0, 0x05, 0x04, 0x39, 0x2f, 0x4b, 0x96, 0xcb, 0xab, 0x54, 0x39, 0x3b, 0xa4, 0x27, 0x1e, 0xf4, - 0x7e, 0x05, 0xa1, 0x99, 0xcd, 0xd0, 0xc5, 0xfe, 0xc7, 0x25, 0x59, 0xc3, 0x6c, 0x90, 0xc9, 0x63, - 0x5a, 0x49, 0xde, 0x83, 0xdb, 0xcc, 0xec, 0xff, 0x5b, 0xf5, 0xce, 0x23, 0xd3, 0x19, 0xf5, 0xf0, - 0x9b, 0x43, 0x84, 0xed, 0xcc, 0xcf, 0x32, 0x80, 0xfe, 0x2e, 0x62, 0x08, 0x93, 0xf4, 0x6a, 0x75, - 0xfd, 0x99, 0x6c, 0x3e, 0x44, 0x56, 0x8f, 0x76, 0x59, 0x64, 0xe3, 0x0c, 0x82, 0xf5, 0xf5, 0xcd, - 0x6e, 0xa3, 0x16, 0x47, 0x06, 0xdc, 0x65, 0x91, 0x83, 0x13, 0x70, 0xd3, 0xd5, 0xed, 0x2a, 0x72, - 0xcf, 0x22, 0xf0, 0x36, 0x6a, 0x3b, 0x1f, 0x9c, 0xcd, 0xcd, 0xbb, 0xc8, 0x5a, 0x9e, 0x43, 0x98, - 0x09, 0xfe, 0xfb, 0xb0, 0x63, 0xe2, 0xd7, 0x5d, 0xce, 0xf0, 0x04, 0x3c, 0x85, 0xd1, 0xd7, 0xef, - 0xe0, 0xb8, 0xfb, 0x49, 0xac, 0xb9, 0x7d, 0x61, 0x2f, 0x5f, 0x43, 0xb8, 0x6a, 0xde, 0x51, 0x27, - 0x78, 0x09, 0xfe, 0xba, 0x3d, 0xdb, 0x43, 0x92, 0x2f, 0x63, 0x75, 0x45, 0x2e, 0xff, 0x06, 0x00, - 0x00, 0xff, 0xff, 0x34, 0x35, 0xd5, 0x32, 0x87, 0x03, 0x00, 0x00, +var fileDescriptor_656b6c96a18ce683 = []byte{ + // 155 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2f, 0x28, 0xca, 0x2f, + 0xc9, 0xd7, 0x4f, 0x4c, 0x4f, 0xcd, 0x2b, 0x81, 0x90, 0x7a, 0x60, 0x11, 0x29, 0xdd, 0xec, 0xfc, + 0xbc, 0xbc, 0xd4, 0xe4, 0x92, 0xcc, 0xb2, 0xcc, 0x92, 0x4a, 0xdd, 0xe4, 0x9c, 0x4c, 0x90, 0x02, + 0x88, 0x62, 0x28, 0x07, 0x42, 0x41, 0x94, 0x1b, 0x19, 0x72, 0xf1, 0x38, 0x82, 0x74, 0x07, 0xa7, + 0x16, 0x95, 0x65, 0x26, 0xa7, 0x0a, 0x29, 0x72, 0xb1, 0x3b, 0x43, 0x0c, 0x10, 0x62, 0xd7, 0x0b, + 0x48, 0x4c, 0xce, 0x4e, 0x2d, 0x91, 0x82, 0x31, 0x94, 0x18, 0x34, 0x18, 0x0d, 0x18, 0x9d, 0x0c, + 0xa3, 0xf4, 0x8b, 0x33, 0xd3, 0x8b, 0xf5, 0xb2, 0x2d, 0x8a, 0xf5, 0x32, 0xf3, 0xf5, 0x13, 0x0b, + 0x32, 0x8b, 0x53, 0x8b, 0xca, 0x52, 0x8b, 0x74, 0xf3, 0x52, 0x4b, 0xca, 0xf3, 0x8b, 0xb2, 0x75, + 0x0b, 0x8a, 0xf2, 0x2b, 0x2a, 0xf5, 0x91, 0x1c, 0x98, 0xc4, 0x06, 0xe6, 0x18, 0x03, 0x02, 0x00, + 0x00, 0xff, 0xff, 0x85, 0x25, 0x82, 0x73, 0xb6, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -544,110 +64,6 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// ProxyServiceClient is the client API for ProxyService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ProxyServiceClient interface { - Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error) -} - -type proxyServiceClient struct { - cc *grpc.ClientConn -} - -func NewProxyServiceClient(cc *grpc.ClientConn) ProxyServiceClient { - return &proxyServiceClient{cc} -} - -func (c *proxyServiceClient) Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error) { - stream, err := c.cc.NewStream(ctx, &_ProxyService_serviceDesc.Streams[0], "/ProxyService/Proxy", opts...) - if err != nil { - return nil, err - } - x := &proxyServiceProxyClient{stream} - return x, nil -} - -type ProxyService_ProxyClient interface { - Send(*Packet) error - Recv() (*Packet, error) - grpc.ClientStream -} - -type proxyServiceProxyClient struct { - grpc.ClientStream -} - -func (x *proxyServiceProxyClient) Send(m *Packet) error { - return x.ClientStream.SendMsg(m) -} - -func (x *proxyServiceProxyClient) Recv() (*Packet, error) { - m := new(Packet) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// ProxyServiceServer is the server API for ProxyService service. -type ProxyServiceServer interface { - Proxy(ProxyService_ProxyServer) error -} - -// UnimplementedProxyServiceServer can be embedded to have forward compatible implementations. -type UnimplementedProxyServiceServer struct { -} - -func (*UnimplementedProxyServiceServer) Proxy(srv ProxyService_ProxyServer) error { - return status.Errorf(codes.Unimplemented, "method Proxy not implemented") -} - -func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) { - s.RegisterService(&_ProxyService_serviceDesc, srv) -} - -func _ProxyService_Proxy_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(ProxyServiceServer).Proxy(&proxyServiceProxyServer{stream}) -} - -type ProxyService_ProxyServer interface { - Send(*Packet) error - Recv() (*Packet, error) - grpc.ServerStream -} - -type proxyServiceProxyServer struct { - grpc.ServerStream -} - -func (x *proxyServiceProxyServer) Send(m *Packet) error { - return x.ServerStream.SendMsg(m) -} - -func (x *proxyServiceProxyServer) Recv() (*Packet, error) { - m := new(Packet) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _ProxyService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "ProxyService", - HandlerType: (*ProxyServiceServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "Proxy", - Handler: _ProxyService_Proxy_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "agent/agent.proto", -} - // AgentServiceClient is the client API for AgentService service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. @@ -674,8 +90,8 @@ func (c *agentServiceClient) Connect(ctx context.Context, opts ...grpc.CallOptio } type AgentService_ConnectClient interface { - Send(*Packet) error - Recv() (*Packet, error) + Send(*client.Packet) error + Recv() (*client.Packet, error) grpc.ClientStream } @@ -683,12 +99,12 @@ type agentServiceConnectClient struct { grpc.ClientStream } -func (x *agentServiceConnectClient) Send(m *Packet) error { +func (x *agentServiceConnectClient) Send(m *client.Packet) error { return x.ClientStream.SendMsg(m) } -func (x *agentServiceConnectClient) Recv() (*Packet, error) { - m := new(Packet) +func (x *agentServiceConnectClient) Recv() (*client.Packet, error) { + m := new(client.Packet) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -718,8 +134,8 @@ func _AgentService_Connect_Handler(srv interface{}, stream grpc.ServerStream) er } type AgentService_ConnectServer interface { - Send(*Packet) error - Recv() (*Packet, error) + Send(*client.Packet) error + Recv() (*client.Packet, error) grpc.ServerStream } @@ -727,12 +143,12 @@ type agentServiceConnectServer struct { grpc.ServerStream } -func (x *agentServiceConnectServer) Send(m *Packet) error { +func (x *agentServiceConnectServer) Send(m *client.Packet) error { return x.ServerStream.SendMsg(m) } -func (x *agentServiceConnectServer) Recv() (*Packet, error) { - m := new(Packet) +func (x *agentServiceConnectServer) Recv() (*client.Packet, error) { + m := new(client.Packet) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } @@ -751,5 +167,5 @@ var _AgentService_serviceDesc = grpc.ServiceDesc{ ClientStreams: true, }, }, - Metadata: "agent/agent.proto", + Metadata: "proto/agent/agent.proto", } diff --git a/proto/agent/agent.proto b/proto/agent/agent.proto index efde94a9a..100305d0b 100644 --- a/proto/agent/agent.proto +++ b/proto/agent/agent.proto @@ -14,87 +14,11 @@ syntax = "proto3"; -// Retransmit? -// Sliding windows? +option go_package = "sigs.k8s.io/apiserver-network-proxy/proto/agent"; - -service ProxyService { - rpc Proxy(stream Packet) returns (stream Packet) {} -} +import "konnectivity-client/proto/client/client.proto"; service AgentService { // Agent Identifier? rpc Connect(stream Packet) returns (stream Packet) {} } - -enum PacketType { - DIAL_REQ = 0; - DIAL_RSP = 1; - CLOSE_REQ = 2; - CLOSE_RSP = 3; - DATA = 4; -} - -enum Error { - EOF = 0; - // ... -} - -message Packet { - PacketType type = 1; - - oneof payload { - DialRequest dialRequest = 2; - DialResponse dialResponse = 3; - Data data = 4; - CloseRequest closeRequest = 5; - CloseResponse closeResponse = 6; - } -} - -message DialRequest { - // tcp or udp? - string protocol = 1; - - // node:port - string address = 2; - - // random id for client, maybe should be longer - int64 random = 3; -} - -message DialResponse { - // error failed reason; enum? - string error = 1; - - // connectID indicates the identifier of the connection - int64 connectID = 2; - - // random copied from DialRequest - int64 random = 3; -} - -message CloseRequest { - // connectID of the stream to close - int64 connectID = 1; -} - -message CloseResponse { - // error message - string error = 1; - - // connectID indicates the identifier of the connection - int64 connectID = 2; -} - -message Data { - // connectID to connect to - int64 connectID = 1; - - // error message if error happens - string error = 2; - - // stream data - bytes data = 3; -} - diff --git a/proto/agent/mocks/agent_mock.go b/proto/agent/mocks/agent_mock.go index a7f284524..a27342dbf 100644 --- a/proto/agent/mocks/agent_mock.go +++ b/proto/agent/mocks/agent_mock.go @@ -24,7 +24,7 @@ import ( gomock "github.com/golang/mock/gomock" metadata "google.golang.org/grpc/metadata" reflect "reflect" - agent "sigs.k8s.io/apiserver-network-proxy/proto/agent" + client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // MockAgentService_ConnectServer is a mock of AgentService_ConnectServer interface @@ -65,10 +65,10 @@ func (mr *MockAgentService_ConnectServerMockRecorder) Context() *gomock.Call { } // Recv mocks base method -func (m *MockAgentService_ConnectServer) Recv() (*agent.Packet, error) { +func (m *MockAgentService_ConnectServer) Recv() (*client.Packet, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Recv") - ret0, _ := ret[0].(*agent.Packet) + ret0, _ := ret[0].(*client.Packet) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -94,7 +94,7 @@ func (mr *MockAgentService_ConnectServerMockRecorder) RecvMsg(arg0 interface{}) } // Send mocks base method -func (m *MockAgentService_ConnectServer) Send(arg0 *agent.Packet) error { +func (m *MockAgentService_ConnectServer) Send(arg0 *client.Packet) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Send", arg0) ret0, _ := ret[0].(error) diff --git a/proto/proxy.pb.go b/proto/proxy.pb.go deleted file mode 100644 index c04f4aae9..000000000 --- a/proto/proxy.pb.go +++ /dev/null @@ -1,375 +0,0 @@ -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: proxy.proto - -package proxy - -import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type DialRequest struct { - // tcp or udp? - Protocol string `protobuf:"bytes,1,opt,name=protocol,proto3" json:"protocol,omitempty"` - // node:port - Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *DialRequest) Reset() { *m = DialRequest{} } -func (m *DialRequest) String() string { return proto.CompactTextString(m) } -func (*DialRequest) ProtoMessage() {} -func (*DialRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{0} -} - -func (m *DialRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DialRequest.Unmarshal(m, b) -} -func (m *DialRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DialRequest.Marshal(b, m, deterministic) -} -func (m *DialRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialRequest.Merge(m, src) -} -func (m *DialRequest) XXX_Size() int { - return xxx_messageInfo_DialRequest.Size(m) -} -func (m *DialRequest) XXX_DiscardUnknown() { - xxx_messageInfo_DialRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_DialRequest proto.InternalMessageInfo - -func (m *DialRequest) GetProtocol() string { - if m != nil { - return m.Protocol - } - return "" -} - -func (m *DialRequest) GetAddress() string { - if m != nil { - return m.Address - } - return "" -} - -type DialResponse struct { - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - StreamID int32 `protobuf:"varint,2,opt,name=streamID,proto3" json:"streamID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *DialResponse) Reset() { *m = DialResponse{} } -func (m *DialResponse) String() string { return proto.CompactTextString(m) } -func (*DialResponse) ProtoMessage() {} -func (*DialResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{1} -} - -func (m *DialResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_DialResponse.Unmarshal(m, b) -} -func (m *DialResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_DialResponse.Marshal(b, m, deterministic) -} -func (m *DialResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialResponse.Merge(m, src) -} -func (m *DialResponse) XXX_Size() int { - return xxx_messageInfo_DialResponse.Size(m) -} -func (m *DialResponse) XXX_DiscardUnknown() { - xxx_messageInfo_DialResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_DialResponse proto.InternalMessageInfo - -func (m *DialResponse) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func (m *DialResponse) GetStreamID() int32 { - if m != nil { - return m.StreamID - } - return 0 -} - -type Data struct { - // streamID to connect to - StreamID int32 `protobuf:"varint,1,opt,name=streamID,proto3" json:"streamID,omitempty"` - // error message if error happens - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` - // stream data - Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Data) Reset() { *m = Data{} } -func (m *Data) String() string { return proto.CompactTextString(m) } -func (*Data) ProtoMessage() {} -func (*Data) Descriptor() ([]byte, []int) { - return fileDescriptor_700b50b08ed8dbaf, []int{2} -} - -func (m *Data) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Data.Unmarshal(m, b) -} -func (m *Data) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Data.Marshal(b, m, deterministic) -} -func (m *Data) XXX_Merge(src proto.Message) { - xxx_messageInfo_Data.Merge(m, src) -} -func (m *Data) XXX_Size() int { - return xxx_messageInfo_Data.Size(m) -} -func (m *Data) XXX_DiscardUnknown() { - xxx_messageInfo_Data.DiscardUnknown(m) -} - -var xxx_messageInfo_Data proto.InternalMessageInfo - -func (m *Data) GetStreamID() int32 { - if m != nil { - return m.StreamID - } - return 0 -} - -func (m *Data) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func (m *Data) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func init() { - proto.RegisterType((*DialRequest)(nil), "DialRequest") - proto.RegisterType((*DialResponse)(nil), "DialResponse") - proto.RegisterType((*Data)(nil), "Data") -} - -func init() { proto.RegisterFile("proxy.proto", fileDescriptor_700b50b08ed8dbaf) } - -var fileDescriptor_700b50b08ed8dbaf = []byte{ - // 222 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xc1, 0x4a, 0xc4, 0x30, - 0x10, 0x86, 0x37, 0x6b, 0xeb, 0xea, 0x6c, 0xbc, 0x0c, 0x1e, 0x42, 0x41, 0x58, 0x02, 0x42, 0x4f, - 0x41, 0xf4, 0x05, 0x84, 0xed, 0x45, 0xf0, 0x20, 0xd1, 0x17, 0x88, 0xed, 0x1c, 0x16, 0xd6, 0xa6, - 0x4e, 0xa2, 0xe8, 0xdb, 0x4b, 0x52, 0xad, 0xed, 0x29, 0xf9, 0x92, 0xe1, 0xe3, 0xff, 0x07, 0xb6, - 0x03, 0xfb, 0xaf, 0x6f, 0x33, 0xb0, 0x8f, 0x5e, 0xef, 0x61, 0xdb, 0x1c, 0xdc, 0xd1, 0xd2, 0xfb, - 0x07, 0x85, 0x88, 0x15, 0x9c, 0xe5, 0xf7, 0xd6, 0x1f, 0x95, 0xd8, 0x89, 0xfa, 0xdc, 0x4e, 0x8c, - 0x0a, 0x36, 0xae, 0xeb, 0x98, 0x42, 0x50, 0xeb, 0xfc, 0xf5, 0x87, 0xfa, 0x1e, 0xe4, 0x28, 0x09, - 0x83, 0xef, 0x03, 0xe1, 0x25, 0x94, 0xc4, 0xec, 0xf9, 0x57, 0x31, 0x42, 0x72, 0x87, 0xc8, 0xe4, - 0xde, 0x1e, 0x9a, 0x2c, 0x28, 0xed, 0xc4, 0xfa, 0x11, 0x8a, 0xc6, 0x45, 0xb7, 0x98, 0x11, 0xcb, - 0x99, 0x7f, 0xeb, 0x7a, 0x6e, 0x45, 0x28, 0x3a, 0x17, 0x9d, 0x3a, 0xd9, 0x89, 0x5a, 0xda, 0x7c, - 0xbf, 0x7d, 0x01, 0xf9, 0x94, 0x3a, 0x3e, 0x13, 0x7f, 0x1e, 0x5a, 0xc2, 0x6b, 0x28, 0x52, 0x3e, - 0x94, 0x66, 0xd6, 0xb5, 0xba, 0x30, 0xf3, 0xd0, 0x7a, 0x85, 0x57, 0xb0, 0xd9, 0xfb, 0xbe, 0xa7, - 0x36, 0x62, 0x69, 0x52, 0x9c, 0x6a, 0x3c, 0xf4, 0xaa, 0x16, 0x37, 0xe2, 0xf5, 0x34, 0x6f, 0xe2, - 0xee, 0x27, 0x00, 0x00, 0xff, 0xff, 0x3d, 0x41, 0x6a, 0x1c, 0x40, 0x01, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// ProxyServiceClient is the client API for ProxyService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ProxyServiceClient interface { - // Dial a remote address and return a stream id if success - Dial(ctx context.Context, in *DialRequest, opts ...grpc.CallOption) (*DialResponse, error) - // Connect connects to a remote address by stream id, and establish - // a bi-directional stream. - Connect(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ConnectClient, error) -} - -type proxyServiceClient struct { - cc *grpc.ClientConn -} - -func NewProxyServiceClient(cc *grpc.ClientConn) ProxyServiceClient { - return &proxyServiceClient{cc} -} - -func (c *proxyServiceClient) Dial(ctx context.Context, in *DialRequest, opts ...grpc.CallOption) (*DialResponse, error) { - out := new(DialResponse) - err := c.cc.Invoke(ctx, "/ProxyService/Dial", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *proxyServiceClient) Connect(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ConnectClient, error) { - stream, err := c.cc.NewStream(ctx, &_ProxyService_serviceDesc.Streams[0], "/ProxyService/Connect", opts...) - if err != nil { - return nil, err - } - x := &proxyServiceConnectClient{stream} - return x, nil -} - -type ProxyService_ConnectClient interface { - Send(*Data) error - Recv() (*Data, error) - grpc.ClientStream -} - -type proxyServiceConnectClient struct { - grpc.ClientStream -} - -func (x *proxyServiceConnectClient) Send(m *Data) error { - return x.ClientStream.SendMsg(m) -} - -func (x *proxyServiceConnectClient) Recv() (*Data, error) { - m := new(Data) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// ProxyServiceServer is the server API for ProxyService service. -type ProxyServiceServer interface { - // Dial a remote address and return a stream id if success - Dial(context.Context, *DialRequest) (*DialResponse, error) - // Connect connects to a remote address by stream id, and establish - // a bi-directional stream. - Connect(ProxyService_ConnectServer) error -} - -// UnimplementedProxyServiceServer can be embedded to have forward compatible implementations. -type UnimplementedProxyServiceServer struct { -} - -func (*UnimplementedProxyServiceServer) Dial(ctx context.Context, req *DialRequest) (*DialResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Dial not implemented") -} -func (*UnimplementedProxyServiceServer) Connect(srv ProxyService_ConnectServer) error { - return status.Errorf(codes.Unimplemented, "method Connect not implemented") -} - -func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) { - s.RegisterService(&_ProxyService_serviceDesc, srv) -} - -func _ProxyService_Dial_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DialRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProxyServiceServer).Dial(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ProxyService/Dial", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProxyServiceServer).Dial(ctx, req.(*DialRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _ProxyService_Connect_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(ProxyServiceServer).Connect(&proxyServiceConnectServer{stream}) -} - -type ProxyService_ConnectServer interface { - Send(*Data) error - Recv() (*Data, error) - grpc.ServerStream -} - -type proxyServiceConnectServer struct { - grpc.ServerStream -} - -func (x *proxyServiceConnectServer) Send(m *Data) error { - return x.ServerStream.SendMsg(m) -} - -func (x *proxyServiceConnectServer) Recv() (*Data, error) { - m := new(Data) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _ProxyService_serviceDesc = grpc.ServiceDesc{ - ServiceName: "ProxyService", - HandlerType: (*ProxyServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Dial", - Handler: _ProxyService_Dial_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Connect", - Handler: _ProxyService_Connect_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "proxy.proto", -} diff --git a/proto/proxy.proto b/proto/proxy.proto deleted file mode 100644 index 3d74deacd..000000000 --- a/proto/proxy.proto +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright The Kubernetes Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// LEGACY - -syntax = "proto3"; - -service ProxyService { - // Dial a remote address and return a stream id if success - rpc Dial(DialRequest) returns (DialResponse) {} - - // Connect connects to a remote address by stream id, and establish - // a bi-directional stream. - rpc Connect(stream Data) returns (stream Data) {} - - // rpc Close() -} - -message DialRequest { - // tcp or udp? - string protocol = 1; - - // node:port - string address = 2; -} - -message DialResponse { - string error = 1; - int32 streamID = 2; -} - -message Data { - // streamID to connect to - int32 streamID = 1; - - // error message if error happens - string error = 2; - - // stream data - bytes data = 3; -} - diff --git a/tests/concurrent_test.go b/tests/concurrent_test.go index 80fe19199..475490e22 100644 --- a/tests/concurrent_test.go +++ b/tests/concurrent_test.go @@ -9,7 +9,7 @@ import ( "time" "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent/client" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) func TestProxy_Concurrency(t *testing.T) { diff --git a/tests/ha_proxy_server_test.go b/tests/ha_proxy_server_test.go index 972f4241a..8225a40a2 100644 --- a/tests/ha_proxy_server_test.go +++ b/tests/ha_proxy_server_test.go @@ -13,7 +13,7 @@ import ( "time" "google.golang.org/grpc" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent/client" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) type tcpLB struct { diff --git a/tests/proxy_test.go b/tests/proxy_test.go index d64899e4b..b0abba8a5 100644 --- a/tests/proxy_test.go +++ b/tests/proxy_test.go @@ -16,7 +16,8 @@ import ( "google.golang.org/grpc" "sigs.k8s.io/apiserver-network-proxy/pkg/agent/agentclient" "sigs.k8s.io/apiserver-network-proxy/pkg/agent/agentserver" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent/client" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" + clientproto "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) @@ -240,7 +241,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, func(), error) { grpcServer.Stop() } - agent.RegisterProxyServiceServer(grpcServer, server) + clientproto.RegisterProxyServiceServer(grpcServer, server) lis, err = net.Listen("tcp", "") if err != nil { return proxy, cleanup, err diff --git a/tests/tcp_server_test.go b/tests/tcp_server_test.go index e083d66df..22cc1c7ac 100644 --- a/tests/tcp_server_test.go +++ b/tests/tcp_server_test.go @@ -7,7 +7,7 @@ import ( "google.golang.org/grpc" "k8s.io/klog" - "sigs.k8s.io/apiserver-network-proxy/pkg/agent/client" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" ) func echo(conn net.Conn) {