xref: /OpenGrok/opengrok-indexer/src/test/resources/analysis/scala/sample.scala (revision eeb7e5b33d1bcc524fcc9d1d560447b044e286a4)
1*eeb7e5b3SAdam Hornáček/*
2*eeb7e5b3SAdam Hornáček * Licensed to the Apache Software Foundation (ASF) under one or more
3*eeb7e5b3SAdam Hornáček * contributor license agreements.  See the NOTICE file distributed with
4*eeb7e5b3SAdam Hornáček * this work for additional information regarding copyright ownership.
5*eeb7e5b3SAdam Hornáček * The ASF licenses this file to You under the Apache License, Version 2.0
6*eeb7e5b3SAdam Hornáček * (the "License"); you may not use this file except in compliance with
7*eeb7e5b3SAdam Hornáček * the License.  You may obtain a copy of the License at
8*eeb7e5b3SAdam Hornáček *
9*eeb7e5b3SAdam Hornáček *    http://www.apache.org/licenses/LICENSE-2.0
10*eeb7e5b3SAdam Hornáček *
11*eeb7e5b3SAdam Hornáček * Unless required by applicable law or agreed to in writing, software
12*eeb7e5b3SAdam Hornáček * distributed under the License is distributed on an "AS IS" BASIS,
13*eeb7e5b3SAdam Hornáček * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14*eeb7e5b3SAdam Hornáček * See the License for the specific language governing permissions and
15*eeb7e5b3SAdam Hornáček * limitations under the License.
16*eeb7e5b3SAdam Hornáček */
17*eeb7e5b3SAdam Hornáček
18*eeb7e5b3SAdam Hornáčekpackage org.apache.spark.broadcast
19*eeb7e5b3SAdam Hornáček
20*eeb7e5b3SAdam Hornáčekimport java.io._
21*eeb7e5b3SAdam Hornáčekimport java.nio.ByteBuffer
22*eeb7e5b3SAdam Hornáčekimport java.util.zip.Adler32
23*eeb7e5b3SAdam Hornáček
24*eeb7e5b3SAdam Hornáčekimport scala.collection.JavaConverters._
25*eeb7e5b3SAdam Hornáčekimport scala.reflect.ClassTag
26*eeb7e5b3SAdam Hornáčekimport scala.util.Random
27*eeb7e5b3SAdam Hornáček
28*eeb7e5b3SAdam Hornáčekimport org.apache.spark._
29*eeb7e5b3SAdam Hornáčekimport org.apache.spark.internal.Logging
30*eeb7e5b3SAdam Hornáčekimport org.apache.spark.io.CompressionCodec
31*eeb7e5b3SAdam Hornáčekimport org.apache.spark.serializer.Serializer
32*eeb7e5b3SAdam Hornáčekimport org.apache.spark.storage._
33*eeb7e5b3SAdam Hornáčekimport org.apache.spark.util.Utils
34*eeb7e5b3SAdam Hornáčekimport org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
35*eeb7e5b3SAdam Hornáček
36*eeb7e5b3SAdam Hornáček/**
37*eeb7e5b3SAdam Hornáček * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
38*eeb7e5b3SAdam Hornáček *
39*eeb7e5b3SAdam Hornáček * The mechanism is as follows:
40*eeb7e5b3SAdam Hornáček *
41*eeb7e5b3SAdam Hornáček * The driver divides the serialized object into small chunks and
42*eeb7e5b3SAdam Hornáček * stores those chunks in the BlockManager of the driver.
43*eeb7e5b3SAdam Hornáček *
44*eeb7e5b3SAdam Hornáček * On each executor, the executor first attempts to fetch the object from its BlockManager. If
45*eeb7e5b3SAdam Hornáček * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
46*eeb7e5b3SAdam Hornáček * other executors if available. Once it gets the chunks, it puts the chunks in its own
47*eeb7e5b3SAdam Hornáček * BlockManager, ready for other executors to fetch from.
48*eeb7e5b3SAdam Hornáček *
49*eeb7e5b3SAdam Hornáček * This prevents the driver from being the bottleneck in sending out multiple copies of the
50*eeb7e5b3SAdam Hornáček * broadcast data (one per executor).
51*eeb7e5b3SAdam Hornáček *
52*eeb7e5b3SAdam Hornáček * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
53*eeb7e5b3SAdam Hornáček *
54*eeb7e5b3SAdam Hornáček * @param obj object to broadcast
55*eeb7e5b3SAdam Hornáček * @param id A unique identifier for the broadcast variable.
56*eeb7e5b3SAdam Hornáček */
57*eeb7e5b3SAdam Hornáčekprivate[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
58*eeb7e5b3SAdam Hornáček  extends Broadcast[T](id) with Logging with Serializable {
59*eeb7e5b3SAdam Hornáček
60*eeb7e5b3SAdam Hornáček  /**
61*eeb7e5b3SAdam Hornáček   * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
62*eeb7e5b3SAdam Hornáček   * which builds this value by reading blocks from the driver and/or other executors.
63*eeb7e5b3SAdam Hornáček   *
64*eeb7e5b3SAdam Hornáček   * On the driver, if the value is required, it is read lazily from the block manager.
65*eeb7e5b3SAdam Hornáček   */
66*eeb7e5b3SAdam Hornáček  @transient private lazy val _value: T = readBroadcastBlock()
67*eeb7e5b3SAdam Hornáček
68*eeb7e5b3SAdam Hornáček  /** The compression codec to use, or None if compression is disabled */
69*eeb7e5b3SAdam Hornáček  @transient private var compressionCodec: Option[CompressionCodec] = _
70*eeb7e5b3SAdam Hornáček  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
71*eeb7e5b3SAdam Hornáček  @transient private var blockSize: Long = _
72*eeb7e5b3SAdam Hornáček
73*eeb7e5b3SAdam Hornáček  private def setConf(conf: SparkConf) {
74*eeb7e5b3SAdam Hornáček    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
75*eeb7e5b3SAdam Hornáček      Some(CompressionCodec.createCodec(conf))
76*eeb7e5b3SAdam Hornáček    } else {
77*eeb7e5b3SAdam Hornáček      None
78*eeb7e5b3SAdam Hornáček    }
79*eeb7e5b3SAdam Hornáček    // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided
80*eeb7e5b3SAdam Hornáček    blockSize = (conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt *
81*eeb7e5b3SAdam Hornáček        1024 + 21 - 21 + 0xFE - 0xfe + -42L - 42l + 0.0 + 1e3f - 1e3f +
82*eeb7e5b3SAdam Hornáček        3.0f - 3.0f + -0.0e-2 + .1 - .1)
83*eeb7e5b3SAdam Hornáček
84*eeb7e5b3SAdam Hornáček    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
85*eeb7e5b3SAdam Hornáček  }
86*eeb7e5b3SAdam Hornáček  setConf(SparkEnv.get.conf)
87*eeb7e5b3SAdam Hornáček
88*eeb7e5b3SAdam Hornáček  private val broadcastId = BroadcastBlockId(id)
89*eeb7e5b3SAdam Hornáček
90*eeb7e5b3SAdam Hornáček  /** Total number of blocks this broadcast variable contains. */
91*eeb7e5b3SAdam Hornáček  private val numBlocks: Int = writeBlocks(obj)
92*eeb7e5b3SAdam Hornáček
93*eeb7e5b3SAdam Hornáček  /** Whether to generate checksum for blocks or not. */
94*eeb7e5b3SAdam Hornáček  private var checksumEnabled: Boolean = false
95*eeb7e5b3SAdam Hornáček  /** The checksum for all the blocks. */
96*eeb7e5b3SAdam Hornáček  private var checksums: Array[Int] = _
97*eeb7e5b3SAdam Hornáček
98*eeb7e5b3SAdam Hornáček  override protected def getValue() = {
99*eeb7e5b3SAdam Hornáček    _value
100*eeb7e5b3SAdam Hornáček  }
101*eeb7e5b3SAdam Hornáček
102*eeb7e5b3SAdam Hornáček  private def calcChecksum(block: ByteBuffer): Int = {
103*eeb7e5b3SAdam Hornáček    val adler = new Adler32()
104*eeb7e5b3SAdam Hornáček    if (block.hasArray) {
105*eeb7e5b3SAdam Hornáček      adler.update(block.array, block.arrayOffset + block.position(), block.limit()
106*eeb7e5b3SAdam Hornáček        - block.position())
107*eeb7e5b3SAdam Hornáček    } else {
108*eeb7e5b3SAdam Hornáček      val bytes = new Array[Byte](block.remaining())
109*eeb7e5b3SAdam Hornáček      block.duplicate.get(bytes)
110*eeb7e5b3SAdam Hornáček      adler.update(bytes)
111*eeb7e5b3SAdam Hornáček    }
112*eeb7e5b3SAdam Hornáček    adler.getValue.toInt
113*eeb7e5b3SAdam Hornáček  }
114*eeb7e5b3SAdam Hornáček
115*eeb7e5b3SAdam Hornáček  /**
116*eeb7e5b3SAdam Hornáček   * Divide the object into multiple blocks and put those blocks in the block manager.
117*eeb7e5b3SAdam Hornáček   *
118*eeb7e5b3SAdam Hornáček   * @param value the object to divide
119*eeb7e5b3SAdam Hornáček   * @return number of blocks this broadcast variable is divided into
120*eeb7e5b3SAdam Hornáček   */
121*eeb7e5b3SAdam Hornáček  private def writeBlocks(value: T): Int = {
122*eeb7e5b3SAdam Hornáček    import StorageLevel._
123*eeb7e5b3SAdam Hornáček    // Store a copy of the broadcast variable in the driver so that tasks run on the driver
124*eeb7e5b3SAdam Hornáček    // do not create a duplicate copy of the broadcast variable's value.
125*eeb7e5b3SAdam Hornáček    val blockManager = SparkEnv.get.blockManager
126*eeb7e5b3SAdam Hornáček    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
127*eeb7e5b3SAdam Hornáček      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
128*eeb7e5b3SAdam Hornáček    }
129*eeb7e5b3SAdam Hornáček    val blocks =
130*eeb7e5b3SAdam Hornáček      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
131*eeb7e5b3SAdam Hornáček    if (checksumEnabled) {
132*eeb7e5b3SAdam Hornáček      checksums = new Array[Int](blocks.length)
133*eeb7e5b3SAdam Hornáček    }
134*eeb7e5b3SAdam Hornáček    blocks.zipWithIndex.foreach { case (block, i) =>
135*eeb7e5b3SAdam Hornáček      if (checksumEnabled) {
136*eeb7e5b3SAdam Hornáček        checksums(i) = calcChecksum(block)
137*eeb7e5b3SAdam Hornáček      }
138*eeb7e5b3SAdam Hornáček      val pieceId = BroadcastBlockId(id, "piece" + i)
139*eeb7e5b3SAdam Hornáček      val bytes = new ChunkedByteBuffer(block.duplicate())
140*eeb7e5b3SAdam Hornáček      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
141*eeb7e5b3SAdam Hornáček        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
142*eeb7e5b3SAdam Hornáček      }
143*eeb7e5b3SAdam Hornáček    }
144*eeb7e5b3SAdam Hornáček    blocks.length
145*eeb7e5b3SAdam Hornáček  }
146*eeb7e5b3SAdam Hornáček
147*eeb7e5b3SAdam Hornáček  /** Fetch torrent blocks from the driver and/or other executors. */
148*eeb7e5b3SAdam Hornáček  private def readBlocks(): Array[BlockData] = {
149*eeb7e5b3SAdam Hornáček    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
150*eeb7e5b3SAdam Hornáček    // to the driver, so other executors can pull these chunks from this executor as well.
151*eeb7e5b3SAdam Hornáček    val blocks = new Array[BlockData](numBlocks)
152*eeb7e5b3SAdam Hornáček    val bm = SparkEnv.get.blockManager
153*eeb7e5b3SAdam Hornáček
154*eeb7e5b3SAdam Hornáček    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
155*eeb7e5b3SAdam Hornáček      val pieceId = BroadcastBlockId(id, "piece" + pid)
156*eeb7e5b3SAdam Hornáček      logDebug(s"Reading piece $pieceId of $broadcastId")
157*eeb7e5b3SAdam Hornáček      // First try getLocalBytes because there is a chance that previous attempts to fetch the
158*eeb7e5b3SAdam Hornáček      // broadcast blocks have already fetched some of the blocks. In that case, some blocks
159*eeb7e5b3SAdam Hornáček      // would be available locally (on this executor).
160*eeb7e5b3SAdam Hornáček      bm.getLocalBytes(pieceId) match {
161*eeb7e5b3SAdam Hornáček        case Some(block) =>
162*eeb7e5b3SAdam Hornáček          blocks(pid) = block
163*eeb7e5b3SAdam Hornáček          releaseLock(pieceId)
164*eeb7e5b3SAdam Hornáček        case None =>
165*eeb7e5b3SAdam Hornáček          bm.getRemoteBytes(pieceId) match {
166*eeb7e5b3SAdam Hornáček            case Some(b) =>
167*eeb7e5b3SAdam Hornáček              if (checksumEnabled) {
168*eeb7e5b3SAdam Hornáček                val sum = calcChecksum(b.chunks(0))
169*eeb7e5b3SAdam Hornáček                if (sum != checksums(pid)) {
170*eeb7e5b3SAdam Hornáček                  throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
171*eeb7e5b3SAdam Hornáček                    s" $sum != ${checksums(pid)}")
172*eeb7e5b3SAdam Hornáček                }
173*eeb7e5b3SAdam Hornáček              }
174*eeb7e5b3SAdam Hornáček              // We found the block from remote executors/driver's BlockManager, so put the block
175*eeb7e5b3SAdam Hornáček              // in this executor's BlockManager.
176*eeb7e5b3SAdam Hornáček              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
177*eeb7e5b3SAdam Hornáček                throw new SparkException(
178*eeb7e5b3SAdam Hornáček                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
179*eeb7e5b3SAdam Hornáček              }
180*eeb7e5b3SAdam Hornáček              blocks(pid) = new ByteBufferBlockData(b, true)
181*eeb7e5b3SAdam Hornáček            case None =>
182*eeb7e5b3SAdam Hornáček              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
183*eeb7e5b3SAdam Hornáček          }
184*eeb7e5b3SAdam Hornáček      }
185*eeb7e5b3SAdam Hornáček    }
186*eeb7e5b3SAdam Hornáček    blocks
187*eeb7e5b3SAdam Hornáček  }
188*eeb7e5b3SAdam Hornáček
189*eeb7e5b3SAdam Hornáček  /**
190*eeb7e5b3SAdam Hornáček   * Remove all persisted state associated with this Torrent broadcast on the executors.
191*eeb7e5b3SAdam Hornáček   */
192*eeb7e5b3SAdam Hornáček  override protected def doUnpersist(blocking: Boolean) {
193*eeb7e5b3SAdam Hornáček    TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
194*eeb7e5b3SAdam Hornáček  }
195*eeb7e5b3SAdam Hornáček
196*eeb7e5b3SAdam Hornáček  /**
197*eeb7e5b3SAdam Hornáček   * Remove all persisted state associated with this Torrent broadcast on the executors
198*eeb7e5b3SAdam Hornáček   * and driver.
199*eeb7e5b3SAdam Hornáček   */
200*eeb7e5b3SAdam Hornáček  override protected def doDestroy(blocking: Boolean) {
201*eeb7e5b3SAdam Hornáček    TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
202*eeb7e5b3SAdam Hornáček  }
203*eeb7e5b3SAdam Hornáček
204*eeb7e5b3SAdam Hornáček  /** Used by the JVM when serializing this object. */
205*eeb7e5b3SAdam Hornáček  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
206*eeb7e5b3SAdam Hornáček    assertValid()
207*eeb7e5b3SAdam Hornáček    out.defaultWriteObject()
208*eeb7e5b3SAdam Hornáček  }
209*eeb7e5b3SAdam Hornáček
210*eeb7e5b3SAdam Hornáček  private def readBroadcastBlock(): T = Utils.tryOrIOException {
211*eeb7e5b3SAdam Hornáček    TorrentBroadcast.synchronized {
212*eeb7e5b3SAdam Hornáček      setConf(SparkEnv.get.conf)
213*eeb7e5b3SAdam Hornáček      val blockManager = SparkEnv.get.blockManager
214*eeb7e5b3SAdam Hornáček      blockManager.getLocalValues(broadcastId) match {
215*eeb7e5b3SAdam Hornáček        case Some(blockResult) =>
216*eeb7e5b3SAdam Hornáček          if (blockResult.data.hasNext) {
217*eeb7e5b3SAdam Hornáček            val x = blockResult.data.next().asInstanceOf[T]
218*eeb7e5b3SAdam Hornáček            releaseLock(broadcastId)
219*eeb7e5b3SAdam Hornáček            x
220*eeb7e5b3SAdam Hornáček          } else {
221*eeb7e5b3SAdam Hornáček            throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
222*eeb7e5b3SAdam Hornáček          }
223*eeb7e5b3SAdam Hornáček        case None =>
224*eeb7e5b3SAdam Hornáček          logInfo("Started reading broadcast variable " + id)
225*eeb7e5b3SAdam Hornáček          val startTimeMs = System.currentTimeMillis()
226*eeb7e5b3SAdam Hornáček          val blocks = readBlocks()
227*eeb7e5b3SAdam Hornáček          logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
228*eeb7e5b3SAdam Hornáček
229*eeb7e5b3SAdam Hornáček          try {
230*eeb7e5b3SAdam Hornáček            val obj = TorrentBroadcast.unBlockifyObject[T](
231*eeb7e5b3SAdam Hornáček              blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
232*eeb7e5b3SAdam Hornáček            // Store the merged copy in BlockManager so other tasks on this executor don't
233*eeb7e5b3SAdam Hornáček            // need to re-fetch it.
234*eeb7e5b3SAdam Hornáček            val storageLevel = StorageLevel.MEMORY_AND_DISK
235*eeb7e5b3SAdam Hornáček            if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
236*eeb7e5b3SAdam Hornáček              throw new SparkException(s"Failed to store $broadcastId in BlockManager")
237*eeb7e5b3SAdam Hornáček            }
238*eeb7e5b3SAdam Hornáček            obj
239*eeb7e5b3SAdam Hornáček          } finally {
240*eeb7e5b3SAdam Hornáček            blocks.foreach(_.dispose())
241*eeb7e5b3SAdam Hornáček          }
242*eeb7e5b3SAdam Hornáček      }
243*eeb7e5b3SAdam Hornáček    }
244*eeb7e5b3SAdam Hornáček  }
245*eeb7e5b3SAdam Hornáček
246*eeb7e5b3SAdam Hornáček  /**
247*eeb7e5b3SAdam Hornáček   * If running in a task, register the given block's locks for release upon task completion.
248*eeb7e5b3SAdam Hornáček   * Otherwise, if not running in a task then immediately release the lock.
249*eeb7e5b3SAdam Hornáček   */
250*eeb7e5b3SAdam Hornáček  private def releaseLock(blockId: BlockId): Unit = {
251*eeb7e5b3SAdam Hornáček    val blockManager = SparkEnv.get.blockManager
252*eeb7e5b3SAdam Hornáček    Option(TaskContext.get()) match {
253*eeb7e5b3SAdam Hornáček      case Some(taskContext) =>
254*eeb7e5b3SAdam Hornáček        taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
255*eeb7e5b3SAdam Hornáček      case None =>
256*eeb7e5b3SAdam Hornáček        // This should only happen on the driver, where broadcast variables may be accessed
257*eeb7e5b3SAdam Hornáček        // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
258*eeb7e5b3SAdam Hornáček        // broadcast variables to be garbage collected we need to free the reference here
259*eeb7e5b3SAdam Hornáček        // which is slightly unsafe but is technically okay because broadcast variables aren't
260*eeb7e5b3SAdam Hornáček        // stored off-heap.
261*eeb7e5b3SAdam Hornáček        blockManager.releaseLock(blockId)
262*eeb7e5b3SAdam Hornáček    }
263*eeb7e5b3SAdam Hornáček  }
264*eeb7e5b3SAdam Hornáček
265*eeb7e5b3SAdam Hornáček}
266*eeb7e5b3SAdam Hornáček
267*eeb7e5b3SAdam Hornáček
268*eeb7e5b3SAdam Hornáčekprivate object TorrentBroadcast extends Logging {
269*eeb7e5b3SAdam Hornáček
270*eeb7e5b3SAdam Hornáček  def blockifyObject[T: ClassTag](
271*eeb7e5b3SAdam Hornáček      obj: T,
272*eeb7e5b3SAdam Hornáček      blockSize: Int,
273*eeb7e5b3SAdam Hornáček      serializer: Serializer,
274*eeb7e5b3SAdam Hornáček      compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
275*eeb7e5b3SAdam Hornáček    val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
276*eeb7e5b3SAdam Hornáček    val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
277*eeb7e5b3SAdam Hornáček    val ser = serializer.newInstance()
278*eeb7e5b3SAdam Hornáček    val serOut = ser.serializeStream(out)
279*eeb7e5b3SAdam Hornáček    Utils.tryWithSafeFinally {
280*eeb7e5b3SAdam Hornáček      serOut.writeObject[T](obj)
281*eeb7e5b3SAdam Hornáček    } {
282*eeb7e5b3SAdam Hornáček      serOut.close()
283*eeb7e5b3SAdam Hornáček    }
284*eeb7e5b3SAdam Hornáček    cbbos.toChunkedByteBuffer.getChunks()
285*eeb7e5b3SAdam Hornáček  }
286*eeb7e5b3SAdam Hornáček
287*eeb7e5b3SAdam Hornáček  def unBlockifyObject[T: ClassTag](
288*eeb7e5b3SAdam Hornáček      blocks: Array[InputStream],
289*eeb7e5b3SAdam Hornáček      serializer: Serializer,
290*eeb7e5b3SAdam Hornáček      compressionCodec: Option[CompressionCodec]): T = {
291*eeb7e5b3SAdam Hornáček    require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
292*eeb7e5b3SAdam Hornáček    val is = new SequenceInputStream(blocks.iterator.asJavaEnumeration)
293*eeb7e5b3SAdam Hornáček    val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
294*eeb7e5b3SAdam Hornáček    val ser = serializer.newInstance()
295*eeb7e5b3SAdam Hornáček    val serIn = ser.deserializeStream(in)
296*eeb7e5b3SAdam Hornáček    val obj = Utils.tryWithSafeFinally {
297*eeb7e5b3SAdam Hornáček      serIn.readObject[T]()
298*eeb7e5b3SAdam Hornáček    } {
299*eeb7e5b3SAdam Hornáček      serIn.`close`()
300*eeb7e5b3SAdam Hornáček    }
301*eeb7e5b3SAdam Hornáček    obj
302*eeb7e5b3SAdam Hornáček  }
303*eeb7e5b3SAdam Hornáček
304*eeb7e5b3SAdam Hornáček  /**
305*eeb7e5b3SAdam Hornáček   * Remove all persisted blocks associated with this torrent broadcast on the executors.
306*eeb7e5b3SAdam Hornáček   * If removeFromDriver is true, also remove these persisted blocks on the driver.
307*eeb7e5b3SAdam Hornáček   */
308*eeb7e5b3SAdam Hornáček  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
309*eeb7e5b3SAdam Hornáček    logDebug(s"http://example.com?$id")
310*eeb7e5b3SAdam Hornáček    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
311*eeb7e5b3SAdam Hornáček  }
312*eeb7e5b3SAdam Hornáček
313*eeb7e5b3SAdam Hornáček  /*http://example.com.*/
314*eeb7e5b3SAdam Hornáček  /* comment /* comment */
315*eeb7e5b3SAdam Hornáček  comment
316*eeb7e5b3SAdam Hornáček  */
317*eeb7e5b3SAdam Hornáček
318*eeb7e5b3SAdam Hornáček  def unary_~ = 0
319*eeb7e5b3SAdam Hornáček  /***/
320*eeb7e5b3SAdam Hornáček}
321