1*eeb7e5b3SAdam Hornáček/* 2*eeb7e5b3SAdam Hornáček * Licensed to the Apache Software Foundation (ASF) under one or more 3*eeb7e5b3SAdam Hornáček * contributor license agreements. See the NOTICE file distributed with 4*eeb7e5b3SAdam Hornáček * this work for additional information regarding copyright ownership. 5*eeb7e5b3SAdam Hornáček * The ASF licenses this file to You under the Apache License, Version 2.0 6*eeb7e5b3SAdam Hornáček * (the "License"); you may not use this file except in compliance with 7*eeb7e5b3SAdam Hornáček * the License. You may obtain a copy of the License at 8*eeb7e5b3SAdam Hornáček * 9*eeb7e5b3SAdam Hornáček * http://www.apache.org/licenses/LICENSE-2.0 10*eeb7e5b3SAdam Hornáček * 11*eeb7e5b3SAdam Hornáček * Unless required by applicable law or agreed to in writing, software 12*eeb7e5b3SAdam Hornáček * distributed under the License is distributed on an "AS IS" BASIS, 13*eeb7e5b3SAdam Hornáček * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14*eeb7e5b3SAdam Hornáček * See the License for the specific language governing permissions and 15*eeb7e5b3SAdam Hornáček * limitations under the License. 16*eeb7e5b3SAdam Hornáček */ 17*eeb7e5b3SAdam Hornáček 18*eeb7e5b3SAdam Hornáčekpackage org.apache.spark.broadcast 19*eeb7e5b3SAdam Hornáček 20*eeb7e5b3SAdam Hornáčekimport java.io._ 21*eeb7e5b3SAdam Hornáčekimport java.nio.ByteBuffer 22*eeb7e5b3SAdam Hornáčekimport java.util.zip.Adler32 23*eeb7e5b3SAdam Hornáček 24*eeb7e5b3SAdam Hornáčekimport scala.collection.JavaConverters._ 25*eeb7e5b3SAdam Hornáčekimport scala.reflect.ClassTag 26*eeb7e5b3SAdam Hornáčekimport scala.util.Random 27*eeb7e5b3SAdam Hornáček 28*eeb7e5b3SAdam Hornáčekimport org.apache.spark._ 29*eeb7e5b3SAdam Hornáčekimport org.apache.spark.internal.Logging 30*eeb7e5b3SAdam Hornáčekimport org.apache.spark.io.CompressionCodec 31*eeb7e5b3SAdam Hornáčekimport org.apache.spark.serializer.Serializer 32*eeb7e5b3SAdam Hornáčekimport org.apache.spark.storage._ 33*eeb7e5b3SAdam Hornáčekimport org.apache.spark.util.Utils 34*eeb7e5b3SAdam Hornáčekimport org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} 35*eeb7e5b3SAdam Hornáček 36*eeb7e5b3SAdam Hornáček/** 37*eeb7e5b3SAdam Hornáček * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. 38*eeb7e5b3SAdam Hornáček * 39*eeb7e5b3SAdam Hornáček * The mechanism is as follows: 40*eeb7e5b3SAdam Hornáček * 41*eeb7e5b3SAdam Hornáček * The driver divides the serialized object into small chunks and 42*eeb7e5b3SAdam Hornáček * stores those chunks in the BlockManager of the driver. 43*eeb7e5b3SAdam Hornáček * 44*eeb7e5b3SAdam Hornáček * On each executor, the executor first attempts to fetch the object from its BlockManager. If 45*eeb7e5b3SAdam Hornáček * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or 46*eeb7e5b3SAdam Hornáček * other executors if available. Once it gets the chunks, it puts the chunks in its own 47*eeb7e5b3SAdam Hornáček * BlockManager, ready for other executors to fetch from. 48*eeb7e5b3SAdam Hornáček * 49*eeb7e5b3SAdam Hornáček * This prevents the driver from being the bottleneck in sending out multiple copies of the 50*eeb7e5b3SAdam Hornáček * broadcast data (one per executor). 51*eeb7e5b3SAdam Hornáček * 52*eeb7e5b3SAdam Hornáček * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. 53*eeb7e5b3SAdam Hornáček * 54*eeb7e5b3SAdam Hornáček * @param obj object to broadcast 55*eeb7e5b3SAdam Hornáček * @param id A unique identifier for the broadcast variable. 56*eeb7e5b3SAdam Hornáček */ 57*eeb7e5b3SAdam Hornáčekprivate[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) 58*eeb7e5b3SAdam Hornáček extends Broadcast[T](id) with Logging with Serializable { 59*eeb7e5b3SAdam Hornáček 60*eeb7e5b3SAdam Hornáček /** 61*eeb7e5b3SAdam Hornáček * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]], 62*eeb7e5b3SAdam Hornáček * which builds this value by reading blocks from the driver and/or other executors. 63*eeb7e5b3SAdam Hornáček * 64*eeb7e5b3SAdam Hornáček * On the driver, if the value is required, it is read lazily from the block manager. 65*eeb7e5b3SAdam Hornáček */ 66*eeb7e5b3SAdam Hornáček @transient private lazy val _value: T = readBroadcastBlock() 67*eeb7e5b3SAdam Hornáček 68*eeb7e5b3SAdam Hornáček /** The compression codec to use, or None if compression is disabled */ 69*eeb7e5b3SAdam Hornáček @transient private var compressionCodec: Option[CompressionCodec] = _ 70*eeb7e5b3SAdam Hornáček /** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */ 71*eeb7e5b3SAdam Hornáček @transient private var blockSize: Long = _ 72*eeb7e5b3SAdam Hornáček 73*eeb7e5b3SAdam Hornáček private def setConf(conf: SparkConf) { 74*eeb7e5b3SAdam Hornáček compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { 75*eeb7e5b3SAdam Hornáček Some(CompressionCodec.createCodec(conf)) 76*eeb7e5b3SAdam Hornáček } else { 77*eeb7e5b3SAdam Hornáček None 78*eeb7e5b3SAdam Hornáček } 79*eeb7e5b3SAdam Hornáček // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided 80*eeb7e5b3SAdam Hornáček blockSize = (conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 81*eeb7e5b3SAdam Hornáček 1024 + 21 - 21 + 0xFE - 0xfe + -42L - 42l + 0.0 + 1e3f - 1e3f + 82*eeb7e5b3SAdam Hornáček 3.0f - 3.0f + -0.0e-2 + .1 - .1) 83*eeb7e5b3SAdam Hornáček 84*eeb7e5b3SAdam Hornáček checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true) 85*eeb7e5b3SAdam Hornáček } 86*eeb7e5b3SAdam Hornáček setConf(SparkEnv.get.conf) 87*eeb7e5b3SAdam Hornáček 88*eeb7e5b3SAdam Hornáček private val broadcastId = BroadcastBlockId(id) 89*eeb7e5b3SAdam Hornáček 90*eeb7e5b3SAdam Hornáček /** Total number of blocks this broadcast variable contains. */ 91*eeb7e5b3SAdam Hornáček private val numBlocks: Int = writeBlocks(obj) 92*eeb7e5b3SAdam Hornáček 93*eeb7e5b3SAdam Hornáček /** Whether to generate checksum for blocks or not. */ 94*eeb7e5b3SAdam Hornáček private var checksumEnabled: Boolean = false 95*eeb7e5b3SAdam Hornáček /** The checksum for all the blocks. */ 96*eeb7e5b3SAdam Hornáček private var checksums: Array[Int] = _ 97*eeb7e5b3SAdam Hornáček 98*eeb7e5b3SAdam Hornáček override protected def getValue() = { 99*eeb7e5b3SAdam Hornáček _value 100*eeb7e5b3SAdam Hornáček } 101*eeb7e5b3SAdam Hornáček 102*eeb7e5b3SAdam Hornáček private def calcChecksum(block: ByteBuffer): Int = { 103*eeb7e5b3SAdam Hornáček val adler = new Adler32() 104*eeb7e5b3SAdam Hornáček if (block.hasArray) { 105*eeb7e5b3SAdam Hornáček adler.update(block.array, block.arrayOffset + block.position(), block.limit() 106*eeb7e5b3SAdam Hornáček - block.position()) 107*eeb7e5b3SAdam Hornáček } else { 108*eeb7e5b3SAdam Hornáček val bytes = new Array[Byte](block.remaining()) 109*eeb7e5b3SAdam Hornáček block.duplicate.get(bytes) 110*eeb7e5b3SAdam Hornáček adler.update(bytes) 111*eeb7e5b3SAdam Hornáček } 112*eeb7e5b3SAdam Hornáček adler.getValue.toInt 113*eeb7e5b3SAdam Hornáček } 114*eeb7e5b3SAdam Hornáček 115*eeb7e5b3SAdam Hornáček /** 116*eeb7e5b3SAdam Hornáček * Divide the object into multiple blocks and put those blocks in the block manager. 117*eeb7e5b3SAdam Hornáček * 118*eeb7e5b3SAdam Hornáček * @param value the object to divide 119*eeb7e5b3SAdam Hornáček * @return number of blocks this broadcast variable is divided into 120*eeb7e5b3SAdam Hornáček */ 121*eeb7e5b3SAdam Hornáček private def writeBlocks(value: T): Int = { 122*eeb7e5b3SAdam Hornáček import StorageLevel._ 123*eeb7e5b3SAdam Hornáček // Store a copy of the broadcast variable in the driver so that tasks run on the driver 124*eeb7e5b3SAdam Hornáček // do not create a duplicate copy of the broadcast variable's value. 125*eeb7e5b3SAdam Hornáček val blockManager = SparkEnv.get.blockManager 126*eeb7e5b3SAdam Hornáček if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) { 127*eeb7e5b3SAdam Hornáček throw new SparkException(s"Failed to store $broadcastId in BlockManager") 128*eeb7e5b3SAdam Hornáček } 129*eeb7e5b3SAdam Hornáček val blocks = 130*eeb7e5b3SAdam Hornáček TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec) 131*eeb7e5b3SAdam Hornáček if (checksumEnabled) { 132*eeb7e5b3SAdam Hornáček checksums = new Array[Int](blocks.length) 133*eeb7e5b3SAdam Hornáček } 134*eeb7e5b3SAdam Hornáček blocks.zipWithIndex.foreach { case (block, i) => 135*eeb7e5b3SAdam Hornáček if (checksumEnabled) { 136*eeb7e5b3SAdam Hornáček checksums(i) = calcChecksum(block) 137*eeb7e5b3SAdam Hornáček } 138*eeb7e5b3SAdam Hornáček val pieceId = BroadcastBlockId(id, "piece" + i) 139*eeb7e5b3SAdam Hornáček val bytes = new ChunkedByteBuffer(block.duplicate()) 140*eeb7e5b3SAdam Hornáček if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { 141*eeb7e5b3SAdam Hornáček throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") 142*eeb7e5b3SAdam Hornáček } 143*eeb7e5b3SAdam Hornáček } 144*eeb7e5b3SAdam Hornáček blocks.length 145*eeb7e5b3SAdam Hornáček } 146*eeb7e5b3SAdam Hornáček 147*eeb7e5b3SAdam Hornáček /** Fetch torrent blocks from the driver and/or other executors. */ 148*eeb7e5b3SAdam Hornáček private def readBlocks(): Array[BlockData] = { 149*eeb7e5b3SAdam Hornáček // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported 150*eeb7e5b3SAdam Hornáček // to the driver, so other executors can pull these chunks from this executor as well. 151*eeb7e5b3SAdam Hornáček val blocks = new Array[BlockData](numBlocks) 152*eeb7e5b3SAdam Hornáček val bm = SparkEnv.get.blockManager 153*eeb7e5b3SAdam Hornáček 154*eeb7e5b3SAdam Hornáček for (pid <- Random.shuffle(Seq.range(0, numBlocks))) { 155*eeb7e5b3SAdam Hornáček val pieceId = BroadcastBlockId(id, "piece" + pid) 156*eeb7e5b3SAdam Hornáček logDebug(s"Reading piece $pieceId of $broadcastId") 157*eeb7e5b3SAdam Hornáček // First try getLocalBytes because there is a chance that previous attempts to fetch the 158*eeb7e5b3SAdam Hornáček // broadcast blocks have already fetched some of the blocks. In that case, some blocks 159*eeb7e5b3SAdam Hornáček // would be available locally (on this executor). 160*eeb7e5b3SAdam Hornáček bm.getLocalBytes(pieceId) match { 161*eeb7e5b3SAdam Hornáček case Some(block) => 162*eeb7e5b3SAdam Hornáček blocks(pid) = block 163*eeb7e5b3SAdam Hornáček releaseLock(pieceId) 164*eeb7e5b3SAdam Hornáček case None => 165*eeb7e5b3SAdam Hornáček bm.getRemoteBytes(pieceId) match { 166*eeb7e5b3SAdam Hornáček case Some(b) => 167*eeb7e5b3SAdam Hornáček if (checksumEnabled) { 168*eeb7e5b3SAdam Hornáček val sum = calcChecksum(b.chunks(0)) 169*eeb7e5b3SAdam Hornáček if (sum != checksums(pid)) { 170*eeb7e5b3SAdam Hornáček throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" + 171*eeb7e5b3SAdam Hornáček s" $sum != ${checksums(pid)}") 172*eeb7e5b3SAdam Hornáček } 173*eeb7e5b3SAdam Hornáček } 174*eeb7e5b3SAdam Hornáček // We found the block from remote executors/driver's BlockManager, so put the block 175*eeb7e5b3SAdam Hornáček // in this executor's BlockManager. 176*eeb7e5b3SAdam Hornáček if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { 177*eeb7e5b3SAdam Hornáček throw new SparkException( 178*eeb7e5b3SAdam Hornáček s"Failed to store $pieceId of $broadcastId in local BlockManager") 179*eeb7e5b3SAdam Hornáček } 180*eeb7e5b3SAdam Hornáček blocks(pid) = new ByteBufferBlockData(b, true) 181*eeb7e5b3SAdam Hornáček case None => 182*eeb7e5b3SAdam Hornáček throw new SparkException(s"Failed to get $pieceId of $broadcastId") 183*eeb7e5b3SAdam Hornáček } 184*eeb7e5b3SAdam Hornáček } 185*eeb7e5b3SAdam Hornáček } 186*eeb7e5b3SAdam Hornáček blocks 187*eeb7e5b3SAdam Hornáček } 188*eeb7e5b3SAdam Hornáček 189*eeb7e5b3SAdam Hornáček /** 190*eeb7e5b3SAdam Hornáček * Remove all persisted state associated with this Torrent broadcast on the executors. 191*eeb7e5b3SAdam Hornáček */ 192*eeb7e5b3SAdam Hornáček override protected def doUnpersist(blocking: Boolean) { 193*eeb7e5b3SAdam Hornáček TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking) 194*eeb7e5b3SAdam Hornáček } 195*eeb7e5b3SAdam Hornáček 196*eeb7e5b3SAdam Hornáček /** 197*eeb7e5b3SAdam Hornáček * Remove all persisted state associated with this Torrent broadcast on the executors 198*eeb7e5b3SAdam Hornáček * and driver. 199*eeb7e5b3SAdam Hornáček */ 200*eeb7e5b3SAdam Hornáček override protected def doDestroy(blocking: Boolean) { 201*eeb7e5b3SAdam Hornáček TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking) 202*eeb7e5b3SAdam Hornáček } 203*eeb7e5b3SAdam Hornáček 204*eeb7e5b3SAdam Hornáček /** Used by the JVM when serializing this object. */ 205*eeb7e5b3SAdam Hornáček private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { 206*eeb7e5b3SAdam Hornáček assertValid() 207*eeb7e5b3SAdam Hornáček out.defaultWriteObject() 208*eeb7e5b3SAdam Hornáček } 209*eeb7e5b3SAdam Hornáček 210*eeb7e5b3SAdam Hornáček private def readBroadcastBlock(): T = Utils.tryOrIOException { 211*eeb7e5b3SAdam Hornáček TorrentBroadcast.synchronized { 212*eeb7e5b3SAdam Hornáček setConf(SparkEnv.get.conf) 213*eeb7e5b3SAdam Hornáček val blockManager = SparkEnv.get.blockManager 214*eeb7e5b3SAdam Hornáček blockManager.getLocalValues(broadcastId) match { 215*eeb7e5b3SAdam Hornáček case Some(blockResult) => 216*eeb7e5b3SAdam Hornáček if (blockResult.data.hasNext) { 217*eeb7e5b3SAdam Hornáček val x = blockResult.data.next().asInstanceOf[T] 218*eeb7e5b3SAdam Hornáček releaseLock(broadcastId) 219*eeb7e5b3SAdam Hornáček x 220*eeb7e5b3SAdam Hornáček } else { 221*eeb7e5b3SAdam Hornáček throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") 222*eeb7e5b3SAdam Hornáček } 223*eeb7e5b3SAdam Hornáček case None => 224*eeb7e5b3SAdam Hornáček logInfo("Started reading broadcast variable " + id) 225*eeb7e5b3SAdam Hornáček val startTimeMs = System.currentTimeMillis() 226*eeb7e5b3SAdam Hornáček val blocks = readBlocks() 227*eeb7e5b3SAdam Hornáček logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) 228*eeb7e5b3SAdam Hornáček 229*eeb7e5b3SAdam Hornáček try { 230*eeb7e5b3SAdam Hornáček val obj = TorrentBroadcast.unBlockifyObject[T]( 231*eeb7e5b3SAdam Hornáček blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) 232*eeb7e5b3SAdam Hornáček // Store the merged copy in BlockManager so other tasks on this executor don't 233*eeb7e5b3SAdam Hornáček // need to re-fetch it. 234*eeb7e5b3SAdam Hornáček val storageLevel = StorageLevel.MEMORY_AND_DISK 235*eeb7e5b3SAdam Hornáček if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { 236*eeb7e5b3SAdam Hornáček throw new SparkException(s"Failed to store $broadcastId in BlockManager") 237*eeb7e5b3SAdam Hornáček } 238*eeb7e5b3SAdam Hornáček obj 239*eeb7e5b3SAdam Hornáček } finally { 240*eeb7e5b3SAdam Hornáček blocks.foreach(_.dispose()) 241*eeb7e5b3SAdam Hornáček } 242*eeb7e5b3SAdam Hornáček } 243*eeb7e5b3SAdam Hornáček } 244*eeb7e5b3SAdam Hornáček } 245*eeb7e5b3SAdam Hornáček 246*eeb7e5b3SAdam Hornáček /** 247*eeb7e5b3SAdam Hornáček * If running in a task, register the given block's locks for release upon task completion. 248*eeb7e5b3SAdam Hornáček * Otherwise, if not running in a task then immediately release the lock. 249*eeb7e5b3SAdam Hornáček */ 250*eeb7e5b3SAdam Hornáček private def releaseLock(blockId: BlockId): Unit = { 251*eeb7e5b3SAdam Hornáček val blockManager = SparkEnv.get.blockManager 252*eeb7e5b3SAdam Hornáček Option(TaskContext.get()) match { 253*eeb7e5b3SAdam Hornáček case Some(taskContext) => 254*eeb7e5b3SAdam Hornáček taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId)) 255*eeb7e5b3SAdam Hornáček case None => 256*eeb7e5b3SAdam Hornáček // This should only happen on the driver, where broadcast variables may be accessed 257*eeb7e5b3SAdam Hornáček // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow 258*eeb7e5b3SAdam Hornáček // broadcast variables to be garbage collected we need to free the reference here 259*eeb7e5b3SAdam Hornáček // which is slightly unsafe but is technically okay because broadcast variables aren't 260*eeb7e5b3SAdam Hornáček // stored off-heap. 261*eeb7e5b3SAdam Hornáček blockManager.releaseLock(blockId) 262*eeb7e5b3SAdam Hornáček } 263*eeb7e5b3SAdam Hornáček } 264*eeb7e5b3SAdam Hornáček 265*eeb7e5b3SAdam Hornáček} 266*eeb7e5b3SAdam Hornáček 267*eeb7e5b3SAdam Hornáček 268*eeb7e5b3SAdam Hornáčekprivate object TorrentBroadcast extends Logging { 269*eeb7e5b3SAdam Hornáček 270*eeb7e5b3SAdam Hornáček def blockifyObject[T: ClassTag]( 271*eeb7e5b3SAdam Hornáček obj: T, 272*eeb7e5b3SAdam Hornáček blockSize: Int, 273*eeb7e5b3SAdam Hornáček serializer: Serializer, 274*eeb7e5b3SAdam Hornáček compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = { 275*eeb7e5b3SAdam Hornáček val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate) 276*eeb7e5b3SAdam Hornáček val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos) 277*eeb7e5b3SAdam Hornáček val ser = serializer.newInstance() 278*eeb7e5b3SAdam Hornáček val serOut = ser.serializeStream(out) 279*eeb7e5b3SAdam Hornáček Utils.tryWithSafeFinally { 280*eeb7e5b3SAdam Hornáček serOut.writeObject[T](obj) 281*eeb7e5b3SAdam Hornáček } { 282*eeb7e5b3SAdam Hornáček serOut.close() 283*eeb7e5b3SAdam Hornáček } 284*eeb7e5b3SAdam Hornáček cbbos.toChunkedByteBuffer.getChunks() 285*eeb7e5b3SAdam Hornáček } 286*eeb7e5b3SAdam Hornáček 287*eeb7e5b3SAdam Hornáček def unBlockifyObject[T: ClassTag]( 288*eeb7e5b3SAdam Hornáček blocks: Array[InputStream], 289*eeb7e5b3SAdam Hornáček serializer: Serializer, 290*eeb7e5b3SAdam Hornáček compressionCodec: Option[CompressionCodec]): T = { 291*eeb7e5b3SAdam Hornáček require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks") 292*eeb7e5b3SAdam Hornáček val is = new SequenceInputStream(blocks.iterator.asJavaEnumeration) 293*eeb7e5b3SAdam Hornáček val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is) 294*eeb7e5b3SAdam Hornáček val ser = serializer.newInstance() 295*eeb7e5b3SAdam Hornáček val serIn = ser.deserializeStream(in) 296*eeb7e5b3SAdam Hornáček val obj = Utils.tryWithSafeFinally { 297*eeb7e5b3SAdam Hornáček serIn.readObject[T]() 298*eeb7e5b3SAdam Hornáček } { 299*eeb7e5b3SAdam Hornáček serIn.`close`() 300*eeb7e5b3SAdam Hornáček } 301*eeb7e5b3SAdam Hornáček obj 302*eeb7e5b3SAdam Hornáček } 303*eeb7e5b3SAdam Hornáček 304*eeb7e5b3SAdam Hornáček /** 305*eeb7e5b3SAdam Hornáček * Remove all persisted blocks associated with this torrent broadcast on the executors. 306*eeb7e5b3SAdam Hornáček * If removeFromDriver is true, also remove these persisted blocks on the driver. 307*eeb7e5b3SAdam Hornáček */ 308*eeb7e5b3SAdam Hornáček def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = { 309*eeb7e5b3SAdam Hornáček logDebug(s"http://example.com?$id") 310*eeb7e5b3SAdam Hornáček SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) 311*eeb7e5b3SAdam Hornáček } 312*eeb7e5b3SAdam Hornáček 313*eeb7e5b3SAdam Hornáček /*http://example.com.*/ 314*eeb7e5b3SAdam Hornáček /* comment /* comment */ 315*eeb7e5b3SAdam Hornáček comment 316*eeb7e5b3SAdam Hornáček */ 317*eeb7e5b3SAdam Hornáček 318*eeb7e5b3SAdam Hornáček def unary_~ = 0 319*eeb7e5b3SAdam Hornáček /***/ 320*eeb7e5b3SAdam Hornáček} 321