Allow using custom script for chronicle queue BinLog archival
authorpksivar <prsivaraju@walmart.com>
Fri, 10 Aug 2018 18:55:54 +0000 (11:55 -0700)
committerMarcus Eriksson <marcuse@apache.org>
Thu, 11 Oct 2018 11:06:34 +0000 (13:06 +0200)
Patch by Pramod K Sivaraju and marcuse; reviewed by Ariel Weisberg
and Sam Tunnicliffe for CASSANDRA-14373

24 files changed:
CHANGES.txt
bin/nodetool
conf/cassandra.yaml
src/java/org/apache/cassandra/audit/AuditLogManager.java
src/java/org/apache/cassandra/audit/AuditLogOptions.java
src/java/org/apache/cassandra/audit/BinAuditLogger.java
src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java [new file with mode: 0644]
src/java/org/apache/cassandra/config/Config.java
src/java/org/apache/cassandra/config/DatabaseDescriptor.java
src/java/org/apache/cassandra/service/StorageProxy.java
src/java/org/apache/cassandra/service/StorageProxyMBean.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/tools/nodetool/EnableFullQueryLog.java
src/java/org/apache/cassandra/utils/binlog/BinLog.java
src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java [new file with mode: 0644]
src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java [new file with mode: 0644]
src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java [new file with mode: 0644]
src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java [new file with mode: 0644]
test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java [new file with mode: 0644]
test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java [new file with mode: 0644]

index e1fbb90..0bfc8c7 100644 (file)
@@ -1,4 +1,5 @@
 4.0
+ * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373)
  * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759)
  * ReplicaCollection follow-up (CASSANDRA-14726)
  * Transient node receives full data requests (CASSANDRA-14762)
index 3104582..7ba57b7 100755 (executable)
@@ -79,6 +79,11 @@ do
       fi
       JVM_ARGS="$JVM_ARGS -Dssl.enable=true $SSL_ARGS"
       ;;
+    --archive-command)
+      # archive-command can be multi-word, we need to special handle that in POSIX shell
+      ARCHIVE_COMMAND="$2"
+      shift
+      ;;
     -D*)
       JVM_ARGS="$JVM_ARGS $1"
       ;;
@@ -93,11 +98,19 @@ if [ "x$MAX_HEAP_SIZE" = "x" ]; then
     MAX_HEAP_SIZE="128m"
 fi
 
-"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
-        -XX:ParallelGCThreads=1 \
-        -Dcassandra.storagedir="$cassandra_storagedir" \
-        -Dlogback.configurationFile=logback-tools.xml \
-        $JVM_ARGS \
-        org.apache.cassandra.tools.NodeTool -p $JMX_PORT $ARGS
+CMD=$(echo "$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+            -XX:ParallelGCThreads=1 \
+            -Dcassandra.storagedir="$cassandra_storagedir" \
+            -Dlogback.configurationFile=logback-tools.xml \
+            $JVM_ARGS \
+            org.apache.cassandra.tools.NodeTool -p $JMX_PORT $ARGS)
+
+if [ "x$ARCHIVE_COMMAND" != "x" ]
+then
+  exec $CMD "--archive-command" "${ARCHIVE_COMMAND}"
+else
+  exec $CMD
+fi
+
 
 # vi:ai sw=4 ts=4 tw=0 et
index 190ce77..feb9037 100644 (file)
@@ -1219,12 +1219,32 @@ audit_logging_options:
     enabled: false
     logger: BinAuditLogger
     # audit_logs_dir:
-    # included_keyspaces: 
+    # included_keyspaces:
     # excluded_keyspaces:
     # included_categories:
     # excluded_categories:
     # included_users:
     # excluded_users:
+    # roll_cycle: HOURLY
+    # block: true
+    # max_queue_weight: 268435456 # 256 MiB
+    # max_log_size: 17179869184 # 16 GiB
+    ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled:
+    # archive_command:
+    # max_archive_retries: 10
+
+
+# default options for full query logging - these can be overridden from command line when executing
+# nodetool enablefullquerylog
+#full_query_logging_options:
+    # log_dir:
+    # roll_cycle: HOURLY
+    # block: true
+    # max_queue_weight: 268435456 # 256 MiB
+    # max_log_size: 17179869184 # 16 GiB
+    ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled:
+    # archive_command:
+    # max_archive_retries: 10
 
 # validate tombstones on reads and compaction
 # can be either "disabled", "warn" or "exception"
index 25966f7..041bdee 100644 (file)
@@ -293,7 +293,7 @@ public class AuditLogManager
         oldLogger.stop();
     }
 
-    public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    public void configureFQL(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries)
     {
         if (path.equals(auditLogger.path()))
             throw new IllegalArgumentException(String.format("fullquerylogger path (%s) cannot be the same as the " +
@@ -301,7 +301,7 @@ public class AuditLogManager
                                                              path,
                                                              auditLogger.path()));
 
-        fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+        fullQueryLogger.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries);
     }
 
     public void resetFQL(String fullQueryLogPath)
index 1888c45..3d0efa3 100644 (file)
@@ -19,7 +19,9 @@ package org.apache.cassandra.audit;
 
 import org.apache.commons.lang3.StringUtils;
 
