MINIFI-34 Establishing CMake build system to provide build functionality equivalent...
[nifi-minifi-cpp.git] / libminifi / include / FlowControlProtocol.h
1 /**
2 * @file FlowControlProtocol.h
3 * FlowControlProtocol class declaration
4 *
5 * Licensed to the Apache Software Foundation (ASF) under one or more
6 * contributor license agreements. See the NOTICE file distributed with
7 * this work for additional information regarding copyright ownership.
8 * The ASF licenses this file to You under the Apache License, Version 2.0
9 * (the "License"); you may not use this file except in compliance with
10 * the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20 #ifndef __FLOW_CONTROL_PROTOCOL_H__
21 #define __FLOW_CONTROL_PROTOCOL_H__
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <fcntl.h>
30 #include <netdb.h>
31 #include <string>
32 #include <errno.h>
33 #include <chrono>
34 #include <thread>
35 #include "Logger.h"
36 #include "Configure.h"
37 #include "Property.h"
38
39 //! Forwarder declaration
40 class FlowController;
41
42 #define DEFAULT_NIFI_SERVER_PORT 9000
43 #define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
44 #define MAX_READ_TIMEOUT 30000 // 30 seconds
45
46 //! FlowControl Protocol Msg Type
47 typedef enum {
48 REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
49 REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
50 REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
51 REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
52 MAX_FLOW_CONTROL_MSG_TYPE
53 } FlowControlMsgType;
54
55 //! FlowControl Protocol Msg Type String
56 static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
57 {
58 "REGISTER_REQ",
59 "REGISTER_RESP",
60 "REPORT_REQ",
61 "REPORT_RESP"
62 };
63
64 //! Flow Control Msg Type to String
65 inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
66 {
67 if (type < MAX_FLOW_CONTROL_MSG_TYPE)
68 return FlowControlMsgTypeStr[type];
69 else
70 return NULL;
71 }
72
73 //! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
74 typedef enum {
75 //Fix length 8 bytes: client to server in register request, required field
76 FLOW_SERIAL_NUMBER,
77 // Flow XML name TLV: client to server in register request and report request, required field
78 FLOW_XML_NAME,
79 // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
80 FLOW_XML_CONTENT,
81 // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
82 REPORT_INTERVAL,
83 // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
84 PROCESSOR_NAME,
85 // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
86 PROPERTY_NAME,
87 // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
88 PROPERTY_VALUE,
89 // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
90 REPORT_BLOB,
91 MAX_FLOW_MSG_ID
92 } FlowControlMsgID;
93
94 //! FlowControl Protocol Msg ID String
95 static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
96 {
97 "FLOW_SERIAL_NUMBER",
98 "FLOW_XML_NAME",
99 "FLOW_XML_CONTENT",
100 "REPORT_INTERVAL",
101 "PROCESSOR_NAME"
102 "PROPERTY_NAME",
103 "PROPERTY_VALUE",
104 "REPORT_BLOB"
105 };
106
107 #define TYPE_HDR_LEN 4 // Fix Hdr Type
108 #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
109
110 //! FlowControl Protocol Msg Len
111 inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
112 {
113 if (id == FLOW_SERIAL_NUMBER)
114 return (TYPE_HDR_LEN + 8);
115 else if (id == REPORT_INTERVAL)
116 return (TYPE_HDR_LEN + 4);
117 else if (id < MAX_FLOW_MSG_ID)
118 return (TLV_HDR_LEN + payLoadLen);
119 else
120 return -1;
121 }
122
123 //! Flow Control Msg Id to String
124 inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
125 {
126 if (id < MAX_FLOW_MSG_ID)
127 return FlowControlMsgIDStr[id];
128 else
129 return NULL;
130 }
131
132 //! Flow Control Respond status code
133 typedef enum {
134 RESP_SUCCESS,
135 RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
136 RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
137 RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
138 RESP_FAILURE,
139 MAX_RESP_CODE
140 } FlowControlRespCode;
141
142 //! FlowControl Resp Code str
143 static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
144 {
145 "RESP_SUCCESS",
146 "RESP_TRIGGER_REGISTER",
147 "RESP_START_FLOW_CONTROLLER",
148 "RESP_STOP_FLOW_CONTROLLER",
149 "RESP_FAILURE"
150 };
151
152 //! Flow Control Resp Code to String
153 inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
154 {
155 if (code < MAX_RESP_CODE)
156 return FlowControlRespCodeStr[code];
157 else
158 return NULL;
159 }
160
161 //! Common FlowControlProtocol Header
162 typedef struct {
163 uint32_t msgType; //! Msg Type
164 uint32_t seqNumber; //! Seq Number to match Req with Resp
165 uint32_t status; //! Resp Code, see FlowControlRespCode
166 uint32_t payloadLen; //! Msg Payload length
167 } FlowControlProtocolHeader;
168
169 //! FlowControlProtocol Class
170 class FlowControlProtocol
171 {
172 public:
173 //! Constructor
174 /*!
175 * Create a new control protocol
176 */
177 FlowControlProtocol(FlowController *controller) {
178 _controller = controller;
179 _logger = Logger::getLogger();
180 _configure = Configure::getConfigure();
181 _socket = 0;
182 _serverName = "localhost";
183 _serverPort = DEFAULT_NIFI_SERVER_PORT;
184 _registered = false;
185 _seqNumber = 0;
186 _reportBlob = NULL;
187 _reportBlobLen = 0;
188 _reportInterval = DEFAULT_REPORT_INTERVAL;
189 _running = false;
190
191 std::string value;
192
193 if (_configure->get(Configure::nifi_server_name, value))
194 {
195 _serverName = value;
196 _logger->log_info("NiFi Server Name %s", _serverName.c_str());
197 }
198 if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort))
199 {
200 _logger->log_info("NiFi Server Port: [%d]", _serverPort);
201 }
202 if (_configure->get(Configure::nifi_server_report_interval, value))
203 {
204 TimeUnit unit;
205 if (Property::StringToTime(value, _reportInterval, unit) &&
206 Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval))
207 {
208 _logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
209 }
210 }
211 }
212 //! Destructor
213 virtual ~FlowControlProtocol()
214 {
215 stop();
216 if (_socket)
217 close(_socket);
218 if (_reportBlob)
219 delete [] _reportBlob;
220 if (this->_thread)
221 delete this->_thread;
222 }
223
224 public:
225
226 //! SendRegisterRequest and Process Register Respond, return 0 for success
227 int sendRegisterReq();
228 //! SendReportReq and Process Report Respond, return 0 for success
229 int sendReportReq();
230 //! Start the flow control protocol
231 void start();
232 //! Stop the flow control protocol
233 void stop();
234 //! Set Report BLOB for periodically report
235 void setReportBlob(char *blob, int len)
236 {
237 std::lock_guard<std::mutex> lock(_mtx);
238 if (_reportBlob && _reportBlobLen >= len)
239 {
240 memcpy(_reportBlob, blob, len);
241 _reportBlobLen = len;
242 }
243 else
244 {
245 if (_reportBlob)
246 delete[] _reportBlob;
247 _reportBlob = new char[len];
248 _reportBlobLen = len;
249 }
250 }
251 //! Run function for the thread
252 static void run(FlowControlProtocol *protocol);
253 //! set 8 bytes SerialNumber
254 void setSerialNumber(uint8_t *number)
255 {
256 memcpy(_serialNumber, number, 8);
257 }
258
259 protected:
260
261 private:
262 //! Connect to the socket, return sock descriptor if success, 0 for failure
263 int connectServer(const char *host, uint16_t port);
264 //! Send Data via the socket, return -1 for failure
265 int sendData(uint8_t *buf, int buflen);
266 //! Read length into buf, return -1 for failure and 0 for EOF
267 int readData(uint8_t *buf, int buflen);
268 //! Select on the socket
269 int selectClient(int msec);
270 //! Read the header
271 int readHdr(FlowControlProtocolHeader *hdr);
272 //! encode uint32_t
273 uint8_t *encode(uint8_t *buf, uint32_t value)
274 {
275 *buf++ = (value & 0xFF000000) >> 24;
276 *buf++ = (value & 0x00FF0000) >> 16;
277 *buf++ = (value & 0x0000FF00) >> 8;
278 *buf++ = (value & 0x000000FF);
279 return buf;
280 }
281 //! encode uint32_t
282 uint8_t *decode(uint8_t *buf, uint32_t &value)
283 {
284 value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
285 return (buf + 4);
286 }
287 //! encode byte array
288 uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
289 {
290 memcpy(buf, bufArray, size);
291 buf += size;
292 return buf;
293 }
294 //! encode std::string
295 uint8_t *encode(uint8_t *buf, std::string value)
296 {
297 // add the \0 for size
298 buf = encode(buf, value.size()+1);
299 buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
300 return buf;
301 }
302 //! Mutex for protection
303 std::mutex _mtx;
304 //! Logger
305 Logger *_logger;
306 //! Configure
307 Configure *_configure;
308 //! NiFi server Name
309 std::string _serverName;
310 //! NiFi server port
311 int64_t _serverPort;
312 //! Serial Number
313 uint8_t _serialNumber[8];
314 //! socket to server
315 int _socket;
316 //! report interal in msec
317 int64_t _reportInterval;
318 //! whether it was registered to the NiFi server
319 bool _registered;
320 //! seq number
321 uint32_t _seqNumber;
322 //! FlowController
323 FlowController *_controller;
324 //! report Blob
325 char *_reportBlob;
326 //! report Blob len;
327 int _reportBlobLen;
328 //! thread
329 std::thread *_thread;
330 //! whether it is running
331 bool _running;
332 // Prevent default copy constructor and assignment operation
333 // Only support pass by reference or pointer
334 FlowControlProtocol(const FlowControlProtocol &parent);
335 FlowControlProtocol &operator=(const FlowControlProtocol &parent);
336
337 };
338
339 #endif