diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java index bac89e21ddec..7741164e7803 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/SubscriptionUtil.java @@ -153,42 +153,57 @@ public static Set getOwnerOrFollowers( } private static Set getTaskAssignees( - SubscriptionDestination.SubscriptionType type, ChangeEvent event) { + SubscriptionDestination.SubscriptionCategory category, + SubscriptionDestination.SubscriptionType type, + ChangeEvent event) { Thread thread = AlertsRuleEvaluator.getThread(event); - List assignees = thread.getTask().getAssignees(); Set receiversList = new HashSet<>(); Map teams = new HashMap<>(); Map users = new HashMap<>(); Team tempTeamVar = null; User tempUserVar = null; - if (!nullOrEmpty(assignees)) { - for (EntityReference reference : assignees) { - if (Entity.USER.equals(reference.getType())) { - tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(reference.getType())) { - tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); + + if (category.equals(SubscriptionDestination.SubscriptionCategory.ASSIGNEES)) { + List assignees = thread.getTask().getAssignees(); + if (!nullOrEmpty(assignees)) { + for (EntityReference reference : assignees) { + if (Entity.USER.equals(reference.getType())) { + tempUserVar = Entity.getEntity(USER, reference.getId(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } else if (TEAM.equals(reference.getType())) { + tempTeamVar = Entity.getEntity(TEAM, reference.getId(), "profile", Include.NON_DELETED); + teams.put(tempTeamVar.getId(), tempTeamVar); + } } } - } - for (Post post : thread.getPosts()) { - tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - List mentions = MessageParser.getEntityLinks(post.getMessage()); - for (MessageParser.EntityLink link : mentions) { - if (USER.equals(link.getEntityType())) { - tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); + for (Post post : thread.getPosts()) { + tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + List mentions = MessageParser.getEntityLinks(post.getMessage()); + for (MessageParser.EntityLink link : mentions) { + if (USER.equals(link.getEntityType())) { + tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } else if (TEAM.equals(link.getEntityType())) { + tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + teams.put(tempTeamVar.getId(), tempTeamVar); + } } } } + if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) { + try { + tempUserVar = + Entity.getEntityByName(USER, thread.getCreatedBy(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } catch (Exception ex) { + LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy()); + } + } + // Users receiversList.addAll(getEmailOrWebhookEndpointForUsers(users.values().stream().toList(), type)); @@ -199,7 +214,9 @@ private static Set getTaskAssignees( } public static Set handleConversationNotification( - SubscriptionDestination.SubscriptionType type, ChangeEvent event) { + SubscriptionDestination.SubscriptionCategory category, + SubscriptionDestination.SubscriptionType type, + ChangeEvent event) { Thread thread = AlertsRuleEvaluator.getThread(event); Set receiversList = new HashSet<>(); Map teams = new HashMap<>(); @@ -207,33 +224,43 @@ public static Set handleConversationNotification( Team tempTeamVar = null; User tempUserVar = null; - tempUserVar = - Entity.getEntityByName(USER, thread.getCreatedBy(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - List mentions = MessageParser.getEntityLinks(thread.getMessage()); - for (MessageParser.EntityLink link : mentions) { - if (USER.equals(link.getEntityType())) { - tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "", Include.NON_DELETED); - teams.put(tempTeamVar.getId(), tempTeamVar); - } - } - for (Post post : thread.getPosts()) { - tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); - users.put(tempUserVar.getId(), tempUserVar); - mentions = MessageParser.getEntityLinks(post.getMessage()); + if (category.equals(SubscriptionDestination.SubscriptionCategory.MENTIONS)) { + List mentions = MessageParser.getEntityLinks(thread.getMessage()); for (MessageParser.EntityLink link : mentions) { if (USER.equals(link.getEntityType())) { tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); users.put(tempUserVar.getId(), tempUserVar); } else if (TEAM.equals(link.getEntityType())) { - tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + tempTeamVar = Entity.getEntity(link, "", Include.NON_DELETED); teams.put(tempTeamVar.getId(), tempTeamVar); } } + + for (Post post : thread.getPosts()) { + tempUserVar = Entity.getEntityByName(USER, post.getFrom(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + mentions = MessageParser.getEntityLinks(post.getMessage()); + for (MessageParser.EntityLink link : mentions) { + if (USER.equals(link.getEntityType())) { + tempUserVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } else if (TEAM.equals(link.getEntityType())) { + tempTeamVar = Entity.getEntity(link, "profile", Include.NON_DELETED); + teams.put(tempTeamVar.getId(), tempTeamVar); + } + } + } + } + + if (category.equals(SubscriptionDestination.SubscriptionCategory.OWNERS)) { + try { + tempUserVar = + Entity.getEntityByName(USER, thread.getCreatedBy(), "profile", Include.NON_DELETED); + users.put(tempUserVar.getId(), tempUserVar); + } catch (Exception ex) { + LOG.warn("Thread created by unknown user: {}", thread.getCreatedBy()); + } } // Users @@ -339,8 +366,9 @@ public static Set getTargetsForAlert( if (event.getEntityType().equals(THREAD)) { Thread thread = AlertsRuleEvaluator.getThread(event); switch (thread.getType()) { - case Task -> receiverUrls.addAll(getTaskAssignees(type, event)); - case Conversation -> receiverUrls.addAll(handleConversationNotification(type, event)); + case Task -> receiverUrls.addAll(getTaskAssignees(category, type, event)); + case Conversation -> receiverUrls.addAll( + handleConversationNotification(category, type, event)); // TODO: For Announcement, Immediate Consumer needs to be Notified (find information from // Lineage) }