-public class AuditLogOptions
+import org.apache.cassandra.utils.binlog.BinLogOptions;
+
+public class AuditLogOptions extends BinLogOptions
 {
     public volatile boolean enabled = false;
     public String logger = BinAuditLogger.class.getSimpleName();
@@ -35,27 +37,24 @@ public class AuditLogOptions
      */
     public String audit_logs_dir = System.getProperty("cassandra.logdir.audit",
                                                       System.getProperty("cassandra.logdir",".")+"/audit/");
-    /**
-     * Indicates if the AuditLog should block if the it falls behind or should drop audit log records.
-     * Default is set to true so that AuditLog records wont be lost
-     */
-    public boolean block = true;
-
-    /**
-     * Maximum weight of in memory queue for records waiting to be written to the audit log file
-     * before blocking or dropping the log records. For advanced configurations
-     */
-    public int max_queue_weight = 256 * 1024 * 1024;
 
-    /**
-     * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations
-     */
-    public long max_log_size = 16L * 1024L * 1024L * 1024L;
-
-    /**
-     * How often to roll Audit log segments so they can potentially be reclaimed. Available options are:
-     * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.
-     * For more options, refer: net.openhft.chronicle.queue.RollCycles
-     */
-    public String roll_cycle = "HOURLY";
+    public String toString()
+    {
+        return "AuditLogOptions{" +
+               "enabled=" + enabled +
+               ", logger='" + logger + '\'' +
+               ", included_keyspaces='" + included_keyspaces + '\'' +
+               ", excluded_keyspaces='" + excluded_keyspaces + '\'' +
+               ", included_categories='" + included_categories + '\'' +
+               ", excluded_categories='" + excluded_categories + '\'' +
+               ", included_users='" + included_users + '\'' +
+               ", excluded_users='" + excluded_users + '\'' +
+               ", audit_logs_dir='" + audit_logs_dir + '\'' +
+               ", archive_command='" + archive_command + '\'' +
+               ", roll_cycle='" + roll_cycle + '\'' +
+               ", block=" + block +
+               ", max_queue_weight=" + max_queue_weight +
+               ", max_log_size=" + max_log_size +
+               '}';
+    }
 }
index 3ac9499..bd3a158 100644 (file)
@@ -39,7 +39,9 @@ public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger
                   auditLoggingOptions.block,
                   auditLoggingOptions.max_queue_weight,
                   auditLoggingOptions.max_log_size,
-                  false);
+                  false,
+                  auditLoggingOptions.archive_command,
+                  auditLoggingOptions.max_archive_retries);
     }
 
     @Override
index 7534650..d43bb4a 100644 (file)
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,9 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.binlog.BinLogArchiver;
+import org.apache.cassandra.utils.binlog.DeletingArchiver;
+import org.apache.cassandra.utils.binlog.ExternalArchiver;
 
 abstract class BinLogAuditLogger implements IAuditLogger
 {
@@ -55,10 +59,12 @@ abstract class BinLogAuditLogger implements IAuditLogger
      * @param blocking Whether the FQL should block if the FQL falls behind or should drop log records
      * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
      * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
+     * @param archiveCommand the archive command to execute on rolled log files
+     * @param maxArchiveRetries max number of retries of failed archive commands
      */
-    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries)
     {
-        this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true);
+        this.configure(path, rollCycle, blocking, maxQueueWeight, maxLogSize, true, archiveCommand, maxArchiveRetries);
     }
 
     /**
@@ -69,8 +75,10 @@ abstract class BinLogAuditLogger implements IAuditLogger
      * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
      * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
      * @param cleanDirectory Indicates to clean the directory before starting FullQueryLogger or not
+     * @param archiveCommand the archive command to execute on rolled log files
+     * @param maxArchiveRetries max number of retries of failed archive commands
      */
-    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory)
+    public synchronized void configure(Path path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize, boolean cleanDirectory, String archiveCommand, int maxArchiveRetries)
     {
         Preconditions.checkNotNull(path, "path was null");
         File pathAsFile = path.toFile();
@@ -83,7 +91,7 @@ abstract class BinLogAuditLogger implements IAuditLogger
         Preconditions.checkNotNull(RollCycles.valueOf(rollCycle), "unrecognized roll cycle");
         Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
         Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
-        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}", path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+        logger.info("Attempting to configure full query logger path: {} Roll cycle: {} Blocking: {} Max queue weight: {} Max log size:{}, archive command: {}", path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand);
 
         if (binLog != null)
         {
@@ -91,6 +99,8 @@ abstract class BinLogAuditLogger implements IAuditLogger
             throw new IllegalStateException("Already configured");
         }
 
+        // create the archiver before cleaning directories - ExternalArchiver will try to archive any existing file.
+        BinLogArchiver archiver = Strings.isNullOrEmpty(archiveCommand) ? new DeletingArchiver(maxLogSize) : new ExternalArchiver(archiveCommand, path, maxArchiveRetries);
         if (cleanDirectory)
         {
             logger.info("Cleaning directory: {} as requested",path);
@@ -103,10 +113,9 @@ abstract class BinLogAuditLogger implements IAuditLogger
                 }
             }
         }
-
         this.path = path;
         this.blocking = blocking;
