forked from tweag/funflow
-
Notifications
You must be signed in to change notification settings - Fork 1
/
SQLite.hs
318 lines (296 loc) · 10.6 KB
/
SQLite.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE QuasiQuotes #-}
-- | SQLLite co-ordinator for Funflow.
--
-- This co-ordinator effectively uses the shared filesystem as a tool for task
-- distribution and sequencing. This means that it can control a distributed
-- funflow task without needing any additional processes running.
module Control.Funflow.External.Coordinator.SQLite
( SQLite (..)
) where
import Control.Concurrent (threadDelay)
import Control.Exception.Safe
import Control.Funflow.ContentHashable
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import Control.Funflow.Lock
import Control.Lens
import Control.Monad.IO.Class
import qualified Data.Aeson as Json
import qualified Data.ByteString.Char8 as C8
import qualified Data.Text as T
import Data.Typeable (Typeable)
import qualified Database.SQLite.Simple as SQL
import qualified Database.SQLite.Simple.FromField as SQL
import qualified Database.SQLite.Simple.Ok as SQL
import qualified Database.SQLite.Simple.ToField as SQL
import Path
import Path.IO
import System.Clock
-- | SQLite coordinator tag.
data SQLite = SQLite
-- | SQLite coordinator hook.
data SQLiteHook = SQLiteHook
{ _sqlConn :: SQL.Connection
, _sqlLock :: Lock
}
makeLenses ''SQLiteHook
-- | Take the lock and run the given action on the SQLite connection.
withSQLite :: SQLiteHook -> (SQL.Connection -> IO a) -> IO a
withSQLite hook action = withLock (hook^.sqlLock) $ action (hook^.sqlConn)
-- | Enumeration of possible 'TaskStatus' cases for SQLite status column.
data SqlTaskStatus
= SqlPending
| SqlRunning
| SqlCompleted
| SqlFailed
deriving Enum
instance SQL.FromField SqlTaskStatus where
fromField field = do
n <- SQL.fromField field
pure $! toEnum n
instance SQL.ToField SqlTaskStatus where
toField = SQL.toField . fromEnum
-- | Wrapper around 'Executor' for SQLite serialization.
newtype SqlExecutor = SqlExecutor { getSqlExecutor :: Executor }
instance SQL.FromField SqlExecutor where
fromField field = SqlExecutor . Executor <$> SQL.fromField field
instance SQL.ToField SqlExecutor where
toField (SqlExecutor (Executor host)) = SQL.toField host
-- | SQLite task info query result.
data SqlTaskInfo = SqlTaskInfo
{ _stiStatus :: SqlTaskStatus
, _stiExecutor :: Maybe SqlExecutor
, _stiElapsed :: Maybe Integer
, _stiExitCode :: Maybe Int
}
makeLenses '' SqlTaskInfo
instance SQL.FromRow SqlTaskInfo where
fromRow = SqlTaskInfo
<$> SQL.field
<*> SQL.field
<*> SQL.field
<*> SQL.field
-- | Wrapper around 'ExternalTask' for SQLite serialization.
newtype SqlExternal = SqlExternal { getSqlExternal :: ExternalTask }
instance SQL.FromField SqlExternal where
fromField field = do
bs <- SQL.fromField field
case Json.eitherDecode bs of
Left err -> SQL.Errors [toException $ DecodingError "task" err]
Right x -> pure $! SqlExternal x
instance SQL.ToField SqlExternal where
toField = SQL.toField . Json.encode . getSqlExternal
-- | Wrapper around 'TaskDescription' for SQLite serialization.
newtype SqlTask = SqlTask TaskDescription
instance SQL.FromRow SqlTask where
fromRow = do
output <- SQL.field
SqlExternal task <- SQL.field
pure $! SqlTask $! TaskDescription
{ _tdOutput = output
, _tdTask = task
}
-- | Errors that can occur when interacting with the SQLite coordinator.
data SQLiteCoordinatorError
-- | @MissingDBTaskEntry output field@
-- The task database entry is missing a field.
= MissingDBTaskEntry ContentHash T.Text
-- | @DecodingError field error@
-- Failed to decode the field.
| DecodingError T.Text String
-- | @NonRunningTask output@
-- The task is not running.
| NonRunningTask ContentHash
-- | @IllegalStatusUpdate output status@
-- Cannot update the status of the task.
| IllegalStatusUpdate ContentHash TaskStatus
deriving (Show, Typeable)
instance Exception SQLiteCoordinatorError where
displayException (MissingDBTaskEntry output field) =
"Missing field in SQLite task entry '"
++ T.unpack field
++ "' for output "
++ C8.unpack (encodeHash output)
displayException (DecodingError field err) =
"Failed to decode field '"
++ T.unpack field
++ "': "
++ err
displayException (NonRunningTask output) =
"Task was not running when expected: "
++ C8.unpack (encodeHash output)
displayException (IllegalStatusUpdate output status) =
"Illegal status update for "
++ C8.unpack (encodeHash output)
++ ": "
++ show status
-- | Helper for @NULL@ valued data-base fields.
--
-- Throws 'MissingDBTaskEntry' on 'Nothing', otherwise returns the value.
fromMaybeField :: MonadIO m => ContentHash -> T.Text -> Maybe a -> m a
fromMaybeField output f = \case
Nothing -> liftIO $ throwIO $ MissingDBTaskEntry output f
Just x -> pure x
-- | Unlifted version of 'taskInfo'.
taskInfo' :: SQLiteHook -> ContentHash -> IO TaskInfo
taskInfo' hook output = do
r <- withSQLite hook $ \conn -> SQL.queryNamed conn
"SELECT status, executor, elapsed, exit_code FROM tasks\
\ WHERE\
\ output = :output"
[ ":output" SQL.:= output ]
case r of
[] -> pure UnknownTask
(ti:_) -> case ti^.stiStatus of
SqlPending -> pure $! KnownTask Pending
SqlRunning -> do
executor <- fromMaybeField output "executor" (ti^.stiExecutor)
pure $! KnownTask $! Running ExecutionInfo
{ _eiExecutor = getSqlExecutor executor
, _eiElapsed = fromNanoSecs 0
}
SqlCompleted -> do
executor <- fromMaybeField output "executor" (ti^.stiExecutor)
elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
pure $! KnownTask $! Completed ExecutionInfo
{ _eiExecutor = getSqlExecutor executor
, _eiElapsed = fromNanoSecs elapsed
}
SqlFailed -> do
executor <- fromMaybeField output "executor" (ti^.stiExecutor)
elapsed <- fromMaybeField output "elapsed" (ti^.stiElapsed)
exitCode <- fromMaybeField output "exit_code" (ti^.stiExitCode)
pure $! KnownTask $! Failed
ExecutionInfo
{ _eiExecutor = getSqlExecutor executor
, _eiElapsed = fromNanoSecs elapsed
}
exitCode
instance Coordinator SQLite where
type Config SQLite = Path Abs Dir
type Hook SQLite = SQLiteHook
initialise dir = liftIO $ do
createDirIfMissing True dir
lock <- openLock (dir </> [reldir|lock|])
withLock lock $ do
conn <- SQL.open $ fromAbsFile (dir </> [relfile|db.sqlite|])
SQL.execute_ conn
"CREATE TABLE IF NOT EXISTS\
\ tasks\
\ ( output TEXT PRIMARY KEY\
\ , status INT NOT NULL\
\ , executor TEXT\
\ , elapsed INT\
\ , exit_code INT\
\ , task TEXT NOT NULL\
\ )"
return SQLiteHook
{ _sqlConn = conn
, _sqlLock = lock
}
submitTask hook td = liftIO $
withSQLite hook $ \conn -> SQL.executeNamed conn
"INSERT OR REPLACE INTO\
\ tasks (output, status, task)\
\ VALUES\
\ (:output, :status, :task)"
[ ":output" SQL.:= (td^.tdOutput)
, ":status" SQL.:= SqlPending
, ":task" SQL.:= SqlExternal (td^.tdTask)
]
queueSize hook = liftIO $ do
[[n]] <- withSQLite hook $ \conn -> SQL.queryNamed conn
"SELECT COUNT(*) FROM tasks WHERE status = :pending"
[ ":pending" SQL.:= SqlPending ]
return n
taskInfo hook output = liftIO $
taskInfo' hook output
popTask hook executor = liftIO $
withSQLite hook $ \conn -> SQL.withTransaction conn $ do
r <- SQL.queryNamed conn
"SELECT output, task FROM tasks\
\ WHERE\
\ status = :pending\
\ LIMIT 1"
[ ":pending" SQL.:= SqlPending ]
case r of
[] -> pure Nothing
(SqlTask td:_) -> do
SQL.executeNamed conn
"UPDATE tasks\
\ SET\
\ status = :status,\
\ executor = :executor\
\ WHERE\
\ output = :output"
[ ":status" SQL.:= SqlRunning
, ":executor" SQL.:= SqlExecutor executor
, ":output" SQL.:= td^.tdOutput
]
pure $! Just td
awaitTask hook output = liftIO $ loop
where
-- XXX: SQLite has callback mechanisms built-in (e.g. @sqlite3_commit_hook@).
-- Unfortunately, @direct-sqlite@, which @sqlite-simple@ builds on top of,
-- doesn't expose this functionality at the moment.
loop = taskInfo' hook output >>= \case
KnownTask Pending -> sleep >> loop
KnownTask (Running _) -> sleep >> loop
ti -> pure ti
sleep = liftIO $ threadDelay oneSeconds
oneSeconds = 1000000
updateTaskStatus hook output ts = liftIO $
withSQLite hook $ \conn -> SQL.withTransaction conn $ do
r <- SQL.queryNamed conn
"SELECT status FROM tasks\
\ WHERE\
\ output = :output"
[ ":output" SQL.:= output ]
case r of
[SqlRunning]:_ -> case ts of
Completed ei -> SQL.executeNamed conn
"UPDATE tasks\
\ SET\
\ status = :completed,\
\ elapsed = :elapsed\
\ WHERE\
\ output = :output"
[ ":completed" SQL.:= SqlCompleted
, ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
, ":output" SQL.:= output
]
Failed ei exitCode -> SQL.executeNamed conn
"UPDATE tasks\
\ SET\
\ status = :failed,\
\ elapsed = :elapsed,\
\ exit_code = :exit_code\
\ WHERE\
\ output = :output"
[ ":failed" SQL.:= SqlFailed
, ":elapsed" SQL.:= toNanoSecs (ei^.eiElapsed)
, ":exit_code" SQL.:= exitCode
, ":output" SQL.:= output
]
Pending -> SQL.executeNamed conn
"UPDATE tasks\
\ SET\
\ status = :pending\
\ WHERE\
\ output = :output"
[ ":pending" SQL.:= SqlPending
, ":output" SQL.:= output
]
Running _ -> throwIO $ IllegalStatusUpdate output ts
_ -> throwIO $ NonRunningTask output
dropTasks hook = liftIO $
withSQLite hook $ \conn ->
SQL.executeNamed conn
"DELETE FROM tasks\
\ WHERE\
\ status = :pending"
[ ":pending" SQL.:= SqlPending ]