Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency issue in typedthreads.nim #24591

Open
mk1nz opened this issue Dec 30, 2024 · 1 comment
Open

Concurrency issue in typedthreads.nim #24591

mk1nz opened this issue Dec 30, 2024 · 1 comment

Comments

@mk1nz
Copy link

mk1nz commented Dec 30, 2024

Description

Thread sanitiser and valgrind (--tool=helgrind) report a data race (probably read) in typedthreads.nim on line 274 (Using devel version of the compiler. The problem also occurs with other compiler versions, but the reported line numbers vary.).
Example using channels:

import 
  std/net,
  std/locks,
  std/os

var lock: Lock = Lock()

type SocketOpts* = object
  socket*: Socket

proc processSession(thChan: ptr Channel[Socket]) {.thread.} =
  var response: tuple[dataAvailable: bool, msg: Socket]
  withLock lock:
    try:
      response = thChan[].tryRecv()
    except:
      debugEcho "Unable to receive chan message"
      return
  if response.dataAvailable:
    if response.msg.isNil:
      withLock lock:
        debugEcho "Client socket is nil"
      return
    withLock lock:
      debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](response.msg[])
      discard trySend(response.msg, "hello\n")
    sleep(1000)
    withLock lock:
      response.msg.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[10, Thread[ptr Channel[Socket]]]
  sockets: array[10, Channel[Socket]]
  address: string = ""
  client: Socket = Socket()

for id in 0..sockets.high:
  try:
    sockets[id].open()
  except:
    quit 1

proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:

      srv.socket.acceptAddr(client, address)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue

        createThread(thr[id], processSession, sockets[id].unsafeAddr)

        while not sockets[id].trySend(client):
          echo "Try again"
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

Example of using an array to store client sockets:

import 
  std/net,
  std/locks,
  std/os

type SocketOpts* = object
  socket*: Socket

var lock: Lock = Lock()

proc processSession(sock: Socket) {.thread.} =
  if sock.isNil:
    withLock lock:
      debugEcho "Client socket is nil"
    return
  withLock lock:
    debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](sock[])
    discard trySend(sock, "hello\n")
  sleep(1000)
  withLock lock:
    sock.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[0..10, Thread[Socket]]
  sockets: array[0..10, Socket]
  address: string = ""
  client: Socket = Socket()

proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:
      
      srv.socket.accept(client)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue
        sockets[id] = client
        copyMem(sockets[id].unsafeAddr, client.unsafeAddr, sizeof(Socket))
        echo "Client connected from: ", address, ". Socket: ", cast[uint16](sockets[id])
        createThread(thr[id], processSession, sockets[id])
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

nim.cfg :

--gc:orc
--lineTrace:on
--lineDir:on
--d:nimTypeNames
--debuginfo
--d:debug
--d:nimDebugDlOpen
--d:useMalloc
--opt:none
--passC:" -g3 -O0 -fsanitize=thread"
--passL:" -g3 -O0 -fsanitize=thread"

Nim Version

Current devel version,
2.2.0
1.6.20

Current Output

Following output 

WARNING: ThreadSanitizer: data race (pid=41382)
  Read of size 8 at 0x000102add050 by main thread (mutexes: write M0):
    #0 typedthreads::running(Thread<ptr<Channel<ref<net::SocketImpl>>>>) typedthreads.nim:154 (testTcpServerChannels:arm64+0x100023a28)
    #1 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:71 (testTcpServerChannels:arm64+0x1000233f8)
    #2 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Previous write of size 8 at 0x000102add050 by thread T1:
    #0 typedthreads::threadProcWrapper(pointer) threadimpl.nim:110 (testTcpServerChannels:arm64+0x100008148)

  Location is global 'thr__test84cp83erver67hannels_u86' at 0x000102add040 (testTcpServerChannels+0x100031050)

  Mutex M0 (0x000102adc8e0) created at:
    #0 pthread_mutex_init <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x3181c)
    #1 locks::initLock(var<syslocks::SysLockObj>) locks.nim:38 (testTcpServerChannels:arm64+0x100024614)
    #2 NimMainModule testTcpServerChannels.nim:32 (testTcpServerChannels:arm64+0x1000240b0)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Thread T1 (tid=10953362, finished) created by main thread at:
    #0 pthread_create <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x309d8)
    #1 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:292 (testTcpServerChannels:arm64+0x100008350)
    #2 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #3 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #4 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #5 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #6 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

