MINIFI-34 Establishing CMake build system to provide build functionality equivalent...
[nifi-minifi-cpp.git] / libminifi / include / FlowController.h
1 /**
2 * @file FlowController.h
3 * FlowController class declaration
4 *
5 * Licensed to the Apache Software Foundation (ASF) under one or more
6 * contributor license agreements. See the NOTICE file distributed with
7 * this work for additional information regarding copyright ownership.
8 * The ASF licenses this file to You under the Apache License, Version 2.0
9 * (the "License"); you may not use this file except in compliance with
10 * the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20 #ifndef __FLOW_CONTROLLER_H__
21 #define __FLOW_CONTROLLER_H__
22
23 #include <uuid/uuid.h>
24 #include <vector>
25 #include <queue>
26 #include <map>
27 #include <mutex>
28 #include <atomic>
29 #include <algorithm>
30 #include <set>
31 #include <libxml/parser.h>
32 #include <libxml/tree.h>
33 #include <yaml-cpp/yaml.h>
34
35 #include "Configure.h"
36 #include "Property.h"
37 #include "Relationship.h"
38 #include "FlowFileRecord.h"
39 #include "Connection.h"
40 #include "Processor.h"
41 #include "ProcessContext.h"
42 #include "ProcessSession.h"
43 #include "ProcessGroup.h"
44 #include "GenerateFlowFile.h"
45 #include "LogAttribute.h"
46 #include "RealTimeDataCollector.h"
47 #include "TimerDrivenSchedulingAgent.h"
48 #include "FlowControlProtocol.h"
49 #include "RemoteProcessorGroupPort.h"
50 #include "GetFile.h"
51 #include "TailFile.h"
52 #include "ListenSyslog.h"
53 #include "ExecuteProcess.h"
54
55 //! Default NiFi Root Group Name
56 #define DEFAULT_ROOT_GROUP_NAME ""
57 #define DEFAULT_FLOW_XML_FILE_NAME "conf/flow.xml"
58 #define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml"
59 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
60
61 enum class ConfigFormat { XML, YAML };
62
63 struct ProcessorConfig {
64 std::string name;
65 std::string javaClass;
66 std::string maxConcurrentTasks;
67 std::string schedulingStrategy;
68 std::string schedulingPeriod;
69 std::string penalizationPeriod;
70 std::string yieldPeriod;
71 std::string runDurationNanos;
72 std::vector<std::string> autoTerminatedRelationships;
73 std::vector<Property> properties;
74 };
75
76 //! FlowController Class
77 class FlowController
78 {
79 public:
80 static const int DEFAULT_MAX_TIMER_DRIVEN_THREAD = 10;
81 static const int DEFAULT_MAX_EVENT_DRIVEN_THREAD = 5;
82 //! Constructor
83 /*!
84 * Create a new Flow Controller
85 */
86 FlowController(std::string name = DEFAULT_ROOT_GROUP_NAME);
87 //! Destructor
88 virtual ~FlowController();
89 //! Set FlowController Name
90 void setName(std::string name) {
91 _name = name;
92 }
93 //! Get Flow Controller Name
94 std::string getName(void) {
95 return (_name);
96 }
97 //! Set UUID
98 void setUUID(uuid_t uuid) {
99 uuid_copy(_uuid, uuid);
100 }
101 //! Get UUID
102 bool getUUID(uuid_t uuid) {
103 if (uuid)
104 {
105 uuid_copy(uuid, _uuid);
106 return true;
107 }
108 else
109 return false;
110 }
111 //! Set MAX TimerDrivenThreads
112 void setMaxTimerDrivenThreads(int number)
113 {
114 _maxTimerDrivenThreads = number;
115 }
116 //! Get MAX TimerDrivenThreads
117 int getMaxTimerDrivenThreads()
118 {
119 return _maxTimerDrivenThreads;
120 }
121 //! Set MAX EventDrivenThreads
122 void setMaxEventDrivenThreads(int number)
123 {
124 _maxEventDrivenThreads = number;
125 }
126 //! Get MAX EventDrivenThreads
127 int getMaxEventDrivenThreads()
128 {
129 return _maxEventDrivenThreads;
130 }
131 //! Create FlowFile Repository
132 bool createFlowFileRepository();
133 //! Create Content Repository
134 bool createContentRepository();
135
136 //! Life Cycle related function
137 //! Load flow xml from disk, after that, create the root process group and its children, initialize the flows
138 void load(ConfigFormat format);
139 //! Whether the Flow Controller is start running
140 bool isRunning();
141 //! Whether the Flow Controller has already been initialized (loaded flow XML)
142 bool isInitialized();
143 //! Start to run the Flow Controller which internally start the root process group and all its children
144 bool start();
145 //! Stop to run the Flow Controller which internally stop the root process group and all its children
146 void stop(bool force);
147 //! Unload the current flow xml, clean the root process group and all its children
148 void unload();
149 //! Load new xml
150 void reload(std::string xmlFile);
151 //! update property value
152 void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue)
153 {
154 if (_root)
155 _root->updatePropertyValue(processorName, propertyName, propertyValue);
156 }
157
158 //! Create Processor (Node/Input/Output Port) based on the name
159 Processor *createProcessor(std::string name, uuid_t uuid);
160 //! Create Root Processor Group
161 ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid);
162 //! Create Remote Processor Group
163 ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid);
164 //! Create Connection
165 Connection *createConnection(std::string name, uuid_t uuid);
166 //! set 8 bytes SerialNumber
167 void setSerialNumber(uint8_t *number)
168 {
169 _protocol->setSerialNumber(number);
170 }
171
172 protected:
173
174 //! A global unique identifier
175 uuid_t _uuid;
176 //! FlowController Name
177 std::string _name;
178 //! Configuration File Name
179 std::string _configurationFileName;
180 //! NiFi property File Name
181 std::string _propertiesFileName;
182 //! Root Process Group
183 ProcessGroup *_root;
184 //! MAX Timer Driven Threads
185 int _maxTimerDrivenThreads;
186 //! MAX Event Driven Threads
187 int _maxEventDrivenThreads;
188 //! Config
189 //! FlowFile Repo
190 //! Provenance Repo
191 //! Flow Engines
192 //! Flow Scheduler
193 TimerDrivenSchedulingAgent _timerScheduler;
194 //! Controller Service
195 //! Config
196 //! Site to Site Server Listener
197 //! Heart Beat
198 //! FlowControl Protocol
199 FlowControlProtocol *_protocol;
200
201 private:
202
203 //! Mutex for protection
204 std::mutex _mtx;
205 //! Logger
206 Logger *_logger;
207 //! Configure
208 Configure *_configure;
209 //! Whether it is running
210 std::atomic<bool> _running;
211 //! Whether it has already been initialized (load the flow XML already)
212 std::atomic<bool> _initialized;
213 //! Process Processor Node XML
214 void parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent);
215 //! Process Port XML
216 void parsePort(xmlDoc *doc, xmlNode *processorNode, ProcessGroup *parent, TransferDirection direction);
217 //! Process Root Processor Group XML
218 void parseRootProcessGroup(xmlDoc *doc, xmlNode *node);
219 //! Process Property XML
220 void parseProcessorProperty(xmlDoc *doc, xmlNode *node, Processor *processor);
221 //! Process connection XML
222 void parseConnection(xmlDoc *doc, xmlNode *node, ProcessGroup *parent);
223 //! Process Remote Process Group
224 void parseRemoteProcessGroup(xmlDoc *doc, xmlNode *node, ProcessGroup *parent);
225
226 //! Process Processor Node YAML
227 void parseProcessorNodeYaml(YAML::Node processorNode, ProcessGroup *parent);
228 //! Process Port YAML
229 void parsePortYaml(YAML::Node *portNode, ProcessGroup *parent, TransferDirection direction);
230 //! Process Root Processor Group YAML
231 void parseRootProcessGroupYaml(YAML::Node rootNode);
232 //! Process Property YAML
233 void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, Processor *processor);
234 //! Process connection YAML
235 void parseConnectionYaml(YAML::Node *node, ProcessGroup *parent);
236 //! Process Remote Process Group YAML
237 void parseRemoteProcessGroupYaml(YAML::Node *node, ProcessGroup *parent);
238 //! Parse Properties Node YAML for a processor
239 void parsePropertiesNodeYaml(YAML::Node *propertiesNode, Processor *processor);
240
241 // Prevent default copy constructor and assignment operation
242 // Only support pass by reference or pointer
243 FlowController(const FlowController &parent);
244 FlowController &operator=(const FlowController &parent);
245
246 };
247
248 #endif