Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented RabbitMQ Event Listener #1

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/trino-server/src/main/provisio/trino.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@
</artifact>
</artifactSet>

<artifactSet to="plugin/rabbitmq-event-listener">
<artifact id="${project.groupId}:trino-rabbitmq-event-listener:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>

<artifactSet to="plugin/raptor-legacy">
<artifact id="${project.groupId}:trino-raptor-legacy:zip:${project.version}">
<unpack />
Expand Down
137 changes: 137 additions & 0 deletions plugin/trino-rabbitmq-event-listener/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>440-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>trino-mysql-event-listener</artifactId>
<packaging>trino-plugin</packaging>
<description>Trino - RabbitMQ event listener</description>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-core</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-sqlobject</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>junit-extensions</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.trino.eventlistener.rabbitmq;

public class ConnectionException extends RuntimeException {
public ConnectionException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.trino.eventlistener.rabbitmq;

import com.fasterxml.jackson.annotation.JsonAnyGetter;

import java.util.HashMap;
import java.util.Map;

public class Payload {
private final Map<String, Object> payload = new HashMap<>();

@JsonAnyGetter
public Map<String, Object> getPayload() {
return payload;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.trino.eventlistener.rabbitmq;

public class PublicationException extends RuntimeException {
public PublicationException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.trino.eventlistener.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Set;
import java.util.concurrent.TimeoutException;

public class RabbitmqClient {
private Channel channel;
private String uri;
private String exchangeName;
private String exchangeType;
private boolean durable;

private boolean suppressConnectionErrors;

public RabbitmqClient(String url, String exchangeName, String exchangeType, boolean durable, boolean suppressConnectionErrors) {
this.exchangeName = exchangeName;
this.uri = url;
this.exchangeType = exchangeType;
this.durable = durable;
this.suppressConnectionErrors = suppressConnectionErrors;

System.out.println("Creating Rabbitmq connection, with suppress-connection-errors: " + suppressConnectionErrors);
try {
this.establishConnection();
} catch(TimeoutException | IOException e) {
if (this.suppressConnectionErrors) {
System.err.println("Received error when creating rabbitmq connection "+ e.getClass() + ": " + e.getMessage());
System.err.println("Connection re-attempt will be made at time of publication");
} else {
throw new ConnectionException(e.getMessage());
}
}
}

private static ConnectionFactory getConnectionFactory(String uri) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
return factory;
} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
throw new ConnectionException(e.getMessage());
}
}

private void establishConnection() throws IOException, TimeoutException {
this.channel = getConnectionFactory(uri).newConnection().createChannel();
this.channel.exchangeDeclare(exchangeName, exchangeType, durable);
}

public void Publish(Set<String> queues, byte[] message) {
synchronized(this) {
// Ensure that we have a channel and it's open
if (this.channel == null || !this.channel.isOpen()) {
try {
this.establishConnection();
} catch (IOException | TimeoutException e) {
if(this.suppressConnectionErrors) {
System.err.println("Attempted to create channel for publication but got error " + e.getClass() + ": " + e.getMessage());
System.err.println("Message will be discarded, and another attempt will be made at publication time");
} else {
throw new ConnectionException(e.getMessage());
}
}
}

// If we have an open channel, publish
for (String queueName: queues) {
try {
this.channel.basicPublish(this.exchangeName, queueName, null, message);
} catch(IOException e) {
if(this.suppressConnectionErrors) {
System.err.println("Attempted to publish message but got error " + e.getClass() + ": " + e.getMessage());
} else {
throw new PublicationException(e.getMessage());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.trino.eventlistener.rabbitmq;


import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JSR310Module;
import io.trino.eventlistener.rabbitmq.PublicationException;
import io.trino.eventlistener.rabbitmq.RabbitmqClient;
import io.trino.eventlistener.rabbitmq.Payload;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.SplitCompletedEvent;

import java.util.List;
import java.util.Map;


public class RabbitmqEventListener implements EventListener {
private final RabbitmqEventListenerConfig config;
private final RabbitmqClient client;

public RabbitmqEventListener(RabbitmqEventListenerConfig config) {
this.config = config;
this.client = new RabbitmqClient(
config.getUrl(),
config.getExchangeName(),
config.getExchangeType(),
config.isDurableExchange(),
config.shouldSuppressConnectionErrors()
);
}

@Override
public void queryCompleted(final QueryCompletedEvent queryCompletedEvent) {
if (config.shouldPublishQueryCompleted()) {
try {
client.Publish(this.config.getQueryCompletedQueues(), serializePayload(queryCompletedEvent));
} catch (Exception e) {
// Print error and continue so that Trino behaviour is not interrupted
System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
}
}
}

@Override
public void queryCreated(final QueryCreatedEvent queryCreatedEvent) {
if (config.shouldPublishQueryCreated()) {
try {
client.Publish(this.config.getQueryCreatedQueues(), serializePayload(queryCreatedEvent));
} catch (Exception e) {
// Print error and continue so that Trino behaviour is not interrupted
System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
}
}
}

@Override
public void splitCompleted(final SplitCompletedEvent splitCompletedEvent) {
if (config.shouldPublishSplitCompleted()) {
try {
client.Publish(this.config.getSplitCompletedQueues(), serializePayload(splitCompletedEvent));
} catch (Exception e) {
// Print error and continue so that Trino behaviour is not interrupted
System.err.println("Attempted to publish message but got " + e.getClass() + ": " + e.getMessage());
}
}
}

private byte[] serializePayload(Object val) throws PublicationException {
try {
ObjectMapper mapper = new ObjectMapper()
.registerModule(new JSR310Module())
.registerModule(new Jdk8Module());

if(config.getPayloadParentKeys().size() < 1) {
return mapper.writeValueAsBytes(val);
} else {
return mapper.writeValueAsBytes(
constructPayload(config.getPayloadParentKeys(), config.getCustomProperties(), val)
);
}
} catch (JsonProcessingException e) {
throw new PublicationException("Got JSON processing error payload: " + e.getMessage());
}
}

static Payload constructPayload(List<String> allKeys, Map<String, String> customProps, Object val) {
Payload rootpayload = new Payload();
Payload lastchild = rootpayload;

for(int i = 0; i < allKeys.size(); i++) {
if (i == allKeys.size() - 1) {
// If we are on the last element, append the input payload
lastchild.getPayload().put(allKeys.get(i), val);
// And add our custom properties
for(Map.Entry<String, String> prop : customProps.entrySet()) {
lastchild.getPayload().put(prop.getKey(), prop.getValue());
}
} else {
// Otherwise create a new payload and move the parent chain
Payload newPayload = new Payload();
lastchild.getPayload().put(allKeys.get(i), newPayload);
lastchild = newPayload;
}
}

return rootpayload;
}
}
Loading