diff --git a/nodes/node_balancer/README.md b/nodes/node_balancer/README.md index 62fd4de86..895865b46 100644 --- a/nodes/node_balancer/README.md +++ b/nodes/node_balancer/README.md @@ -1,17 +1,49 @@ # Node Balancer application -# Installation +## Installation and configuration -- Prepare environment variables +- Prepare environment variables, according with `sample.env`. - Build application ```bash go build -o nodebalancer . ``` -# Work with nodebalancer +- Generate configuration -## add-access +```bash +nodebalancer generate-config +``` + +- Modify configuration. Tags should NOT repeat blockchain, as it is specified in `blockchain` key. Example of configuration: + +```bash +[ + { + "blockchain": "ethereum", + "endpoint": "http://127.0.0.1:8545", + "tags": ["local"] + }, + { + "blockchain": "ethereum", + "endpoint": "http://127.0.0.1:9585", + "tags": ["local"] + }, + { + "blockchain": "ethereum", + "endpoint": "https://cool-name.quiknode.pro/y0urn0de1den1f1cat0r/", + "tags": ["external"] + } +] +``` + +So if with request will be specified tag `local` will be returned node with corresponding tag. + +## Work with nodebalancer + +**IMPORTANT** Do not use flag `-debug` in production. + +### add-access Add new access for user: @@ -25,7 +57,7 @@ nodebalancer add-access \ --blockchain--access true ``` -## delete-access +### delete-access Delete user access: @@ -37,7 +69,7 @@ nodebalancer delete-access \ If `access-id` not specified, all user accesses will be deleted. -## users +### users ```bash nodebalancer users | jq . @@ -67,7 +99,7 @@ This command will return a list of bugout resources of registered users to acces `extended_methods` - boolean which allow you to call not whitelisted method to blockchain node, by default for new user this is equal to `false` -## server +### server ```bash nodebalancer server -host 0.0.0.0 -port 8544 -healthcheck @@ -76,17 +108,17 @@ nodebalancer server -host 0.0.0.0 -port 8544 -healthcheck Flag `--healthcheck` will execute background process to ping-pong available nodes to keep their status and current block number. Flag `--debug` will extend output of each request to server and healthchecks summary. -# Work with node +## Work with node Common request to fetch block number ```bash -curl --request GET 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=' \ +curl --request POST 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=' \ --header 'Content-Type: application/json' \ --data-raw '{ "jsonrpc":"2.0", "method":"eth_getBlockByNumber", - "params":["0xb71b64", false], + "params":["latest", false], "id":1 }' ``` @@ -97,3 +129,16 @@ For Web3 providers `access_id` and `data_source` could be specified in headers --header 'x-node-balancer-data-source: ' --header 'x-node-balancer-access-id: ' ``` + +Same request to fetch specific nodes using tags + +```bash +curl --request POST 'http://127.0.0.1:8544/nb/ethereum/jsonrpc?access_id=&data_source=&tag=&tag=' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":["latest", false], + "id":1 + }' +``` diff --git a/nodes/node_balancer/cmd/nodebalancer/balancer.go b/nodes/node_balancer/cmd/nodebalancer/balancer.go index a79f838c1..cf1de7dc1 100644 --- a/nodes/node_balancer/cmd/nodebalancer/balancer.go +++ b/nodes/node_balancer/cmd/nodebalancer/balancer.go @@ -1,5 +1,5 @@ /* -Load balancer, based on https://github.com/kasvith/simplelb/ +Load balancer logic. */ package main @@ -19,10 +19,9 @@ import ( // Main variable of pool of blockchains which contains pool of nodes // for each blockchain we work during session. -var blockchainPool BlockchainPool +var blockchainPools map[string]*NodePool // Node structure with -// StatusURL for status server at node endpoint // Endpoint for geth/bor/etc node http.server endpoint type Node struct { Endpoint *url.URL @@ -36,16 +35,16 @@ type Node struct { GethReverseProxy *httputil.ReverseProxy } -type NodePool struct { - Blockchain string - Nodes []*Node - - // Counter to observe all nodes - Current uint64 +type TopNodeBlock struct { + Block uint64 + Node *Node } -type BlockchainPool struct { - Blockchains []*NodePool +type NodePool struct { + NodesMap map[string][]*Node + NodesSet []*Node + + TopNode TopNodeBlock } // Node status response struct for HealthCheck @@ -58,24 +57,25 @@ type NodeStatusResponse struct { } // AddNode to the nodes pool -func (bpool *BlockchainPool) AddNode(node *Node, blockchain string) { - var nodePool *NodePool - for _, b := range bpool.Blockchains { - if b.Blockchain == blockchain { - nodePool = b - } +func AddNode(blockchain string, tags []string, node *Node) { + if blockchainPools == nil { + blockchainPools = make(map[string]*NodePool) } + if blockchainPools[blockchain] == nil { + blockchainPools[blockchain] = &NodePool{} + } + if blockchainPools[blockchain].NodesMap == nil { + blockchainPools[blockchain].NodesMap = make(map[string][]*Node) + } + blockchainPools[blockchain].NodesSet = append(blockchainPools[blockchain].NodesSet, node) - // Check if blockchain not yet in pool - if nodePool == nil { - nodePool = &NodePool{ - Blockchain: blockchain, - } - nodePool.Nodes = append(nodePool.Nodes, node) - bpool.Blockchains = append(bpool.Blockchains, nodePool) - } else { - nodePool.Nodes = append(nodePool.Nodes, node) + for _, tag := range tags { + blockchainPools[blockchain].NodesMap[tag] = append( + blockchainPools[blockchain].NodesMap[tag], + node, + ) } + } // SetAlive with mutex for exact node @@ -105,71 +105,76 @@ func (node *Node) UpdateNodeState(currentBlock uint64, alive bool) (callCounter return callCounter } -// IncreaseCallCounter increased to 1 each time node called -func (node *Node) IncreaseCallCounter() { - node.mux.Lock() - if node.CallCounter >= NB_MAX_COUNTER_NUMBER { - log.Printf("Number of calls for node %s reached %d limit, reset the counter.", node.Endpoint, NB_MAX_COUNTER_NUMBER) - node.CallCounter = uint64(0) - } else { - node.CallCounter++ - } - node.mux.Unlock() -} +// FilterTagsNodes returns nodes with provided tags +func (npool *NodePool) FilterTagsNodes(tags []string) ([]*Node, TopNodeBlock) { + nodesMap := npool.NodesMap + nodesSet := npool.NodesSet -// GetNextNode returns next active peer to take a connection -// Loop through entire nodes to find out an alive one -func (bpool *BlockchainPool) GetNextNode(blockchain string) *Node { - highestBlock := uint64(0) - - // Get NodePool with correct blockchain - var np *NodePool - for _, b := range bpool.Blockchains { - if b.Blockchain == blockchain { - np = b - for _, n := range b.Nodes { - if n.CurrentBlock > highestBlock { - highestBlock = n.CurrentBlock - } - } + tagSet := make(map[string]map[*Node]bool) + + for tag, nodes := range nodesMap { + if tagSet[tag] == nil { + tagSet[tag] = make(map[*Node]bool) + } + for _, node := range nodes { + tagSet[tag][node] = true } } - // Increase Current value with 1 - currentInc := atomic.AddUint64(&np.Current, uint64(1)) + topNode := TopNodeBlock{} - // next is an Atomic incrementer, value always in range from 0 to slice length, - // it returns an index of slice - next := int(currentInc % uint64(len(np.Nodes))) + var filteredNodes []*Node + for _, node := range nodesSet { + accept := true + for _, tag := range tags { + if tagSet[tag][node] != true { + accept = false + break + } + } + if accept { + filteredNodes = append(filteredNodes, node) + currentBlock := node.CurrentBlock + if currentBlock >= npool.TopNode.Block { + topNode.Block = currentBlock + topNode.Node = node + } + } + } - // Start from next one and move full cycle - l := len(np.Nodes) + next + return filteredNodes, topNode +} - for i := next; i < l; i++ { - // Take an index by modding with length - idx := i % len(np.Nodes) - // If we have an alive one, use it and store if its not the original one - if np.Nodes[idx].IsAlive() { - if i != next { - // Mark the current one - atomic.StoreUint64(&np.Current, uint64(idx)) - } - // Pass nodes with low blocks - // TODO(kompotkot): Re-write to not rotate through not highest blocks - if np.Nodes[idx].CurrentBlock < highestBlock { +// GetNextNode returns next active peer to take a connection +// Loop through entire nodes to find out an alive one and chose one with small CallCounter +func GetNextNode(nodes []*Node, topNode TopNodeBlock) *Node { + nextNode := topNode.Node + + for _, node := range nodes { + if node.IsAlive() { + currentBlock := node.CurrentBlock + if currentBlock < topNode.Block-NB_HIGHEST_BLOCK_SHIFT { + // Bypass too outdated nodes continue } - - return np.Nodes[idx] + if node.CallCounter < nextNode.CallCounter { + nextNode = node + } } } - return nil + + if nextNode != nil { + // Increase CallCounter value with 1 + atomic.AddUint64(&nextNode.CallCounter, uint64(1)) + } + + return nextNode } // SetNodeStatus modify status of the node -func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func SetNodeStatus(url *url.URL, alive bool) { + for _, nodes := range blockchainPools { + for _, n := range nodes.NodesSet { if n.Endpoint.String() == url.String() { n.SetAlive(alive) break @@ -180,55 +185,55 @@ func (bpool *BlockchainPool) SetNodeStatus(url *url.URL, alive bool) { // StatusLog logs node status // TODO(kompotkot): Print list of alive and dead nodes -func (bpool *BlockchainPool) StatusLog() { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func StatusLog() { + for blockchain, nodes := range blockchainPools { + for _, n := range nodes.NodesSet { log.Printf( - "Blockchain %s node %s is alive %t. Blockchain called %d times", - b.Blockchain, n.Endpoint.Host, n.Alive, b.Current, + "Blockchain %s node %s is alive %t", + blockchain, n.Endpoint.Host, n.Alive, ) } } } // HealthCheck fetch the node latest block -func (bpool *BlockchainPool) HealthCheck() { - for _, b := range bpool.Blockchains { - for _, n := range b.Nodes { +func HealthCheck() { + for blockchain, nodes := range blockchainPools { + for _, node := range nodes.NodesSet { alive := false httpClient := http.Client{Timeout: NB_HEALTH_CHECK_CALL_TIMEOUT} resp, err := httpClient.Post( - n.Endpoint.String(), + node.Endpoint.String(), "application/json", bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", false],"id":1}`)), ) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to reach node: %s", n.Endpoint.Host) + node.UpdateNodeState(0, alive) + log.Printf("Unable to reach node: %s", node.Endpoint.Host) continue } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to parse response from %s node, err %v", n.Endpoint.Host, err) + node.UpdateNodeState(0, alive) + log.Printf("Unable to parse response from %s node, err %v", node.Endpoint.Host, err) continue } var statusResponse NodeStatusResponse err = json.Unmarshal(body, &statusResponse) if err != nil { - n.UpdateNodeState(0, alive) - log.Printf("Unable to read json response from %s node, err: %v", n.Endpoint.Host, err) + node.UpdateNodeState(0, alive) + log.Printf("Unable to read json response from %s node, err: %v", node.Endpoint.Host, err) continue } blockNumberHex := strings.Replace(statusResponse.Result.Number, "0x", "", -1) blockNumber, err := strconv.ParseUint(blockNumberHex, 16, 64) if err != nil { - n.UpdateNodeState(0, alive) + node.UpdateNodeState(0, alive) log.Printf("Unable to parse block number from hex to string, err: %v", err) continue } @@ -237,10 +242,24 @@ func (bpool *BlockchainPool) HealthCheck() { if blockNumber != 0 { alive = true } - callCounter := n.UpdateNodeState(blockNumber, alive) + callCounter := node.UpdateNodeState(blockNumber, alive) + + if blockNumber > nodes.TopNode.Block { + nodes.TopNode.Block = blockNumber + nodes.TopNode.Node = node + } + + if node.CallCounter >= NB_MAX_COUNTER_NUMBER { + log.Printf( + "Number of CallCounter for node %s reached %d limit, reset the counter.", + node.Endpoint, NB_MAX_COUNTER_NUMBER, + ) + atomic.StoreUint64(&node.CallCounter, uint64(0)) + } log.Printf( - "Node %s is alive: %t with current block: %d called: %d times", n.Endpoint.Host, alive, blockNumber, callCounter, + "Blockchain %s node %s is alive: %t with current block: %d called: %d times", + blockchain, node.Endpoint.Host, alive, blockNumber, callCounter, ) } } diff --git a/nodes/node_balancer/cmd/nodebalancer/cli.go b/nodes/node_balancer/cmd/nodebalancer/cli.go index 23ccf9fcb..15a75aa2d 100644 --- a/nodes/node_balancer/cmd/nodebalancer/cli.go +++ b/nodes/node_balancer/cmd/nodebalancer/cli.go @@ -167,7 +167,7 @@ func (s *StateCLI) populateCLI() { // Common flag pointers for _, fs := range []*flag.FlagSet{s.addAccessCmd, s.generateConfigCmd, s.deleteAccessCmd, s.serverCmd, s.usersCmd, s.versionCmd} { fs.BoolVar(&s.helpFlag, "help", false, "Show help message") - fs.StringVar(&s.configPathFlag, "config", "", "Path to configuration file (default: ~/.nodebalancer/config.txt)") + fs.StringVar(&s.configPathFlag, "config", "", "Path to configuration file (default: ~/.nodebalancer/config.json)") } // Add, delete and list user access subcommand flag pointers diff --git a/nodes/node_balancer/cmd/nodebalancer/clients_test.go b/nodes/node_balancer/cmd/nodebalancer/clients_test.go index e8dfff187..7b7e3e028 100644 --- a/nodes/node_balancer/cmd/nodebalancer/clients_test.go +++ b/nodes/node_balancer/cmd/nodebalancer/clients_test.go @@ -1,4 +1,3 @@ -// TODO(kompotkot): Re-write tests for client package main import ( @@ -7,19 +6,36 @@ import ( "time" ) +func setupSuit(t *testing.T) func(t *testing.T) { + t.Log("Setup suit") + + configBlockchains = map[string]bool{"ethereum": true} + + return func(t *testing.T) { + t.Log("Teardown suit") + } +} + +// TestAddClientNode tests adding new client to client pool func TestAddClientNode(t *testing.T) { + teardownSuit := setupSuit(t) + defer teardownSuit(t) + var cases = []struct { clients map[string]*Client expected string }{ {map[string]*Client{"1": {Node: &Node{Alive: true}}}, "1"}, } + for _, c := range cases { CreateClientPools() + cpool := GetClientPool("ethereum") + for id, client := range c.clients { - ethereumClientPool.AddClientNode(id, client.Node) + cpool.AddClientNode(id, client.Node) } - for id := range ethereumClientPool.Client { + for id := range cpool.Client { if id != c.expected { t.Log("Wrong client was added") t.Fatal() @@ -28,6 +44,7 @@ func TestAddClientNode(t *testing.T) { } } +// TestGetClientNode tests getting correct client func TestGetClientNode(t *testing.T) { ts := time.Now().Unix() @@ -39,15 +56,17 @@ func TestGetClientNode(t *testing.T) { {map[string]*Client{}, "1", nil}, {map[string]*Client{"1": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", &Node{Alive: true}}, {map[string]*Client{"2": {LastCallTs: ts, Node: &Node{Alive: true}}}, "1", nil}, - {map[string]*Client{"1": {LastCallTs: ts - NB_CLIENT_NODE_KEEP_ALIVE, Node: &Node{Alive: true}}}, "1", nil}, } + for _, c := range cases { CreateClientPools() + cpool := GetClientPool("ethereum") + for id, client := range c.clients { - ethereumClientPool.Client[id] = client + cpool.AddClientNode(id, client.Node) } - clientNode := ethereumClientPool.GetClientNode(c.id) + clientNode := cpool.GetClientNode(c.id) if !reflect.DeepEqual(clientNode, c.expected) { t.Log("Wrong node returned") t.Fatal() @@ -55,6 +74,7 @@ func TestGetClientNode(t *testing.T) { } } +// TestCleanInactiveClientNodes tests cleaning inactive clients func TestCleanInactiveClientNodes(t *testing.T) { ts := time.Now().Unix() @@ -72,12 +92,14 @@ func TestCleanInactiveClientNodes(t *testing.T) { } for _, c := range cases { CreateClientPools() + cpool := GetClientPool("ethereum") + for id, client := range c.clients { - ethereumClientPool.Client[id] = client + cpool.Client[id] = client } - ethereumClientPool.CleanInactiveClientNodes() - for id := range ethereumClientPool.Client { + cpool.CleanInactiveClientNodes() + for id := range cpool.Client { if id != c.expected { t.Log("Wrong client was removed") t.Fatal() diff --git a/nodes/node_balancer/cmd/nodebalancer/configs.go b/nodes/node_balancer/cmd/nodebalancer/configs.go index 358eb5982..84fea272b 100644 --- a/nodes/node_balancer/cmd/nodebalancer/configs.go +++ b/nodes/node_balancer/cmd/nodebalancer/configs.go @@ -18,7 +18,6 @@ var ( nodeConfigs []NodeConfig // Bugout and application configuration - BUGOUT_AUTH_URL = os.Getenv("BUGOUT_AUTH_URL") BUGOUT_AUTH_CALL_TIMEOUT = time.Second * 5 NB_APPLICATION_ID = os.Getenv("NB_APPLICATION_ID") NB_CONTROLLER_TOKEN = os.Getenv("NB_CONTROLLER_TOKEN") @@ -35,7 +34,8 @@ var ( NB_MAX_COUNTER_NUMBER = uint64(10000000) // Client configuration - NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds + NB_CLIENT_NODE_KEEP_ALIVE = int64(5) // How long to store node in hot list for client in seconds + NB_HIGHEST_BLOCK_SHIFT = uint64(50) // Allowed shift to prefer node with most highest block NB_ACCESS_ID_HEADER = os.Getenv("NB_ACCESS_ID_HEADER") NB_DATA_SOURCE_HEADER = os.Getenv("NB_DATA_SOURCE_HEADER") @@ -60,8 +60,9 @@ func CheckEnvVarSet() { // Nodes configuration type NodeConfig struct { - Blockchain string `json:"blockchain"` - Endpoint string `json:"endpoint"` + Blockchain string `json:"blockchain"` + Endpoint string `json:"endpoint"` + Tags []string `json:"tags"` } func LoadConfig(configPath string) error { @@ -108,7 +109,7 @@ func GetConfigPath(providedPath string) (*ConfigPlacement, error) { return nil, fmt.Errorf("Unable to find user home directory, %v", err) } configDirPath = fmt.Sprintf("%s/.nodebalancer", homeDir) - configPath = fmt.Sprintf("%s/config.txt", configDirPath) + configPath = fmt.Sprintf("%s/config.json", configDirPath) } else { configPath = strings.TrimSuffix(providedPath, "/") configDirPath = filepath.Dir(configPath) @@ -144,7 +145,7 @@ func GenerateDefaultConfig(config *ConfigPlacement) error { if !config.ConfigExists { tempConfig := []NodeConfig{ - {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545"}, + {Blockchain: "ethereum", Endpoint: "http://127.0.0.1:8545", Tags: []string{"local"}}, } tempConfigJson, err := json.Marshal(tempConfig) if err != nil { diff --git a/nodes/node_balancer/cmd/nodebalancer/middleware.go b/nodes/node_balancer/cmd/nodebalancer/middleware.go index 81811a5ed..b5799b605 100644 --- a/nodes/node_balancer/cmd/nodebalancer/middleware.go +++ b/nodes/node_balancer/cmd/nodebalancer/middleware.go @@ -98,14 +98,14 @@ func (ac *AccessCache) Cleanup() (int64, int64) { return removedAccessIds, totalAccessIds } -func initCacheCleaning(debug bool) { +func initCacheCleaning() { t := time.NewTicker(NB_CACHE_CLEANING_INTERVAL) for { select { case <-t.C: removedAccessIds, totalAccessIds := accessIdCache.Cleanup() - if debug { - log.Printf("Removed %d elements from access id cache", removedAccessIds) + if stateCLI.enableDebugFlag { + log.Printf("[DEBUG] Removed %d elements from access id cache", removedAccessIds) } log.Printf("Elements in access id cache: %d", totalAccessIds) } @@ -241,7 +241,7 @@ func logMiddleware(next http.Handler) http.Handler { if stateCLI.enableDebugFlag { if r.URL.RawQuery != "" { - logStr += fmt.Sprintf(" %s", r.URL.RawQuery) + logStr += fmt.Sprintf(" [DEBUG] %s", r.URL.RawQuery) } accessID := extractAccessID(r) if accessID != "" { @@ -269,20 +269,20 @@ func accessMiddleware(next http.Handler) http.Handler { // If access id does not belong to internal crawlers, then check cache or find it in Bugout resources if accessID == NB_CONTROLLER_ACCESS_ID { if stateCLI.enableDebugFlag { - log.Printf("Access id belongs to internal crawlers") + log.Printf("[DEBUG] Access id belongs to internal crawlers") } currentClientAccess = internalCrawlersAccess currentClientAccess.dataSource = dataSource } else if accessIdCache.FindAccessIdInCache(accessID) != "" { if stateCLI.enableDebugFlag { - log.Printf("Access id found in cache") + log.Printf("[DEBUG] Access id found in cache") } currentClientAccess = accessIdCache.accessIds[accessID] currentClientAccess.dataSource = dataSource accessIdCache.UpdateAccessIdAtCache(accessID, dataSource) } else { if stateCLI.enableDebugFlag { - log.Printf("New access id, looking at Brood resources") + log.Printf("[DEBUG] New access id, looking at Brood resources") } resources, err := bugoutClient.Brood.GetResources( NB_CONTROLLER_TOKEN, diff --git a/nodes/node_balancer/cmd/nodebalancer/routes.go b/nodes/node_balancer/cmd/nodebalancer/routes.go index b99293697..19f1c2305 100644 --- a/nodes/node_balancer/cmd/nodebalancer/routes.go +++ b/nodes/node_balancer/cmd/nodebalancer/routes.go @@ -53,25 +53,12 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { return } - // Chose one node - var node *Node - cpool := GetClientPool(blockchain) - node = cpool.GetClientNode(currentClientAccess.AccessID) - if node == nil { - node = blockchainPool.GetNextNode(blockchain) - if node == nil { - http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) - return - } - cpool.AddClientNode(currentClientAccess.AccessID, node) - } - // Save origin path, to use in proxyErrorHandler if node will not response r.Header.Add("X-Origin-Path", r.URL.Path) switch { case strings.HasPrefix(r.URL.Path, fmt.Sprintf("/nb/%s/jsonrpc", blockchain)): - lbJSONRPCHandler(w, r, blockchain, node, currentClientAccess) + lbJSONRPCHandler(w, r, blockchain, currentClientAccess) return default: http.Error(w, fmt.Sprintf("Unacceptable path for %s blockchain %s", blockchain, r.URL.Path), http.StatusBadRequest) @@ -79,7 +66,7 @@ func lbHandler(w http.ResponseWriter, r *http.Request) { } } -func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, node *Node, currentClientAccess ClientResourceData) { +func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, currentClientAccess ClientResourceData) { body, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Unable to read body", http.StatusBadRequest) @@ -94,6 +81,43 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, return } + // Get tags from request params, sort and generate from it identifier + var tags []string + queries := r.URL.Query() + for k, v := range queries { + if k == "tag" { + for _, tag := range v { + tags = append(tags, tag) + } + } + } + + // Chose one node + var node *Node + cpool := GetClientPool(blockchain) + node = cpool.GetClientNode(currentClientAccess.AccessID) + if node == nil { + npool := blockchainPools[blockchain] + var nodes []*Node + var topNode TopNodeBlock + if len(tags) != 0 { + nodes, topNode = npool.FilterTagsNodes(tags) + } else { + topNode = npool.TopNode + nodes = npool.NodesSet + } + node = GetNextNode(nodes, topNode) + if node == nil { + http.Error(w, "There are no nodes available", http.StatusServiceUnavailable) + return + } + cpool.AddClientNode(currentClientAccess.AccessID, node) + } + + if stateCLI.enableDebugFlag { + log.Printf("[DEBUG] Used node with endpoint: %s, call counter equals: %d", node.Endpoint, node.CallCounter) + } + switch { case currentClientAccess.dataSource == "blockchain": if currentClientAccess.BlockchainAccess == false { @@ -110,8 +134,6 @@ func lbJSONRPCHandler(w http.ResponseWriter, r *http.Request, blockchain string, } } - node.IncreaseCallCounter() - // Overwrite Path so response will be returned to correct place r.URL.Path = "/" node.GethReverseProxy.ServeHTTP(w, r) diff --git a/nodes/node_balancer/cmd/nodebalancer/server.go b/nodes/node_balancer/cmd/nodebalancer/server.go index f7b47c41e..d0533d6ed 100644 --- a/nodes/node_balancer/cmd/nodebalancer/server.go +++ b/nodes/node_balancer/cmd/nodebalancer/server.go @@ -5,7 +5,6 @@ package main import ( "context" - "encoding/json" "fmt" "log" "net/http" @@ -29,12 +28,12 @@ var ( ) // initHealthCheck runs a routine for check status of the nodes every 5 seconds -func initHealthCheck(debug bool) { +func initHealthCheck() { t := time.NewTicker(NB_HEALTH_CHECK_INTERVAL) for { select { case <-t.C: - blockchainPool.HealthCheck() + HealthCheck() logStr := "Client pool healthcheck." for b := range configBlockchains { cp := clientPool[b] @@ -42,8 +41,8 @@ func initHealthCheck(debug bool) { logStr += fmt.Sprintf(" Active %s clients: %d.", b, clients) } log.Println(logStr) - if debug { - blockchainPool.StatusLog() + if stateCLI.enableDebugFlag { + StatusLog() } } } @@ -89,7 +88,7 @@ func proxyErrorHandler(proxy *httputil.ReverseProxy, url *url.URL) { } // After 3 retries, mark this backend as down - blockchainPool.SetNodeStatus(url, false) + SetNodeStatus(url, false) // Set modified path back // TODO(kompotkot): Try r.RequestURI instead of header @@ -129,33 +128,24 @@ func Server() { fmt.Printf("Unable to get user with provided access identifier, err: %v\n", err) os.Exit(1) } + resourcesLog := "Access with resources established." if len(resources.Resources) != 1 { - fmt.Printf("User with provided access identifier has wrong number of resources, err: %v\n", err) - os.Exit(1) - } - resource_data, err := json.Marshal(resources.Resources[0].ResourceData) - if err != nil { - fmt.Printf("Unable to encode resource data interface to json, err: %v\n", err) - os.Exit(1) - } - var clientAccess ClientResourceData - err = json.Unmarshal(resource_data, &clientAccess) - if err != nil { - fmt.Printf("Unable to decode resource data json to structure, err: %v\n", err) - os.Exit(1) + log.Printf("%s There are no access IDs for users in resources", resourcesLog) + } else { + log.Printf("%s Found user access IDs in resources", resourcesLog) } + + // Set internal crawlers access to bypass requests from internal services + // without fetching data from authn Brood server + internalCrawlersUserID := uuid.New().String() internalCrawlersAccess = ClientResourceData{ - UserID: clientAccess.UserID, - AccessID: clientAccess.AccessID, - Name: clientAccess.Name, - Description: clientAccess.Description, - BlockchainAccess: clientAccess.BlockchainAccess, - ExtendedMethods: clientAccess.ExtendedMethods, + UserID: internalCrawlersUserID, + AccessID: NB_CONTROLLER_ACCESS_ID, + Name: "InternalCrawlersAccess", + Description: "Access for internal crawlers.", + BlockchainAccess: true, + ExtendedMethods: true, } - log.Printf( - "Internal crawlers access set, resource id: %s, blockchain access: %t, extended methods: %t", - resources.Resources[0].Id, clientAccess.BlockchainAccess, clientAccess.ExtendedMethods, - ) err = InitDatabaseClient() if err != nil { @@ -195,14 +185,18 @@ func Server() { r.Header.Del(strings.Title(NB_DATA_SOURCE_HEADER)) // Change r.Host from nodebalancer's to end host so TLS check will be passed r.Host = r.URL.Host + // Explicit set of r.URL requires, because by default it adds trailing slash and brake some urls + r.URL = endpoint } proxyErrorHandler(proxyToEndpoint, endpoint) - blockchainPool.AddNode(&Node{ + newNode := &Node{ Endpoint: endpoint, Alive: true, GethReverseProxy: proxyToEndpoint, - }, nodeConfig.Blockchain) + } + AddNode(nodeConfig.Blockchain, nodeConfig.Tags, newNode) + log.Printf( "Added new %s proxy blockchain under index %d from config file with geth url: %s://%s", nodeConfig.Blockchain, i, endpoint.Scheme, endpoint.Host) @@ -211,6 +205,12 @@ func Server() { // Generate map of clients CreateClientPools() + // Start node health checking and current block fetching + HealthCheck() + if stateCLI.enableHealthCheckFlag { + go initHealthCheck() + } + serveMux := http.NewServeMux() serveMux.Handle("/nb/", accessMiddleware(http.HandlerFunc(lbHandler))) log.Println("Authentication middleware enabled") @@ -227,14 +227,8 @@ func Server() { WriteTimeout: 40 * time.Second, } - // Start node health checking and current block fetching - blockchainPool.HealthCheck() - if stateCLI.enableHealthCheckFlag { - go initHealthCheck(stateCLI.enableDebugFlag) - } - // Start access id cache cleaning - go initCacheCleaning(stateCLI.enableDebugFlag) + go initCacheCleaning() log.Printf("Starting node load balancer HTTP server at %s:%s", stateCLI.listeningAddrFlag, stateCLI.listeningPortFlag) err = server.ListenAndServe() diff --git a/nodes/node_balancer/cmd/nodebalancer/version.go b/nodes/node_balancer/cmd/nodebalancer/version.go index 0c6459319..0a350a652 100644 --- a/nodes/node_balancer/cmd/nodebalancer/version.go +++ b/nodes/node_balancer/cmd/nodebalancer/version.go @@ -1,3 +1,3 @@ package main -var NB_VERSION = "0.2.1" +var NB_VERSION = "0.2.2" diff --git a/nodes/node_balancer/sample.env b/nodes/node_balancer/sample.env index d1aea3b68..6de59eb58 100644 --- a/nodes/node_balancer/sample.env +++ b/nodes/node_balancer/sample.env @@ -1,5 +1,5 @@ # Required environment variables for load balancer -export BUGOUT_AUTH_URL="https://auth.bugout.dev" +export BUGOUT_BROOD_URL="https://auth.bugout.dev" export NB_APPLICATION_ID="" export NB_CONTROLLER_TOKEN="" export NB_CONTROLLER_ACCESS_ID=""