diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..cd4cb3e --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "rust-analyzer.linkedProjects": [ + "./benchmark/Cargo.toml" + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 864992d..d178659 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,9 @@ +[workspace] +members = ["benchmark"] + [package] name = "loole" -version = "0.1.3" +version = "0.1.4" edition = "2021" authors = ["Mahdi Shojaee "] description = "A safe async/sync multi-producer, multi-consumer channel" diff --git a/README.md b/README.md index 06c8f35..726803f 100644 --- a/README.md +++ b/README.md @@ -38,42 +38,55 @@ fn main() { To use Loole, place the following line under the `[dependencies]` section in your `Cargo.toml`: ```toml -loole = "0.1.3" +loole = "0.1.4" ``` ## Benchmarks Benchmarks measure throughput, which is the number of messages sent and received per second, for messages of 264 bytes each. -### MPSC +### Run benchmarks + +To run benchmarks on your local machine, run the following command: + +```bash +cargo run --release -p benchmark +``` -5000 producers send messages to 1 consumer. +The above command will generate and update the benchmark images in the README.md file. + +### MPSC Measures: Messages per seconds. (higher is better) Messages size: 264 bytes. -![MPSC](misc/loole-mpsc.png) +![MPSC: sync-sync](benchmark/charts/images/mpsc-sync-sync.svg) +![MPSC: async-async](benchmark/charts/images/mpsc-async-async.svg) +![MPSC: async-sync](benchmark/charts/images/mpsc-async-sync.svg) +![MPSC: sync-async](benchmark/charts/images/mpsc-sync-async.svg) ### MPMC -5000 producers send messages to 2 consumers. - Measures: Messages per seconds. (higher is better) Messages size: 264 bytes. -![MPMC](misc/loole-mpmc.png) +![MPMC: sync-sync](benchmark/charts/images/mpmc-sync-sync.svg) +![MPMC: async-async](benchmark/charts/images/mpmc-async-async.svg) +![MPMC: async-sync](benchmark/charts/images/mpmc-async-sync.svg) +![MPMC: sync-async](benchmark/charts/images/mpmc-sync-async.svg) ### SPSC -1 producer send messages to 1 consumer. - Measures: Messages per seconds. (higher is better) Messages size: 264 bytes. -![SPSC](misc/loole-spsc.png) +![SPSC: sync-sync](benchmark/charts/images/spsc-sync-sync.svg) +![SPSC: async-async](benchmark/charts/images/spsc-async-async.svg) +![SPSC: async-sync](benchmark/charts/images/spsc-async-sync.svg) +![SPSC: sync-async](benchmark/charts/images/spsc-sync-async.svg) ## License diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml new file mode 100644 index 0000000..9bb9dac --- /dev/null +++ b/benchmark/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "benchmark" +version = "0.1.0" +edition = "2021" + +[dependencies] +loole = { path = ".." } +async-channel = "1.9.0" +flume = "0.10.14" +kanal = { version = "0.1.0-pre8", features = ["std-mutex"] } +crossbeam-channel = "0.5.8" +tokio = { version = "1.31.0", features = ["full"] } +num-format = "0.4.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.107" diff --git a/benchmark/charts/.gitignore b/benchmark/charts/.gitignore new file mode 100644 index 0000000..a547bf3 --- /dev/null +++ b/benchmark/charts/.gitignore @@ -0,0 +1,24 @@ +# Logs +logs +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +pnpm-debug.log* +lerna-debug.log* + +node_modules +dist +dist-ssr +*.local + +# Editor directories and files +.vscode/* +!.vscode/extensions.json +.idea +.DS_Store +*.suo +*.ntvs* +*.njsproj +*.sln +*.sw? diff --git a/benchmark/charts/benchmark-result.json b/benchmark/charts/benchmark-result.json new file mode 100644 index 0000000..ad881f4 --- /dev/null +++ b/benchmark/charts/benchmark-result.json @@ -0,0 +1,632 @@ +{ + "mpsc": { + "sync-sync": { + "title": "5000 sync senders / 1 sync receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1266747, + 760797, + 0, + 0, + 223192, + 392076 + ], + [ + "bounded(1)", + 1299706, + 797610, + 242067, + 233439, + 131391, + 182450 + ], + [ + "bounded(50)", + 1299096, + 783805, + 550134, + 1126038, + 129894, + 195287 + ], + [ + "bounded(100)", + 1298088, + 784329, + 544614, + 1131401, + 128806, + 197504 + ] + ] + } + }, + "async-async": { + "title": "5000 async senders / 1 async receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1953227, + 58983, + 0, + 0, + 3387730, + 0 + ], + [ + "bounded(1)", + 2073515, + 59501, + 1217626, + 2792523, + 2039503, + 0 + ], + [ + "bounded(50)", + 1791361, + 58999, + 757492, + 2049884, + 2332746, + 0 + ], + [ + "bounded(100)", + 1980326, + 56320, + 841443, + 2100951, + 2012172, + 0 + ] + ] + } + }, + "async-sync": { + "title": "5000 async senders / 1 sync receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1963973, + 58526, + 0, + 0, + 3188337, + 0 + ], + [ + "bounded(1)", + 1973744, + 58400, + 270978, + 260413, + 1858322, + 0 + ], + [ + "bounded(50)", + 1934532, + 59346, + 789137, + 2057602, + 1869366, + 0 + ], + [ + "bounded(100)", + 2005898, + 63381, + 718127, + 2029216, + 1956505, + 0 + ] + ] + } + }, + "sync-async": { + "title": "5000 sync senders / 1 async receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1281720, + 798436, + 0, + 0, + 1253397, + 0 + ], + [ + "bounded(1)", + 1236789, + 798357, + 170775, + 193148, + 130553, + 0 + ], + [ + "bounded(50)", + 1277570, + 814099, + 542768, + 1117772, + 127490, + 0 + ], + [ + "bounded(100)", + 1243376, + 815376, + 544671, + 1185323, + 132491, + 0 + ] + ] + } + } + }, + "mpmc": { + "sync-sync": { + "title": "5000 sync senders / 10 sync receivers", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1171485, + 634557, + 0, + 0, + 209364, + 239205 + ], + [ + "bounded(1)", + 1133225, + 638677, + 342278, + 0, + 126882, + 170162 + ], + [ + "bounded(50)", + 1158600, + 634685, + 404745, + 0, + 194968, + 174638 + ], + [ + "bounded(100)", + 1090020, + 634489, + 405812, + 0, + 130739, + 175664 + ] + ] + } + }, + "async-async": { + "title": "5000 async senders / 10 async receivers", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1430726, + 71535, + 0, + 0, + 2181914, + 0 + ], + [ + "bounded(1)", + 1438181, + 73144, + 679771, + 0, + 1514383, + 0 + ], + [ + "bounded(50)", + 1814827, + 71706, + 5795710, + 0, + 1464339, + 0 + ], + [ + "bounded(100)", + 2024925, + 70648, + 5443571, + 0, + 1626718, + 0 + ] + ] + } + }, + "async-sync": { + "title": "5000 async senders / 10 sync receivers", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1079265, + 68233, + 0, + 0, + 1065251, + 0 + ], + [ + "bounded(1)", + 1099716, + 70046, + 796526, + 0, + 1564901, + 0 + ], + [ + "bounded(50)", + 1072935, + 66102, + 5058264, + 0, + 1652168, + 0 + ], + [ + "bounded(100)", + 1082870, + 68323, + 5168296, + 0, + 1515097, + 0 + ] + ] + } + }, + "sync-async": { + "title": "5000 sync senders / 10 async receivers", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 1151306, + 633803, + 0, + 0, + 464130, + 0 + ], + [ + "bounded(1)", + 1134389, + 633003, + 202035, + 0, + 127111, + 0 + ], + [ + "bounded(50)", + 1148371, + 636644, + 416205, + 0, + 125575, + 0 + ], + [ + "bounded(100)", + 1183153, + 638166, + 427100, + 0, + 124819, + 0 + ] + ] + } + } + }, + "spsc": { + "sync-sync": { + "title": "1 sync sender / 1 sync receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 685401, + 682343, + 0, + 0, + 7141773, + 3401986 + ], + [ + "bounded(1)", + 1029526, + 1032356, + 345271, + 345508, + 5138641, + 5046589 + ], + [ + "bounded(50)", + 4013275, + 6637439, + 14905065, + 5047318, + 9621140, + 16757181 + ], + [ + "bounded(100)", + 4300841, + 6520335, + 11020787, + 5382916, + 9034310, + 14838765 + ] + ] + } + }, + "async-async": { + "title": "1 async sender / 1 async receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 6246206, + 4274617, + 0, + 0, + 8707159, + 0 + ], + [ + "bounded(1)", + 7263551, + 4246570, + 2255044, + 2781927, + 9780376, + 0 + ], + [ + "bounded(50)", + 16905352, + 12554794, + 15512261, + 9248255, + 26817254, + 0 + ], + [ + "bounded(100)", + 5543888, + 6022053, + 11321346, + 8092411, + 29916766, + 0 + ] + ] + } + }, + "async-sync": { + "title": "1 async sender / 1 sync receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 545336, + 526467, + 0, + 0, + 1410910, + 0 + ], + [ + "bounded(1)", + 810877, + 792058, + 264657, + 264099, + 2116980, + 0 + ], + [ + "bounded(50)", + 5009002, + 5088051, + 9186541, + 6188150, + 8087397, + 0 + ], + [ + "bounded(100)", + 4426106, + 5288014, + 8039824, + 6222736, + 8426656, + 0 + ] + ] + } + }, + "sync-async": { + "title": "1 sync sender / 1 async receiver", + "dataset": { + "source": [ + [ + "capacity", + "loole", + "flume", + "async-channel", + "tokio", + "kanal", + "crossbeam-channel" + ], + [ + "bounded(0)", + 544971, + 518006, + 0, + 0, + 1364421, + 0 + ], + [ + "bounded(1)", + 782345, + 516322, + 267390, + 264476, + 2096253, + 0 + ], + [ + "bounded(50)", + 5543426, + 6014639, + 7203311, + 5988391, + 10445808, + 0 + ], + [ + "bounded(100)", + 4753615, + 6224164, + 7753069, + 6005439, + 10222426, + 0 + ] + ] + } + } + } +} \ No newline at end of file diff --git a/benchmark/charts/images/mpmc-async-async.svg b/benchmark/charts/images/mpmc-async-async.svg new file mode 100644 index 0000000..fdf91b3 --- /dev/null +++ b/benchmark/charts/images/mpmc-async-async.svg @@ -0,0 +1,231 @@ + + + + + + + + + + + + + + + +0 +1,000,000 +2,000,000 +3,000,000 +4,000,000 +5,000,000 +6,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +kanal + +5000 async senders / 10 async receivers + + \ No newline at end of file diff --git a/benchmark/charts/images/mpmc-async-sync.svg b/benchmark/charts/images/mpmc-async-sync.svg new file mode 100644 index 0000000..127e741 --- /dev/null +++ b/benchmark/charts/images/mpmc-async-sync.svg @@ -0,0 +1,231 @@ + + + + + + + + + + + + + + + +0 +1,000,000 +2,000,000 +3,000,000 +4,000,000 +5,000,000 +6,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +kanal + +5000 async senders / 10 sync receivers + + \ No newline at end of file diff --git a/benchmark/charts/images/mpmc-sync-async.svg b/benchmark/charts/images/mpmc-sync-async.svg new file mode 100644 index 0000000..990b23e --- /dev/null +++ b/benchmark/charts/images/mpmc-sync-async.svg @@ -0,0 +1,231 @@ + + + + + + + + + + + + + + + +0 +200,000 +400,000 +600,000 +800,000 +1,000,000 +1,200,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +kanal + +5000 sync senders / 10 async receivers + + \ No newline at end of file diff --git a/benchmark/charts/images/mpmc-sync-sync.svg b/benchmark/charts/images/mpmc-sync-sync.svg new file mode 100644 index 0000000..a95b2b0 --- /dev/null +++ b/benchmark/charts/images/mpmc-sync-sync.svg @@ -0,0 +1,277 @@ + + + + + + + + + + + + + + + +0 +200,000 +400,000 +600,000 +800,000 +1,000,000 +1,200,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +kanal + +crossbeam-channel + +5000 sync senders / 10 sync receivers + + \ No newline at end of file diff --git a/benchmark/charts/images/mpsc-async-async.svg b/benchmark/charts/images/mpsc-async-async.svg new file mode 100644 index 0000000..1ded8b0 --- /dev/null +++ b/benchmark/charts/images/mpsc-async-async.svg @@ -0,0 +1,268 @@ + + + + + + + + + + + + + + + + +0 +500,000 +1,000,000 +1,500,000 +2,000,000 +2,500,000 +3,000,000 +3,500,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +5000 async senders / 1 async receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/mpsc-async-sync.svg b/benchmark/charts/images/mpsc-async-sync.svg new file mode 100644 index 0000000..c02c943 --- /dev/null +++ b/benchmark/charts/images/mpsc-async-sync.svg @@ -0,0 +1,268 @@ + + + + + + + + + + + + + + + + +0 +500,000 +1,000,000 +1,500,000 +2,000,000 +2,500,000 +3,000,000 +3,500,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +5000 async senders / 1 sync receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/mpsc-sync-async.svg b/benchmark/charts/images/mpsc-sync-async.svg new file mode 100644 index 0000000..cfe4841 --- /dev/null +++ b/benchmark/charts/images/mpsc-sync-async.svg @@ -0,0 +1,264 @@ + + + + + + + + + + + + + + +0 +300,000 +600,000 +900,000 +1,200,000 +1,500,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +5000 sync senders / 1 async receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/mpsc-sync-sync.svg b/benchmark/charts/images/mpsc-sync-sync.svg new file mode 100644 index 0000000..d37a51c --- /dev/null +++ b/benchmark/charts/images/mpsc-sync-sync.svg @@ -0,0 +1,310 @@ + + + + + + + + + + + + + + +0 +300,000 +600,000 +900,000 +1,200,000 +1,500,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +crossbeam-channel + +5000 sync senders / 1 sync receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/spsc-async-async.svg b/benchmark/charts/images/spsc-async-async.svg new file mode 100644 index 0000000..89b7d60 --- /dev/null +++ b/benchmark/charts/images/spsc-async-async.svg @@ -0,0 +1,266 @@ + + + + + + + + + + + + + + + +0 +5,000,000 +10,000,000 +15,000,000 +20,000,000 +25,000,000 +30,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +1 async sender / 1 async receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/spsc-async-sync.svg b/benchmark/charts/images/spsc-async-sync.svg new file mode 100644 index 0000000..a4544ce --- /dev/null +++ b/benchmark/charts/images/spsc-async-sync.svg @@ -0,0 +1,264 @@ + + + + + + + + + + + + + + +0 +2,000,000 +4,000,000 +6,000,000 +8,000,000 +10,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +1 async sender / 1 sync receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/spsc-sync-async.svg b/benchmark/charts/images/spsc-sync-async.svg new file mode 100644 index 0000000..410595f --- /dev/null +++ b/benchmark/charts/images/spsc-sync-async.svg @@ -0,0 +1,266 @@ + + + + + + + + + + + + + + + +0 +2,000,000 +4,000,000 +6,000,000 +8,000,000 +10,000,000 +12,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +1 sync sender / 1 async receiver + + \ No newline at end of file diff --git a/benchmark/charts/images/spsc-sync-sync.svg b/benchmark/charts/images/spsc-sync-sync.svg new file mode 100644 index 0000000..e7475ef --- /dev/null +++ b/benchmark/charts/images/spsc-sync-sync.svg @@ -0,0 +1,312 @@ + + + + + + + + + + + + + + + +0 +3,000,000 +6,000,000 +9,000,000 +12,000,000 +15,000,000 +18,000,000 +bounded(0) +bounded(1) +bounded(50) +bounded(100) + + + + + + + + + + + + + + + + + + + + + + + + + + +loole + +flume + +async-channel + +tokio + +kanal + +crossbeam-channel + +1 sync sender / 1 sync receiver + + \ No newline at end of file diff --git a/benchmark/charts/main.js b/benchmark/charts/main.js new file mode 100644 index 0000000..fdc1128 --- /dev/null +++ b/benchmark/charts/main.js @@ -0,0 +1,77 @@ +const { JSDOM } = require("jsdom"); +const fs = require("node:fs"); + +const { window } = new JSDOM(); +global.window = window; +global.document = window.document; +global.navigator = window.navigator; +global.navigator.userAgent = window.navigator.userAgent; +global.navigator.language = window.navigator.language; +const echarts = require("echarts"); + +const data = require("./benchmark-result.json"); + +echarts.setPlatformAPI({ + createCanvas() {}, +}); + +function renderChart(data, chanType, benchType) { + const barsNo = data[chanType][benchType].dataset.source[0].length - 1; + const supportedCrates = data[chanType][benchType].dataset.source[0].filter( + (_, index) => + index > 0 && + data[chanType][benchType].dataset.source + .slice(1) + .some((a) => a[index] > 0) + ); + const chart = echarts.init(null, null, { + renderer: "svg", + ssr: true, + animation: false, + width: 700, + height: 220, + }); + const option = { + title: { + text: data[chanType][benchType].title, + textStyle: { + fontSize: 14, + }, + left: "center", + top: "5%", + backgroundColor: "#ddd8", + borderColor: "#ccc", + borderWidth: 1, + padding: [5, 20], + }, + backgroundColor: "white", + legend: { + orient: "horizontal", + left: "center", + bottom: "5%", + data: supportedCrates, + }, + grid: { + left: "4%", + right: "4%", + bottom: "20%", + top: "20%", + containLabel: true, + }, + tooltip: {}, + dataset: data[chanType][benchType].dataset, + xAxis: { type: "category" }, + yAxis: {}, + series: Array(barsNo).fill({ type: "bar" }), + }; + chart.setOption(option); + const svgStr = chart.renderToSVGString(); + chart.dispose(); + fs.writeFileSync(`images/${chanType}-${benchType}.svg`, svgStr); +} + +for (const chanType of Object.keys(data)) { + for (const benchType of Object.keys(data[chanType])) { + renderChart(data, chanType, benchType); + } +} diff --git a/benchmark/charts/package-lock.json b/benchmark/charts/package-lock.json new file mode 100644 index 0000000..ef0f91f --- /dev/null +++ b/benchmark/charts/package-lock.json @@ -0,0 +1,473 @@ +{ + "name": "charts", + "version": "0.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "charts", + "version": "0.0.0", + "dependencies": { + "echarts": "^5.4.3", + "jsdom": "^22.1.0" + } + }, + "node_modules/@tootallnate/once": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-2.0.0.tgz", + "integrity": "sha512-XCuKFP5PS55gnMVu3dty8KPatLqUoy/ZYzDzAGCQ8JNFCkLXzmI7vNHCR+XpbZaMWQK/vQubr7PkYq8g470J/A==", + "engines": { + "node": ">= 10" + } + }, + "node_modules/abab": { + "version": "2.0.6", + "resolved": "https://registry.npmjs.org/abab/-/abab-2.0.6.tgz", + "integrity": "sha512-j2afSsaIENvHZN2B8GOpF566vZ5WVk5opAiMTvWgaQT8DkbOqsTfvNAvHoRGU2zzP8cPoqys+xHTRDWW8L+/BA==" + }, + "node_modules/agent-base": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", + "integrity": "sha512-RZNwNclF7+MS/8bDg70amg32dyeZGZxiDuQmZxKLAlQjr3jGyLx+4Kkk58UO7D2QdgFIQCovuSuZESne6RG6XQ==", + "dependencies": { + "debug": "4" + }, + "engines": { + "node": ">= 6.0.0" + } + }, + "node_modules/asynckit": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", + "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==" + }, + "node_modules/combined-stream": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-1.0.8.tgz", + "integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==", + "dependencies": { + "delayed-stream": "~1.0.0" + }, + "engines": { + "node": ">= 0.8" + } + }, + "node_modules/cssstyle": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/cssstyle/-/cssstyle-3.0.0.tgz", + "integrity": "sha512-N4u2ABATi3Qplzf0hWbVCdjenim8F3ojEXpBDF5hBpjzW182MjNGLqfmQ0SkSPeQ+V86ZXgeH8aXj6kayd4jgg==", + "dependencies": { + "rrweb-cssom": "^0.6.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/data-urls": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/data-urls/-/data-urls-4.0.0.tgz", + "integrity": "sha512-/mMTei/JXPqvFqQtfyTowxmJVwr2PVAeCcDxyFf6LhoOu/09TX2OX3kb2wzi4DMXcfj4OItwDOnhl5oziPnT6g==", + "dependencies": { + "abab": "^2.0.6", + "whatwg-mimetype": "^3.0.0", + "whatwg-url": "^12.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/debug": { + "version": "4.3.4", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz", + "integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==", + "dependencies": { + "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/decimal.js": { + "version": "10.4.3", + "resolved": "https://registry.npmjs.org/decimal.js/-/decimal.js-10.4.3.tgz", + "integrity": "sha512-VBBaLc1MgL5XpzgIP7ny5Z6Nx3UrRkIViUkPUdtl9aya5amy3De1gsUUSB1g3+3sExYNjCAsAznmukyxCb1GRA==" + }, + "node_modules/delayed-stream": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", + "integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==", + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/domexception": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/domexception/-/domexception-4.0.0.tgz", + "integrity": "sha512-A2is4PLG+eeSfoTMA95/s4pvAoSo2mKtiM5jlHkAVewmiO8ISFTFKZjH7UAM1Atli/OT/7JHOrJRJiMKUZKYBw==", + "dependencies": { + "webidl-conversions": "^7.0.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/echarts": { + "version": "5.4.3", + "resolved": "https://registry.npmjs.org/echarts/-/echarts-5.4.3.tgz", + "integrity": "sha512-mYKxLxhzy6zyTi/FaEbJMOZU1ULGEQHaeIeuMR5L+JnJTpz+YR03mnnpBhbR4+UYJAgiXgpyTVLffPAjOTLkZA==", + "dependencies": { + "tslib": "2.3.0", + "zrender": "5.4.4" + } + }, + "node_modules/entities": { + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/entities/-/entities-4.5.0.tgz", + "integrity": "sha512-V0hjH4dGPh9Ao5p0MoRY6BVqtwCjhz6vI5LT8AJ55H+4g9/4vbHx1I54fS0XuclLhDHArPQCiMjDxjaL8fPxhw==", + "engines": { + "node": ">=0.12" + }, + "funding": { + "url": "https://github.com/fb55/entities?sponsor=1" + } + }, + "node_modules/form-data": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.0.tgz", + "integrity": "sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/html-encoding-sniffer": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/html-encoding-sniffer/-/html-encoding-sniffer-3.0.0.tgz", + "integrity": "sha512-oWv4T4yJ52iKrufjnyZPkrN0CH3QnrUqdB6In1g5Fe1mia8GmF36gnfNySxoZtxD5+NmYw1EElVXiBk93UeskA==", + "dependencies": { + "whatwg-encoding": "^2.0.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/http-proxy-agent": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz", + "integrity": "sha512-n2hY8YdoRE1i7r6M0w9DIw5GgZN0G25P8zLCRQ8rjXtTU3vsNFBI/vWK/UIeE6g5MUUz6avwAPXmL6Fy9D/90w==", + "dependencies": { + "@tootallnate/once": "2", + "agent-base": "6", + "debug": "4" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/https-proxy-agent": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/https-proxy-agent/-/https-proxy-agent-5.0.1.tgz", + "integrity": "sha512-dFcAjpTQFgoLMzC2VwU+C/CbS7uRL0lWmxDITmqm7C+7F0Odmj6s9l6alZc6AELXhrnggM2CeWSXHGOdX2YtwA==", + "dependencies": { + "agent-base": "6", + "debug": "4" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/iconv-lite": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz", + "integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==", + "dependencies": { + "safer-buffer": ">= 2.1.2 < 3.0.0" + }, + "engines": { + "node": ">=0.10.0" + } + }, + "node_modules/is-potential-custom-element-name": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/is-potential-custom-element-name/-/is-potential-custom-element-name-1.0.1.tgz", + "integrity": "sha512-bCYeRA2rVibKZd+s2625gGnGF/t7DSqDs4dP7CrLA1m7jKWz6pps0LpYLJN8Q64HtmPKJ1hrN3nzPNKFEKOUiQ==" + }, + "node_modules/jsdom": { + "version": "22.1.0", + "resolved": "https://registry.npmjs.org/jsdom/-/jsdom-22.1.0.tgz", + "integrity": "sha512-/9AVW7xNbsBv6GfWho4TTNjEo9fe6Zhf9O7s0Fhhr3u+awPwAJMKwAMXnkk5vBxflqLW9hTHX/0cs+P3gW+cQw==", + "dependencies": { + "abab": "^2.0.6", + "cssstyle": "^3.0.0", + "data-urls": "^4.0.0", + "decimal.js": "^10.4.3", + "domexception": "^4.0.0", + "form-data": "^4.0.0", + "html-encoding-sniffer": "^3.0.0", + "http-proxy-agent": "^5.0.0", + "https-proxy-agent": "^5.0.1", + "is-potential-custom-element-name": "^1.0.1", + "nwsapi": "^2.2.4", + "parse5": "^7.1.2", + "rrweb-cssom": "^0.6.0", + "saxes": "^6.0.0", + "symbol-tree": "^3.2.4", + "tough-cookie": "^4.1.2", + "w3c-xmlserializer": "^4.0.0", + "webidl-conversions": "^7.0.0", + "whatwg-encoding": "^2.0.0", + "whatwg-mimetype": "^3.0.0", + "whatwg-url": "^12.0.1", + "ws": "^8.13.0", + "xml-name-validator": "^4.0.0" + }, + "engines": { + "node": ">=16" + }, + "peerDependencies": { + "canvas": "^2.5.0" + }, + "peerDependenciesMeta": { + "canvas": { + "optional": true + } + } + }, + "node_modules/mime-db": { + "version": "1.52.0", + "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", + "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/mime-types": { + "version": "2.1.35", + "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", + "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", + "dependencies": { + "mime-db": "1.52.0" + }, + "engines": { + "node": ">= 0.6" + } + }, + "node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, + "node_modules/nwsapi": { + "version": "2.2.7", + "resolved": "https://registry.npmjs.org/nwsapi/-/nwsapi-2.2.7.tgz", + "integrity": "sha512-ub5E4+FBPKwAZx0UwIQOjYWGHTEq5sPqHQNRN8Z9e4A7u3Tj1weLJsL59yH9vmvqEtBHaOmT6cYQKIZOxp35FQ==" + }, + "node_modules/parse5": { + "version": "7.1.2", + "resolved": "https://registry.npmjs.org/parse5/-/parse5-7.1.2.tgz", + "integrity": "sha512-Czj1WaSVpaoj0wbhMzLmWD69anp2WH7FXMB9n1Sy8/ZFF9jolSQVMu1Ij5WIyGmcBmhk7EOndpO4mIpihVqAXw==", + "dependencies": { + "entities": "^4.4.0" + }, + "funding": { + "url": "https://github.com/inikulin/parse5?sponsor=1" + } + }, + "node_modules/psl": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/psl/-/psl-1.9.0.tgz", + "integrity": "sha512-E/ZsdU4HLs/68gYzgGTkMicWTLPdAftJLfJFlLUAAKZGkStNU72sZjT66SnMDVOfOWY/YAoiD7Jxa9iHvngcag==" + }, + "node_modules/punycode": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.0.tgz", + "integrity": "sha512-rRV+zQD8tVFys26lAGR9WUuS4iUAngJScM+ZRSKtvl5tKeZ2t5bvdNFdNHBW9FWR4guGHlgmsZ1G7BSm2wTbuA==", + "engines": { + "node": ">=6" + } + }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==" + }, + "node_modules/rrweb-cssom": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/rrweb-cssom/-/rrweb-cssom-0.6.0.tgz", + "integrity": "sha512-APM0Gt1KoXBz0iIkkdB/kfvGOwC4UuJFeG/c+yV7wSc7q96cG/kJ0HiYCnzivD9SB53cLV1MlHFNfOuPaadYSw==" + }, + "node_modules/safer-buffer": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", + "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==" + }, + "node_modules/saxes": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/saxes/-/saxes-6.0.0.tgz", + "integrity": "sha512-xAg7SOnEhrm5zI3puOOKyy1OMcMlIJZYNJY7xLBwSze0UjhPLnWfj2GF2EpT0jmzaJKIWKHLsaSSajf35bcYnA==", + "dependencies": { + "xmlchars": "^2.2.0" + }, + "engines": { + "node": ">=v12.22.7" + } + }, + "node_modules/symbol-tree": { + "version": "3.2.4", + "resolved": "https://registry.npmjs.org/symbol-tree/-/symbol-tree-3.2.4.tgz", + "integrity": "sha512-9QNk5KwDF+Bvz+PyObkmSYjI5ksVUYtjW7AU22r2NKcfLJcXp96hkDWU3+XndOsUb+AQ9QhfzfCT2O+CNWT5Tw==" + }, + "node_modules/tough-cookie": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.3.tgz", + "integrity": "sha512-aX/y5pVRkfRnfmuX+OdbSdXvPe6ieKX/G2s7e98f4poJHnqH3281gDPm/metm6E/WRamfx7WC4HUqkWHfQHprw==", + "dependencies": { + "psl": "^1.1.33", + "punycode": "^2.1.1", + "universalify": "^0.2.0", + "url-parse": "^1.5.3" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/tr46": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-4.1.1.tgz", + "integrity": "sha512-2lv/66T7e5yNyhAAC4NaKe5nVavzuGJQVVtRYLyQ2OI8tsJ61PMLlelehb0wi2Hx6+hT/OJUWZcw8MjlSRnxvw==", + "dependencies": { + "punycode": "^2.3.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/tslib": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.3.0.tgz", + "integrity": "sha512-N82ooyxVNm6h1riLCoyS9e3fuJ3AMG2zIZs2Gd1ATcSFjSA23Q0fzjjZeh0jbJvWVDZ0cJT8yaNNaaXHzueNjg==" + }, + "node_modules/universalify": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.2.0.tgz", + "integrity": "sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==", + "engines": { + "node": ">= 4.0.0" + } + }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, + "node_modules/w3c-xmlserializer": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/w3c-xmlserializer/-/w3c-xmlserializer-4.0.0.tgz", + "integrity": "sha512-d+BFHzbiCx6zGfz0HyQ6Rg69w9k19nviJspaj4yNscGjrHu94sVP+aRm75yEbCh+r2/yR+7q6hux9LVtbuTGBw==", + "dependencies": { + "xml-name-validator": "^4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/webidl-conversions": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-7.0.0.tgz", + "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", + "engines": { + "node": ">=12" + } + }, + "node_modules/whatwg-encoding": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/whatwg-encoding/-/whatwg-encoding-2.0.0.tgz", + "integrity": "sha512-p41ogyeMUrw3jWclHWTQg1k05DSVXPLcVxRTYsXUk+ZooOCZLcoYgPZ/HL/D/N+uQPOtcp1me1WhBEaX02mhWg==", + "dependencies": { + "iconv-lite": "0.6.3" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/whatwg-mimetype": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/whatwg-mimetype/-/whatwg-mimetype-3.0.0.tgz", + "integrity": "sha512-nt+N2dzIutVRxARx1nghPKGv1xHikU7HKdfafKkLNLindmPU/ch3U31NOCGGA/dmPcmb1VlofO0vnKAcsm0o/Q==", + "engines": { + "node": ">=12" + } + }, + "node_modules/whatwg-url": { + "version": "12.0.1", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-12.0.1.tgz", + "integrity": "sha512-Ed/LrqB8EPlGxjS+TrsXcpUond1mhccS3pchLhzSgPCnTimUCKj3IZE75pAs5m6heB2U2TMerKFUXheyHY+VDQ==", + "dependencies": { + "tr46": "^4.1.1", + "webidl-conversions": "^7.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/ws": { + "version": "8.14.2", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.14.2.tgz", + "integrity": "sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g==", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/xml-name-validator": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/xml-name-validator/-/xml-name-validator-4.0.0.tgz", + "integrity": "sha512-ICP2e+jsHvAj2E2lIHxa5tjXRlKDJo4IdvPvCXbXQGdzSfmSpNVyIKMvoZHjDY9DP0zV17iI85o90vRFXNccRw==", + "engines": { + "node": ">=12" + } + }, + "node_modules/xmlchars": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/xmlchars/-/xmlchars-2.2.0.tgz", + "integrity": "sha512-JZnDKK8B0RCDw84FNdDAIpZK+JuJw+s7Lz8nksI7SIuU3UXJJslUthsi+uWBUYOwPFwW7W7PRLRfUKpxjtjFCw==" + }, + "node_modules/zrender": { + "version": "5.4.4", + "resolved": "https://registry.npmjs.org/zrender/-/zrender-5.4.4.tgz", + "integrity": "sha512-0VxCNJ7AGOMCWeHVyTrGzUgrK4asT4ml9PEkeGirAkKNYXYzoPJCLvmyfdoOXcjTHPs10OZVMfD1Rwg16AZyYw==", + "dependencies": { + "tslib": "2.3.0" + } + } + } +} diff --git a/benchmark/charts/package.json b/benchmark/charts/package.json new file mode 100644 index 0000000..cb059f0 --- /dev/null +++ b/benchmark/charts/package.json @@ -0,0 +1,12 @@ +{ + "name": "charts", + "private": true, + "version": "0.0.0", + "scripts": { + "update": "node --no-warnings main.js" + }, + "dependencies": { + "echarts": "^5.4.3", + "jsdom": "^22.1.0" + } +} diff --git a/benchmark/src/async_channel_bench.rs b/benchmark/src/async_channel_bench.rs new file mode 100644 index 0000000..134fcc6 --- /dev/null +++ b/benchmark/src/async_channel_bench.rs @@ -0,0 +1,221 @@ +use async_channel::{bounded, unbounded, Receiver, RecvError, SendError, Sender}; +use std::{future::Future, thread}; + +use crate::bench_utils::{calculate_benchmark_result, BenchResult, JoinHandle, BenchError}; + +pub fn crate_name() -> &'static str { + "async-channel" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.send_blocking(msg) +} + +pub fn recv(rx: &Receiver) -> Result { + rx.recv_blocking() +} + +pub fn send_async( + tx: &Sender, + msg: T, +) -> impl Future>> + '_ { + tx.send(msg) +} + +pub fn recv_async(rx: &Receiver) -> impl Future> + '_ { + rx.recv() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + if cap == Some(0) { + return Err(BenchError::ZeroCapacityNotSupported); + } + let (tx, rx) = cap.map_or_else(unbounded::, bounded); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + for _ in 0..receivers_no { + receivers.push(create_receiver(rx.clone())); + } + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_async_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + for k in 0..n { + match send_async(&tx, k.into()).await { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&rx) { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +fn create_async_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + let mut c = 0; + loop { + match recv_async(&rx).await { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_async_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_async_receiver::, + ) + .await +} + +pub async fn bench_async_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_sync_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_async_receiver::, + ) + .await +} diff --git a/benchmark/src/bench_utils.rs b/benchmark/src/bench_utils.rs new file mode 100644 index 0000000..9aab2b0 --- /dev/null +++ b/benchmark/src/bench_utils.rs @@ -0,0 +1,57 @@ +use std::{error::Error, fmt::Display, thread, time::Instant}; + +pub struct BenchResult { + pub throughput: usize, +} + +#[derive(Debug)] +pub enum BenchError { + ZeroCapacityNotSupported, + AsyncNotSupported, + MpmcNotSupported, +} + +impl Display for BenchError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BenchError::ZeroCapacityNotSupported => write!(f, "zero capacity not supported"), + BenchError::AsyncNotSupported => write!(f, "async not supported"), + BenchError::MpmcNotSupported => write!(f, "mpmc not supported"), + } + } +} + +impl Error for BenchError {} + +pub enum JoinHandle { + Sync(thread::JoinHandle), + Async(tokio::task::JoinHandle), +} + +impl JoinHandle { + pub async fn join(self) -> T { + match self { + JoinHandle::Sync(h) => h.join().unwrap(), + JoinHandle::Async(h) => h.await.unwrap(), + } + } +} + +pub async fn calculate_benchmark_result( + senders: Vec>, + receivers: Vec>, +) -> BenchResult { + let t = Instant::now(); + let mut sent_no = 0; + for t in senders { + sent_no += t.join().await; + } + let mut recv_no = 0; + for t in receivers { + recv_no += t.join().await; + } + let elapsed = t.elapsed(); + assert_eq!(sent_no, recv_no); + let throughput = (recv_no as f32 / elapsed.as_secs_f32()).round() as usize; + BenchResult { throughput } +} diff --git a/benchmark/src/benchmark.rs b/benchmark/src/benchmark.rs new file mode 100644 index 0000000..e4c6b90 --- /dev/null +++ b/benchmark/src/benchmark.rs @@ -0,0 +1,193 @@ +use std::fmt::Display; + +use crate::{ + async_channel_bench, + bench_utils::{BenchError, BenchResult}, + crossbeam_channel_bench, flume_bench, kanal_bench, loole_bench, message_type, tokio_bench, +}; + +pub const MESSAGES_NO: usize = 1_000_000; + +// type MsgType = message_type::StackType<4>; +type MsgType = message_type::StackType<256>; +// type MsgType = message_type::StackType<4096>; + +pub const BUFFER_SIZE_LIST: [Option; 4] = [Some(0), Some(1), Some(50), Some(100)]; + +pub enum CrateName { + Loole, + Flume, + AsyncChannel, + Tokio, + CrossbeamChannel, + Kanal, +} + +impl Display for CrateName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CrateName::Loole => write!(f, "loole"), + CrateName::Flume => write!(f, "flume"), + CrateName::AsyncChannel => write!(f, "async-channel"), + CrateName::Tokio => write!(f, "tokio"), + CrateName::CrossbeamChannel => write!(f, "crossbeam-channel"), + CrateName::Kanal => write!(f, "kanal"), + } + } +} + +pub async fn bench_sync_sync( + crate_name: CrateName, + senders_no: usize, + receivers_no: usize, + buffer_size: Option, +) -> Result { + let msg_no = MESSAGES_NO; + match crate_name { + CrateName::Loole => { + loole_bench::bench_sync_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Flume => { + flume_bench::bench_sync_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::AsyncChannel => { + async_channel_bench::bench_sync_sync::( + senders_no, + receivers_no, + buffer_size, + msg_no, + ) + .await + } + CrateName::Tokio => { + tokio_bench::bench_sync_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Kanal => { + kanal_bench::bench_sync_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::CrossbeamChannel => { + crossbeam_channel_bench::bench_sync_sync::( + senders_no, + receivers_no, + buffer_size, + msg_no, + ) + .await + } + } +} + +pub async fn bench_async_async( + crate_name: CrateName, + senders_no: usize, + receivers_no: usize, + buffer_size: Option, +) -> Result { + let msg_no = MESSAGES_NO; + match crate_name { + CrateName::Loole => { + loole_bench::bench_async_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Flume => { + flume_bench::bench_async_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::AsyncChannel => { + async_channel_bench::bench_async_async::( + senders_no, + receivers_no, + buffer_size, + msg_no, + ) + .await + } + CrateName::Tokio => { + tokio_bench::bench_async_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Kanal => { + kanal_bench::bench_async_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::CrossbeamChannel => Err(BenchError::AsyncNotSupported), + } +} + +pub async fn bench_async_sync( + crate_name: CrateName, + senders_no: usize, + receivers_no: usize, + buffer_size: Option, +) -> Result { + let msg_no = MESSAGES_NO; + match crate_name { + CrateName::Loole => { + loole_bench::bench_async_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Flume => { + flume_bench::bench_async_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::AsyncChannel => { + async_channel_bench::bench_async_sync::( + senders_no, + receivers_no, + buffer_size, + msg_no, + ) + .await + } + CrateName::Tokio => { + tokio_bench::bench_async_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Kanal => { + kanal_bench::bench_async_sync::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::CrossbeamChannel => Err(BenchError::AsyncNotSupported), + } +} + +pub async fn bench_sync_async( + crate_name: CrateName, + senders_no: usize, + receivers_no: usize, + buffer_size: Option, +) -> Result { + let msg_no = MESSAGES_NO; + match crate_name { + CrateName::Loole => { + loole_bench::bench_sync_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Flume => { + flume_bench::bench_sync_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::AsyncChannel => { + async_channel_bench::bench_sync_async::( + senders_no, + receivers_no, + buffer_size, + msg_no, + ) + .await + } + CrateName::Tokio => { + tokio_bench::bench_sync_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::Kanal => { + kanal_bench::bench_sync_async::(senders_no, receivers_no, buffer_size, msg_no) + .await + } + CrateName::CrossbeamChannel => Err(BenchError::AsyncNotSupported), + } +} diff --git a/benchmark/src/chart_data.rs b/benchmark/src/chart_data.rs new file mode 100644 index 0000000..edc0707 --- /dev/null +++ b/benchmark/src/chart_data.rs @@ -0,0 +1,173 @@ +use serde::{ser::SerializeStruct, Serialize}; +use tokio::runtime::Runtime; + +use crate::{ + bench_utils::{BenchError, BenchResult}, + benchmark::{ + bench_async_async, bench_async_sync, bench_sync_async, bench_sync_sync, CrateName, + BUFFER_SIZE_LIST, + }, +}; +use num_format::{Locale, ToFormattedString}; + +#[derive(Serialize, Debug)] +pub struct BenchData { + mpsc: DataSets, + mpmc: DataSets, + spsc: DataSets, +} + +#[derive(Debug)] +pub struct DataSets { + pub sync_sync: ChartData, + pub async_async: ChartData, + pub async_sync: ChartData, + pub sync_async: ChartData, +} + +impl Serialize for DataSets { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut obj = serializer.serialize_struct("DataSets", 4)?; + obj.serialize_field("sync-sync", &self.sync_sync)?; + obj.serialize_field("async-async", &self.async_async)?; + obj.serialize_field("async-sync", &self.async_sync)?; + obj.serialize_field("sync-async", &self.sync_async)?; + obj.end() + } +} + +#[derive(Serialize, Debug)] +pub struct ChartData { + title: String, + dataset: DataSet, +} + +#[derive(Serialize, Debug)] +pub struct DataSet { + source: Vec>, +} + +#[derive(Debug)] +pub enum Item { + Label(String), + Value(usize), +} + +impl Serialize for Item { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Item::Label(l) => serializer.serialize_str(l), + Item::Value(v) => serializer.serialize_u64(*v as u64), + } + } +} + +const CRATES: [CrateName; 6] = [ + CrateName::Loole, + CrateName::Flume, + CrateName::AsyncChannel, + CrateName::Tokio, + CrateName::Kanal, + CrateName::CrossbeamChannel, +]; + +fn f(n: usize) -> String { + n.to_formatted_string(&Locale::en) +} + +pub fn get_dataset(benchmark: F) -> DataSet +where + F: Fn(CrateName, Option) -> Result, +{ + let mut source = vec![]; + eprint!("{:<17}\t", "capacity"); + let mut row = vec![Item::Label("capacity".to_string())]; + for crate_name in CRATES { + eprint!("{:<17}\t", crate_name.to_string()); + row.push(Item::Label(crate_name.to_string())); + } + eprintln!(); + source.push(row); + for buffer_size in BUFFER_SIZE_LIST { + eprint!("{:<17}\t", format!("bounded({})", buffer_size.unwrap())); + let mut row = vec![Item::Label(format!("bounded({})", buffer_size.unwrap()))]; + for crate_name in CRATES { + let r = match benchmark(crate_name, buffer_size) { + Ok(bench_result) => bench_result.throughput, + Err(_) => 0, + }; + eprint!("{:<17}\t", f(r)); + row.push(Item::Value(r)); + } + eprintln!(); + source.push(row); + } + eprintln!(); + DataSet { source } +} + +fn format_title( + senders_no: usize, + receivers_no: usize, + sender_type: &str, + receiver_type: &str, +) -> String { + format!( + "{} {} sender{} / {} {} receiver{}", + senders_no, + sender_type, + if senders_no != 1 { "s" } else { "" }, + receivers_no, + receiver_type, + if receivers_no != 1 { "s" } else { "" } + ) +} + +pub fn get_datasets(senders_no: usize, receivers_no: usize, rt: &Runtime) -> DataSets { + let title = format_title(senders_no, receivers_no, "sync", "sync"); + eprintln!("{}", title); + let sync_sync = ChartData { + title, + dataset: get_dataset(|c, b| rt.block_on(bench_sync_sync(c, senders_no, receivers_no, b))), + }; + let title = format_title(senders_no, receivers_no, "async", "async"); + eprintln!("{}", title); + let async_async = ChartData { + title, + dataset: get_dataset(|c, b| rt.block_on(bench_async_async(c, senders_no, receivers_no, b))), + }; + let title = format_title(senders_no, receivers_no, "async", "sync"); + eprintln!("{}", title); + let async_sync = ChartData { + title, + dataset: get_dataset(|c, b| rt.block_on(bench_async_sync(c, senders_no, receivers_no, b))), + }; + let title = format_title(senders_no, receivers_no, "sync", "async"); + eprintln!("{}", title); + let sync_async = ChartData { + title, + dataset: get_dataset(|c, b| rt.block_on(bench_sync_async(c, senders_no, receivers_no, b))), + }; + DataSets { + sync_sync, + async_async, + async_sync, + sync_async, + } +} + +pub fn get_bench_data(rt: &Runtime) -> BenchData { + eprintln!("MPSC\n----"); + let mpsc = get_datasets(5000, 1, rt); + eprintln!("MPMC\n----"); + let mpmc = get_datasets(5000, 10, rt); + eprintln!("SPSC\n----"); + let spsc = get_datasets(1, 1, rt); + BenchData { mpsc, mpmc, spsc } +} diff --git a/benchmark/src/crossbeam_channel_bench.rs b/benchmark/src/crossbeam_channel_bench.rs new file mode 100644 index 0000000..403c20f --- /dev/null +++ b/benchmark/src/crossbeam_channel_bench.rs @@ -0,0 +1,117 @@ +use crossbeam_channel::{bounded, unbounded, Receiver, RecvError, SendError, Sender}; +use std::{future::Future, thread}; + +use crate::bench_utils::{calculate_benchmark_result, BenchError, BenchResult, JoinHandle}; + +pub fn crate_name() -> &'static str { + "crossbeam-channel" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.send(msg) +} + +pub fn recv(rx: &Receiver) -> Result { + rx.recv() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + let (tx, rx) = cap.map_or_else(unbounded::, bounded); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + for _ in 0..receivers_no { + receivers.push(create_receiver(rx.clone())); + } + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&rx) { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} diff --git a/benchmark/src/flume_bench.rs b/benchmark/src/flume_bench.rs new file mode 100644 index 0000000..9988108 --- /dev/null +++ b/benchmark/src/flume_bench.rs @@ -0,0 +1,218 @@ +use flume::{bounded, unbounded, Receiver, RecvError, SendError, Sender}; +use std::{future::Future, thread}; + +use crate::bench_utils::{calculate_benchmark_result, BenchError, BenchResult, JoinHandle}; + +pub fn crate_name() -> &'static str { + "flume" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.send(msg) +} + +pub fn recv(rx: &Receiver) -> Result { + rx.recv() +} + +pub fn send_async( + tx: &Sender, + msg: T, +) -> impl Future>> + '_ { + tx.send_async(msg) +} + +pub fn recv_async(rx: &Receiver) -> impl Future> + '_ { + rx.recv_async() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + let (tx, rx) = cap.map_or_else(unbounded::, bounded); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + for _ in 0..receivers_no { + receivers.push(create_receiver(rx.clone())); + } + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_async_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + for k in 0..n { + match send_async(&tx, k.into()).await { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&rx) { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +fn create_async_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + let mut c = 0; + loop { + match recv_async(&rx).await { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_async_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_async_receiver::, + ) + .await +} + +pub async fn bench_async_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_sync_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_async_receiver::, + ) + .await +} diff --git a/benchmark/src/kanal_bench.rs b/benchmark/src/kanal_bench.rs new file mode 100644 index 0000000..39c8a0a --- /dev/null +++ b/benchmark/src/kanal_bench.rs @@ -0,0 +1,222 @@ +use kanal::{ + bounded, unbounded, AsyncReceiver, AsyncSender, ReceiveError, Receiver, SendError, Sender, +}; +use std::{future::Future, thread}; + +use crate::bench_utils::{calculate_benchmark_result, BenchError, BenchResult, JoinHandle}; + +pub fn crate_name() -> &'static str { + "loole" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.send(msg) +} + +pub fn recv(rx: &Receiver) -> Result { + rx.recv() +} + +pub fn send_async( + tx: &AsyncSender, + msg: T, +) -> impl Future> + '_ { + tx.send(msg) +} + +pub fn recv_async(rx: &AsyncReceiver) -> impl Future> + '_ { + rx.recv() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + let (tx, rx) = cap.map_or_else(unbounded::, bounded); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + for _ in 0..receivers_no { + receivers.push(create_receiver(rx.clone())); + } + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_async_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + let tx = tx.to_async(); + JoinHandle::Async(tokio::spawn(async move { + for k in 0..n { + match send_async(&tx, k.into()).await { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&rx) { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +fn create_async_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + let rx = rx.to_async(); + JoinHandle::Async(tokio::spawn(async move { + let mut c = 0; + loop { + match recv_async(&rx).await { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_async_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_async_receiver::, + ) + .await +} + +pub async fn bench_async_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_sync_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_async_receiver::, + ) + .await +} diff --git a/benchmark/src/loole_bench.rs b/benchmark/src/loole_bench.rs new file mode 100644 index 0000000..b4310f3 --- /dev/null +++ b/benchmark/src/loole_bench.rs @@ -0,0 +1,218 @@ +use loole::{bounded, unbounded, Receiver, RecvError, SendError, Sender}; +use std::{future::Future, thread}; + +use crate::bench_utils::{calculate_benchmark_result, BenchError, BenchResult, JoinHandle}; + +pub fn crate_name() -> &'static str { + "loole" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.send(msg) +} + +pub fn recv(rx: &Receiver) -> Result { + rx.recv() +} + +pub fn send_async( + tx: &Sender, + msg: T, +) -> impl Future>> + '_ { + tx.send_async(msg) +} + +pub fn recv_async(rx: &Receiver) -> impl Future> + '_ { + rx.recv_async() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + let (tx, rx) = cap.map_or_else(unbounded::, bounded); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + for _ in 0..receivers_no { + receivers.push(create_receiver(rx.clone())); + } + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_async_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + for k in 0..n { + match send_async(&tx, k.into()).await { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&rx) { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +fn create_async_receiver(rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + let mut c = 0; + loop { + match recv_async(&rx).await { + Ok(_) => c += 1, + Err(_) => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_async_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_async_receiver::, + ) + .await +} + +pub async fn bench_async_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_sync_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_async_receiver::, + ) + .await +} diff --git a/benchmark/src/main.rs b/benchmark/src/main.rs new file mode 100644 index 0000000..92fffee --- /dev/null +++ b/benchmark/src/main.rs @@ -0,0 +1,50 @@ +#![allow(dead_code, unused_imports, unused_variables)] + +use std::io::prelude::*; +use std::io::prelude::*; +use std::path::Path; +use std::{fs::File, path::PathBuf}; + +use chart_data::{get_bench_data, Item}; + +mod bench_utils; +mod chart_data; +mod message_type; + +mod async_channel_bench; +mod crossbeam_channel_bench; +mod flume_bench; +mod kanal_bench; +mod loole_bench; +mod tokio_bench; + +mod benchmark; + +fn main() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let bench_data = get_bench_data(&rt); + let json_str = serde_json::to_string_pretty(&bench_data).unwrap(); + + let charts_dir = match std::env::current_dir().unwrap().ends_with("charts") { + true => PathBuf::from("."), + false => PathBuf::from("benchmark").join("charts"), + }; + + let result_path = charts_dir.join("benchmark-result.json"); + + let mut file = File::create(result_path).unwrap(); + file.write_all(json_str.as_ref()).unwrap(); + + match std::process::Command::new("npm") + .args(["run", "update"]) + .current_dir(charts_dir) + .status() + { + Ok(_) => println!("Charts updated successfully"), + Err(err) => eprintln!("{}", err), + } +} diff --git a/benchmark/src/message_type.rs b/benchmark/src/message_type.rs new file mode 100644 index 0000000..2890ca2 --- /dev/null +++ b/benchmark/src/message_type.rs @@ -0,0 +1,61 @@ +use std::fmt::{Debug, Display}; + +pub struct Type(usize, T); + +impl From for Type { + #[inline(always)] + fn from(value: usize) -> Self { + Self(value, Default::default()) + } +} + +impl Debug for Type { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("index: {}, len: {}", self.0, self.1.len())) + } +} + +impl Display for Type { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("index: {}, len: {}", self.0, self.1.len())) + } +} + +pub struct FilledArray([u8; SIZE]); + +impl Default for FilledArray { + #[inline(always)] + fn default() -> Self { + Self([0; SIZE]) + } +} + +pub struct FilledVec(Vec); + +impl Default for FilledVec { + #[inline(always)] + fn default() -> Self { + Self(vec![0; SIZE]) + } +} + +pub trait Length { + fn len(&self) -> usize; +} + +impl Length for FilledArray { + #[inline(always)] + fn len(&self) -> usize { + self.0.len() + } +} + +impl Length for FilledVec { + #[inline(always)] + fn len(&self) -> usize { + self.0.len() + } +} + +pub type StackType = Type>; +pub type HeapType = Type>; diff --git a/benchmark/src/tokio_bench.rs b/benchmark/src/tokio_bench.rs new file mode 100644 index 0000000..123fdd8 --- /dev/null +++ b/benchmark/src/tokio_bench.rs @@ -0,0 +1,227 @@ +use std::{future::Future, thread}; +use tokio::sync::mpsc::{ + self, channel, error::SendError, unbounded_channel, Receiver, Sender, UnboundedReceiver, + UnboundedSender, +}; + +use crate::bench_utils::{calculate_benchmark_result, BenchError, BenchResult, JoinHandle}; + +const UNBOUNDED_BUFFER_SIZE_ESTIMATION: usize = 100_000; + +pub fn crate_name() -> &'static str { + "loole" +} + +pub fn send(tx: &Sender, msg: T) -> Result<(), SendError> { + tx.blocking_send(msg) +} + +pub fn recv(rx: &mut Receiver) -> Option { + rx.blocking_recv() +} + +pub fn send_async( + tx: &Sender, + msg: T, +) -> impl Future>> + '_ { + tx.send(msg) +} + +pub fn recv_async(rx: &mut Receiver) -> impl Future> + '_ { + rx.recv() +} + +async fn bench_helper( + senders_no: usize, + receivers_no: usize, + cap: Option, + msg_no: usize, + create_sender: S, + create_receiver: R, +) -> Result +where + T: From + Send + 'static, + S: Fn(Sender, usize) -> JoinHandle, + R: Fn(Receiver) -> JoinHandle, +{ + if receivers_no > 1 { + return Err(BenchError::MpmcNotSupported); + } + if cap == Some(0) { + return Err(BenchError::ZeroCapacityNotSupported); + } + let (tx, rx) = channel(cap.unwrap_or(UNBOUNDED_BUFFER_SIZE_ESTIMATION)); + let senders = create_senders(tx, senders_no, msg_no, create_sender); + let receivers = create_receivers(rx, receivers_no, create_receiver); + Ok(calculate_benchmark_result(senders, receivers).await) +} + +fn create_senders( + tx: Sender, + senders_no: usize, + msg_no: usize, + create_sender: S, +) -> Vec> +where + S: Fn(Sender, usize) -> JoinHandle, +{ + let mut senders = Vec::with_capacity(senders_no); + for i in 0..senders_no { + let n = msg_no / senders_no + if i < msg_no % senders_no { 1 } else { 0 }; + senders.push(create_sender(tx.clone(), n)); + } + senders +} + +fn create_receivers( + rx: Receiver, + receivers_no: usize, + create_receiver: R, +) -> Vec> +where + R: Fn(Receiver) -> JoinHandle, +{ + let mut receivers = Vec::with_capacity(receivers_no); + receivers.push(create_receiver(rx)); + receivers +} + +fn create_sync_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + for k in 0..n { + match send(&tx, k.into()) { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_async_sender(tx: Sender, n: usize) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + for k in 0..n { + match send_async(&tx, k.into()).await { + Ok(_) => (), + Err(_) => println!("error: channel closed at: {}", k), + } + } + n + })) +} + +fn create_sync_receiver(mut rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Sync(thread::spawn(move || { + let mut c = 0; + loop { + match recv(&mut rx) { + Some(_) => c += 1, + None => break c, + } + } + })) +} + +fn create_async_receiver(mut rx: Receiver) -> JoinHandle +where + T: From + Send + 'static, +{ + JoinHandle::Async(tokio::spawn(async move { + let mut c = 0; + loop { + match recv_async(&mut rx).await { + Some(_) => c += 1, + None => break c, + } + } + })) +} + +pub async fn bench_sync_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_async_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_async_receiver::, + ) + .await +} + +pub async fn bench_async_sync( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_async_sender::, + create_sync_receiver::, + ) + .await +} + +pub async fn bench_sync_async( + senders_no: usize, + receivers_no: usize, + buffer_size: Option, + msg_no: usize, +) -> Result +where + T: From + Send + Sync + 'static, +{ + bench_helper( + senders_no, + receivers_no, + buffer_size, + msg_no, + create_sync_sender::, + create_async_receiver::, + ) + .await +} diff --git a/misc/loole-mpmc.png b/misc/loole-mpmc.png deleted file mode 100644 index 5dd934d..0000000 Binary files a/misc/loole-mpmc.png and /dev/null differ diff --git a/misc/loole-mpsc.png b/misc/loole-mpsc.png deleted file mode 100644 index ecccaad..0000000 Binary files a/misc/loole-mpsc.png and /dev/null differ diff --git a/misc/loole-spsc.png b/misc/loole-spsc.png deleted file mode 100644 index c92c606..0000000 Binary files a/misc/loole-spsc.png and /dev/null differ diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..0c3af84 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "loole", + "lockfileVersion": 3, + "requires": true, + "packages": {} +}