IMPALA-6792: Fail status reporting if coordinator refuses connections
authorSailesh Mukil <sailesh@cloudera.com>
Tue, 3 Apr 2018 21:24:21 +0000 (14:24 -0700)
committerImpala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Wed, 11 Apr 2018 22:56:00 +0000 (22:56 +0000)
The ReportExecStatusAux() function is run on a dedicated thread per
fragment instance. This thread will run until the fragment instance
completes executing.

On every attempt to send a report to the coordinator, it will attempt
to send up to 3 RPCs. If all 3 of them fail, then the fragment instance
will cancel itself.

However, there is one case where a failure to send the RPC will not
be considered a failed RPC. If when we attempt to obtain a new
connection, we end up creating a new connection
(via ClientCache::CreateClient()) instead of getting a previously
cached connection, and this new connection fails to even Open(),
it will not be counted as a RPC failure.

This patch counts such an error as a failed RPC too.

This patch also clarifies some of the error log messages and introduces
a flag to control the sleep interval between failed ReportExecStatus RPC
retries.

Change-Id: If668838f99f78b5ffa713488178b2eb5799ba220
Reviewed-on: http://gerrit.cloudera.org:8080/9916
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins
be/src/runtime/query-state.cc

index ad5748f..04a4283 100644 (file)
 
 #include "common/names.h"
 
+DEFINE_int32(report_status_retry_interval_ms, 100,
+    "The interval in milliseconds to wait before retrying a failed status report RPC to "
+    "the coordinator.");
+
 using namespace impala;
 
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
@@ -249,28 +253,40 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   DCHECK_EQ(res.status.status_code, TErrorCode::OK);
   // Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
   // It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
+  Status client_status;
   for (int i = 0; i < 3; ++i) {
-    Status client_status;
     ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(),
         query_ctx().coord_address, &client_status);
     if (client_status.ok()) {
       rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
       if (rpc_status.ok()) break;
     }
-    if (i < 2) SleepForMs(100);
+    if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms);
   }
   Status result_status(res.status);
-  if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+  if ((!client_status.ok() || !rpc_status.ok() || !result_status.ok()) &&
+      instances_started) {
     // TODO: should we try to keep rpc_status for the final report? (but the final
     // report, following this Cancel(), may not succeed anyway.)
     // TODO: not keeping an error status here means that all instances might
     // abort with CANCELLED status, despite there being an error
-    if (!rpc_status.ok()) {
-      // TODO: Fix IMPALA-2990. Cancelling fragment instances here may cause query to
-      // hang as the coordinator may not be aware of the cancellation. Remove the log
-      // statement once IMPALA-2990 is fixed.
-      LOG(ERROR) << "Cancelling fragment instances due to failure to report status. "
-                 << "Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    // TODO: Fix IMPALA-2990. Cancelling fragment instances without sending the
+    // ReporExecStatus RPC may cause query to hang as the coordinator may not be aware
+    // of the cancellation. Remove the log statements once IMPALA-2990 is fixed.
+    if (!client_status.ok()) {
+      LOG(ERROR) << "Cancelling fragment instances due to failure to obtain a connection "
+                 << "to the coordinator. (" << client_status.GetDetail()
+                 << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    } else if (!rpc_status.ok()) {
+      LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
+                 << "coordinator. (" << rpc_status.GetDetail()
+                 << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
+    } else if (!result_status.ok()) {
+      // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get
+      // back a non-OK status, it means that the coordinator expects us to cancel the
+      // fragment instances for this query.
+      LOG(INFO) << "Cancelling fragment instances as directed by the coordinator. "
+                << "Returned status: " << result_status.GetDetail();
     }
     Cancel();
   }