diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 7a2e0a6fe8f25..c65caf8548ba7 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -260,6 +260,12 @@
+
+
+
+
+
+
diff --git a/plugin/trino-rabbitmq-event-listener/pom.xml b/plugin/trino-rabbitmq-event-listener/pom.xml
new file mode 100644
index 0000000000000..3bfbf7c1a8fc1
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/pom.xml
@@ -0,0 +1,137 @@
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 440-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-mysql-event-listener
+ trino-plugin
+ Trino - RabbitMQ event listener
+
+
+ ${project.parent.basedir}
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ org.jdbi
+ jdbi3-core
+
+
+
+ org.jdbi
+ jdbi3-sqlobject
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+ com.rabbitmq
+ amqp-client
+ 5.20.0
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.testcontainers
+ mysql
+ test
+
+
+
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java
new file mode 100644
index 0000000000000..0f707c1b2ff7e
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java
@@ -0,0 +1,7 @@
+package io.trino.eventlistener.rabbitmq;
+
+public class ConnectionException extends RuntimeException {
+ public ConnectionException(String msg) {
+ super(msg);
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java
new file mode 100644
index 0000000000000..dc0a32ec732c0
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java
@@ -0,0 +1,16 @@
+package io.trino.eventlistener.rabbitmq;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Payload {
+ private final Map payload = new HashMap<>();
+
+ @JsonAnyGetter
+ public Map getPayload() {
+ return payload;
+ }
+}
+
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java
new file mode 100644
index 0000000000000..5cac0806f9cca
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java
@@ -0,0 +1,7 @@
+package io.trino.eventlistener.rabbitmq;
+
+public class PublicationException extends RuntimeException {
+ public PublicationException(String msg) {
+ super(msg);
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java
new file mode 100644
index 0000000000000..ee1e3732d3464
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java
@@ -0,0 +1,86 @@
+package io.trino.eventlistener.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ConnectionFactory;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+public class RabbitmqClient {
+ private Channel channel;
+ private String uri;
+ private String exchangeName;
+ private String exchangeType;
+ private boolean durable;
+
+ private boolean suppressConnectionErrors;
+
+ public RabbitmqClient(String url, String exchangeName, String exchangeType, boolean durable, boolean suppressConnectionErrors) {
+ this.exchangeName = exchangeName;
+ this.uri = url;
+ this.exchangeType = exchangeType;
+ this.durable = durable;
+ this.suppressConnectionErrors = suppressConnectionErrors;
+
+ System.out.println("Creating Rabbitmq connection, with suppress-connection-errors: " + suppressConnectionErrors);
+ try {
+ this.establishConnection();
+ } catch(TimeoutException | IOException e) {
+ if (this.suppressConnectionErrors) {
+ System.err.println("Received error when creating rabbitmq connection "+ e.getClass() + ": " + e.getMessage());
+ System.err.println("Connection re-attempt will be made at time of publication");
+ } else {
+ throw new ConnectionException(e.getMessage());
+ }
+ }
+ }
+
+ private static ConnectionFactory getConnectionFactory(String uri) {
+ try {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setUri(uri);
+ return factory;
+ } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
+ throw new ConnectionException(e.getMessage());
+ }
+ }
+
+ private void establishConnection() throws IOException, TimeoutException {
+ this.channel = getConnectionFactory(uri).newConnection().createChannel();
+ this.channel.exchangeDeclare(exchangeName, exchangeType, durable);
+ }
+
+ public void Publish(Set queues, byte[] message) {
+ synchronized(this) {
+ // Ensure that we have a channel and it's open
+ if (this.channel == null || !this.channel.isOpen()) {
+ try {
+ this.establishConnection();
+ } catch (IOException | TimeoutException e) {
+ if(this.suppressConnectionErrors) {
+ System.err.println("Attempted to create channel for publication but got error " + e.getClass() + ": " + e.getMessage());
+ System.err.println("Message will be discarded, and another attempt will be made at publication time");
+ } else {
+ throw new ConnectionException(e.getMessage());
+ }
+ }
+ }
+
+ // If we have an open channel, publish
+ for (String queueName: queues) {
+ try {
+ this.channel.basicPublish(this.exchangeName, queueName, null, message);
+ } catch(IOException e) {
+ if(this.suppressConnectionErrors) {
+ System.err.println("Attempted to publish message but got error " + e.getClass() + ": " + e.getMessage());
+ } else {
+ throw new PublicationException(e.getMessage());
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java
new file mode 100644
index 0000000000000..c7ce96d8950df
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java
@@ -0,0 +1,111 @@
+package io.trino.eventlistener.rabbitmq;
+
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JSR310Module;
+import io.trino.eventlistener.rabbitmq.PublicationException;
+import io.trino.eventlistener.rabbitmq.RabbitmqClient;
+import io.trino.eventlistener.rabbitmq.Payload;
+import io.trino.spi.eventlistener.EventListener;
+import io.trino.spi.eventlistener.QueryCompletedEvent;
+import io.trino.spi.eventlistener.QueryCreatedEvent;
+import io.trino.spi.eventlistener.SplitCompletedEvent;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class RabbitmqEventListener implements EventListener {
+ private final RabbitmqEventListenerConfig config;
+ private final RabbitmqClient client;
+
+ public RabbitmqEventListener(RabbitmqEventListenerConfig config) {
+ this.config = config;
+ this.client = new RabbitmqClient(
+ config.getUrl(),
+ config.getExchangeName(),
+ config.getExchangeType(),
+ config.isDurableExchange(),
+ config.shouldSuppressConnectionErrors()
+ );
+ }
+
+ @Override
+ public void queryCompleted(final QueryCompletedEvent queryCompletedEvent) {
+ if (config.shouldPublishQueryCompleted()) {
+ try {
+ client.Publish(this.config.getQueryCompletedQueues(), serializePayload(queryCompletedEvent));
+ } catch (Exception e) {
+ // Print error and continue so that Trino behaviour is not interrupted
+ System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void queryCreated(final QueryCreatedEvent queryCreatedEvent) {
+ if (config.shouldPublishQueryCreated()) {
+ try {
+ client.Publish(this.config.getQueryCreatedQueues(), serializePayload(queryCreatedEvent));
+ } catch (Exception e) {
+ // Print error and continue so that Trino behaviour is not interrupted
+ System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void splitCompleted(final SplitCompletedEvent splitCompletedEvent) {
+ if (config.shouldPublishSplitCompleted()) {
+ try {
+ client.Publish(this.config.getSplitCompletedQueues(), serializePayload(splitCompletedEvent));
+ } catch (Exception e) {
+ // Print error and continue so that Trino behaviour is not interrupted
+ System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
+ }
+ }
+ }
+
+ private byte[] serializePayload(Object val) throws PublicationException {
+ try {
+ ObjectMapper mapper = new ObjectMapper()
+ .registerModule(new JSR310Module())
+ .registerModule(new Jdk8Module());
+
+ if(config.getPayloadParentKeys().size() < 1) {
+ return mapper.writeValueAsBytes(val);
+ } else {
+ return mapper.writeValueAsBytes(
+ constructPayload(config.getPayloadParentKeys(), config.getCustomProperties(), val)
+ );
+ }
+ } catch (JsonProcessingException e) {
+ throw new PublicationException("Got JSON processing error payload: " + e.getMessage());
+ }
+ }
+
+ static Payload constructPayload(List allKeys, Map customProps, Object val) {
+ Payload rootpayload = new Payload();
+ Payload lastchild = rootpayload;
+
+ for(int i = 0; i < allKeys.size(); i++) {
+ if (i == allKeys.size() - 1) {
+ // If we are on the last element, append the input payload
+ lastchild.getPayload().put(allKeys.get(i), val);
+ // And add our custom properties
+ for(Map.Entry prop : customProps.entrySet()) {
+ lastchild.getPayload().put(prop.getKey(), prop.getValue());
+ }
+ } else {
+ // Otherwise create a new payload and move the parent chain
+ Payload newPayload = new Payload();
+ lastchild.getPayload().put(allKeys.get(i), newPayload);
+ lastchild = newPayload;
+ }
+ }
+
+ return rootpayload;
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java
new file mode 100644
index 0000000000000..6988cf9f3b4d2
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java
@@ -0,0 +1,261 @@
+package io.trino.eventlistener.rabbitmq;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+public class RabbitmqEventListenerConfig {
+ public String getUrl() {
+ return url;
+ }
+
+ public String getExchangeName() {
+ return exchangeName;
+ }
+
+ public String getExchangeType() {
+ return exchangeType;
+ }
+
+ public boolean isDurableExchange() {
+ return durableExchange;
+ }
+
+ public boolean shouldSuppressConnectionErrors() {
+ return suppressConnectionErrors;
+ }
+
+ public Set getQueryCreatedQueues() {
+ return queryCreatedQueues;
+ }
+
+ public Set getQueryCompletedQueues() {
+ return queryCompletedQueues;
+ }
+
+ public Set getSplitCompletedQueues() {
+ return splitCompletedQueues;
+ }
+
+ public List getPayloadParentKeys() {
+ return this.payloadParentKeys;
+ }
+
+ public Map getCustomProperties() {
+ return this.customProperties;
+ }
+
+ private String url;
+ private String exchangeName;
+ private String exchangeType;
+ private boolean durableExchange;
+ private boolean suppressConnectionErrors;
+ private Set queryCreatedQueues;
+ private Set queryCompletedQueues;
+ private Set splitCompletedQueues;
+ private List payloadParentKeys;
+ private Map customProperties;
+ private boolean publishQueryCreated;
+ private boolean publishQueryCompleted;
+ private boolean publishSplitCompleted;
+
+ private static final String SERVER_URL = "server-url";
+ private static final String SUPPRESS_CONNECTION_ERRORS = "suppress-connection-errors";
+ private static final String EXCHANGE_NAME = "exchange-name";
+ private static final String EXCHANGE_TYPE = "exchange-type";
+ private static final String DURABLE_EXCHANGE = "durable-exchange";
+
+ private static final String PUBLISH_QUERY_CREATED = "publish-query-created";
+ private static final String QUERY_CREATED_QUEUES = "query-created-queues";
+
+ private static final String PUBLISH_QUERY_COMPLETED = "publish-query-completed";
+ private static final String QUERY_COMPLETED_QUEUES = "query-completed-queues";
+
+ private static final String PUBLISH_SPLIT_COMPLETED = "publish-split-completed";
+ private static final String SPLIT_COMPLETED_QUEUES = "split-completed-queues";
+
+ private static final String PAYLOAD_PARENT_KEYS = "payload-parent-keys";
+ private static final String CUSTOM_PROPERTIES_PATTERN = "x-custom-";
+
+ public static class Builder {
+ // required params
+ private String url;
+ private String exchangeName;
+ private boolean suppressConnectionErrors;
+
+ // defaulted params
+ private String exchangeType;
+ private boolean durableExchange;
+ private boolean publishQueryCreated;
+ private String queryCreatedQueues;
+ private boolean publishQueryCompleted;
+ private String queryCompletedQueues;
+ private boolean publishSplitCompleted;
+ private String splitCompletedQueues;
+ private Map customProperties;
+ private String payloadParentKeys;
+
+
+
+ public Builder(String url, String exchangeName, String exchangeType) {
+ // Assign values
+ this.url = url;
+ this.exchangeName = exchangeName;
+ this.exchangeType = exchangeType;
+
+ // Assign defaults
+ this.durableExchange = false;
+ this.suppressConnectionErrors = false;
+ this.publishQueryCreated = false;
+ this.publishQueryCompleted = false;
+ this.publishSplitCompleted = false;
+ this.customProperties = new HashMap<>();
+ }
+
+ public Builder setDurableExchange(boolean durableExchange) {
+ this.durableExchange = durableExchange;
+ return this;
+ }
+
+ public Builder setSuppressConnectionErrors(boolean suppressConnectionErrors) {
+ this.suppressConnectionErrors = suppressConnectionErrors;
+ return this;
+ }
+
+ public Builder setPublishQueryCreated(boolean publishQueryCreated, String queryCreatedQueues) {
+ this.publishQueryCreated = publishQueryCreated;
+ this.queryCreatedQueues = queryCreatedQueues;
+ return this;
+ }
+
+ public Builder setPublishQueryCompleted(boolean publishQueryCompleted, String queryCompletedQueues) {
+ this.publishQueryCompleted = publishQueryCompleted;
+ this.queryCompletedQueues = queryCompletedQueues;
+ return this;
+ }
+
+ public Builder setPublishSplitCompleted(boolean publishSplitCompleted, String splitCompletedQueues) {
+ this.publishSplitCompleted = publishSplitCompleted;
+ this.splitCompletedQueues = splitCompletedQueues;
+ return this;
+ }
+
+ public Builder setPayloadParentKeys(String payloadParentKeys) {
+ this.payloadParentKeys = payloadParentKeys;
+ return this;
+ }
+
+ public Builder addCustomProperty(String key, String value) {
+ this.customProperties.put(key, value);
+ return this;
+ }
+
+ public RabbitmqEventListenerConfig Build() throws IllegalArgumentException {
+ // Split queue names and enforce argument exception based on boolean setting
+ Set queryCreatedQueueNames = Arrays.stream(this.queryCreatedQueues.split(",")).map(String::strip).collect(Collectors.toSet());
+ if (this.publishQueryCreated && queryCreatedQueueNames.size() < 1) {
+ throw new IllegalArgumentException("At least one queue name must be supplied for " + QUERY_CREATED_QUEUES);
+ }
+
+ Set queryCompletedQueueNames = Arrays.stream(this.queryCompletedQueues.split(",")).map(String::strip).collect(Collectors.toSet());
+ if (this.publishQueryCompleted && queryCompletedQueueNames.size() < 1) {
+ throw new IllegalArgumentException("At least one queue name must be supplied for " + QUERY_COMPLETED_QUEUES);
+ }
+
+ Set splitCompletedQueueNames = Arrays.stream(this.splitCompletedQueues.split(",")).map(String::strip).collect(Collectors.toSet());
+ if (this.publishSplitCompleted && splitCompletedQueueNames.size() < 1) {
+ throw new IllegalArgumentException("At least one queue name must be supplied for " + SPLIT_COMPLETED_QUEUES);
+ }
+
+ return new RabbitmqEventListenerConfig(
+ this.url, this.exchangeName, this.exchangeType, queryCreatedQueueNames, queryCompletedQueueNames, splitCompletedQueueNames,
+ Arrays.stream(this.payloadParentKeys.split("\\.")).toList(), this.customProperties,
+ this.durableExchange,this.suppressConnectionErrors,
+ this.publishQueryCreated, this.publishQueryCompleted, this.publishSplitCompleted
+ );
+ }
+ }
+
+ private RabbitmqEventListenerConfig(
+ String url,
+ String exchangeName,
+ String exchangeType,
+ Set queryCreatedQueueNames,
+ Set queryCompletedQueueNames,
+ Set splitCompletedQueueNames,
+ List payloadParentKeys,
+ Map customProperties,
+ boolean durableExchange,
+ boolean suppressConnectionErrors,
+ boolean publishQueryCreated,
+ boolean publishQueryCompleted,
+ boolean publishSplitCompleted
+ ) {
+ this.url = url;
+ this.exchangeName = exchangeName;
+ this.exchangeType = exchangeType;
+ this.queryCreatedQueues = queryCreatedQueueNames;
+ this.queryCompletedQueues = queryCompletedQueueNames;
+ this.splitCompletedQueues = splitCompletedQueueNames;
+ this.payloadParentKeys = payloadParentKeys;
+ this.customProperties = customProperties;
+ this.durableExchange = durableExchange;
+ this.suppressConnectionErrors = suppressConnectionErrors;
+ this.publishQueryCreated = publishQueryCreated;
+ this.publishQueryCompleted = publishQueryCompleted;
+ this.publishSplitCompleted = publishSplitCompleted;
+ }
+
+ public static RabbitmqEventListenerConfig create(Map config) {
+ // Extract and create builder
+ RabbitmqEventListenerConfig.Builder builder = new Builder(
+ config.getOrDefault(SERVER_URL, ""),
+ config.getOrDefault(EXCHANGE_NAME, ""),
+ config.getOrDefault(EXCHANGE_TYPE, "")
+ ).setDurableExchange(
+ parseBoolFromConfigValue(config.get(DURABLE_EXCHANGE), false)
+ ).setSuppressConnectionErrors(
+ parseBoolFromConfigValue(config.get(SUPPRESS_CONNECTION_ERRORS), false)
+ ).setPublishQueryCreated(
+ parseBoolFromConfigValue(config.get(PUBLISH_QUERY_CREATED), false),
+ config.getOrDefault(QUERY_CREATED_QUEUES, "")
+ ).setPublishQueryCompleted(
+ parseBoolFromConfigValue(config.get(PUBLISH_QUERY_COMPLETED), false),
+ config.getOrDefault(QUERY_COMPLETED_QUEUES, "")
+ ).setPublishSplitCompleted(
+ parseBoolFromConfigValue(config.get(PUBLISH_SPLIT_COMPLETED), false),
+ config.getOrDefault(SPLIT_COMPLETED_QUEUES, "")
+ ).setPayloadParentKeys(
+ config.getOrDefault(PAYLOAD_PARENT_KEYS, "")
+ );
+
+ // See if config has any keys that match the custom pattern
+ config.entrySet().stream().forEach(configEntry -> {
+ String key = configEntry.getKey();
+ if (key.startsWith(CUSTOM_PROPERTIES_PATTERN)) {
+ builder.addCustomProperty(key, config.get(key));
+ }
+ });
+
+ return builder.Build();
+ }
+
+ private static boolean parseBoolFromConfigValue(String value, boolean defaultValue) {
+ return Optional.ofNullable(value).map(Boolean::parseBoolean).orElse(defaultValue);
+ }
+
+ public boolean shouldPublishQueryCreated() {
+ return publishQueryCreated;
+ }
+
+ public boolean shouldPublishQueryCompleted() {
+ return publishQueryCompleted;
+ }
+
+ public boolean shouldPublishSplitCompleted() {
+ return publishSplitCompleted;
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java
new file mode 100644
index 0000000000000..d2a000be7581b
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java
@@ -0,0 +1,19 @@
+package io.trino.eventlistener.rabbitmq;
+
+import io.trino.spi.eventlistener.EventListener;
+import io.trino.spi.eventlistener.EventListenerFactory;
+
+import java.util.Map;
+
+public class RabbitmqEventListenerFactory implements EventListenerFactory {
+ @Override
+ public String getName() {
+ return "rabbitmq";
+ }
+
+ @Override
+ public EventListener create(Map config) {
+ var listenerConfig = RabbitmqEventListenerConfig.create(config);
+ return new RabbitmqEventListener(listenerConfig);
+ }
+}
diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java
new file mode 100644
index 0000000000000..cefe32532e425
--- /dev/null
+++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java
@@ -0,0 +1,13 @@
+package io.trino.eventlistener.rabbitmq;
+
+import io.trino.spi.Plugin;
+import io.trino.spi.eventlistener.EventListenerFactory;
+
+import java.util.Collections;
+
+public class RabbitmqEventListenerPlugin implements Plugin {
+ @Override
+ public Iterable getEventListenerFactories() {
+ return Collections.singletonList(new RabbitmqEventListenerFactory());
+ }
+}
diff --git a/pom.xml b/pom.xml
index acec7249ec09b..686fed501ba95 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,7 @@
plugin/trino-pinot
plugin/trino-postgresql
plugin/trino-prometheus
+ plugin/trino-rabbitmq-event-listener
plugin/trino-raptor-legacy
plugin/trino-redis
plugin/trino-redshift