[SPARK-23391][CORE] It may lead to overflow for some integer multiplication
authorliuxian <liu.xian3@zte.com.cn>
Mon, 12 Feb 2018 14:49:45 +0000 (08:49 -0600)
committerSean Owen <sowen@cloudera.com>
Mon, 12 Feb 2018 14:49:52 +0000 (08:49 -0600)
## What changes were proposed in this pull request?
In the `getBlockData`,`blockId.reduceId` is the `Int` type, when it is greater than 2^28, `blockId.reduceId*8` will overflow
In the `decompress0`, `len` and  `unitSize` are  Int type, so `len * unitSize` may lead to  overflow
## How was this patch tested?
N/A

Author: liuxian <liu.xian3@zte.com.cn>

Closes #20581 from 10110346/overflow2.

(cherry picked from commit 4a4dd4f36f65410ef5c87f7b61a960373f044e61)
Signed-off-by: Sean Owen <sowen@cloudera.com>
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala

index 2414b94..449f602 100644 (file)
@@ -203,13 +203,13 @@ private[spark] class IndexShuffleBlockResolver(
     // class of issue from re-occurring in the future which is why they are left here even though
     // SPARK-22982 is fixed.
     val channel = Files.newByteChannel(indexFile.toPath)
-    channel.position(blockId.reduceId * 8)
+    channel.position(blockId.reduceId * 8L)
     val in = new DataInputStream(Channels.newInputStream(channel))
     try {
       val offset = in.readLong()
       val nextOffset = in.readLong()
       val actualPosition = channel.position()
-      val expectedPosition = blockId.reduceId * 8 + 16
+      val expectedPosition = blockId.reduceId * 8L + 16
       if (actualPosition != expectedPosition) {
         throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
           s"expected $expectedPosition but actual position was $actualPosition.")
index 79dcf3a..00a1d54 100644 (file)
@@ -116,7 +116,7 @@ private[columnar] case object PassThrough extends CompressionScheme {
       while (pos < capacity) {
         if (pos != nextNullIndex) {
           val len = nextNullIndex - pos
-          assert(len * unitSize < Int.MaxValue)
+          assert(len * unitSize.toLong < Int.MaxValue)
           putFunction(columnVector, pos, bufferPos, len)
           bufferPos += len * unitSize
           pos += len