-        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, maxLogSize);
+        binLog = new BinLog(path, RollCycles.valueOf(rollCycle), maxQueueWeight, archiver);
         binLog.start();
     }
 
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java b/src/java/org/apache/cassandra/audit/FullQueryLoggerOptions.java
new file mode 100644 (file)
index 0000000..825a8b8
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.cassandra.utils.binlog.BinLogOptions;
+
+public class FullQueryLoggerOptions extends BinLogOptions
+{
+    public String log_dir = StringUtils.EMPTY;
+
+    public String toString()
+    {
+        return "FullQueryLoggerOptions{" +
+               "log_dir='" + log_dir + '\'' +
+               ", archive_command='" + archive_command + '\'' +
+               ", roll_cycle='" + roll_cycle + '\'' +
+               ", block=" + block +
+               ", max_queue_weight=" + max_queue_weight +
+               ", max_log_size=" + max_log_size +
+               '}';
+    }
+}
index 782815e..9049131 100644 (file)
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.audit.AuditLogOptions;
+import org.apache.cassandra.audit.FullQueryLoggerOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 
 /**
@@ -379,8 +380,6 @@ public class Config
     public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
     public int repair_command_pool_size = concurrent_validations;
 
-    public String full_query_log_dir = null;
-
     // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
     public int block_for_peers_percentage = 70;
     public int block_for_peers_timeout_in_secs = 10;
@@ -389,6 +388,7 @@ public class Config
     public boolean stream_entire_sstables = true;
 
     public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
+    public volatile FullQueryLoggerOptions full_query_logging_options = new FullQueryLoggerOptions();
 
     public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
 
index dc76431..de87de5 100644 (file)
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.audit.AuditLogOptions;
+import org.apache.cassandra.audit.FullQueryLoggerOptions;
 import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
 import org.apache.cassandra.auth.AuthConfig;
 import org.apache.cassandra.auth.IAuthenticator;
@@ -2604,9 +2605,9 @@ public class DatabaseDescriptor
         return conf.repair_command_pool_full_strategy;
     }
 
-    public static String getFullQueryLogPath()
+    public static FullQueryLoggerOptions getFullQueryLogOptions()
     {
-        return  conf.full_query_log_dir;
+        return  conf.full_query_logging_options;
     }
 
     public static int getBlockForPeersPercentage()
index b3adc47..630dc5d 100644 (file)
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service;
 
+import java.io.File;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
@@ -29,6 +30,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
@@ -40,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.audit.AuditLogManager;
+import org.apache.cassandra.audit.FullQueryLoggerOptions;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.Stage;
@@ -2720,17 +2723,25 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @Override
-    public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize)
+    public void configureFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries)
     {
-        path = path != null ? path : DatabaseDescriptor.getFullQueryLogPath();
-        Preconditions.checkNotNull(path, "cassandra.yaml did not set full_query_log_dir and not set as parameter");
-        AuditLogManager.getInstance().configureFQL(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize);
+        FullQueryLoggerOptions fqlOptions = DatabaseDescriptor.getFullQueryLogOptions();
+        path = path != null ? path : fqlOptions.log_dir;
+        rollCycle = rollCycle != null ? rollCycle : fqlOptions.roll_cycle;
+        blocking = blocking != null ? blocking : fqlOptions.block;
+        maxQueueWeight = maxQueueWeight != Integer.MIN_VALUE ? maxQueueWeight : fqlOptions.max_queue_weight;
+        maxLogSize = maxLogSize != Long.MIN_VALUE ? maxLogSize : fqlOptions.max_log_size;
+        archiveCommand = archiveCommand != null ? archiveCommand : fqlOptions.archive_command;
+        maxArchiveRetries = maxArchiveRetries != Integer.MIN_VALUE ? maxArchiveRetries : fqlOptions.max_archive_retries;
+
+        Preconditions.checkNotNull(path, "cassandra.yaml did not set log_dir and not set as parameter");
+        AuditLogManager.getInstance().configureFQL(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries);
     }
 
     @Override
     public void resetFullQueryLogger()
     {
-        AuditLogManager.getInstance().resetFQL(DatabaseDescriptor.getFullQueryLogPath());
+        AuditLogManager.getInstance().resetFQL(DatabaseDescriptor.getFullQueryLogOptions().log_dir);
     }
 
     @Override
index efc163d..95f5f26 100644 (file)
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.db.ConsistencyLevel;
 
 public interface StorageProxyMBean
@@ -80,8 +82,11 @@ public interface StorageProxyMBean
      * @param blocking Whether threads submitting queries to the query log should block if they can't be drained to the filesystem or alternatively drops samples and log
      * @param maxQueueWeight How many bytes of query data to queue before blocking or dropping samples
      * @param maxLogSize How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted.
+     * @param archiveCommand executable archiving the rolled log files,
+     * @param maxArchiveRetries max number of times to retry a failing archive command
+     *
      */
-    public void configureFullQueryLogger(String path, String rollCycle, boolean blocking, int maxQueueWeight, long maxLogSize);
+    public void configureFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, @Nullable String archiveCommand, int maxArchiveRetries);
 
     /**
      * Disable the full query logger if it is enabled.
index abc189e..7cd99de 100644 (file)
@@ -5352,9 +5352,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         AuditLogManager.getInstance().enableAuditLog(auditLogOptions);
 
         logger.info("AuditLog is enabled with logger: [{}], included_keyspaces: [{}], excluded_keyspaces: [{}], " +
-                    "included_categories: [{}], excluded_categories: [{}]," +
-                    "included_users: [{}], excluded_users: [{}],", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces,
-                    auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users);
+                    "included_categories: [{}], excluded_categories: [{}], included_users: [{}], "
+                    + "excluded_users: [{}], archive_command: [{}]", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces,
+                    auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users,
+                    auditLogOptions.archive_command);
 
     }
 
index 624a301..1d35e66 100644 (file)
@@ -23,27 +23,34 @@ import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
-@Command(name = "enablefullquerylog", description = "Enable full query logging")
+@Command(name = "enablefullquerylog", description = "Enable full query logging, defaults for the options are configured in cassandra.yaml")
 public class EnableFullQueryLog extends NodeToolCmd
 {
-    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file (MINUTELY, HOURLY, DAILY). Default HOURLY.")
-    private String rollCycle = "HOURLY";
+    @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file (MINUTELY, HOURLY, DAILY).")
+    private String rollCycle = null;
 
-    @Option(title = "blocking", name = {"--blocking"}, description = "If the queue is full whether to block producers or drop samples. Default true.")
-    private boolean blocking = true;
+    @Option(title = "blocking", name = {"--blocking"}, description = "If the queue is full whether to block producers or drop samples.")
+    private Boolean blocking = null;
 
-    @Option(title = "max_queue_weight", name = {"--max-queue-weight"}, description = "Maximum number of bytes of query data to queue to disk before blocking or dropping samples. Default 256 megabytes.")
-    private int maxQueueWeight = 256 * 1024 * 1024;
+    @Option(title = "max_queue_weight", name = {"--max-queue-weight"}, description = "Maximum number of bytes of query data to queue to disk before blocking or dropping samples.")
+    private int maxQueueWeight = Integer.MIN_VALUE;
 
-    @Option(title = "max_log_size", name = {"--max-log-size"}, description = "How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted. Default 16 gigabytes.")
-    private long maxLogSize = 16L * 1024L * 1024L * 1024L;
+    @Option(title = "max_log_size", name = {"--max-log-size"}, description = "How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted.")
+    private long maxLogSize = Long.MIN_VALUE;
 
-    @Option(title = "path", name = {"--path"}, description = "Path to store the full query log at. Will have it's contents recursively deleted. If not set the value from cassandra.yaml will be used.")
+    @Option(title = "path", name = {"--path"}, description = "Path to store the full query log at. Will have it's contents recursively deleted.")
     private String path = null;
 
+    @Option(title = "archive_command", name = {"--archive-command"}, description = "Command that will handle archiving rolled full query log files." +
+                                                                                   " Format is \"/path/to/script.sh %path\" where %path will be replaced with the file to archive")
+    private String archiveCommand = null;
+
+    @Option(title = "archive_retries", name = {"--max-archive-retries"}, description = "Max number of archive retries.")
+    private int archiveRetries = Integer.MIN_VALUE;
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.getSpProxy().configureFullQueryLogger(path, rollCycle, blocking, maxQueueWeight, maxLogSize);
+        probe.getSpProxy().configureFullQueryLogger(path, rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, archiveRetries);
     }
 }
\ No newline at end of file
index 0c8659e..d4dac78 100644 (file)
 
 package org.apache.cassandra.utils.binlog;
 
-import java.io.File;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,7 +32,6 @@ import net.openhft.chronicle.queue.ChronicleQueue;
 import net.openhft.chronicle.queue.ChronicleQueueBuilder;
 import net.openhft.chronicle.queue.ExcerptAppender;
 import net.openhft.chronicle.queue.RollCycle;
-import net.openhft.chronicle.queue.impl.StoreFileListener;
 import net.openhft.chronicle.wire.WireOut;
 import net.openhft.chronicle.wire.WriteMarshallable;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -53,7 +49,7 @@ import org.apache.cassandra.utils.concurrent.WeightedQueue;
  * to handle writing the log, making it available for readers, as well as log rolling.
  *
  */