SUMMARY: ThreadSanitizer: data race typedthreads.nim:154 in typedthreads::running(Thread<ptr<Channel<ref<net::SocketImpl>>>>)
==================
==================
WARNING: ThreadSanitizer: data race (pid=41382)
  Write of size 8 at 0x000102add040 by main thread (mutexes: write M0):
    #0 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:274 (testTcpServerChannels:arm64+0x100008214)
    #1 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #2 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Previous write of size 8 at 0x000102add040 by thread T1:
    #0 typedthreads::threadProcWrapper(pointer) threadimpl.nim:109 (testTcpServerChannels:arm64+0x100008130)

  Location is global 'thr__test84cp83erver67hannels_u86' at 0x000102add040 (testTcpServerChannels+0x100031040)

  Mutex M0 (0x000102adc8e0) created at:
    #0 pthread_mutex_init <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x3181c)
    #1 locks::initLock(var<syslocks::SysLockObj>) locks.nim:38 (testTcpServerChannels:arm64+0x100024614)
    #2 NimMainModule testTcpServerChannels.nim:32 (testTcpServerChannels:arm64+0x1000240b0)
    #3 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #4 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #5 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

  Thread T1 (tid=10953362, finished) created by main thread at:
    #0 pthread_create <null>:91232944 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x309d8)
    #1 typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>) typedthreads.nim:292 (testTcpServerChannels:arm64+0x100008350)
    #2 testTcpServerChannels::startServer(var<testTcpServerChannels::SocketOpts>) testTcpServerChannels.nim:78 (testTcpServerChannels:arm64+0x1000235e0)
    #3 NimMainModule testTcpServerChannels.nim:93 (testTcpServerChannels:arm64+0x100024384)
    #4 NimMainInner testTcpServerChannels.nim:58 (testTcpServerChannels:arm64+0x10002403c)
    #5 NimMain testTcpServerChannels.nim:64 (testTcpServerChannels:arm64+0x1000244e4)
    #6 main testTcpServerChannels.nim:72 (testTcpServerChannels:arm64+0x1000245b0)

SUMMARY: ThreadSanitizer: data race typedthreads.nim:274 in typedthreads::createThread(var<Thread<ptr<Channel<ref<net::SocketImpl>>>>>, proc<ptr<Channel<ref<net::SocketImpl>>>>, ptr<Channel<ref<net::SocketImpl>>>)

Expected Output

No response

Known Workarounds

No response

Additional Information

To test the code, use the following tools in two separate terminals

watch -n 0.1 nc 127.0.0.1 3333

To test with valgrind (--tool=helgrind), the thread sanitiser should be disabled.

@mk1nz
Copy link
Author

mk1nz commented Jan 2, 2025

Example with use of manual allocation:
--deepcopy:on -> nim.cfg

Results with MM ORC:

SUMMARY: ThreadSanitizer: data race typedthreads.nim:154 in typedthreads::running(Thread<refnet::SocketImpl>)

--mm:none:

SUMMARY: ThreadSanitizer: data race typedthreads.nim:271 in typedthreads::createThread(var<Thread<refnet::SocketImpl>>, proc<refnet::SocketImpl>, refnet::SocketImpl)

Nim Compiler Version 2.3.1

import 
  std/net,
  std/locks,
  std/os

type SocketOpts* = object
  socket*: Socket

var lock: Lock = Lock()

proc processSession(sock: Socket) {.thread.} =
  if sock.isNil:
    withLock lock:
      debugEcho "Client socket is nil"
    return
  withLock lock:
    debugEcho "Thread ", getThreadId(), " got request to send to socket ", cast[uint16](sock[])
    discard trySend(sock, "hello\n")
  sleep(1000)
  withLock lock:
    sock.close

try:
    initLock(lock)
except:
  #log error
  debugEcho "Unable to init rlock"

var 
  thr: array[0..10, Thread[Socket]]
  sockets: array[0..10, Socket]
  address: string = ""
  client: Socket

for sock in sockets.mitems:
  sock = cast[Socket](alloc(sizeof(SocketImpl)))



proc startServer*(srv: var SocketOpts): void =
  
  try:
    srv.socket = newSocket()
    srv.socket.setSockOpt(OptReusePort, true)
    srv.socket.setSockOpt(OptNoDelay, true, level = IPPROTO_TCP.cint)
    srv.socket.bindAddr(Port(3333))
    srv.socket.listen()

    var id: int = 0
    while true:
      
      srv.socket.acceptAddr(client, address)

      withLock lock:
        if thr[id].running:
          debugEcho "No threads available"
          client.send("No threads available")
          client.close()
          continue

        deepCopy(sockets[id], client)
        echo "Client connected from: ", address, ". Socket: ", cast[uint16](sockets[id][])
        createThread(thr[id], processSession, sockets[id])
        client = Socket()
      
      if id == thr.high:
        id = 0
      else:
        id.inc
      
  except:
    #log error here!!!
    debugEcho "ERROR!"
    deinitLock lock
    let excpt: ref Exception = getCurrentException()
    debugEcho excpt.msg
    return

  deinitLock(lock)

var params: SocketOpts

startServer(params)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant