ATLAS-2122 OMAG Server Application and OMRS APIs
[atlas.git] / omrs / src / main / java / org / apache / atlas / omrs / topicconnectors / OMRSTopicConnector.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 * <p/>
10 * http://www.apache.org/licenses/LICENSE-2.0
11 * <p/>
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.atlas.omrs.topicconnectors;
19
20 import org.apache.atlas.ocf.ConnectorBase;
21 import org.apache.atlas.ocf.ffdc.ConnectorCheckedException;
22 import org.apache.atlas.omrs.auditlog.OMRSAuditCode;
23 import org.apache.atlas.omrs.auditlog.OMRSAuditLog;
24 import org.apache.atlas.omrs.auditlog.OMRSAuditingComponent;
25 import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.util.ArrayList;
30
31
32 /**
33 * OMRSTopicConnector provides the support for the registration of listeners and the distribution of
34 * incoming events to the registered listeners. An implementation of the OMRSTopicConnector needs to
35 * extend this class to include the interaction with the eventing/messaging layer.
36 * <ul>
37 * <li>
38 * For inbound events it should call the protected distributeEvents() method.
39 * </li>
40 * <li>
41 * For outbound events, callers will invoke the sendEvent() method.
42 * </li>
43 * <li>
44 * When the server no longer needs the topic, it will call close().
45 * </li>
46 * </ul>
47 */
48 public abstract class OMRSTopicConnector extends ConnectorBase implements OMRSTopic
49 {
50 ArrayList<OMRSTopicListener> topicListeners = new ArrayList<>();
51
52 private static final Logger log = LoggerFactory.getLogger(OMRSTopicConnector.class);
53 private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OMRS_TOPIC_CONNECTOR);
54
55 /**
56 * Simple constructor
57 */
58 public OMRSTopicConnector()
59 {
60 super();
61 }
62
63
64 /**
65 * Pass an event that has been received on the topic to each of the registered listeners.
66 *
67 * @param event - OMRSEvent to distribute
68 */
69 protected void distributeEvent(OMRSEventV1 event)
70 {
71 for (OMRSTopicListener topicListener : topicListeners)
72 {
73 try
74 {
75 topicListener.processEvent(event);
76 }
77 catch (Throwable error)
78 {
79 final String actionDescription = "Initialize Repository Operational Services";
80
81 OMRSAuditCode auditCode = OMRSAuditCode.EVENT_PROCESSING_ERROR;
82 auditLog.logRecord(actionDescription,
83 auditCode.getLogMessageId(),
84 auditCode.getSeverity(),
85 auditCode.getFormattedLogMessage(event.toString(), error.toString()),
86 null,
87 auditCode.getSystemAction(),
88 auditCode.getUserAction());
89 }
90 }
91 }
92
93
94 /**
95 * Register a listener object. This object will be supplied with all of the events received on the topic.
96 *
97 * @param topicListener - object implementing the OMRSTopicListener interface
98 */
99 public void registerListener(OMRSTopicListener topicListener)
100 {
101 if (topicListener != null)
102 {
103 topicListeners.add(topicListener);
104 }
105 }
106
107
108 /**
109 * Indicates that the connector is completely configured and can begin processing.
110 *
111 * @throws ConnectorCheckedException - there is a problem within the connector.
112 */
113 public void start() throws ConnectorCheckedException
114 {
115 super.start();
116 }
117
118
119 /**
120 * Free up any resources held since the connector is no longer needed.
121 *
122 * @throws ConnectorCheckedException - there is a problem within the connector.
123 */
124 public void disconnect() throws ConnectorCheckedException
125 {
126 super.disconnect();
127 }
128 }