Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocol stream .end() not working as expected #366

Open
gmaclennan opened this issue Apr 25, 2023 · 9 comments
Open

protocol stream .end() not working as expected #366

gmaclennan opened this issue Apr 25, 2023 · 9 comments

Comments

@gmaclennan
Copy link
Contributor

The end event on the protocol stream returned by core.replicate() does not seem to propogate as expected.

Use case:

I want to gracefully end a replication stream without destroying it.

Expected behaviour:

const s = core.replicate(true)
s.on('end', () => console.log('end'))
s.end()

I would expect the above code to output end. It doesn't. There is a scenario where it does work: if the core that is replicating is a read-only peer, and you call s.end() after first calling core.update(). There does not seem to be a way to end the replication stream for the writer core.

Minimal reproduction:

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'
import test from 'tape'

test('can end replication stream from writer', async t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core1.update()
    s1.end()
  })()
})

test('can end replication stream from reader', t => {
  t.plan(2)
  ;(async () => {
    const core1 = new Hypercore(RAM)
    await core1.ready()
    const core2 = new Hypercore(RAM, core1.key)

    const s1 = core1.replicate(true)
    const s2 = core2.replicate(false)

    s1.on('end', () => t.pass('s1 end'))
    s2.on('end', () => t.pass('s2 end'))

    s1.pipe(s2).pipe(s1)

    await core2.update()
    s2.end()
  })()
})
@LuKks
Copy link
Contributor

LuKks commented Apr 25, 2023

When you do stream.end() this sends the signal to the other side, meaning the other side gets the 'end' event but not you, unless they also send stream.end() back, for example:

const s = core.replicate(true)
s.on('end', () => s.end())
// ...
s.end()

I think this is automatically handled so you should only need to do s.end() and wait for 'close' event (unsure about timeouts here).

Except if you listen to the event like s.on('end', cb) then it's your job to do as above and manually call s.end(). In your Expected behaviour example you were registering the event and just logging, so you're not sending back s.end().

@gmaclennan
Copy link
Contributor Author

gmaclennan commented Apr 25, 2023

Thanks @LuKks that makes sense, however this is not the behaviour I am seeing: s.end() does not cause any end event on the other side. You can see from running the tests in my comment that the only event that fires is s2.on('end') when you call s2.end(), in the second test. No other end events fire.

@gmaclennan
Copy link
Contributor Author

Ok did some digging and got the following results. It seems like s.end() results in inconsistent behaviour if called in the same tick, or the next tick, but is consistent (but not quite in the way that you suggest) if I call it after setTimeout(0):

import Hypercore from 'hypercore'
import RAM from 'random-access-memory'

const core1 = new Hypercore(RAM)
await core1.ready()
const core2 = new Hypercore(RAM, core1.key)

const s1 = core1.replicate(true)
const s2 = core2.replicate(false)

s1.on('end', () => console.log('s1 end'))
s1.on('close', () => console.log('s1 close'))
s2.on('end', () => console.log('s2 end'))
s2.on('close', () => console.log('s2 close'))

s1.pipe(s2).pipe(s1)

s2.end()

This code logs nothing, either when calling s1.end() or s2.end().

If I await process.nextTick I get different results depending on which core I await and which stream I end:

await new Promise(res => process.nextTick(res))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => process.nextTick(res))
s2.end()
// s2 end
// s1 end
// s1 close

If I await setTimeout(0) then I get consistent, but not quite what I would expect based on your comment.

await new Promise(res => setTimeout(res, 0))
s1.end()
// s1 end
// s1 close
// s2 close
await new Promise(res => setTimeout(res, 0))
s2.end()
// s2 end
// s2 close
// s1 close

@LuKks
Copy link
Contributor

LuKks commented Apr 25, 2023

Try commenting out the lines where you listen to 'end' events, and try again without waiting the next tick

@gmaclennan
Copy link
Contributor Author

Done: nothing logged (e.g. no 'close' events logging)

@gmaclennan
Copy link
Contributor Author

Possibly related? mafintosh/streamx#72

@LuKks
Copy link
Contributor

LuKks commented Apr 25, 2023

I was about to say that it sounds like a bug on secret-stream because it registers the 'end' event a bit late, that's why if you wait a tick it works. You should create PR on secret-stream where you add a single simple test case reproducing the bug

@LuKks
Copy link
Contributor

LuKks commented Apr 25, 2023

Meanwhile, instead of commenting it out do this:

s1.on('end', () => s1.end())
s2.on('end', () => s2.end())

And try again without waiting any tick

@mafintosh
Copy link
Contributor

Unsure if you can end these streams atm. We tend to use the replicate(stream) api everywhere with a swarm stream, and the stream returned works a bit differently, but we'll look into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants