diff --git a/open-metadata-implementation/access-services/asset-consumer/asset-consumer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetconsumer/connectors/outtopic/AssetConsumerOutTopicServerConnector.java b/open-metadata-implementation/access-services/asset-consumer/asset-consumer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetconsumer/connectors/outtopic/AssetConsumerOutTopicServerConnector.java index 089abeccc51..66f5d8e601b 100644 --- a/open-metadata-implementation/access-services/asset-consumer/asset-consumer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetconsumer/connectors/outtopic/AssetConsumerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/asset-consumer/asset-consumer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetconsumer/connectors/outtopic/AssetConsumerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * AssetConsumerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(AssetConsumerOutTopicEvent event) throws InvalidParameterE try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(AssetConsumerOutTopicEvent event) throws InvalidParameterE eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/asset-manager/asset-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetmanager/connectors/outtopic/AssetManagerOutTopicServerConnector.java b/open-metadata-implementation/access-services/asset-manager/asset-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetmanager/connectors/outtopic/AssetManagerOutTopicServerConnector.java index 1be2710f4c6..a9332979668 100644 --- a/open-metadata-implementation/access-services/asset-manager/asset-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetmanager/connectors/outtopic/AssetManagerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/asset-manager/asset-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetmanager/connectors/outtopic/AssetManagerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * AssetManagerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(AssetManagerOutTopicEvent event) throws InvalidParameterEx try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(AssetManagerOutTopicEvent event) throws InvalidParameterEx eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/asset-owner/asset-owner-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetowner/connectors/outtopic/AssetOwnerOutTopicServerConnector.java b/open-metadata-implementation/access-services/asset-owner/asset-owner-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetowner/connectors/outtopic/AssetOwnerOutTopicServerConnector.java index 555d986c880..5f064977dc5 100644 --- a/open-metadata-implementation/access-services/asset-owner/asset-owner-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetowner/connectors/outtopic/AssetOwnerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/asset-owner/asset-owner-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/assetowner/connectors/outtopic/AssetOwnerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * AssetOwnerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(AssetOwnerOutTopicEvent event) throws InvalidParameterExce try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(AssetOwnerOutTopicEvent event) throws InvalidParameterExce eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/community-profile/community-profile-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/communityprofile/connectors/outtopic/CommunityProfileOutTopicServerConnector.java b/open-metadata-implementation/access-services/community-profile/community-profile-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/communityprofile/connectors/outtopic/CommunityProfileOutTopicServerConnector.java index 843ceff2bbf..cbf0fda0a11 100644 --- a/open-metadata-implementation/access-services/community-profile/community-profile-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/communityprofile/connectors/outtopic/CommunityProfileOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/community-profile/community-profile-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/communityprofile/connectors/outtopic/CommunityProfileOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * CommunityProfileOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(CommunityProfileOutboundEvent event) throws InvalidParamet try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(CommunityProfileOutboundEvent event) throws InvalidParamet eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/data-engine/data-engine-server/src/main/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandler.java b/open-metadata-implementation/access-services/data-engine/data-engine-server/src/main/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandler.java index 8e147e85f4d..3e093127f58 100644 --- a/open-metadata-implementation/access-services/data-engine/data-engine-server/src/main/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandler.java +++ b/open-metadata-implementation/access-services/data-engine/data-engine-server/src/main/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandler.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -209,11 +208,10 @@ protected void upsertExternalRelationship(String userId, String firstGUID, Strin TypeDef relationshipTypeDef = repositoryHelper.getTypeDefByName(userId, relationshipTypeName); - genericHandler.linkElementToElement(userId, externalSourceGUID, externalSourceName, firstGUID, + genericHandler.uncheckedLinkElementToElement(userId, externalSourceGUID, externalSourceName, firstGUID, CommonMapper.GUID_PROPERTY_NAME, firstEntityTypeName, secondGUID, CommonMapper.GUID_PROPERTY_NAME, secondEntityTypeName, false, false, null, - relationshipTypeDef.getGUID(), relationshipTypeName, relationshipProperties, null, - null, getNow(), methodName); + relationshipTypeDef.getGUID(), relationshipTypeName, relationshipProperties, getNow(), methodName); } else { Relationship originalRelationship = relationship.get(); String relationshipGUID = originalRelationship.getGUID(); diff --git a/open-metadata-implementation/access-services/data-engine/data-engine-server/src/test/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandlerTest.java b/open-metadata-implementation/access-services/data-engine/data-engine-server/src/test/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandlerTest.java index 32228d49322..f146b8a7be8 100644 --- a/open-metadata-implementation/access-services/data-engine/data-engine-server/src/test/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandlerTest.java +++ b/open-metadata-implementation/access-services/data-engine/data-engine-server/src/test/java/org/odpi/openmetadata/accessservices/dataengine/server/handlers/DataEngineCommonHandlerTest.java @@ -135,10 +135,10 @@ void upsertExternalRelationship() throws InvalidParameterException, PropertyServ ENTITY_TYPE_NAME, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, null); verify(invalidParameterHandler, times(1)).validateUserId(USER, methodName); - verify(genericHandler, times(1)).linkElementToElement(USER, EXTERNAL_SOURCE_DE_GUID, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, FIRST_GUID, + verify(genericHandler, times(1)).uncheckedLinkElementToElement(USER, EXTERNAL_SOURCE_DE_GUID, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, FIRST_GUID, CommonMapper.GUID_PROPERTY_NAME, ENTITY_TYPE_NAME, SECOND_GUID, CommonMapper.GUID_PROPERTY_NAME, ENTITY_TYPE_NAME, false, false, null, - RELATIONSHIP_TYPE_GUID, RELATIONSHIP_TYPE_NAME, null, null, null, null, methodName); + RELATIONSHIP_TYPE_GUID, RELATIONSHIP_TYPE_NAME, null, null, methodName); } @Test diff --git a/open-metadata-implementation/access-services/data-engine/data-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/dataengine/connectors/intopic/DataEngineInTopicClientConnector.java b/open-metadata-implementation/access-services/data-engine/data-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/dataengine/connectors/intopic/DataEngineInTopicClientConnector.java index 522555d3a1a..57375e01f5e 100644 --- a/open-metadata-implementation/access-services/data-engine/data-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/dataengine/connectors/intopic/DataEngineInTopicClientConnector.java +++ b/open-metadata-implementation/access-services/data-engine/data-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/dataengine/connectors/intopic/DataEngineInTopicClientConnector.java @@ -11,6 +11,7 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; /** @@ -36,7 +37,7 @@ public void sendEvent(DataEngineEventHeader event) throws InvalidParameterExcept { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -46,6 +47,17 @@ public void sendEvent(DataEngineEventHeader event) throws InvalidParameterExcept } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/data-manager/data-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/datamanager/connectors/outtopic/DataManagerOutTopicServerConnector.java b/open-metadata-implementation/access-services/data-manager/data-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/datamanager/connectors/outtopic/DataManagerOutTopicServerConnector.java index 0e7ef66e2f0..b4402ee9bad 100644 --- a/open-metadata-implementation/access-services/data-manager/data-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/datamanager/connectors/outtopic/DataManagerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/data-manager/data-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/datamanager/connectors/outtopic/DataManagerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * DataManagerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(DataManagerOutboundEvent event) throws InvalidParameterExc try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(DataManagerOutboundEvent event) throws InvalidParameterExc eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/digital-architecture/digital-architecture-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/digitalarchitecture/connectors/outtopic/DigitalArchitectureOutTopicServerConnector.java b/open-metadata-implementation/access-services/digital-architecture/digital-architecture-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/digitalarchitecture/connectors/outtopic/DigitalArchitectureOutTopicServerConnector.java index b0fc84425d3..6167da6353b 100644 --- a/open-metadata-implementation/access-services/digital-architecture/digital-architecture-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/digitalarchitecture/connectors/outtopic/DigitalArchitectureOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/digital-architecture/digital-architecture-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/digitalarchitecture/connectors/outtopic/DigitalArchitectureOutTopicServerConnector.java @@ -12,6 +12,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * DigitalArchitectureOutTopicServerConnector is the java implementation of the @@ -34,7 +36,7 @@ public void sendEvent(DigitalArchitectureOutTopicEvent event) throws InvalidPara try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -43,6 +45,17 @@ public void sendEvent(DigitalArchitectureOutTopicEvent event) throws InvalidPara eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/governance-engine/governance-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/governanceengine/connectors/outtopic/GovernanceEngineOutTopicServerConnector.java b/open-metadata-implementation/access-services/governance-engine/governance-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/governanceengine/connectors/outtopic/GovernanceEngineOutTopicServerConnector.java index 4303f3a87d3..18994764e67 100644 --- a/open-metadata-implementation/access-services/governance-engine/governance-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/governanceengine/connectors/outtopic/GovernanceEngineOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/governance-engine/governance-engine-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/governanceengine/connectors/outtopic/GovernanceEngineOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * GovernanceEngineOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(GovernanceEngineEvent event) throws InvalidParameterExcept try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(GovernanceEngineEvent event) throws InvalidParameterExcept eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/it-infrastructure/it-infrastructure-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/itinfrastructure/connectors/outtopic/ITInfrastructureOutTopicServerConnector.java b/open-metadata-implementation/access-services/it-infrastructure/it-infrastructure-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/itinfrastructure/connectors/outtopic/ITInfrastructureOutTopicServerConnector.java index 8301ea9788e..9e954c7ba1a 100644 --- a/open-metadata-implementation/access-services/it-infrastructure/it-infrastructure-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/itinfrastructure/connectors/outtopic/ITInfrastructureOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/it-infrastructure/it-infrastructure-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/itinfrastructure/connectors/outtopic/ITInfrastructureOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * ITInfrastructureOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(ITInfrastructureOutTopicEvent event) throws InvalidParamet try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(ITInfrastructureOutTopicEvent event) throws InvalidParamet eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/security-manager/security-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securitymanager/connectors/outtopic/SecurityManagerOutTopicServerConnector.java b/open-metadata-implementation/access-services/security-manager/security-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securitymanager/connectors/outtopic/SecurityManagerOutTopicServerConnector.java index 733c832bad8..3e2a14fe3fd 100644 --- a/open-metadata-implementation/access-services/security-manager/security-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securitymanager/connectors/outtopic/SecurityManagerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/security-manager/security-manager-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securitymanager/connectors/outtopic/SecurityManagerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * SecurityManagerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(SecurityManagerOutTopicEvent event) throws InvalidParamete try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(SecurityManagerOutTopicEvent event) throws InvalidParamete eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/security-officer/security-officer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securityofficer/connectors/outtopic/SecurityOfficerOutTopicServerConnector.java b/open-metadata-implementation/access-services/security-officer/security-officer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securityofficer/connectors/outtopic/SecurityOfficerOutTopicServerConnector.java index af0894aba72..34fb1dcc991 100644 --- a/open-metadata-implementation/access-services/security-officer/security-officer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securityofficer/connectors/outtopic/SecurityOfficerOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/security-officer/security-officer-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/securityofficer/connectors/outtopic/SecurityOfficerOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * SecurityOfficerOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(SecurityOfficerEvent event) throws InvalidParameterExcepti try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(SecurityOfficerEvent event) throws InvalidParameterExcepti eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/access-services/stewardship-action/stewardship-action-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/stewardshipaction/connectors/outtopic/StewardshipActionOutTopicServerConnector.java b/open-metadata-implementation/access-services/stewardship-action/stewardship-action-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/stewardshipaction/connectors/outtopic/StewardshipActionOutTopicServerConnector.java index 24445ccfb47..33ab76c8acb 100644 --- a/open-metadata-implementation/access-services/stewardship-action/stewardship-action-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/stewardshipaction/connectors/outtopic/StewardshipActionOutTopicServerConnector.java +++ b/open-metadata-implementation/access-services/stewardship-action/stewardship-action-topic-connectors/src/main/java/org/odpi/openmetadata/accessservices/stewardshipaction/connectors/outtopic/StewardshipActionOutTopicServerConnector.java @@ -11,6 +11,8 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicSenderConnectorBase; +import java.util.concurrent.CompletionException; + /** * StewardshipActionOutTopicServerConnector is the java implementation of the @@ -33,7 +35,7 @@ public void sendEvent(StewardshipActionOutTopicEvent event) throws InvalidParame try { String eventString = objectMapper.writeValueAsString(event); - super.sendEvent(eventString); + super.sendEvent(eventString).join(); if (super.auditLog != null) { @@ -42,6 +44,17 @@ public void sendEvent(StewardshipActionOutTopicEvent event) throws InvalidParame eventString); } } + catch (CompletionException error) + { + if (error.getCause() instanceof ConnectorCheckedException) + { + throw (ConnectorCheckedException) error.getCause(); + } + else if (error.getCause() instanceof InvalidParameterException) + { + throw (InvalidParameterException) error.getCause(); + } + } catch (InvalidParameterException | ConnectorCheckedException error) { throw error; diff --git a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopic.java b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopic.java index 70caa1d823f..9d8b803b5ab 100644 --- a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopic.java +++ b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopic.java @@ -8,6 +8,8 @@ import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEvent; import org.odpi.openmetadata.repositoryservices.events.OMRSTypeDefEvent; +import java.util.concurrent.CompletableFuture; + /** * OMRSTopic defines the interface to the messaging Topic for OMRS Events. * It implemented by the OMRSTopicConnector. @@ -61,16 +63,17 @@ void registerListener(OMRSTopicRepositoryEventListener newListener, * @param event OMRSRegistryEvent object containing the event properties. * @throws ConnectorCheckedException the connector is not able to communicate with the event bus */ - void sendRegistryEvent(OMRSRegistryEvent event) throws ConnectorCheckedException; + CompletableFuture sendRegistryEvent(OMRSRegistryEvent event) throws ConnectorCheckedException; /** * Sends the supplied event to the topic. * * @param event OMRSTypeDefEvent object containing the event properties. + * @return a future that has the result (boolean) of sendEvent * @throws ConnectorCheckedException the connector is not able to communicate with the event bus */ - void sendTypeDefEvent(OMRSTypeDefEvent event) throws ConnectorCheckedException; + CompletableFuture sendTypeDefEvent(OMRSTypeDefEvent event) throws ConnectorCheckedException; /** diff --git a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopicConnector.java b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopicConnector.java index 997f6e94f28..8c2e6876fcb 100644 --- a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopicConnector.java +++ b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/omrstopic/OMRSTopicConnector.java @@ -10,7 +10,6 @@ import org.odpi.openmetadata.frameworks.connectors.ConnectorBase; import org.odpi.openmetadata.frameworks.connectors.VirtualConnectorExtension; import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException; -import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode; import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector; import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener; @@ -20,6 +19,7 @@ import org.odpi.openmetadata.repositoryservices.events.OMRSTypeDefEvent; import org.odpi.openmetadata.repositoryservices.events.beans.OMRSEventBean; import org.odpi.openmetadata.repositoryservices.events.beans.v1.OMRSEventV1; +import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode; import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode; import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException; import org.slf4j.Logger; @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; /** @@ -335,44 +337,48 @@ private void handleUnsupportedEventVersion(String methodName) throws ConnectorCh /** * Send the registry event to the OMRS Topic connector and manage errors * - * @param registryEvent properties of the event to send + * @param registryEvent properties of the event to send + * @return a future that contains the result of send event * @throws ConnectorCheckedException the connector is not able to communicate with the event bus */ @Override - public void sendRegistryEvent(OMRSRegistryEvent registryEvent) throws ConnectorCheckedException + public CompletableFuture sendRegistryEvent(OMRSRegistryEvent registryEvent) throws ConnectorCheckedException { final String methodName = "sendRegistryEvent"; if (eventProtocolVersion == OMRSEventProtocolVersion.V1) { - this.sendEvent(registryEvent.getOMRSEventV1(), true); + return this.sendEvent(registryEvent.getOMRSEventV1(), true); } else { this.handleUnsupportedEventVersion(methodName); } + return CompletableFuture.completedFuture(false); } /** * Send the TypeDef event to the OMRS Topic connector (providing TypeDef Events are enabled). * - * @param typeDefEvent properties of the event to send + * @param typeDefEvent properties of the event to send + * @return a future that contains the result of sendEvent * @throws ConnectorCheckedException the connector is not able to communicate with the event bus */ @Override - public void sendTypeDefEvent(OMRSTypeDefEvent typeDefEvent) throws ConnectorCheckedException + public CompletableFuture sendTypeDefEvent(OMRSTypeDefEvent typeDefEvent) throws ConnectorCheckedException { final String methodName = "sendTypeDefEvent"; if (eventProtocolVersion == OMRSEventProtocolVersion.V1) { - this.sendEvent(typeDefEvent.getOMRSEventV1(), false); + return this.sendEvent(typeDefEvent.getOMRSEventV1(), false); } else { this.handleUnsupportedEventVersion(methodName); } + return CompletableFuture.completedFuture(false); } @@ -402,65 +408,64 @@ public void sendInstanceEvent(OMRSInstanceEvent instanceEvent) throws ConnectorC /** * Sends the supplied event outbound to the OMRSTopicListeners using the event bus connectors. * - * @param event OMRSEvent object containing the event properties + * @param event OMRSEvent object containing the event properties * @param logEvent should an audit log message be created? - * @throws ConnectorCheckedException the connector is not able to communicate with the event bus */ - private void sendEvent(OMRSEventV1 event, - boolean logEvent) throws ConnectorCheckedException + private CompletableFuture sendEvent(OMRSEventV1 event, + boolean logEvent) { - final String methodName = "send"; - + final String methodName = "sendEvent"; if (event != null) { - try - { - ObjectMapper objectMapper = new ObjectMapper(); + return CompletableFuture.supplyAsync(() -> sendEventTask(event, logEvent)); + } + else + { + log.debug("Unable to send null events"); - String eventString = objectMapper.writeValueAsString(event); + throw new OMRSLogicErrorException(OMRSErrorCode.OMRS_TOPIC_SEND_NULL_EVENT.getMessageDefinition(connectionName), + this.getClass().getName(), + methodName); + } + } - if ((auditLog != null) && (logEvent)) - { - auditLog.logMessage(methodName, - OMRSAuditCode.OUTBOUND_TOPIC_EVENT.getMessageDefinition(event.getEventCategory().getName(), - topicName), - eventString); - } + private boolean sendEventTask(OMRSEventV1 event, + boolean logEvent) + { + final String methodName = "sendEventTask"; + try + { + ObjectMapper objectMapper = new ObjectMapper(); - for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors) - { - if (eventBusConnector != null) - { - eventBusConnector.sendEvent(eventString); - } - } - } - catch (ConnectorCheckedException exc) - { - log.debug("Unable to send event: " + exc.getMessage()); + String eventString = objectMapper.writeValueAsString(event); - throw exc; + if ((auditLog != null) && logEvent) + { + auditLog.logMessage(methodName, + OMRSAuditCode.OUTBOUND_TOPIC_EVENT.getMessageDefinition(event.getEventCategory().getName(), + topicName), + eventString); } - catch (Exception exc) + + for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors) { - log.debug("Unexpected error sending event: " + exc.getMessage()); - - throw new ConnectorCheckedException(OMRSErrorCode.OMRS_TOPIC_SEND_EVENT_FAILED.getMessageDefinition(connectionName, - event.toString(), - exc.getMessage()), - this.getClass().getName(), - methodName, - exc); + if (eventBusConnector != null) + { + eventBusConnector.sendEvent(eventString); + } } } - else + catch (ConnectorCheckedException exc) { - log.debug("Unable to send null events"); - - throw new OMRSLogicErrorException(OMRSErrorCode.OMRS_TOPIC_SEND_NULL_EVENT.getMessageDefinition(connectionName), - this.getClass().getName(), - methodName); + log.debug("Unable to send event: " + exc.getMessage()); + throw new CompletionException(exc); + } + catch (Exception exc) + { + log.debug("Unexpected error sending event: " + exc.getMessage()); + throw new CompletionException(exc); } + return true; } diff --git a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/openmetadatatopic/OpenMetadataTopicSenderConnectorBase.java b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/openmetadatatopic/OpenMetadataTopicSenderConnectorBase.java index 2ad62d9c4a2..88eb2a1a9cf 100644 --- a/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/openmetadatatopic/OpenMetadataTopicSenderConnectorBase.java +++ b/open-metadata-implementation/repository-services/repository-services-apis/src/main/java/org/odpi/openmetadata/repositoryservices/connectors/openmetadatatopic/OpenMetadataTopicSenderConnectorBase.java @@ -7,6 +7,9 @@ import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException; import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + /** * OpenMetadataTopicConnectorBase is a base class to topic connectors that only send * events on the embedded event bus connector @@ -17,10 +20,11 @@ public class OpenMetadataTopicSenderConnectorBase extends OpenMetadataTopicConsu * Send the request to the embedded event bus connector(s). * * @param event event as a string + * @return a completable future with the sendEvent result * @throws InvalidParameterException the event is null * @throws ConnectorCheckedException there is a problem with the embedded event bus connector(s). */ - protected void sendEvent(String event) throws InvalidParameterException, + protected CompletableFuture sendEvent(String event) throws InvalidParameterException, ConnectorCheckedException { final String methodName = "sendEvent"; @@ -40,9 +44,14 @@ protected void sendEvent(String event) throws InvalidParameterException, /* * Each of the event bus connectors need to be passed the new event. */ - for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors) - { - eventBusConnector.sendEvent(event); - } + return CompletableFuture.runAsync(() -> { + for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors) { + try { + eventBusConnector.sendEvent(event); + } catch (ConnectorCheckedException e) { + throw new CompletionException(e); + } + } + }); } } diff --git a/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRegistryEventPublisher.java b/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRegistryEventPublisher.java index df4aac3b923..8a69265d52c 100644 --- a/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRegistryEventPublisher.java +++ b/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRegistryEventPublisher.java @@ -4,16 +4,23 @@ import org.odpi.openmetadata.frameworks.auditlog.AuditLog; import org.odpi.openmetadata.frameworks.connectors.properties.beans.Connection; -import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode; import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicConnector; -import org.odpi.openmetadata.repositoryservices.events.*; +import org.odpi.openmetadata.repositoryservices.events.OMRSEventOriginator; +import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEvent; +import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventErrorCode; +import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventProcessor; +import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventType; +import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode; import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode; import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; /** @@ -87,13 +94,23 @@ private boolean sendRegistryEvent(OMRSRegistryEvent registryEvent) try { + List> results = new ArrayList<>(); for (OMRSTopicConnector omrsTopicConnector : omrsTopicConnectors) { log.debug("topicConnector: " + omrsTopicConnector); - omrsTopicConnector.sendRegistryEvent(registryEvent); + results.add(omrsTopicConnector.sendRegistryEvent(registryEvent)); } + successFlag = results.stream().map(CompletableFuture::join).reduce(true, (r1, r2) -> r1 && r2); + } + // exceptions from sendEvent are wrapped in CompletionException + catch (CompletionException exception) + { + auditLog.logException(actionDescription, + OMRSAuditCode.SEND_REGISTRY_EVENT_ERROR.getMessageDefinition(publisherName), + "registryEvent : " + registryEvent, + exception.getCause()); - successFlag = true; + log.debug("Exception: " + exception.getCause() + "; Registry Event: " + registryEvent); } catch (Exception error) { diff --git a/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRepositoryEventPublisher.java b/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRepositoryEventPublisher.java index 133bc7dd21b..c0b2da03328 100644 --- a/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRepositoryEventPublisher.java +++ b/open-metadata-implementation/repository-services/repository-services-implementation/src/main/java/org/odpi/openmetadata/repositoryservices/eventmanagement/OMRSRepositoryEventPublisher.java @@ -3,17 +3,20 @@ package org.odpi.openmetadata.repositoryservices.eventmanagement; import org.odpi.openmetadata.frameworks.auditlog.AuditLog; +import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicConnector; +import org.odpi.openmetadata.repositoryservices.events.OMRSInstanceEvent; +import org.odpi.openmetadata.repositoryservices.events.OMRSMetadataDefaultEventsSecurity; +import org.odpi.openmetadata.repositoryservices.events.OMRSTypeDefEvent; +import org.odpi.openmetadata.repositoryservices.events.OpenMetadataEventsSecurity; import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.odpi.openmetadata.repositoryservices.events.*; import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode; import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException; - -import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletionException; /** @@ -159,14 +162,24 @@ public void sendTypeDefEvent(String sourceName, for (OMRSTopicConnector omrsTopicConnector : typesTopicConnectors) { log.debug("topicConnector: " + omrsTopicConnector); - omrsTopicConnector.sendTypeDefEvent(typeDefEvent); + omrsTopicConnector.sendTypeDefEvent(typeDefEvent).get(); } } + // exceptions from sendEvent are wrapped in CompletionException + catch (CompletionException exception) + { + auditLog.logException(actionDescription, + OMRSAuditCode.SEND_TYPEDEF_EVENT_ERROR.getMessageDefinition(sourceName), + "typeDefEvent {" + typeDefEvent + "}", + exception.getCause()); + log.debug("Completion exception with cause ", exception.getCause()); + } + catch (Exception error) { auditLog.logException(actionDescription, OMRSAuditCode.SEND_TYPEDEF_EVENT_ERROR.getMessageDefinition(sourceName), - "typeDefEvent {" + typeDefEvent.toString() + "}", + "typeDefEvent {" + typeDefEvent + "}", error); log.debug("Exception: ", error); @@ -204,11 +217,20 @@ public void sendInstanceEvent(String sourceName, } } } + // exceptions from sendEvent are wrapped in CompletionException + catch (CompletionException exception) + { + auditLog.logException(actionDescription, + OMRSAuditCode.SEND_INSTANCE_EVENT_ERROR.getMessageDefinition(sourceName), + "instanceEvent {" + instanceEvent + "}", + exception.getCause()); + log.debug("Completion exception with cause ", exception.getCause()); + } catch (Exception error) { auditLog.logException(actionDescription, OMRSAuditCode.SEND_INSTANCE_EVENT_ERROR.getMessageDefinition(sourceName), - "instanceEvent {" + instanceEvent.toString() + "}", + "instanceEvent {" + instanceEvent + "}", error); log.debug("Exception: ", error);