diff --git a/lib/lambda_ethereum_consensus/telemetry.ex b/lib/lambda_ethereum_consensus/telemetry.ex index 26c89d79f..79920d62e 100644 --- a/lib/lambda_ethereum_consensus/telemetry.ex +++ b/lib/lambda_ethereum_consensus/telemetry.ex @@ -59,6 +59,15 @@ defmodule LambdaEthereumConsensus.Telemetry do counter("peers.connection.count", tags: [:result]), counter("peers.challenge.count", tags: [:result]), counter("network.request.count", tags: [:result, :type, :reason]), + counter("network.pubsub_peers.count", tags: [:result]), + sum("network.pubsub_topic_active.active", tags: [:topic]), + counter("network.pubsub_topics_graft.count", tags: [:topic]), + counter("network.pubsub_topics_prune.count", tags: [:topic]), + counter("network.pubsub_topics_deliver_message.count", tags: [:topic]), + counter("network.pubsub_topics_duplicate_message.count", tags: [:topic]), + counter("network.pubsub_topics_reject_message.count", tags: [:topic]), + counter("network.pubsub_topics_un_deliverable_message.count", tags: [:topic]), + counter("network.pubsub_topics_validate_message.count", tags: [:topic]), counter("port.message.count", tags: [:function, :direction]), sum("network.request.blocks", tags: [:result, :type, :reason]), diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index d77abd552..32593996d 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -1,8 +1,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @moduledoc """ A GenServer that allows other elixir processes to send and receive commands to/from - the LibP2P server in Go. For now, it only supports subscribing and unsubscribing from - topics. + the LibP2P server in Go. Requests are generated with an ID, which is returned when calling. Those IDs appear in the responses that might be listened to by other processes. @@ -26,6 +25,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do SendResponse, SetHandler, SubscribeToTopic, + Tracer, UnsubscribeFromTopic, ValidateMessage } @@ -310,6 +310,68 @@ defmodule LambdaEthereumConsensus.Libp2pPort do send(pid, {:response, result}) end + defp handle_notification(%Tracer{t: {:add_peer, %{}}}, _state) do + :telemetry.execute([:network, :pubsub_peers], %{}, %{ + result: "add" + }) + end + + defp handle_notification(%Tracer{t: {:remove_peer, %{}}}, _state) do + :telemetry.execute([:network, :pubsub_peers], %{}, %{ + result: "remove" + }) + end + + defp handle_notification(%Tracer{t: {:joined, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topic_active], %{active: 1}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:left, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topic_active], %{active: -1}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:grafted, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_graft], %{}, %{topic: get_topic_name(topic)}) + end + + defp handle_notification(%Tracer{t: {:pruned, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_prune], %{}, %{topic: get_topic_name(topic)}) + end + + defp handle_notification(%Tracer{t: {:deliver_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_deliver_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:duplicate_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_duplicate_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:reject_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_reject_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:un_deliverable_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_un_deliverable_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:validate_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_validate_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + defp parse_args(args) do args |> Keyword.validate!(@default_args) @@ -339,4 +401,11 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {:response, {res, %ResultMessage{message: message}}} -> [res | message] |> List.to_tuple() end end + + defp get_topic_name(topic) do + case topic |> String.split("/") |> Enum.fetch(3) do + {:ok, name} -> name + :error -> topic + end + end end diff --git a/metrics/grafana/provisioning/dashboards/home.json b/metrics/grafana/provisioning/dashboards/home.json index 17eadf027..feda66b71 100644 --- a/metrics/grafana/provisioning/dashboards/home.json +++ b/metrics/grafana/provisioning/dashboards/home.json @@ -67,9 +67,7 @@ "minVizWidth": 75, "orientation": "auto", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -130,9 +128,7 @@ }, "pieType": "pie", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -546,7 +542,7 @@ }, "gridPos": { "h": 6, - "w": 12, + "w": 24, "x": 0, "y": 12 }, @@ -897,7 +893,815 @@ "refId": "D" } ], - "title": "Libp2pPort messages", + "title": "Libp2pPort Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 12, + "y": 12 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "sum(network_pubsub_peers_count{result=\"add\"}) - sum(network_pubsub_peers_count{result=\"remove\"})", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Peers (Gossip)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 12, + "y": 12 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "network_pubsub_topic_active_active", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Topics Activity", + "type": "heatmap" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "network_pubsub_topics_graft_count{} - network_pubsub_topics_prune_count{}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Grafted", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_deliver_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Deliver Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_duplicate_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Duplicate Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_reject_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Reject Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_un_deliverable_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Undeliverable Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_validate_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Validate Messages", "type": "timeseries" } ], diff --git a/native/libp2p_port/internal/proto_helpers/proto_helpers.go b/native/libp2p_port/internal/proto_helpers/proto_helpers.go index 5d63947b6..9fb9b6888 100644 --- a/native/libp2p_port/internal/proto_helpers/proto_helpers.go +++ b/native/libp2p_port/internal/proto_helpers/proto_helpers.go @@ -20,6 +20,72 @@ func ConfigFromInitArgs(initArgs *proto_defs.InitArgs) Config { } } +func AddPeerNotification() proto_defs.Notification { + addPeerNotification := &proto_defs.AddPeerGossip{} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_AddPeer{AddPeer: addPeerNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func RemovePeerNotification() proto_defs.Notification { + removePeerNotification := &proto_defs.RemovePeerGossip{} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RemovePeer{RemovePeer: removePeerNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func JoinNotification(topic string) proto_defs.Notification { + joinNotification := &proto_defs.Join{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Joined{Joined: joinNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func LeaveNofication(topic string) proto_defs.Notification { + leaveNofication := &proto_defs.Leave{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Left{Left: leaveNofication}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func GraftNotification(topic string) proto_defs.Notification { + graftNotification := &proto_defs.Graft{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Grafted{Grafted: graftNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func PruneNotification(topic string) proto_defs.Notification { + pruneNotification := &proto_defs.Prune{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Pruned{Pruned: pruneNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func ValidateMessageNotification(topic string) proto_defs.Notification { + validateMessageNotification := &proto_defs.ValidateMessageGossip{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_ValidateMessage{ValidateMessage: validateMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func DeliverMessageNotification(topic string) proto_defs.Notification { + deliverMessageNotification := &proto_defs.DeliverMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DeliverMessage{DeliverMessage: deliverMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func UndeliverableMessageNotification(topic string) proto_defs.Notification { + unDeliverableMessageNotification := &proto_defs.UnDeliverableMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_UnDeliverableMessage{UnDeliverableMessage: unDeliverableMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func RejectMessageNotification(topic string) proto_defs.Notification { + rejectMessageNotification := &proto_defs.RejectMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RejectMessage{RejectMessage: rejectMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func DuplicateMessageNotification(topic string) proto_defs.Notification { + duplicateMessageNotification := &proto_defs.DuplicateMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DuplicateMessage{DuplicateMessage: duplicateMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + func GossipNotification(topic string, handler, msgId, message []byte) proto_defs.Notification { gossipSubNotification := &proto_defs.GossipSub{Topic: []byte(topic), Handler: handler, MsgId: msgId, Message: message} return proto_defs.Notification{N: &proto_defs.Notification_Gossip{Gossip: gossipSubNotification}} diff --git a/native/libp2p_port/internal/subscriptions/subscriptions.go b/native/libp2p_port/internal/subscriptions/subscriptions.go index 74e526d6c..bdd69de95 100644 --- a/native/libp2p_port/internal/subscriptions/subscriptions.go +++ b/native/libp2p_port/internal/subscriptions/subscriptions.go @@ -14,6 +14,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" ) type subscription struct { @@ -28,6 +29,81 @@ type Subscriber struct { port *port.Port } +type GossipTracer struct { + port *port.Port +} + +func (g GossipTracer) AddPeer(p peer.ID, proto protocol.ID) { + notification := proto_helpers.AddPeerNotification() + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) RemovePeer(p peer.ID) { + notification := proto_helpers.RemovePeerNotification() + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Join(topic string) { + notification := proto_helpers.JoinNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Leave(topic string) { + notification := proto_helpers.LeaveNofication(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Graft(p peer.ID, topic string) { + notification := proto_helpers.GraftNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Prune(p peer.ID, topic string) { + notification := proto_helpers.PruneNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) ValidateMessage(msg *pubsub.Message) { + notification := proto_helpers.ValidateMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) DeliverMessage(msg *pubsub.Message) { + notification := proto_helpers.DeliverMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) UndeliverableMessage(msg *pubsub.Message) { + notification := proto_helpers.UndeliverableMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) RejectMessage(msg *pubsub.Message, reason string) { + notification := proto_helpers.RejectMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) DuplicateMessage(msg *pubsub.Message) { + notification := proto_helpers.DuplicateMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) ThrottlePeer(p peer.ID) { + // no-op +} + +func (g GossipTracer) RecvRPC(rpc *pubsub.RPC) { + // no-op +} + +func (g GossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + // no-op +} + +func (g GossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + // no-op +} + func NewSubscriber(p *port.Port, h host.Host) Subscriber { heartbeat := 700 * time.Millisecond gsubParams := pubsub.DefaultGossipSubParams() @@ -74,6 +150,7 @@ func NewSubscriber(p *port.Port, h host.Host) Subscriber { pubsub.WithPeerOutboundQueueSize(600), pubsub.WithValidateQueueSize(600), pubsub.WithMaxMessageSize(10 * (1 << 20)), // 10 MB + pubsub.WithRawTracer(GossipTracer{port: p}), } gsub, err := pubsub.NewGossipSub(context.Background(), h, options...) diff --git a/proto/libp2p.proto b/proto/libp2p.proto index a1b65f867..936852217 100644 --- a/proto/libp2p.proto +++ b/proto/libp2p.proto @@ -27,6 +27,47 @@ message UnsubscribeFromTopic { string name = 1; } +message AddPeerGossip {} +message RemovePeerGossip {} + +message Join { + // topic that was joined + string topic = 1; +} + +message Leave { + // topic that was abandoned + string topic = 1; +} + +message Graft { + string topic = 1; +} + +message Prune { + string topic = 1; +} + +message ValidateMessageGossip { + string topic = 1; +} + +message DeliverMessage { + string topic = 1; +} + +message UnDeliverableMessage { + string topic = 1; +} + +message RejectMessage { + string topic = 1; +} + +message DuplicateMessage { + string topic = 1; +} + message AddPeer { bytes id = 1; repeated string addrs = 2; @@ -117,11 +158,28 @@ message Result { } } +message Tracer { + oneof t { + Join joined = 1; + Leave left = 2; + Graft grafted = 3; + Prune pruned = 4; + ValidateMessageGossip validate_message = 5; + DeliverMessage deliver_message = 6; + UnDeliverableMessage un_deliverable_message = 7; + RejectMessage reject_message = 8; + DuplicateMessage duplicate_message = 9; + AddPeerGossip add_peer = 10; + RemovePeerGossip remove_peer = 11; + } +} + message Notification { oneof n { GossipSub gossip = 1; Request request = 2; NewPeer new_peer = 3; Result result = 4; + Tracer tracer = 5; } }