From 445c20e3540801d13fd78e6ef0b120effb5202c6 Mon Sep 17 00:00:00 2001 From: Jindong Zhang Date: Sun, 15 May 2022 19:38:24 +0800 Subject: [PATCH] add tcp retry --- CMakeLists.txt | 1 + src/agentcall.c | 5 ++-- src/intent.h | 3 ++ src/portforward.c | 7 +++-- src/repl.c | 73 +++++++++++++++++++++++++++++++++-------------- src/vnet.c | 24 ++++++++++++++-- src/vnet.h | 1 + 7 files changed, 84 insertions(+), 30 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 97095b2..124422f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,3 +81,4 @@ ENDIF() if (LINUX) target_link_libraries(termtunnel PUBLIC "-static") endif() + diff --git a/src/agentcall.c b/src/agentcall.c index ffa2358..79acb3d 100644 --- a/src/agentcall.c +++ b/src/agentcall.c @@ -84,7 +84,7 @@ int agentcall_server_start() { } static void call_send_request(thread_arg_pass_t *tmp) { - int nfd = vnet_tcp_connect(agentcall_service_port); + int nfd = vnet_tcp_connect_with_retry(agentcall_service_port); if (nfd < 0) { log_debug("connect agentcall_service_port error"); return; @@ -98,7 +98,8 @@ static void call_send_request(thread_arg_pass_t *tmp) { int bootstrap_get_args(void * _) { - int nfd = vnet_tcp_connect(agentcall_service_port); + // 确实 agentcall_service_port 会概率性失败.. + int nfd = vnet_tcp_connect_with_retry(agentcall_service_port); if (nfd < 0) { log_debug("connect agentcall_service_port error"); return 0; diff --git a/src/intent.h b/src/intent.h index 6bbdc61..244dfbb 100644 --- a/src/intent.h +++ b/src/intent.h @@ -31,6 +31,9 @@ #define COMMAND_GET_RUNNING_TASK_COUNT 10 #define COMMAND_GET_ARGS 11 #define COMMAND_GET_ARGS_REPLY 12 + +# define FLAG_ONESHOT 1<<0 + typedef struct ci { int i; int mode; diff --git a/src/portforward.c b/src/portforward.c index 467e4f8..a6bfc00 100644 --- a/src/portforward.c +++ b/src/portforward.c @@ -253,7 +253,6 @@ int pipe_lwip_socket_and_socket_pair(int lwip_fd, int fd) { } void portforward_static_server_pipe(port_listen_t *pe) { - // accept log_info("connect %s", pe->host); int lwip_fd = vnet_tcp_connect(port_forward_static_service_port); vnet_send(lwip_fd, pe->host, strlen(pe->host) + 1); // with_zero_as_split @@ -261,7 +260,6 @@ void portforward_static_server_pipe(port_listen_t *pe) { vnet_send(lwip_fd, &tmp, sizeof(uint16_t)); log_info("connect sent %s, do pipe", pe->host); pipe_lwip_socket_and_socket_pair(lwip_fd, pe->local_fd); - log_info("do pipe done", pe->host); close(pe->local_fd); free(pe); int ret = lwip_close(lwip_fd); @@ -282,6 +280,7 @@ void portforward_transparent_server_pipe(port_listen_t *pe) { static void portforward_service_handler(port_listen_t *pe) { int new_sd; int listen_fd = pe->local_fd; + // set_running_task_changed(1); while (true) { if ((new_sd = accept(listen_fd, NULL, NULL)) >= 0) { pthread_t *worker = (pthread_t *)malloc(sizeof(pthread_t)); // TODO(jdz) free @@ -293,11 +292,13 @@ static void portforward_service_handler(port_listen_t *pe) { pthread_create(worker, NULL, &portforward_transparent_server_pipe, (void *)child_pe); } else { pthread_create(worker, NULL, &portforward_static_server_pipe, (void *)child_pe); - } + } + } else { log_info("abort the accept"); } } + // set_running_task_changed(-1); return; } diff --git a/src/repl.c b/src/repl.c index fc55eac..ed97cd1 100644 --- a/src/repl.c +++ b/src/repl.c @@ -84,7 +84,7 @@ ssize_t expect_read(int fd, char *buf, size_t expect_size) { return expect_size; } -void recv_data(int fd, int *type, char **retbuf, int64_t *sz) { +void recv_data(int fd, int64_t *type, char **retbuf, int64_t *sz) { char *buf = (char *)malloc(sizeof(int64_t)); *sz = 0; int a = expect_read(fd, buf, sizeof(int64_t)); @@ -93,10 +93,8 @@ void recv_data(int fd, int *type, char **retbuf, int64_t *sz) { free(buf); return; } - // printf("a==sizeof(int64_t) %d %d\n",a,sizeof(int64_t)); CHECK(a == sizeof(int64_t), "a: %d", a); int64_t msg_size = *((int64_t *)buf); - // printf("recv %d",*(int64_t*)buf); a = expect_read(fd, buf, sizeof(int64_t)); if (a == 0) { free(buf); @@ -108,9 +106,9 @@ void recv_data(int fd, int *type, char **retbuf, int64_t *sz) { } a = expect_read(fd, buf, msg_size); CHECK(a == msg_size, "a: %d,msg_size: %lld\n", a, msg_size); - // printf("a:%d\n",a); *retbuf = buf; *sz = msg_size; + return; } size_t split(char *buffer, char *argv[], size_t argv_size) { @@ -206,6 +204,7 @@ typedef struct { void *funcptr; char *desc; char *usage; + int flags; } actionfinder_t; int hello_func(int argc, char **argv) { @@ -261,7 +260,7 @@ int portforward_func(int argc, char **argv) { send_binary(out, COMMAND_PORT_FORWARD, a, sizeof(port_forward_intent_t)); free(a); - int type; + int64_t type; char *buf; int64_t size; recv_data(in, &type, &buf, &size); @@ -361,17 +360,17 @@ actionfinder_t action_table[] = { {"local_listen", portforward_func, "port forward bind on local host", "local_listen [local_host] [local_port] [remote_host] [remote_port]\n" "when remote_port==0, the service listen on remote_port will be a " - "socks5+http proxy server."}, + "socks5+http proxy server.", NULL}, {"remote_listen", portforward_func, "port forward bind on remote host", "remote_listen [remote_host] [remote_port] [local_host] [local_port]\n" "when local_port==0, the service listen on remote_port will be a " - "socks5+http proxy server."}, - {"upload", upload_func, "upload a file", "usage"}, - {"rz", upload_func, "alias upload", "usage"}, - {"download", download_func, "download a file", "usage"}, - {"sz", download_func, "alias download", "usage"}, - {"help", help_func, "view help manpage", "usage"}, - {"exit", exit_func, "exit application", "usage"}, + "socks5+http proxy server.", NULL}, + {"upload", upload_func, "upload a file", "usage", FLAG_ONESHOT}, + {"rz", upload_func, "alias upload", "usage", FLAG_ONESHOT}, + {"download", download_func, "download a file", "usage", FLAG_ONESHOT}, + {"sz", download_func, "alias download", "usage", FLAG_ONESHOT}, + {"help", help_func, "view help manpage", "usage", FLAG_ONESHOT}, + {"exit", exit_func, "exit application", "usage", NULL}, }; int help_func(int argc, char **argv) { @@ -394,6 +393,17 @@ int print_command_usage(char *command_name) { return 0; } + +int get_command_flags(char *command_name) { + int func_count = sizeof(action_table) / sizeof(actionfinder_t); + for (int i = 0; i < func_count; i++) { + if (strcmp(command_name, action_table[i].funcname) == 0) { + return action_table[i].flags; + } + } + return 0; +} + int repl_execve(int argc, char **argv) { if (argc == 0) { return 0; @@ -447,7 +457,7 @@ void repl_run(int _in, int _out) { } if (ret == -1) { send_binary(out, COMMAND_GET_RUNNING_TASK_COUNT, NULL, 0); - int type; + int64_t type; char *buf; int64_t size; recv_data(in, &type, &buf, &size); @@ -498,7 +508,7 @@ int update_processbar(float percent, char string[]) int get_server_running_task(){ send_binary(out, COMMAND_GET_RUNNING_TASK_COUNT, NULL, 0); - int type; + int64_t type; char *buf; int64_t size; recv_data(in, &type, &buf, &size); @@ -519,12 +529,19 @@ void oneshot_run(int _in, int _out) { update_processbar(i, "11"); fflush(stdout); }*/ + + // get args send_binary(out, COMMAND_GET_ARGS, NULL, 0); char *buf = NULL; int64_t size; int64_t type = 0; recv_data(_in, &type, &buf, &size); - // COMMAND_GET_ARGS_REPLY + if (type != COMMAND_GET_ARGS_REPLY) { + if (buf) { + free(buf); + } + return; + } int32_t argc = *(int32_t*)buf; char* ptr = buf+ sizeof(int32_t); char** argv = malloc(sizeof(char*) * argc); @@ -532,19 +549,31 @@ void oneshot_run(int _in, int _out) { argv[i] = ptr; ptr += strlen(ptr)+ 1; } + if (!(get_command_flags(argv[0]) & FLAG_ONESHOT)) { + printf("the command is REPL only.\n"); + goto end; + } + repl_execve(argc, argv); // spinlock wait while (true) { - usleep(500000); // TODO 仍有可能极小概率任务还没有开始就开始判定,未来可增加健壮性。 - if (get_server_running_task() == 0) { - free(buf); - free(argv); - send_binary(out, COMMAND_EXIT_REPL, NULL, 0); - return; + int ret = usleep(500000); + // TODO(jdz) 仍有可能极小概率任务还没有开始就开始判定,未来可增加健壮性。 + if (ret < 0 && errno == SIGINT) { + keyboard_break = true; + } + if (get_server_running_task() == 0 || keyboard_break) { + keyboard_break = false; + goto end; } } + + end: + printf("end\n"); free(argv); free(buf); + send_binary(out, COMMAND_EXIT_REPL, NULL, 0); + return; } diff --git a/src/vnet.c b/src/vnet.c index 46a3faa..606b50f 100644 --- a/src/vnet.c +++ b/src/vnet.c @@ -40,8 +40,6 @@ char *agent_ip = "192.168.1.111"; char *server_ip = "192.168.1.111"; -int16_t listen_port = 700; - void vnet_setsocketdefaultopt(int nfd) { int flags; flags = 1; @@ -149,6 +147,7 @@ int lwip_writen(int fd, void *buf, int n) { return n; } + int vnet_tcp_connect(uint16_t port) { int s = lwip_socket(AF_INET, SOCK_STREAM, 0); LWIP_ASSERT("s >= 0", s >= 0); @@ -163,11 +162,30 @@ int vnet_tcp_connect(uint16_t port) { if (ret == 0) { return s; } - log_error("connect failed %d", ret); + log_error("connect failed %d %s", ret, strerror(errno)); lwip_close(s); return ret; } + +int vnet_tcp_connect_with_retry(uint16_t port) { + int max_retry = 10; + int retry = 0; + int ret = 0; + int base_sleep = 1000; + while (retry < max_retry) { + ret = vnet_tcp_connect(port); + if (ret > 0) { + break; + } + retry++; + usleep(base_sleep); + base_sleep *= 2; + } + return ret; +} + + int vnet_send(int s, const void *data, size_t size) { return lwip_send(s, data, size, 0); } diff --git a/src/vnet.h b/src/vnet.h index cde968e..54cb4e1 100644 --- a/src/vnet.h +++ b/src/vnet.h @@ -22,4 +22,5 @@ int lwip_writen(int fd, void *buf, int n); void vnet_setsocketdefaultopt(int nfd); int vnet_readn(int fd, void *buf, int n); int vnet_readstring(int fd, char *buf, int n); +int vnet_tcp_connect_with_retry(uint16_t port); #endif