Skip to content

Commit

Permalink
add tcp retry
Browse files Browse the repository at this point in the history
  • Loading branch information
beordle committed May 15, 2022
1 parent 8250deb commit 445c20e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 30 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,4 @@ ENDIF()
if (LINUX)
target_link_libraries(termtunnel PUBLIC "-static")
endif()

5 changes: 3 additions & 2 deletions src/agentcall.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int agentcall_server_start() {
}

static void call_send_request(thread_arg_pass_t *tmp) {
int nfd = vnet_tcp_connect(agentcall_service_port);
int nfd = vnet_tcp_connect_with_retry(agentcall_service_port);
if (nfd < 0) {
log_debug("connect agentcall_service_port error");
return;
Expand All @@ -98,7 +98,8 @@ static void call_send_request(thread_arg_pass_t *tmp) {


int bootstrap_get_args(void * _) {
int nfd = vnet_tcp_connect(agentcall_service_port);
// 确实 agentcall_service_port 会概率性失败..
int nfd = vnet_tcp_connect_with_retry(agentcall_service_port);
if (nfd < 0) {
log_debug("connect agentcall_service_port error");
return 0;
Expand Down
3 changes: 3 additions & 0 deletions src/intent.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#define COMMAND_GET_RUNNING_TASK_COUNT 10
#define COMMAND_GET_ARGS 11
#define COMMAND_GET_ARGS_REPLY 12

# define FLAG_ONESHOT 1<<0

typedef struct ci {
int i;
int mode;
Expand Down
7 changes: 4 additions & 3 deletions src/portforward.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,13 @@ int pipe_lwip_socket_and_socket_pair(int lwip_fd, int fd) {
}

void portforward_static_server_pipe(port_listen_t *pe) {
// accept
log_info("connect %s", pe->host);
int lwip_fd = vnet_tcp_connect(port_forward_static_service_port);
vnet_send(lwip_fd, pe->host, strlen(pe->host) + 1); // with_zero_as_split
uint16_t tmp = htons(pe->port);
vnet_send(lwip_fd, &tmp, sizeof(uint16_t));
log_info("connect sent %s, do pipe", pe->host);
pipe_lwip_socket_and_socket_pair(lwip_fd, pe->local_fd);
log_info("do pipe done", pe->host);
close(pe->local_fd);
free(pe);
int ret = lwip_close(lwip_fd);
Expand All @@ -282,6 +280,7 @@ void portforward_transparent_server_pipe(port_listen_t *pe) {
static void portforward_service_handler(port_listen_t *pe) {
int new_sd;
int listen_fd = pe->local_fd;
// set_running_task_changed(1);
while (true) {
if ((new_sd = accept(listen_fd, NULL, NULL)) >= 0) {
pthread_t *worker = (pthread_t *)malloc(sizeof(pthread_t)); // TODO(jdz) free
Expand All @@ -293,11 +292,13 @@ static void portforward_service_handler(port_listen_t *pe) {
pthread_create(worker, NULL, &portforward_transparent_server_pipe, (void *)child_pe);
} else {
pthread_create(worker, NULL, &portforward_static_server_pipe, (void *)child_pe);
}
}

} else {
log_info("abort the accept");
}
}
// set_running_task_changed(-1);
return;
}

Expand Down
73 changes: 51 additions & 22 deletions src/repl.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ssize_t expect_read(int fd, char *buf, size_t expect_size) {
return expect_size;
}

void recv_data(int fd, int *type, char **retbuf, int64_t *sz) {
void recv_data(int fd, int64_t *type, char **retbuf, int64_t *sz) {
char *buf = (char *)malloc(sizeof(int64_t));
*sz = 0;
int a = expect_read(fd, buf, sizeof(int64_t));
Expand All @@ -93,10 +93,8 @@ void recv_data(int fd, int *type, char **retbuf, int64_t *sz) {
free(buf);
return;
}
// printf("a==sizeof(int64_t) %d %d\n",a,sizeof(int64_t));
CHECK(a == sizeof(int64_t), "a: %d", a);
int64_t msg_size = *((int64_t *)buf);
// printf("recv %d",*(int64_t*)buf);
a = expect_read(fd, buf, sizeof(int64_t));
if (a == 0) {
free(buf);
Expand All @@ -108,9 +106,9 @@ void recv_data(int fd, int *type, char **retbuf, int64_t *sz) {
}
a = expect_read(fd, buf, msg_size);
CHECK(a == msg_size, "a: %d,msg_size: %lld\n", a, msg_size);
// printf("a:%d\n",a);
*retbuf = buf;
*sz = msg_size;
return;
}

size_t split(char *buffer, char *argv[], size_t argv_size) {
Expand Down Expand Up @@ -206,6 +204,7 @@ typedef struct {
void *funcptr;
char *desc;
char *usage;
int flags;
} actionfinder_t;

int hello_func(int argc, char **argv) {
Expand Down Expand Up @@ -261,7 +260,7 @@ int portforward_func(int argc, char **argv) {
send_binary(out, COMMAND_PORT_FORWARD, a, sizeof(port_forward_intent_t));
free(a);

int type;
int64_t type;
char *buf;
int64_t size;
recv_data(in, &type, &buf, &size);
Expand Down Expand Up @@ -361,17 +360,17 @@ actionfinder_t action_table[] = {
{"local_listen", portforward_func, "port forward bind on local host",
"local_listen [local_host] [local_port] [remote_host] [remote_port]\n"
"when remote_port==0, the service listen on remote_port will be a "
"socks5+http proxy server."},
"socks5+http proxy server.", NULL},
{"remote_listen", portforward_func, "port forward bind on remote host",
"remote_listen [remote_host] [remote_port] [local_host] [local_port]\n"
"when local_port==0, the service listen on remote_port will be a "
"socks5+http proxy server."},
{"upload", upload_func, "upload a file", "usage"},
{"rz", upload_func, "alias upload", "usage"},
{"download", download_func, "download a file", "usage"},
{"sz", download_func, "alias download", "usage"},
{"help", help_func, "view help manpage", "usage"},
{"exit", exit_func, "exit application", "usage"},
"socks5+http proxy server.", NULL},
{"upload", upload_func, "upload a file", "usage", FLAG_ONESHOT},
{"rz", upload_func, "alias upload", "usage", FLAG_ONESHOT},
{"download", download_func, "download a file", "usage", FLAG_ONESHOT},
{"sz", download_func, "alias download", "usage", FLAG_ONESHOT},
{"help", help_func, "view help manpage", "usage", FLAG_ONESHOT},
{"exit", exit_func, "exit application", "usage", NULL},
};

int help_func(int argc, char **argv) {
Expand All @@ -394,6 +393,17 @@ int print_command_usage(char *command_name) {
return 0;
}


int get_command_flags(char *command_name) {
int func_count = sizeof(action_table) / sizeof(actionfinder_t);
for (int i = 0; i < func_count; i++) {
if (strcmp(command_name, action_table[i].funcname) == 0) {
return action_table[i].flags;
}
}
return 0;
}

int repl_execve(int argc, char **argv) {
if (argc == 0) {
return 0;
Expand Down Expand Up @@ -447,7 +457,7 @@ void repl_run(int _in, int _out) {
}
if (ret == -1) {
send_binary(out, COMMAND_GET_RUNNING_TASK_COUNT, NULL, 0);
int type;
int64_t type;
char *buf;
int64_t size;
recv_data(in, &type, &buf, &size);
Expand Down Expand Up @@ -498,7 +508,7 @@ int update_processbar(float percent, char string[])

int get_server_running_task(){
send_binary(out, COMMAND_GET_RUNNING_TASK_COUNT, NULL, 0);
int type;
int64_t type;
char *buf;
int64_t size;
recv_data(in, &type, &buf, &size);
Expand All @@ -519,32 +529,51 @@ void oneshot_run(int _in, int _out) {
update_processbar(i, "11");
fflush(stdout);
}*/

// get args
send_binary(out, COMMAND_GET_ARGS, NULL, 0);
char *buf = NULL;
int64_t size;
int64_t type = 0;
recv_data(_in, &type, &buf, &size);
// COMMAND_GET_ARGS_REPLY
if (type != COMMAND_GET_ARGS_REPLY) {
if (buf) {
free(buf);
}
return;
}
int32_t argc = *(int32_t*)buf;
char* ptr = buf+ sizeof(int32_t);
char** argv = malloc(sizeof(char*) * argc);
for (int i=0; i < argc; i++) {
argv[i] = ptr;
ptr += strlen(ptr)+ 1;
}
if (!(get_command_flags(argv[0]) & FLAG_ONESHOT)) {
printf("the command is REPL only.\n");
goto end;
}

repl_execve(argc, argv);
// spinlock wait
while (true) {
usleep(500000); // TODO 仍有可能极小概率任务还没有开始就开始判定,未来可增加健壮性。
if (get_server_running_task() == 0) {
free(buf);
free(argv);
send_binary(out, COMMAND_EXIT_REPL, NULL, 0);
return;
int ret = usleep(500000);
// TODO(jdz) 仍有可能极小概率任务还没有开始就开始判定,未来可增加健壮性。
if (ret < 0 && errno == SIGINT) {
keyboard_break = true;
}
if (get_server_running_task() == 0 || keyboard_break) {
keyboard_break = false;
goto end;
}
}

end:
printf("end\n");
free(argv);
free(buf);
send_binary(out, COMMAND_EXIT_REPL, NULL, 0);
return;
}


Expand Down
24 changes: 21 additions & 3 deletions src/vnet.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
char *agent_ip = "192.168.1.111";
char *server_ip = "192.168.1.111";

int16_t listen_port = 700;

void vnet_setsocketdefaultopt(int nfd) {
int flags;
flags = 1;
Expand Down Expand Up @@ -149,6 +147,7 @@ int lwip_writen(int fd, void *buf, int n) {
return n;
}


int vnet_tcp_connect(uint16_t port) {
int s = lwip_socket(AF_INET, SOCK_STREAM, 0);
LWIP_ASSERT("s >= 0", s >= 0);
Expand All @@ -163,11 +162,30 @@ int vnet_tcp_connect(uint16_t port) {
if (ret == 0) {
return s;
}
log_error("connect failed %d", ret);
log_error("connect failed %d %s", ret, strerror(errno));
lwip_close(s);
return ret;
}


int vnet_tcp_connect_with_retry(uint16_t port) {
int max_retry = 10;
int retry = 0;
int ret = 0;
int base_sleep = 1000;
while (retry < max_retry) {
ret = vnet_tcp_connect(port);
if (ret > 0) {
break;
}
retry++;
usleep(base_sleep);
base_sleep *= 2;
}
return ret;
}


int vnet_send(int s, const void *data, size_t size) {
return lwip_send(s, data, size, 0);
}
Expand Down
1 change: 1 addition & 0 deletions src/vnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ int lwip_writen(int fd, void *buf, int n);
void vnet_setsocketdefaultopt(int nfd);
int vnet_readn(int fd, void *buf, int n);
int vnet_readstring(int fd, char *buf, int n);
int vnet_tcp_connect_with_retry(uint16_t port);
#endif

0 comments on commit 445c20e

Please sign in to comment.