MINIFI-34 Establishing CMake build system to provide build functionality equivalent...
[nifi-minifi-cpp.git] / libminifi / include / Connection.h
1 /**
2 * @file Connection.h
3 * Connection class declaration
4 *
5 * Licensed to the Apache Software Foundation (ASF) under one or more
6 * contributor license agreements. See the NOTICE file distributed with
7 * this work for additional information regarding copyright ownership.
8 * The ASF licenses this file to You under the Apache License, Version 2.0
9 * (the "License"); you may not use this file except in compliance with
10 * the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20 #ifndef __CONNECTION_H__
21 #define __CONNECTION_H__
22
23 #include <uuid/uuid.h>
24 #include <vector>
25 #include <queue>
26 #include <map>
27 #include <mutex>
28 #include <atomic>
29 #include <algorithm>
30
31 #include "FlowFileRecord.h"
32 #include "Relationship.h"
33 #include "Logger.h"
34
35 //! Forwarder declaration
36 class Processor;
37
38 //! Connection Class
39 class Connection
40 {
41 public:
42 //! Constructor
43 /*!
44 * Create a new processor
45 */
46 Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, uuid_t destUUID = NULL);
47 //! Destructor
48 virtual ~Connection() {}
49 //! Set Connection Name
50 void setName(std::string name) {
51 _name = name;
52 }
53 //! Get Process Name
54 std::string getName(void) {
55 return (_name);
56 }
57 //! Set UUID
58 void setUUID(uuid_t uuid) {
59 uuid_copy(_uuid, uuid);
60 }
61 //! Set Source Processor UUID
62 void setSourceProcessorUUID(uuid_t uuid) {
63 uuid_copy(_srcUUID, uuid);
64 }
65 //! Set Destination Processor UUID
66 void setDestinationProcessorUUID(uuid_t uuid) {
67 uuid_copy(_destUUID, uuid);
68 }
69 //! Get Source Processor UUID
70 void getSourceProcessorUUID(uuid_t uuid) {
71 uuid_copy(uuid, _srcUUID);
72 }
73 //! Get Destination Processor UUID
74 void getDestinationProcessorUUID(uuid_t uuid) {
75 uuid_copy(uuid, _destUUID);
76 }
77 //! Get UUID
78 bool getUUID(uuid_t uuid) {
79 if (uuid)
80 {
81 uuid_copy(uuid, _uuid);
82 return true;
83 }
84 else
85 return false;
86 }
87 //! Set Connection Source Processor
88 void setSourceProcessor(Processor *source) {
89 _srcProcessor = source;
90 }
91 // ! Get Connection Source Processor
92 Processor *getSourceProcessor() {
93 return _srcProcessor;
94 }
95 //! Set Connection Destination Processor
96 void setDestinationProcessor(Processor *dest) {
97 _destProcessor = dest;
98 }
99 // ! Get Connection Destination Processor
100 Processor *getDestinationProcessor() {
101 return _destProcessor;
102 }
103 //! Set Connection relationship
104 void setRelationship(Relationship relationship) {
105 _relationship = relationship;
106 }
107 // ! Get Connection relationship
108 Relationship getRelationship() {
109 return _relationship;
110 }
111 //! Set Max Queue Size
112 void setMaxQueueSize(uint64_t size)
113 {
114 _maxQueueSize = size;
115 }
116 //! Get Max Queue Size
117 uint64_t getMaxQueueSize()
118 {
119 return _maxQueueSize;
120 }
121 //! Set Max Queue Data Size
122 void setMaxQueueDataSize(uint64_t size)
123 {
124 _maxQueueDataSize = size;
125 }
126 //! Get Max Queue Data Size
127 uint64_t getMaxQueueDataSize()
128 {
129 return _maxQueueDataSize;
130 }
131 //! Set Flow expiration duration in millisecond
132 void setFlowExpirationDuration(uint64_t duration)
133 {
134 _expiredDuration = duration;
135 }
136 //! Get Flow expiration duration in millisecond
137 uint64_t getFlowExpirationDuration()
138 {
139 return _expiredDuration;
140 }
141 //! Check whether the queue is empty
142 bool isEmpty();
143 //! Check whether the queue is full to apply back pressure
144 bool isFull();
145 //! Get queue size
146 uint64_t getQueueSize() {
147 std::lock_guard<std::mutex> lock(_mtx);
148 return _queue.size();
149 }
150 //! Get queue data size
151 uint64_t getQueueDataSize()
152 {
153 return _maxQueueDataSize;
154 }
155 //! Put the flow file into queue
156 void put(FlowFileRecord *flow);
157 //! Poll the flow file from queue, the expired flow file record also being returned
158 FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords);
159 //! Drain the flow records
160 void drain();
161
162 protected:
163 //! A global unique identifier
164 uuid_t _uuid;
165 //! Source Processor UUID
166 uuid_t _srcUUID;
167 //! Destination Processor UUID
168 uuid_t _destUUID;
169 //! Connection Name
170 std::string _name;
171 //! Relationship for this connection
172 Relationship _relationship;
173 //! Source Processor (ProcessNode/Port)
174 Processor *_srcProcessor;
175 //! Destination Processor (ProcessNode/Port)
176 Processor *_destProcessor;
177 //! Max queue size to apply back pressure
178 std::atomic<uint64_t> _maxQueueSize;
179 //! Max queue data size to apply back pressure
180 std::atomic<uint64_t> _maxQueueDataSize;
181 //! Flow File Expiration Duration in= MilliSeconds
182 std::atomic<uint64_t> _expiredDuration;
183
184
185 private:
186 //! Mutex for protection
187 std::mutex _mtx;
188 //! Queued data size
189 std::atomic<uint64_t> _queuedDataSize;
190 //! Queue for the Flow File
191 std::queue<FlowFileRecord *> _queue;
192 //! Logger
193 Logger *_logger;
194 // Prevent default copy constructor and assignment operation
195 // Only support pass by reference or pointer
196 Connection(const Connection &parent);
197 Connection &operator=(const Connection &parent);
198
199 };
200
201 #endif