-
Notifications
You must be signed in to change notification settings - Fork 3
/
redis_sharding.hs
159 lines (115 loc) · 5 KB
/
redis_sharding.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
{-# LANGUAGE OverloadedStrings #-}
module Main (main) where
import Prelude hiding (catch, getContents, concat)
import Control.Concurrent
import Control.Monad (mapM_, forM, forM_)
import Control.Exception (catch, throw, SomeException, IOException, AsyncException (ThreadKilled))
import Data.ByteString.Lazy.Char8 (ByteString, pack, unpack, split, concat)
import Data.Maybe (maybe, fromJust)
import Data.Time.Clock
import Data.Tuple (fst, snd)
import System.IO
import System.Posix.Signals
import System.Environment (getArgs, getProgName)
import System.Console.GetOpt
import System.Exit
import Network.Socket hiding (recv)
import MyForkManager
import MyNetLazy -- На основе Network.Socket.ByteString.Lazy
import RedisSharding
version = "0.9"
options :: [OptDescr (String, String)]
options = [
Option [] ["host"] (ReqArg (pair "host") "IP") "host",
Option [] ["port"] (ReqArg (pair "port") "port") "port",
Option [] ["nodes"] (ReqArg (pair "nodes") "nodes") "nodes (host1:port1,host2:port2)",
Option [] ["timeout"] (ReqArg (pair "timeout") "timeout") "timeout"
]
where
pair :: a -> b -> (a, b)
pair a b = (a, b)
main = withSocketsDo $ do
installHandler sigPIPE Ignore Nothing
hSetBuffering stdout LineBuffering
printLog ["Start RedisSharding, (version - ", version, ")."]
argv <- getArgs
let get_opt = case getOpt Permute options argv of (opts, _, _) -> flip lookup opts
-- get_opt :: String -> Maybe String -- name -> value
progName <- getProgName
case get_opt "nodes" of
Just _ -> return ()
Nothing -> putStr (
"Parameter 'nodes' is required.\n\nUsing example:\n" ++
progName ++ " --nodes=10.1.1.2:6380,10.1.1.3:6380,...\n\n" ++
"Others parameters:\n--host=10.1.1.1\n--port=6379\n" ++
"--timeout=300 (0 - disable timeout)\n"
) >> exitWith ExitSuccess
host <- maybe (return iNADDR_ANY) inet_addr (get_opt "host")
let port = (maybe 6379 (\a -> fromIntegral $ read a) (get_opt "port"))::PortNumber
let servers = split ',' $ pack $ fromJust $ get_opt "nodes"
let timeout = (maybe 300 (\a -> fromIntegral $ read a) (get_opt "timeout"))::Int
sock <- socket AF_INET Stream defaultProtocol
setSocketOption sock ReuseAddr 1
bindSocket sock (SockAddrInet port host)
listen sock 200
let accepter = accept sock >>= \(c_sock, _) -> forkIO (welcome c_sock servers timeout) >> accepter
accepter
welcome c_sock servers timeout = withForkManagerDo $ \fm -> do
setSocketOption c_sock KeepAlive 1
addr2sMV <- newMVar [] -- Список пар "server address" => "server socket"
catch (forM_ servers (server c_sock addr2sMV))
(\e -> printLog [ pack (show (e::SomeException) ) ] >> clean_from_client c_sock addr2sMV)
-- Получили список пар "server address" => "server socket" после заполнения, дальше он изментся не будет.
addr2s <- readMVar addr2sMV
quit <- newEmptyMVar
let fquit = putMVar quit True >> throw ThreadKilled
waitMVar <- newEmptyMVar
case timeout > 0 of
True -> forkWithQuit fm fquit (timer waitMVar timeout fquit) >> return ()
False -> return ()
cmds <- newChan -- Канал для команд
let set_cmd c = writeChan cmds c
let get_cmd = getCurrentTime >>= putMVar waitMVar >> readChan cmds >>= \cmd -> takeMVar waitMVar >> return cmd
let c_send s = sendAll c_sock $ concat s
forkWithQuit fm fquit (_servers_reader c_sock c_send servers addr2s get_cmd fquit)
forkWithQuit fm fquit (_client_reader c_sock c_send servers addr2s set_cmd fquit)
takeMVar quit
killAllThread fm >> waitAllThread fm
clean_from_client c_sock addr2sMV
where
clean_from_client c_sock addr2sMV = do
takeMVar addr2sMV >>= return . map snd >>= mapM_ sClose
sClose c_sock
-- Соединение с сервером
server c_sock addr2sMV addr = do
s_sock <- socket AF_INET Stream defaultProtocol
ia <- inet_addr (unpack host)
connect s_sock (SockAddrInet port_number ia)
setSocketOption s_sock KeepAlive 1
modifyMVar_ addr2sMV (return . (++) [(addr,s_sock)])
where
[host, port] = split ':' addr
port_number = fromIntegral (read (unpack port))::PortNumber
forkWithQuit fm fquit io = forkWith fm (catch io (\e -> chokeIOException e >> fquit) )
where
chokeIOException :: IOException -> IO ()
chokeIOException e = return ()
timer waitMVar timeout fquit = do
t0 <- readMVar waitMVar
t <- getCurrentTime
let d = ceiling $ diffUTCTime t t0
case d < timeout of
True -> threadDelay (1000000 * d) >> timer waitMVar timeout fquit
False -> fquit
_client_reader c_sock c_send servers addr2s set_cmd fquit =
client_reader getContents c_send servers s_send set_cmd fquit
where
getContents :: IO ByteString
getContents = getContentsWith c_sock (\_ -> fquit)
s_send s_addr s = sendAll (fromJust $ lookup s_addr addr2s) $ concat s
_servers_reader c_sock c_send servers addr2s get_cmd fquit = do
sss <- forM addr2s (\(s_addr, s_sock) -> do
s <- getContentsWith s_sock (\_ -> fquit)
return (s_addr, s_sock, s)
)
servers_reader c_send sss get_cmd fquit