From 785d96577db3bc753454e6bf2c37492a1a65e656 Mon Sep 17 00:00:00 2001 From: Chris Sandison Date: Fri, 8 Mar 2024 17:08:00 -0500 Subject: [PATCH] Porting over code to plugin model --- core/trino-server/src/main/provisio/trino.xml | 6 + plugin/trino-rabbitmq-event-listener/pom.xml | 137 +++++++++ .../rabbitmq/ConnectionException.java | 7 + .../trino/eventlistener/rabbitmq/Payload.java | 16 ++ .../rabbitmq/PublicationException.java | 7 + .../rabbitmq/RabbitmqClient.java | 86 ++++++ .../rabbitmq/RabbitmqEventListener.java | 111 ++++++++ .../rabbitmq/RabbitmqEventListenerConfig.java | 261 ++++++++++++++++++ .../RabbitmqEventListenerFactory.java | 19 ++ .../rabbitmq/RabbitmqEventListenerPlugin.java | 13 + pom.xml | 1 + 11 files changed, 664 insertions(+) create mode 100644 plugin/trino-rabbitmq-event-listener/pom.xml create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java create mode 100644 plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 7a2e0a6fe8f25..c65caf8548ba7 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -260,6 +260,12 @@ + + + + + + diff --git a/plugin/trino-rabbitmq-event-listener/pom.xml b/plugin/trino-rabbitmq-event-listener/pom.xml new file mode 100644 index 0000000000000..3bfbf7c1a8fc1 --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/pom.xml @@ -0,0 +1,137 @@ + + + 4.0.0 + + + io.trino + trino-root + 440-SNAPSHOT + ../../pom.xml + + + trino-mysql-event-listener + trino-plugin + Trino - RabbitMQ event listener + + + ${project.parent.basedir} + + + + + com.google.guava + guava + + + + com.google.inject + guice + + + + io.airlift + bootstrap + + + + io.airlift + configuration + + + + io.airlift + json + + + + io.airlift + log + + + + jakarta.annotation + jakarta.annotation-api + + + + jakarta.validation + jakarta.validation-api + + + + org.jdbi + jdbi3-core + + + + org.jdbi + jdbi3-sqlobject + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + org.openjdk.jol + jol-core + provided + + + + com.rabbitmq + amqp-client + 5.20.0 + + + + io.airlift + junit-extensions + test + + + + org.assertj + assertj-core + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.testcontainers + mysql + test + + + diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java new file mode 100644 index 0000000000000..0f707c1b2ff7e --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/ConnectionException.java @@ -0,0 +1,7 @@ +package io.trino.eventlistener.rabbitmq; + +public class ConnectionException extends RuntimeException { + public ConnectionException(String msg) { + super(msg); + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java new file mode 100644 index 0000000000000..dc0a32ec732c0 --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/Payload.java @@ -0,0 +1,16 @@ +package io.trino.eventlistener.rabbitmq; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; + +import java.util.HashMap; +import java.util.Map; + +public class Payload { + private final Map payload = new HashMap<>(); + + @JsonAnyGetter + public Map getPayload() { + return payload; + } +} + diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java new file mode 100644 index 0000000000000..5cac0806f9cca --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/PublicationException.java @@ -0,0 +1,7 @@ +package io.trino.eventlistener.rabbitmq; + +public class PublicationException extends RuntimeException { + public PublicationException(String msg) { + super(msg); + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java new file mode 100644 index 0000000000000..ee1e3732d3464 --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqClient.java @@ -0,0 +1,86 @@ +package io.trino.eventlistener.rabbitmq; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConnectionFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +public class RabbitmqClient { + private Channel channel; + private String uri; + private String exchangeName; + private String exchangeType; + private boolean durable; + + private boolean suppressConnectionErrors; + + public RabbitmqClient(String url, String exchangeName, String exchangeType, boolean durable, boolean suppressConnectionErrors) { + this.exchangeName = exchangeName; + this.uri = url; + this.exchangeType = exchangeType; + this.durable = durable; + this.suppressConnectionErrors = suppressConnectionErrors; + + System.out.println("Creating Rabbitmq connection, with suppress-connection-errors: " + suppressConnectionErrors); + try { + this.establishConnection(); + } catch(TimeoutException | IOException e) { + if (this.suppressConnectionErrors) { + System.err.println("Received error when creating rabbitmq connection "+ e.getClass() + ": " + e.getMessage()); + System.err.println("Connection re-attempt will be made at time of publication"); + } else { + throw new ConnectionException(e.getMessage()); + } + } + } + + private static ConnectionFactory getConnectionFactory(String uri) { + try { + ConnectionFactory factory = new ConnectionFactory(); + factory.setUri(uri); + return factory; + } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) { + throw new ConnectionException(e.getMessage()); + } + } + + private void establishConnection() throws IOException, TimeoutException { + this.channel = getConnectionFactory(uri).newConnection().createChannel(); + this.channel.exchangeDeclare(exchangeName, exchangeType, durable); + } + + public void Publish(Set queues, byte[] message) { + synchronized(this) { + // Ensure that we have a channel and it's open + if (this.channel == null || !this.channel.isOpen()) { + try { + this.establishConnection(); + } catch (IOException | TimeoutException e) { + if(this.suppressConnectionErrors) { + System.err.println("Attempted to create channel for publication but got error " + e.getClass() + ": " + e.getMessage()); + System.err.println("Message will be discarded, and another attempt will be made at publication time"); + } else { + throw new ConnectionException(e.getMessage()); + } + } + } + + // If we have an open channel, publish + for (String queueName: queues) { + try { + this.channel.basicPublish(this.exchangeName, queueName, null, message); + } catch(IOException e) { + if(this.suppressConnectionErrors) { + System.err.println("Attempted to publish message but got error " + e.getClass() + ": " + e.getMessage()); + } else { + throw new PublicationException(e.getMessage()); + } + } + } + } + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java new file mode 100644 index 0000000000000..c7ce96d8950df --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListener.java @@ -0,0 +1,111 @@ +package io.trino.eventlistener.rabbitmq; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JSR310Module; +import io.trino.eventlistener.rabbitmq.PublicationException; +import io.trino.eventlistener.rabbitmq.RabbitmqClient; +import io.trino.eventlistener.rabbitmq.Payload; +import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryCreatedEvent; +import io.trino.spi.eventlistener.SplitCompletedEvent; + +import java.util.List; +import java.util.Map; + + +public class RabbitmqEventListener implements EventListener { + private final RabbitmqEventListenerConfig config; + private final RabbitmqClient client; + + public RabbitmqEventListener(RabbitmqEventListenerConfig config) { + this.config = config; + this.client = new RabbitmqClient( + config.getUrl(), + config.getExchangeName(), + config.getExchangeType(), + config.isDurableExchange(), + config.shouldSuppressConnectionErrors() + ); + } + + @Override + public void queryCompleted(final QueryCompletedEvent queryCompletedEvent) { + if (config.shouldPublishQueryCompleted()) { + try { + client.Publish(this.config.getQueryCompletedQueues(), serializePayload(queryCompletedEvent)); + } catch (Exception e) { + // Print error and continue so that Trino behaviour is not interrupted + System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage()); + } + } + } + + @Override + public void queryCreated(final QueryCreatedEvent queryCreatedEvent) { + if (config.shouldPublishQueryCreated()) { + try { + client.Publish(this.config.getQueryCreatedQueues(), serializePayload(queryCreatedEvent)); + } catch (Exception e) { + // Print error and continue so that Trino behaviour is not interrupted + System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage()); + } + } + } + + @Override + public void splitCompleted(final SplitCompletedEvent splitCompletedEvent) { + if (config.shouldPublishSplitCompleted()) { + try { + client.Publish(this.config.getSplitCompletedQueues(), serializePayload(splitCompletedEvent)); + } catch (Exception e) { + // Print error and continue so that Trino behaviour is not interrupted + System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage()); + } + } + } + + private byte[] serializePayload(Object val) throws PublicationException { + try { + ObjectMapper mapper = new ObjectMapper() + .registerModule(new JSR310Module()) + .registerModule(new Jdk8Module()); + + if(config.getPayloadParentKeys().size() < 1) { + return mapper.writeValueAsBytes(val); + } else { + return mapper.writeValueAsBytes( + constructPayload(config.getPayloadParentKeys(), config.getCustomProperties(), val) + ); + } + } catch (JsonProcessingException e) { + throw new PublicationException("Got JSON processing error payload: " + e.getMessage()); + } + } + + static Payload constructPayload(List allKeys, Map customProps, Object val) { + Payload rootpayload = new Payload(); + Payload lastchild = rootpayload; + + for(int i = 0; i < allKeys.size(); i++) { + if (i == allKeys.size() - 1) { + // If we are on the last element, append the input payload + lastchild.getPayload().put(allKeys.get(i), val); + // And add our custom properties + for(Map.Entry prop : customProps.entrySet()) { + lastchild.getPayload().put(prop.getKey(), prop.getValue()); + } + } else { + // Otherwise create a new payload and move the parent chain + Payload newPayload = new Payload(); + lastchild.getPayload().put(allKeys.get(i), newPayload); + lastchild = newPayload; + } + } + + return rootpayload; + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java new file mode 100644 index 0000000000000..6988cf9f3b4d2 --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerConfig.java @@ -0,0 +1,261 @@ +package io.trino.eventlistener.rabbitmq; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +public class RabbitmqEventListenerConfig { + public String getUrl() { + return url; + } + + public String getExchangeName() { + return exchangeName; + } + + public String getExchangeType() { + return exchangeType; + } + + public boolean isDurableExchange() { + return durableExchange; + } + + public boolean shouldSuppressConnectionErrors() { + return suppressConnectionErrors; + } + + public Set getQueryCreatedQueues() { + return queryCreatedQueues; + } + + public Set getQueryCompletedQueues() { + return queryCompletedQueues; + } + + public Set getSplitCompletedQueues() { + return splitCompletedQueues; + } + + public List getPayloadParentKeys() { + return this.payloadParentKeys; + } + + public Map getCustomProperties() { + return this.customProperties; + } + + private String url; + private String exchangeName; + private String exchangeType; + private boolean durableExchange; + private boolean suppressConnectionErrors; + private Set queryCreatedQueues; + private Set queryCompletedQueues; + private Set splitCompletedQueues; + private List payloadParentKeys; + private Map customProperties; + private boolean publishQueryCreated; + private boolean publishQueryCompleted; + private boolean publishSplitCompleted; + + private static final String SERVER_URL = "server-url"; + private static final String SUPPRESS_CONNECTION_ERRORS = "suppress-connection-errors"; + private static final String EXCHANGE_NAME = "exchange-name"; + private static final String EXCHANGE_TYPE = "exchange-type"; + private static final String DURABLE_EXCHANGE = "durable-exchange"; + + private static final String PUBLISH_QUERY_CREATED = "publish-query-created"; + private static final String QUERY_CREATED_QUEUES = "query-created-queues"; + + private static final String PUBLISH_QUERY_COMPLETED = "publish-query-completed"; + private static final String QUERY_COMPLETED_QUEUES = "query-completed-queues"; + + private static final String PUBLISH_SPLIT_COMPLETED = "publish-split-completed"; + private static final String SPLIT_COMPLETED_QUEUES = "split-completed-queues"; + + private static final String PAYLOAD_PARENT_KEYS = "payload-parent-keys"; + private static final String CUSTOM_PROPERTIES_PATTERN = "x-custom-"; + + public static class Builder { + // required params + private String url; + private String exchangeName; + private boolean suppressConnectionErrors; + + // defaulted params + private String exchangeType; + private boolean durableExchange; + private boolean publishQueryCreated; + private String queryCreatedQueues; + private boolean publishQueryCompleted; + private String queryCompletedQueues; + private boolean publishSplitCompleted; + private String splitCompletedQueues; + private Map customProperties; + private String payloadParentKeys; + + + + public Builder(String url, String exchangeName, String exchangeType) { + // Assign values + this.url = url; + this.exchangeName = exchangeName; + this.exchangeType = exchangeType; + + // Assign defaults + this.durableExchange = false; + this.suppressConnectionErrors = false; + this.publishQueryCreated = false; + this.publishQueryCompleted = false; + this.publishSplitCompleted = false; + this.customProperties = new HashMap<>(); + } + + public Builder setDurableExchange(boolean durableExchange) { + this.durableExchange = durableExchange; + return this; + } + + public Builder setSuppressConnectionErrors(boolean suppressConnectionErrors) { + this.suppressConnectionErrors = suppressConnectionErrors; + return this; + } + + public Builder setPublishQueryCreated(boolean publishQueryCreated, String queryCreatedQueues) { + this.publishQueryCreated = publishQueryCreated; + this.queryCreatedQueues = queryCreatedQueues; + return this; + } + + public Builder setPublishQueryCompleted(boolean publishQueryCompleted, String queryCompletedQueues) { + this.publishQueryCompleted = publishQueryCompleted; + this.queryCompletedQueues = queryCompletedQueues; + return this; + } + + public Builder setPublishSplitCompleted(boolean publishSplitCompleted, String splitCompletedQueues) { + this.publishSplitCompleted = publishSplitCompleted; + this.splitCompletedQueues = splitCompletedQueues; + return this; + } + + public Builder setPayloadParentKeys(String payloadParentKeys) { + this.payloadParentKeys = payloadParentKeys; + return this; + } + + public Builder addCustomProperty(String key, String value) { + this.customProperties.put(key, value); + return this; + } + + public RabbitmqEventListenerConfig Build() throws IllegalArgumentException { + // Split queue names and enforce argument exception based on boolean setting + Set queryCreatedQueueNames = Arrays.stream(this.queryCreatedQueues.split(",")).map(String::strip).collect(Collectors.toSet()); + if (this.publishQueryCreated && queryCreatedQueueNames.size() < 1) { + throw new IllegalArgumentException("At least one queue name must be supplied for " + QUERY_CREATED_QUEUES); + } + + Set queryCompletedQueueNames = Arrays.stream(this.queryCompletedQueues.split(",")).map(String::strip).collect(Collectors.toSet()); + if (this.publishQueryCompleted && queryCompletedQueueNames.size() < 1) { + throw new IllegalArgumentException("At least one queue name must be supplied for " + QUERY_COMPLETED_QUEUES); + } + + Set splitCompletedQueueNames = Arrays.stream(this.splitCompletedQueues.split(",")).map(String::strip).collect(Collectors.toSet()); + if (this.publishSplitCompleted && splitCompletedQueueNames.size() < 1) { + throw new IllegalArgumentException("At least one queue name must be supplied for " + SPLIT_COMPLETED_QUEUES); + } + + return new RabbitmqEventListenerConfig( + this.url, this.exchangeName, this.exchangeType, queryCreatedQueueNames, queryCompletedQueueNames, splitCompletedQueueNames, + Arrays.stream(this.payloadParentKeys.split("\\.")).toList(), this.customProperties, + this.durableExchange,this.suppressConnectionErrors, + this.publishQueryCreated, this.publishQueryCompleted, this.publishSplitCompleted + ); + } + } + + private RabbitmqEventListenerConfig( + String url, + String exchangeName, + String exchangeType, + Set queryCreatedQueueNames, + Set queryCompletedQueueNames, + Set splitCompletedQueueNames, + List payloadParentKeys, + Map customProperties, + boolean durableExchange, + boolean suppressConnectionErrors, + boolean publishQueryCreated, + boolean publishQueryCompleted, + boolean publishSplitCompleted + ) { + this.url = url; + this.exchangeName = exchangeName; + this.exchangeType = exchangeType; + this.queryCreatedQueues = queryCreatedQueueNames; + this.queryCompletedQueues = queryCompletedQueueNames; + this.splitCompletedQueues = splitCompletedQueueNames; + this.payloadParentKeys = payloadParentKeys; + this.customProperties = customProperties; + this.durableExchange = durableExchange; + this.suppressConnectionErrors = suppressConnectionErrors; + this.publishQueryCreated = publishQueryCreated; + this.publishQueryCompleted = publishQueryCompleted; + this.publishSplitCompleted = publishSplitCompleted; + } + + public static RabbitmqEventListenerConfig create(Map config) { + // Extract and create builder + RabbitmqEventListenerConfig.Builder builder = new Builder( + config.getOrDefault(SERVER_URL, ""), + config.getOrDefault(EXCHANGE_NAME, ""), + config.getOrDefault(EXCHANGE_TYPE, "") + ).setDurableExchange( + parseBoolFromConfigValue(config.get(DURABLE_EXCHANGE), false) + ).setSuppressConnectionErrors( + parseBoolFromConfigValue(config.get(SUPPRESS_CONNECTION_ERRORS), false) + ).setPublishQueryCreated( + parseBoolFromConfigValue(config.get(PUBLISH_QUERY_CREATED), false), + config.getOrDefault(QUERY_CREATED_QUEUES, "") + ).setPublishQueryCompleted( + parseBoolFromConfigValue(config.get(PUBLISH_QUERY_COMPLETED), false), + config.getOrDefault(QUERY_COMPLETED_QUEUES, "") + ).setPublishSplitCompleted( + parseBoolFromConfigValue(config.get(PUBLISH_SPLIT_COMPLETED), false), + config.getOrDefault(SPLIT_COMPLETED_QUEUES, "") + ).setPayloadParentKeys( + config.getOrDefault(PAYLOAD_PARENT_KEYS, "") + ); + + // See if config has any keys that match the custom pattern + config.entrySet().stream().forEach(configEntry -> { + String key = configEntry.getKey(); + if (key.startsWith(CUSTOM_PROPERTIES_PATTERN)) { + builder.addCustomProperty(key, config.get(key)); + } + }); + + return builder.Build(); + } + + private static boolean parseBoolFromConfigValue(String value, boolean defaultValue) { + return Optional.ofNullable(value).map(Boolean::parseBoolean).orElse(defaultValue); + } + + public boolean shouldPublishQueryCreated() { + return publishQueryCreated; + } + + public boolean shouldPublishQueryCompleted() { + return publishQueryCompleted; + } + + public boolean shouldPublishSplitCompleted() { + return publishSplitCompleted; + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java new file mode 100644 index 0000000000000..d2a000be7581b --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerFactory.java @@ -0,0 +1,19 @@ +package io.trino.eventlistener.rabbitmq; + +import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.EventListenerFactory; + +import java.util.Map; + +public class RabbitmqEventListenerFactory implements EventListenerFactory { + @Override + public String getName() { + return "rabbitmq"; + } + + @Override + public EventListener create(Map config) { + var listenerConfig = RabbitmqEventListenerConfig.create(config); + return new RabbitmqEventListener(listenerConfig); + } +} diff --git a/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java new file mode 100644 index 0000000000000..cefe32532e425 --- /dev/null +++ b/plugin/trino-rabbitmq-event-listener/src/main/java/io/trino/eventlistener/rabbitmq/RabbitmqEventListenerPlugin.java @@ -0,0 +1,13 @@ +package io.trino.eventlistener.rabbitmq; + +import io.trino.spi.Plugin; +import io.trino.spi.eventlistener.EventListenerFactory; + +import java.util.Collections; + +public class RabbitmqEventListenerPlugin implements Plugin { + @Override + public Iterable getEventListenerFactories() { + return Collections.singletonList(new RabbitmqEventListenerFactory()); + } +} diff --git a/pom.xml b/pom.xml index acec7249ec09b..686fed501ba95 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ plugin/trino-pinot plugin/trino-postgresql plugin/trino-prometheus + plugin/trino-rabbitmq-event-listener plugin/trino-raptor-legacy plugin/trino-redis plugin/trino-redshift