Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moving from readable-streams to streamx #42

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

martinheidegger
Copy link

Moving utp-native to work with streamx. This PR is motivated to have less dependencies in the hypercore-stack.

binding.cc Outdated Show resolved Hide resolved
@@ -132,16 +130,17 @@ UTP.prototype._closeMaybe = function () {
if (this._closing && !this.connections.length && !this._sending.length && this._inited && !this._closed) {
this._closed = true
binding.utp_napi_close(this._handle)
} else {
for (const conn of this.connections) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand how these lines could be missing: When you close the server, any open connections are supposed to be closed, right?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, each connection must be closed individually

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Same as tcp)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instances created in the tests are not properly torn down then, causing a lot of weird errors happen when I tried to remove this. Checking the tests for "why they fail because this is removed" is the reason this takes to long for me to continue.

@@ -425,7 +444,6 @@ NAPI_METHOD(utp_napi_close) {
err = uv_udp_recv_stop(&(self->handle));
if (err < 0) UTP_NAPI_THROW(err)

uv_close((uv_handle_t *) &(self->handle), on_uv_close);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handle is now closed serially. If I leave his in parallel, it causes a segfault. Probably a timing error, but I couldn't figure out its cause.

module.exports = class Connection extends streamx.Duplex {
constructor (utp, port, address, handle, halfOpen) {
super({
mapWritable: Buffer.from
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This copies all buffers as Buffer.from(buf) is a copy. Check if it's a string an only convert it then.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a quick fix that I forgot to remove, thanks for reminding. Thinking about it though: it may be even better to add two options to UTP: mapWritable and mapReadable with defaults going to a String → Buffer conversion? Related: mafintosh/streamx#47

}

this.once('error', unregister)
this.once('close', unregister)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do this in _destroy instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had this in _destroy but while debugging ran into a problem where _destroy doesn't seemed to have been called. Don't remember anymore what that was or in which combination it occurred. Can't reproduce at the moment.

function unregister () {
this.off('error', unregister)
this.off('close', unregister)
process.nextTick(() => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is nextTick needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I remember this right but what I remember: the error, close triggering too quickly caused some chance of binding.utp_napi_close to be called before binding.utp_close finished.. or something alike. Preventing a proper shutdown of the UTP instance.

}
process.nextTick(() => this.emit('connect'))
initCb()
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old plumbing where the _write method waiting for connect seems simpler imo, can we leave that?


if (bufs.length > 256) return this._write(Buffer.concat(bufs), null, cb)
_write (data, cb) {
if (this.destroyed) return
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroyed writes don't happen in streamx :)

cb(null)
}
_writev (datas, cb) {
if (this.destroyed) return
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@martinheidegger
Copy link
Author

Thoroughly exhaused about this PR for a while. I will not touch it for the time being, if someone else wants to give it a shot: please be my guest.

@martinheidegger martinheidegger marked this pull request as draft May 25, 2021 07:18
module.exports = UTP

const EMPTY = Buffer.alloc(0)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my , i readded the EMPTY constant at the wrong place. Should be just as before. (memo)

@jimmywarting
Copy link

jimmywarting commented Jun 21, 2021

Any thoughts on using async iterator instead?

for await (const chunk of stream) {
  // yield chunk.toString()
}

@martinheidegger
Copy link
Author

The thing about streamx is that it supports AsyncIterator and streams: https://github.com/streamxorg/streamx/blob/7fae781dd08f7cb12ee7c5f41803cfd876846fa5/index.js#L689

@jimmywarting
Copy link

jimmywarting commented Jun 21, 2021

I retract my comment, didn't look into the package source code (it's a duplex stream) making it even more complicated

I think you should just simply depend on node:stream instead... it's always going to be up to date

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants