MINIFICPP-31 Added UpdateAttribute processor
[nifi-minifi-cpp.git] / libminifi / test / TestBase.h
1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one or more
4 * contributor license agreements. See the NOTICE file distributed with
5 * this work for additional information regarding copyright ownership.
6 * The ASF licenses this file to You under the Apache License, Version 2.0
7 * (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #ifndef LIBMINIFI_TEST_TESTBASE_H_
20 #define LIBMINIFI_TEST_TESTBASE_H_
21 #include <dirent.h>
22 #include <cstdio>
23 #include <cstdlib>
24 #include <sstream>
25 #include "ResourceClaim.h"
26 #include "catch.hpp"
27 #include <vector>
28 #include <set>
29 #include <map>
30 #include "core/logging/Logger.h"
31 #include "core/Core.h"
32 #include "properties/Configure.h"
33 #include "properties/Properties.h"
34 #include "core/logging/LoggerConfiguration.h"
35 #include "utils/Id.h"
36 #include "spdlog/sinks/ostream_sink.h"
37 #include "spdlog/sinks/dist_sink.h"
38 #include "unit/ProvenanceTestHelper.h"
39 #include "core/Core.h"
40 #include "core/FlowFile.h"
41 #include "core/Processor.h"
42 #include "core/ProcessContext.h"
43 #include "core/ProcessSession.h"
44 #include "core/ProcessorNode.h"
45 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
46
47 class LogTestController {
48 public:
49 static LogTestController& getInstance() {
50 static LogTestController instance;
51 return instance;
52 }
53
54 template<typename T>
55 void setTrace() {
56 setLevel<T>(spdlog::level::trace);
57 }
58
59 template<typename T>
60 void setDebug() {
61 setLevel<T>(spdlog::level::debug);
62 }
63
64 template<typename T>
65 void setInfo() {
66 setLevel<T>(spdlog::level::info);
67 }
68
69 template<typename T>
70 void setWarn() {
71 setLevel<T>(spdlog::level::warn);
72 }
73
74 template<typename T>
75 void setError() {
76 setLevel<T>(spdlog::level::err);
77 }
78
79 template<typename T>
80 void setOff() {
81 setLevel<T>(spdlog::level::off);
82 }
83
84 template<typename T>
85 void setLevel(spdlog::level::level_enum level) {
86 logging::LoggerFactory<T>::getLogger();
87 std::string name = core::getClassName<T>();
88 modified_loggers.push_back(name);
89 setLevel(name, level);
90 }
91
92 bool contains(const std::string &ending) {
93 return contains(log_output, ending);
94 }
95
96 bool contains(const std::ostringstream &stream, const std::string &ending) {
97 std::string str = stream.str();
98 logger_->log_info("Looking for %s in log output.", ending);
99 return (ending.length() > 0 && str.find(ending) != std::string::npos);
100 }
101
102 void reset() {
103 for (auto const & name : modified_loggers) {
104 setLevel(name, spdlog::level::err);
105 }
106 modified_loggers = std::vector<std::string>();
107 resetStream(log_output);
108 }
109
110 inline void resetStream(std::ostringstream &stream) {
111 stream.str("");
112 stream.clear();
113 }
114
115 std::ostringstream log_output;
116
117 std::shared_ptr<logging::Logger> logger_;
118 private:
119 class TestBootstrapLogger : public logging::Logger {
120 public:
121 TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
122 : Logger(logger) {
123 }
124 ;
125 };
126 LogTestController() {
127 std::shared_ptr<logging::LoggerProperties> logger_properties = std::make_shared<logging::LoggerProperties>();
128 logger_properties->set("logger.root", "ERROR,ostream");
129 logger_properties->set("logger." + core::getClassName<LogTestController>(), "INFO");
130 logger_properties->set("logger." + core::getClassName<logging::LoggerConfiguration>(), "DEBUG");
131 std::shared_ptr<spdlog::sinks::dist_sink_mt> dist_sink = std::make_shared<spdlog::sinks::dist_sink_mt>();
132 dist_sink->add_sink(std::make_shared<spdlog::sinks::ostream_sink_mt>(log_output, true));
133 dist_sink->add_sink(spdlog::sinks::stderr_sink_mt::instance());
134 logger_properties->add_sink("ostream", dist_sink);
135 logging::LoggerConfiguration::getConfiguration().initialize(logger_properties);
136 logger_ = logging::LoggerFactory<LogTestController>::getLogger();
137 }
138 LogTestController(LogTestController const&);
139 LogTestController& operator=(LogTestController const&);
140 ~LogTestController() {
141 }
142 ;
143
144 void setLevel(const std::string name, spdlog::level::level_enum level) {
145 logger_->log_info("Setting log level for %s to %s", name, spdlog::level::to_str(level));
146 spdlog::get(name)->set_level(level);
147 }
148 std::vector<std::string> modified_loggers;
149 };
150
151 class TestPlan {
152 public:
153
154 explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);
155
156 std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
157 core::Relationship relationship = core::Relationship("success", "description"),
158 bool linkToPrevious = false);
159
160 std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
161 bool linkToPrevious = false);
162
163 bool setProperty(const std::shared_ptr<core::Processor> proc,
164 const std::string &prop,
165 const std::string &value,
166 bool dynamic = false);
167
168 void reset();
169
170 bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
171
172 std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords();
173
174 std::shared_ptr<core::FlowFile> getCurrentFlowFile();
175
176 std::shared_ptr<core::Repository> getFlowRepo() {
177 return flow_repo_;
178 }
179
180 std::shared_ptr<core::Repository> getProvenanceRepo() {
181 return prov_repo_;
182 }
183
184 std::shared_ptr<core::ContentRepository> getContentRepo() {
185 return content_repo_;
186 }
187
188 protected:
189
190 void finalize();
191
192 std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
193
194 std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
195
196 std::shared_ptr<core::ContentRepository> content_repo_;
197
198 std::shared_ptr<core::Repository> flow_repo_;
199 std::shared_ptr<core::Repository> prov_repo_;
200
201 std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
202
203 std::recursive_mutex mutex;
204
205 std::atomic<bool> finalized;
206
207 int location;
208
209 std::shared_ptr<core::ProcessSession> current_session_;
210 std::shared_ptr<core::FlowFile> current_flowfile_;
211
212 std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
213 std::vector<std::shared_ptr<core::Processor>> processor_queue_;
214 std::vector<std::shared_ptr<core::Processor>> configured_processors_;
215 std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
216 std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
217 std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
218 std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
219 std::vector<std::shared_ptr<minifi::Connection>> relationships_;
220 core::Relationship termination_;
221
222 private:
223
224 std::shared_ptr<logging::Logger> logger_;
225 };
226
227 class TestController {
228 public:
229
230 TestController()
231 : log(LogTestController::getInstance()) {
232 minifi::setDefaultDirectory("./");
233 log.reset();
234 utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
235 }
236
237 std::shared_ptr<TestPlan> createPlan() {
238 std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
239 std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
240
241 content_repo->initialize(configuration);
242
243 std::shared_ptr<core::Repository> flow_repo = std::make_shared<TestRepository>();
244 std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
245 return std::make_shared<TestPlan>(content_repo, flow_repo, repo);
246 }
247
248
249 void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&)> verify =
250 nullptr) {
251
252 while (plan->runNextProcessor(verify) && runToCompletion) {
253
254 }
255 }
256
257 ~TestController() {
258 for (auto dir : directories) {
259 DIR *created_dir;
260 struct dirent *dir_entry;
261 created_dir = opendir(dir);
262 if (created_dir != NULL) {
263 while ((dir_entry = readdir(created_dir)) != NULL) {
264 if (dir_entry->d_name[0] != '.') {
265
266 std::string file(dir);
267 file += "/";
268 file += dir_entry->d_name;
269 unlink(file.c_str());
270 }
271 }
272 closedir(created_dir);
273 }
274
275 rmdir(dir);
276 }
277 }
278
279 char *createTempDirectory(char *format) {
280 char *dir = mkdtemp(format);
281 directories.push_back(dir);
282 return dir;
283 }
284
285 protected:
286
287 std::mutex test_mutex;
288 //std::map<std::string,>
289
290 LogTestController &log;
291 std::vector<char*> directories;
292
293 };
294
295 #endif /* LIBMINIFI_TEST_TESTBASE_H_ */