-public class BinLog implements Runnable, StoreFileListener
+public class BinLog implements Runnable
 {
     private static final Logger logger = LoggerFactory.getLogger(BinLog.class);
 
@@ -62,17 +58,7 @@ public class BinLog implements Runnable, StoreFileListener
     @VisibleForTesting
     Thread binLogThread = new NamedThreadFactory("Binary Log thread").newThread(this);
     final WeightedQueue<ReleaseableWriteMarshallable> sampleQueue;
-    private final long maxLogSize;
-
-    /**
-     * The files in the chronicle queue that have already rolled
-     */
-    private Queue<File> chronicleStoreFiles = new ConcurrentLinkedQueue<>();
-
-    /**
-     * The number of bytes in store files that have already rolled
-     */
-    private long bytesInStoreFiles;
+    private final BinLogArchiver archiver;
 
     private static final ReleaseableWriteMarshallable NO_OP = new ReleaseableWriteMarshallable()
     {
@@ -90,25 +76,23 @@ public class BinLog implements Runnable, StoreFileListener
     private volatile boolean shouldContinue = true;
 
     /**
-     *
-     * @param path Path to store the BinLog. Can't be shared with anything else.
-     * @param rollCycle How often to roll the log file so it can potentially be deleted
+     * @param path           Path to store the BinLog. Can't be shared with anything else.
+     * @param rollCycle      How often to roll the log file so it can potentially be deleted
      * @param maxQueueWeight Maximum weight of in memory queue for records waiting to be written to the file before blocking or dropping
-     * @param maxLogSize Maximum size of the rolled files to retain on disk before deleting the oldest file
      */
-    public BinLog(Path path, RollCycle rollCycle, int maxQueueWeight, long maxLogSize)
+    public BinLog(Path path, RollCycle rollCycle, int maxQueueWeight, BinLogArchiver archiver)
     {
         Preconditions.checkNotNull(path, "path was null");
         Preconditions.checkNotNull(rollCycle, "rollCycle was null");
         Preconditions.checkArgument(maxQueueWeight > 0, "maxQueueWeight must be > 0");
-        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
         ChronicleQueueBuilder builder = ChronicleQueueBuilder.single(path.toFile());
         builder.rollCycle(rollCycle);
-        builder.storeFileListener(this);
+
+        sampleQueue = new WeightedQueue<>(maxQueueWeight);
+        this.archiver = archiver;
+        builder.storeFileListener(this.archiver);
         queue = builder.build();
         appender = queue.acquireAppender();
-        sampleQueue = new WeightedQueue<>(maxQueueWeight);
-        this.maxLogSize = maxLogSize;
     }
 
     /**
@@ -139,6 +123,7 @@ public class BinLog implements Runnable, StoreFileListener
         binLogThread.join();
         appender = null;
         queue = null;
+        archiver.stop();
     }
 
     /**
@@ -226,34 +211,6 @@ public class BinLog implements Runnable, StoreFileListener
         finalize();
     }
 
-    /**
-     * Track store files as they are added and their storage impact. Delete them if over storage limit.
-     * @param cycle
-     * @param file
-     */
-    public synchronized void onReleased(int cycle, File file)
-    {
-        chronicleStoreFiles.offer(file);
-        //This isn't accurate because the files are sparse, but it's at least pessimistic
-        bytesInStoreFiles += file.length();
-        logger.debug("Chronicle store file {} rolled file size {}", file.getPath(), file.length());
-        while (bytesInStoreFiles > maxLogSize & !chronicleStoreFiles.isEmpty())
-        {
-            File toDelete = chronicleStoreFiles.poll();
-            long toDeleteLength = toDelete.length();
-            if (!toDelete.delete())
-            {
-                logger.error("Failed to delete chronicle store file: {} store file size: {} bytes in store files: {}. " +
-                             "You will need to clean this up manually or reset full query logging.",
-                             toDelete.getPath(), toDeleteLength, bytesInStoreFiles);
-            }
-            else
-            {
-                bytesInStoreFiles -= toDeleteLength;
-                logger.info("Deleted chronicle store file: {} store file size: {} bytes in store files: {} max log size: {}.", file.getPath(), toDeleteLength, bytesInStoreFiles, maxLogSize);
-            }
-        }
-    }
 
     /**
      * There is a race where we might not release a buffer, going to let finalization
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java b/src/java/org/apache/cassandra/utils/binlog/BinLogArchiver.java
new file mode 100644 (file)
index 0000000..9a6f0bc
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+
+import net.openhft.chronicle.queue.impl.StoreFileListener;
+
+public interface BinLogArchiver extends StoreFileListener
+{
+    public void onReleased(int cycle, File file);
+    public void stop();
+}
diff --git a/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java b/src/java/org/apache/cassandra/utils/binlog/BinLogOptions.java
new file mode 100644 (file)
index 0000000..8005ca3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class BinLogOptions
+{
+    public String archive_command = StringUtils.EMPTY;
+    /**
+     * How often to roll BinLog segments so they can potentially be reclaimed. Available options are:
+     * MINUTELY, HOURLY, DAILY, LARGE_DAILY, XLARGE_DAILY, HUGE_DAILY.
+     * For more options, refer: net.openhft.chronicle.queue.RollCycles
+     */
+    public String roll_cycle = "HOURLY";
+    /**
+     * Indicates if the BinLog should block if the it falls behind or should drop bin log records.
+     * Default is set to true so that BinLog records wont be lost
+     */
+    public boolean block = true;
+
+    /**
+     * Maximum weight of in memory queue for records waiting to be written to the binlog file
+     * before blocking or dropping the log records. For advanced configurations
+     */
+    public int max_queue_weight = 256 * 1024 * 1024;
+
+    /**
+     * Maximum size of the rolled files to retain on disk before deleting the oldest file. For advanced configurations.
+     */
+    public long max_log_size = 16L * 1024L * 1024L * 1024L;
+
+    /**
+     * Limit the number of times to retry a command.
+     */
+    public int max_archive_retries = 10;
+}
diff --git a/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java b/src/java/org/apache/cassandra/utils/binlog/DeletingArchiver.java
new file mode 100644 (file)
index 0000000..3bdbb8f
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeletingArchiver implements BinLogArchiver
+{
+    private static final Logger logger = LoggerFactory.getLogger(DeletingArchiver.class);
+    /**
+     * The files in the chronicle queue that have already rolled
+     */
+    private final Queue<File> chronicleStoreFiles = new ConcurrentLinkedQueue<>();
+    private final long maxLogSize;
+    /**
+     * The number of bytes in store files that have already rolled
+     */
+    private long bytesInStoreFiles;
+
+    public DeletingArchiver(long maxLogSize)
+    {
+        Preconditions.checkArgument(maxLogSize > 0, "maxLogSize must be > 0");
+        this.maxLogSize = maxLogSize;
+    }
+
+    /**
+     * Track store files as they are added and their storage impact. Delete them if over storage limit.
+     * @param cycle
+     * @param file
+     */
+    public synchronized void onReleased(int cycle, File file)
+    {
+        chronicleStoreFiles.offer(file);
+        //This isn't accurate because the files are sparse, but it's at least pessimistic
+        bytesInStoreFiles += file.length();
+        logger.debug("Chronicle store file {} rolled file size {}", file.getPath(), file.length());
+        while (bytesInStoreFiles > maxLogSize & !chronicleStoreFiles.isEmpty())
+        {
+            File toDelete = chronicleStoreFiles.poll();
+            long toDeleteLength = toDelete.length();
+            if (!toDelete.delete())
+            {
+                logger.error("Failed to delete chronicle store file: {} store file size: {} bytes in store files: {}. " +
+                             "You will need to clean this up manually or reset full query logging.",
+                             toDelete.getPath(), toDeleteLength, bytesInStoreFiles);
+            }
+            else
+            {
+                bytesInStoreFiles -= toDeleteLength;
+                logger.info("Deleted chronicle store file: {} store file size: {} bytes in store files: {} max log size: {}.",
+                            file.getPath(), toDeleteLength, bytesInStoreFiles, maxLogSize);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    long getBytesInStoreFiles()
+    {
+        return bytesInStoreFiles;
+    }
+
+    public void stop()
+    {
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java b/src/java/org/apache/cassandra/utils/binlog/ExternalArchiver.java
new file mode 100644 (file)
index 0000000..e53c5b0
--- /dev/null
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Longs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Archives binary log files immediately when they are rolled using a configure archive command.
+ *
+ * The archive command should be "/path/to/script.sh %path" where %path will be replaced with the file to be archived
+ */
+public class ExternalArchiver implements BinLogArchiver
+{
+    private static final Logger logger = LoggerFactory.getLogger(ExternalArchiver.class);
+    // used to replace %path with the actual file to archive when calling the archive command
+    private static final Pattern PATH = Pattern.compile("%path");
+    private static final long DEFAULT_RETRY_DELAY_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
+
+    /**
+     * use a DelayQueue to simplify retries - we want first tries to be executed immediately and retries should wait DEFAULT_RETRY_DELAY_MS
+     */
+    private final DelayQueue<DelayFile> archiveQueue = new DelayQueue<>();
+    private final String archiveCommand;
+    private final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("BinLogArchiver"));
+    private final Path path;
+    /**
+     * for testing, to be able to make sure that the command is executed
+     */
+    private final ExecCommand commandExecutor;
+    private volatile boolean shouldContinue = true;
+
+    public ExternalArchiver(String archiveCommand, Path path, int maxArchiveRetries)
+    {
+        this(archiveCommand, path, DEFAULT_RETRY_DELAY_MS, maxArchiveRetries, ExternalArchiver::exec);
+    }
+
+    @VisibleForTesting
+    ExternalArchiver(String archiveCommand, Path path, long retryDelayMs, int maxRetries, ExecCommand command)
+    {
+        this.archiveCommand = archiveCommand;
+        this.commandExecutor = command;
+        // if there are any .cq4 files in path, archive them on startup - this handles any leftover files from crashes etc
+        archiveExisting(path);
+        this.path = path;
+
+        executor.execute(() -> {
+           while (shouldContinue)
+           {
+               DelayFile toArchive = null;
+               try
+               {
+                   toArchive = archiveQueue.poll(100, TimeUnit.MILLISECONDS);
+                   if (toArchive != null)
+                       archiveFile(toArchive.file);
+               }
+               catch (Throwable t)
+               {
+                   if (toArchive != null)
+                   {
+
+                       if (toArchive.retries < maxRetries)
+                       {
+                           logger.error("Got error archiving {}, retrying in {} minutes", toArchive.file, TimeUnit.MINUTES.convert(retryDelayMs, TimeUnit.MILLISECONDS), t);
+                           archiveQueue.add(new DelayFile(toArchive.file, retryDelayMs, TimeUnit.MILLISECONDS, toArchive.retries + 1));
+                       }
+                       else
+                       {
+                           logger.error("Max retries {} reached for {}, leaving on disk", toArchive.retries, toArchive.file, t);
+                       }
+                   }
+                   else
+                       logger.error("Got error waiting for files to archive", t);
+               }
+           }
+           logger.debug("Exiting archiver thread");
+        });
+    }
+
+    public void onReleased(int cycle, File file)
+    {
+        logger.debug("BinLog file released: {}", file);
+        archiveQueue.add(new DelayFile(file, 0, TimeUnit.MILLISECONDS, 0));
+    }
+
+    /**
+     * Stops the archiver thread and tries to archive all existing files
+     *
+     * this handles the case where a user explicitly disables full/audit log and would expect all log files to be archived
+     * rolled or not
+     */
+    public void stop()
+    {
+        shouldContinue = false;
+        try
+        {
+            // wait for the archiver thread to stop;
+            executor.submit(() -> {}).get();
+            // and try to archive all remaining files before exiting
+            archiveExisting(path);
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Iterates over all files in path, executing the archive command for each.
+     */
+    private void archiveExisting(Path path)
+    {
+        if (path == null)
+            return;
+        for (File f : path.toFile().listFiles((f) -> f.isFile() && f.getName().endsWith(SingleChronicleQueue.SUFFIX)))
+        {
+            try
+            {
+                logger.debug("Archiving existing file {}", f);
+                archiveFile(f);
+            }
+            catch (IOException e)
+            {
+                logger.error("Got error archiving existing file {}", f, e);
+            }
+        }
+    }
+
+    private void archiveFile(File f) throws IOException
+    {
+        String cmd = PATH.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(f.getAbsolutePath()));
+        logger.debug("Executing archive command: {}", cmd);
+        commandExecutor.exec(cmd);
+    }
+
+    static void exec(String command) throws IOException
+    {
+        ProcessBuilder pb = new ProcessBuilder(command.split(" "));
+        pb.redirectErrorStream(true);
+        FBUtilities.exec(pb);
+    }
+
+    private static class DelayFile implements Delayed
+    {
+        public final File file;
+        private final long delayTime;
+        private final int retries;
+
+        public DelayFile(File file, long delay, TimeUnit delayUnit, int retries)
+        {
+            this.file = file;
+            this.delayTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(delay, delayUnit);
+            this.retries = retries;
+        }
+        public long getDelay(TimeUnit unit)
+        {
+            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        }
+
+        public int compareTo(Delayed o)
+        {
+            DelayFile other = (DelayFile)o;
+            return Longs.compare(delayTime, other.delayTime);
+        }
+    }
+
+    interface ExecCommand
+    {
+        public void exec(String command) throws IOException;
+    }
+}
index 5fe078a..525fa8e 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.lang3.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -54,6 +55,7 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.binlog.BinLogTest;
+import org.apache.cassandra.utils.binlog.DeletingArchiver;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -101,31 +103,31 @@ public class FullQueryLoggerTest extends CQLTester
     @Test(expected = NullPointerException.class)
     public void testConfigureNullPath() throws Exception
     {
-        instance.configure(null, "", true, 1, 1);
+        instance.configure(null, "", true, 1, 1, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = NullPointerException.class)
     public void testConfigureNullRollCycle() throws Exception
     {
-        instance.configure(BinLogTest.tempDir(), null, true, 1, 1);
+        instance.configure(BinLogTest.tempDir(), null, true, 1, 1, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testConfigureInvalidRollCycle() throws Exception
     {
-        instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1);
+        instance.configure(BinLogTest.tempDir(), "foobar", true, 1, 1, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testConfigureInvalidMaxQueueWeight() throws Exception
     {
-        instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1);
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 0, 1, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testConfigureInvalidMaxQueueLogSize() throws Exception
     {
-        instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0);
+        instance.configure(BinLogTest.tempDir(), "DAILY", true, 1, 0, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -133,7 +135,7 @@ public class FullQueryLoggerTest extends CQLTester
     {
         File f = FileUtils.createTempFile("foo", "bar");
         f.deleteOnExit();
-        instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1);
+        instance.configure(f.toPath(), "TEST_SECONDLY", true, 1, 1, StringUtils.EMPTY, 10);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -344,7 +346,7 @@ public class FullQueryLoggerTest extends CQLTester
     @Test
     public void testNonBlocking() throws Exception
     {
-        instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256);
+        instance.configure(tempDir, "TEST_SECONDLY", false, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10);
         //Prevent the bin log thread from making progress, causing the task queue to refuse tasks
         Semaphore blockBinLog = new Semaphore(0);
         try
@@ -689,7 +691,7 @@ public class FullQueryLoggerTest extends CQLTester
 
     private void configureFQL() throws Exception
     {
-        instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256);
+        instance.configure(tempDir, "TEST_SECONDLY", true, 1, 1024 * 1024 * 256, StringUtils.EMPTY, 10);
     }
 
     private void logQuery(String query)
index 782e3b1..f23e909 100644 (file)
@@ -126,6 +126,8 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.audit.BinLogAuditLogger",
     "org.apache.cassandra.audit.FullQueryLogger",
     "org.apache.cassandra.audit.AuditLogOptions",
+    "org.apache.cassandra.utils.binlog.BinLogOptions",
+    "org.apache.cassandra.audit.FullQueryLoggerOptions",
     // generated classes
     "org.apache.cassandra.config.ConfigBeanInfo",
     "org.apache.cassandra.config.ConfigCustomizer",
index d564e41..76c42f2 100644 (file)
@@ -36,6 +36,8 @@ import net.openhft.chronicle.queue.ExcerptTailer;
 import net.openhft.chronicle.queue.RollCycles;
 import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.audit.AuditLogOptions;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
 
 import static org.junit.Assert.assertEquals;
@@ -63,7 +65,7 @@ public class BinLogTest
     public void setUp() throws Exception
     {
         path = tempDir();
-        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, 1024 * 1024 * 128);
+        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10, new DeletingArchiver(1024 * 1024 * 128));
         binLog.start();
     }
 
@@ -83,25 +85,25 @@ public class BinLogTest
     @Test(expected = NullPointerException.class)
     public void testConstructorNullPath() throws Exception
     {
-        new BinLog(null, RollCycles.TEST_SECONDLY, 1, 1);
+        new BinLog(null, RollCycles.TEST_SECONDLY, 1, new DeletingArchiver(1));
     }
 
     @Test(expected = NullPointerException.class)
     public void testConstructorNullRollCycle() throws Exception
     {
-        new BinLog(tempDir(), null, 1, 1);
+        new BinLog(tempDir(), null, 1, new DeletingArchiver(1));
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testConstructorZeroWeight() throws Exception
     {
-        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, 1);
+        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, new DeletingArchiver(1));
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testConstructorLogSize() throws Exception
     {
-        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 1, 0);
+        new BinLog(tempDir(), RollCycles.TEST_SECONDLY, 0, new DeletingArchiver(1));
     }
 
     /**
@@ -345,7 +347,7 @@ public class BinLogTest
     public void testCleanupOnOversize() throws Exception
     {
         tearDown();
-        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 10000, 1);
+        binLog = new BinLog(path, RollCycles.TEST_SECONDLY, 1, new DeletingArchiver(10000));
         binLog.start();
         for (int ii = 0; ii < 5; ii++)
         {
diff --git a/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java b/test/unit/org/apache/cassandra/utils/binlog/DeletingArchiverTest.java
new file mode 100644 (file)
index 0000000..cd6b7a3
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DeletingArchiverTest
+{
+    @Test
+    public void testDelete() throws IOException
+    {
+        DeletingArchiver da = new DeletingArchiver(45);
+        List<File> files = generateFiles(10, 5);
+        for (File f : files)
+            da.onReleased(1, f);
+        // adding 5 files, each with size 10, this means the first one should have been deleted:
+        assertFalse(files.get(0).exists());
+        for (int i = 1; i < files.size(); i++)
+            assertTrue(files.get(i).exists());
+        assertEquals(40, da.getBytesInStoreFiles());
+    }
+
+    @Test
+    public void testArchiverBigFile() throws IOException
+    {
+        DeletingArchiver da = new DeletingArchiver(45);
+        List<File> largeFiles = generateFiles(50, 1);
+        da.onReleased(1, largeFiles.get(0));
+        assertFalse(largeFiles.get(0).exists());
+        assertEquals(0, da.getBytesInStoreFiles());
+    }
+
+    @Test
+    public void testArchiverSizeTracking() throws IOException
+    {
+        DeletingArchiver da = new DeletingArchiver(45);
+        List<File> smallFiles = generateFiles(10, 4);
+        List<File> largeFiles = generateFiles(40, 1);
+
+        for (File f : smallFiles)
+        {
+            da.onReleased(1, f);
+        }
+        assertEquals(40, da.getBytesInStoreFiles());
+        // we now have 40 bytes in deleting archiver, adding the large 40 byte file should delete all the small ones
+        da.onReleased(1, largeFiles.get(0));
+        for (File f : smallFiles)
+            assertFalse(f.exists());
+
+        smallFiles = generateFiles(10, 4);
+
+        // make sure that size tracking is ok - all 4 new small files should still be there and the large one should be gone
+        for (File f : smallFiles)
+            da.onReleased(1, f);
+
+        assertFalse(largeFiles.get(0).exists());
+        for (File f : smallFiles)
+            assertTrue(f.exists());
+        assertEquals(40, da.getBytesInStoreFiles());
+    }
+
+
+    private List<File> generateFiles(int size, int count) throws IOException
+    {
+        Random r = new Random();
+        List<File> files = new ArrayList<>(count);
+        byte [] content = new byte[size];
+        r.nextBytes(content);
+
+        for (int i = 0; i < count; i++)
+        {
+            Path p = Files.createTempFile("logfile", ".cq4");
+            Files.write(p, content);
+            files.add(p.toFile());
+        }
+        files.forEach(File::deleteOnExit);
+        return files;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java b/test/unit/org/apache/cassandra/utils/binlog/ExternalArchiverTest.java
new file mode 100644 (file)
index 0000000..284ff5a
--- /dev/null
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.binlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalArchiverTest
+{
+    @Test
+    public void testArchiver() throws IOException, InterruptedException
+    {
+        Pair<String, String> s = createScript();
+        String script = s.left;
+        String dir = s.right;
+        Path logdirectory = Files.createTempDirectory("logdirectory");
+        File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
+        Files.write(logfileToArchive.toPath(), "content".getBytes());
+
+        ExternalArchiver ea = new ExternalArchiver(script+" %path", null, 10);
+        ea.onReleased(1, logfileToArchive);
+        while (logfileToArchive.exists())
+        {
+            Thread.sleep(100);
+        }
+
+        File movedFile = new File(dir, logfileToArchive.getName());
+        assertTrue(movedFile.exists());
+        movedFile.deleteOnExit();
+        ea.stop();
+        assertEquals(0, logdirectory.toFile().listFiles().length);
+    }
+
+    @Test
+    public void testArchiveExisting() throws IOException, InterruptedException
+    {
+        Pair<String, String> s = createScript();
+        String script = s.left;
+        String moveDir = s.right;
+        List<File> existingFiles = new ArrayList<>();
+        Path dir = Files.createTempDirectory("archive");
+        for (int i = 0; i < 10; i++)
+        {
+            File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile();
+            logfileToArchive.deleteOnExit();
+            Files.write(logfileToArchive.toPath(), ("content"+i).getBytes());
+            existingFiles.add(logfileToArchive);
+        }
+
+        ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10);
+        boolean allGone = false;
+        while (!allGone)
+        {
+            allGone = true;
+            for (File f : existingFiles)
+            {
+                if (f.exists())
+                {
+                    allGone = false;
+                    Thread.sleep(100);
+                    break;
+                }
+                File movedFile = new File(moveDir, f.getName());
+                assertTrue(movedFile.exists());
+                movedFile.deleteOnExit();
+            }
+        }
+        ea.stop();
+        assertEquals(0, dir.toFile().listFiles().length);
+    }
+
+    @Test
+    public void testArchiveOnShutdown() throws IOException, InterruptedException
+    {
+        Pair<String, String> s = createScript();
+        String script = s.left;
+        String moveDir = s.right;
+        Path dir = Files.createTempDirectory("archive");
+        ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10);
+        List<File> existingFiles = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile();
+            logfileToArchive.deleteOnExit();
+            Files.write(logfileToArchive.toPath(), ("content"+i).getBytes());
+            existingFiles.add(logfileToArchive);
+        }
+        // ea.stop will archive all .cq4 files in the directory
+        ea.stop();
+        for (File f : existingFiles)
+        {
+            assertFalse(f.exists());
+            File movedFile = new File(moveDir, f.getName());
+            assertTrue(movedFile.exists());
+            movedFile.deleteOnExit();
+        }
+    }
+
+    /**
+     * Make sure retries work
+     * 1. create a script that will fail two times before executing the command
+     * 2. create an ExternalArchiver that retries two times (this means we execute the script 3 times, meaning the last one will be successful)
+     * 3. make sure the file is on disk until the script has been executed 3 times
+     * 4. make sure the file is gone and that the command was executed successfully
+     */
+    @Test
+    public void testRetries() throws IOException, InterruptedException
+    {
+        Pair<String, String> s = createFailingScript(2);
+        String script = s.left;
+        String moveDir = s.right;
+        Path logdirectory = Files.createTempDirectory("logdirectory");
+        File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
+        Files.write(logfileToArchive.toPath(), "content".getBytes());
+        AtomicInteger tryCounter = new AtomicInteger();
+        AtomicBoolean success = new AtomicBoolean();
+        ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) ->
+        {
+            tryCounter.incrementAndGet();
+            ExternalArchiver.exec(cmd);
+            success.set(true);
+        });
+        ea.onReleased(0, logfileToArchive);
+        while (tryCounter.get() < 2) // while we have only executed this 0 or 1 times, the file should still be on disk
+        {
+            Thread.sleep(100);
+            assertTrue(logfileToArchive.exists());
+        }
+
+        while (!success.get())
+            Thread.sleep(100);
+
+        // there will be 3 attempts in total, 2 failing ones, then the successful one:
+        assertEquals(3, tryCounter.get());
+        assertFalse(logfileToArchive.exists());
+        File movedFile = new File(moveDir, logfileToArchive.getName());
+        assertTrue(movedFile.exists());
+        ea.stop();
+    }
+
+
+    /**
+     * Makes sure that max retries is honored
+     *
+     * 1. create a script that will fail 3 times before actually executing the command
+     * 2. create an external archiver that retries 2 times (this means that the script will get executed 3 times)
+     * 3. make sure the file is still on disk and that we have not successfully executed the script
+     *
+     */
+    @Test
+    public void testMaxRetries() throws IOException, InterruptedException
+    {
+        Pair<String, String> s = createFailingScript(3);
+        String script = s.left;
+        String moveDir = s.right;
+        Path logdirectory = Files.createTempDirectory("logdirectory");
+        File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
+        Files.write(logfileToArchive.toPath(), "content".getBytes());
+
+        AtomicInteger tryCounter = new AtomicInteger();
+        AtomicBoolean success = new AtomicBoolean();
+        ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) ->
+        {
+            try
+            {
+                ExternalArchiver.exec(cmd);
+                success.set(true);
+            }
+            catch (Throwable t)
+            {
+                tryCounter.incrementAndGet();
+                throw t;
+            }
+        });
+        ea.onReleased(0, logfileToArchive);
+        while (tryCounter.get() < 3)
+            Thread.sleep(500);
+        assertTrue(logfileToArchive.exists());
+        // and the file should not get moved:
+        Thread.sleep(5000);
+        assertTrue(logfileToArchive.exists());
+        assertFalse(success.get());
+        File [] fs = new File(moveDir).listFiles(f ->
+                                                 {
+                                                     if (f.getName().startsWith("file."))
+                                                     {
+                                                         f.deleteOnExit();
+                                                         return true;
+                                                     }
+                                                     throw new AssertionError("There should be no other files in the directory");
+                                                 });
+        assertEquals(3, fs.length); // maxRetries + the first try
+        ea.stop();
+    }
+
+
+    private Pair<String, String> createScript() throws IOException
+    {
+        File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE,
+                                                                                                         PosixFilePermission.OWNER_READ,
+                                                                                                         PosixFilePermission.OWNER_EXECUTE))).toFile();
+        f.deleteOnExit();
+        File dir = Files.createTempDirectory("archive").toFile();
+        dir.deleteOnExit();
+        String script = "#!/bin/sh\nmv $1 "+dir.getAbsolutePath();
+        Files.write(f.toPath(), script.getBytes());
+        return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath());
+    }
+
+    private Pair<String, String> createFailingScript(int failures) throws IOException
+    {
+        File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE,
+                                                                                                         PosixFilePermission.OWNER_READ,
+                                                                                                         PosixFilePermission.OWNER_EXECUTE))).toFile();
+        f.deleteOnExit();
+        File dir = Files.createTempDirectory("archive").toFile();
+        dir.deleteOnExit();
+        // this script counts files in dir.getAbsolutePath, then if there are more than failures files in there, it moves the actual file
+        String script = "#!/bin/bash%n" +
+                        "DIR=%s%n" +
+                        "shopt -s nullglob%n" +
+                        "numfiles=($DIR/*)%n" +
+                        "numfiles=${#numfiles[@]}%n" +
+                        "if (( $numfiles < %d )); then%n" +
+                        "    mktemp $DIR/file.XXXXX%n" +
+                        "    exit 1%n" +
+                        "else%n" +
+                        "    mv $1 $DIR%n"+
+                        "fi%n";
+
+        Files.write(f.toPath(), String.format(script, dir.getAbsolutePath(), failures).getBytes());
+        return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath());
+    }
+}