MINIFI-34 Establishing CMake build system to provide build functionality equivalent...
[nifi-minifi-cpp.git] / libminifi / include / ListenSyslog.h
1 /**
2 * @file ListenSyslog.h
3 * ListenSyslog class declaration
4 *
5 * Licensed to the Apache Software Foundation (ASF) under one or more
6 * contributor license agreements. See the NOTICE file distributed with
7 * this work for additional information regarding copyright ownership.
8 * The ASF licenses this file to You under the Apache License, Version 2.0
9 * (the "License"); you may not use this file except in compliance with
10 * the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20 #ifndef __LISTEN_SYSLOG_H__
21 #define __LISTEN_SYSLOG_H__
22
23 #include <stdio.h>
24 #include <unistd.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <errno.h>
30 #include <sys/select.h>
31 #include <sys/time.h>
32 #include <sys/types.h>
33 #include <chrono>
34 #include <thread>
35 #include "FlowFileRecord.h"
36 #include "Processor.h"
37 #include "ProcessSession.h"
38
39 //! SyslogEvent
40 typedef struct {
41 uint8_t *payload;
42 uint64_t len;
43 } SysLogEvent;
44
45 //! ListenSyslog Class
46 class ListenSyslog : public Processor
47 {
48 public:
49 //! Constructor
50 /*!
51 * Create a new processor
52 */
53 ListenSyslog(std::string name, uuid_t uuid = NULL)
54 : Processor(name, uuid)
55 {
56 _logger = Logger::getLogger();
57 _eventQueueByteSize = 0;
58 _serverSocket = 0;
59 _recvBufSize = 65507;
60 _maxSocketBufSize = 1024*1024;
61 _maxConnections = 2;
62 _maxBatchSize = 1;
63 _messageDelimiter = "\n";
64 _protocol = "UDP";
65 _port = 514;
66 _parseMessages = false;
67 _serverSocket = 0;
68 _maxFds = 0;
69 FD_ZERO(&_readfds);
70 _thread = NULL;
71 _resetServerSocket = false;
72 _serverTheadRunning = false;
73 }
74 //! Destructor
75 virtual ~ListenSyslog()
76 {
77 _serverTheadRunning = false;
78 if (this->_thread)
79 delete this->_thread;
80 // need to reset the socket
81 std::vector<int>::iterator it;
82 for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
83 {
84 int clientSocket = *it;
85 close(clientSocket);
86 }
87 _clientSockets.clear();
88 if (_serverSocket > 0)
89 {
90 _logger->log_info("ListenSysLog Server socket %d close", _serverSocket);
91 close(_serverSocket);
92 _serverSocket = 0;
93 }
94 }
95 //! Processor Name
96 static const std::string ProcessorName;
97 //! Supported Properties
98 static Property RecvBufSize;
99 static Property MaxSocketBufSize;
100 static Property MaxConnections;
101 static Property MaxBatchSize;
102 static Property MessageDelimiter;
103 static Property ParseMessages;
104 static Property Protocol;
105 static Property Port;
106 //! Supported Relationships
107 static Relationship Success;
108 static Relationship Invalid;
109 //! Nest Callback Class for write stream
110 class WriteCallback : public OutputStreamCallback
111 {
112 public:
113 WriteCallback(char *data, uint64_t size)
114 : _data(data), _dataSize(size) {}
115 char *_data;
116 uint64_t _dataSize;
117 void process(std::ofstream *stream) {
118 if (_data && _dataSize > 0)
119 stream->write(_data, _dataSize);
120 }
121 };
122
123 public:
124 //! OnTrigger method, implemented by NiFi ListenSyslog
125 virtual void onTrigger(ProcessContext *context, ProcessSession *session);
126 //! Initialize, over write by NiFi ListenSyslog
127 virtual void initialize(void);
128
129 protected:
130
131 private:
132 //! Logger
133 Logger *_logger;
134 //! Run function for the thread
135 static void run(ListenSyslog *process);
136 //! Run Thread
137 void runThread();
138 //! Queue for store syslog event
139 std::queue<SysLogEvent> _eventQueue;
140 //! Size of Event queue in bytes
141 uint64_t _eventQueueByteSize;
142 //! Get event queue size
143 uint64_t getEventQueueSize() {
144 std::lock_guard<std::mutex> lock(_mtx);
145 return _eventQueue.size();
146 }
147 //! Get event queue byte size
148 uint64_t getEventQueueByteSize() {
149 std::lock_guard<std::mutex> lock(_mtx);
150 return _eventQueueByteSize;
151 }
152 //! Whether the event queue is empty
153 bool isEventQueueEmpty()
154 {
155 std::lock_guard<std::mutex> lock(_mtx);
156 return _eventQueue.empty();
157 }
158 //! Put event into directory listing
159 void putEvent(uint8_t *payload, uint64_t len)
160 {
161 std::lock_guard<std::mutex> lock(_mtx);
162 SysLogEvent event;
163 event.payload = payload;
164 event.len = len;
165 _eventQueue.push(event);
166 _eventQueueByteSize += len;
167 }
168 //! Read \n terminated line from TCP socket
169 int readline( int fd, char *bufptr, size_t len );
170 //! start server socket and handling client socket
171 void startSocketThread();
172 //! Poll event
173 void pollEvent(std::queue<SysLogEvent> &list, int maxSize)
174 {
175 std::lock_guard<std::mutex> lock(_mtx);
176
177 while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize))
178 {
179 SysLogEvent event = _eventQueue.front();
180 _eventQueue.pop();
181 _eventQueueByteSize -= event.len;
182 list.push(event);
183 }
184 return;
185 }
186 //! Mutex for protection of the directory listing
187 std::mutex _mtx;
188 int64_t _recvBufSize;
189 int64_t _maxSocketBufSize;
190 int64_t _maxConnections;
191 int64_t _maxBatchSize;
192 std::string _messageDelimiter;
193 std::string _protocol;
194 int64_t _port;
195 bool _parseMessages;
196 int _serverSocket;
197 std::vector<int> _clientSockets;
198 int _maxFds;
199 fd_set _readfds;
200 //! thread
201 std::thread *_thread;
202 //! whether to reset the server socket
203 bool _resetServerSocket;
204 bool _serverTheadRunning;
205 //! buffer for read socket
206 uint8_t _buffer[2048];
207 };
208
209 #endif