1org # 18:package org.apache.spark.broadcast 2apache 3spark 4broadcast 5java 6io 7java 8nio 9ByteBuffer 10java 11util 12zip 13Adler32 14scala 15collection 16JavaConverters 17scala 18reflect 19ClassTag 20scala 21util 22Random 23org 24apache 25spark 26org 27apache 28spark 29internal 30Logging 31org 32apache 33spark 34io 35CompressionCodec 36org 37apache 38spark 39serializer 40Serializer 41org 42apache 43spark 44storage 45org 46apache 47spark 48util 49Utils 50org 51apache 52spark 53util 54io 55ChunkedByteBuffer 56ChunkedByteBufferOutputStream 57spark # 57:private[spark] class TorrentBroadcast[ 58TorrentBroadcast 59T 60ClassTag 61obj 62T 63id 64Long 65Broadcast 66T 67id 68Logging 69Serializable 70transient 71_value 72T 73readBroadcastBlock 74transient 75compressionCodec 76Option 77CompressionCodec 78transient 79blockSize 80Long 81setConf # 73: private def setConf(conf: SparkConf) { 82conf 83SparkConf 84compressionCodec 85conf 86getBoolean 87Some 88CompressionCodec 89createCodec 90conf 91None 92blockSize 93conf 94getSizeAsKb 95toInt 96checksumEnabled 97conf 98getBoolean 99setConf # 86: setConf(SparkEnv.get.conf) 100SparkEnv 101get 102conf 103broadcastId 104BroadcastBlockId 105id 106numBlocks 107Int 108writeBlocks 109obj 110checksumEnabled 111Boolean 112checksums 113Array 114Int 115getValue # 98: override protected def getValue() = { 116_value 117calcChecksum # 102: private def calcChecksum(block: ByteBuffer): Int = { 118block 119ByteBuffer 120Int 121adler 122Adler32 123block 124hasArray 125adler 126update 127block 128array 129block 130arrayOffset 131block 132position 133block 134limit 135block 136position 137bytes 138Array 139Byte 140block 141remaining 142block 143duplicate 144get 145bytes 146adler 147update 148bytes 149adler 150getValue 151toInt 152writeBlocks # 121: private def writeBlocks(value: T): Int = { 153value 154T 155Int 156StorageLevel 157blockManager 158SparkEnv 159get 160blockManager 161blockManager 162putSingle 163broadcastId 164value 165MEMORY_AND_DISK 166tellMaster 167SparkException 168broadcastId 169blocks 170TorrentBroadcast 171blockifyObject 172value 173blockSize 174SparkEnv 175get 176serializer 177compressionCodec 178checksumEnabled 179checksums 180Array 181Int 182blocks 183length 184blocks 185zipWithIndex 186foreach 187block 188i 189checksumEnabled 190checksums 191i 192calcChecksum 193block 194pieceId 195BroadcastBlockId 196id 197i 198bytes 199ChunkedByteBuffer 200block 201duplicate 202blockManager 203putBytes 204pieceId 205bytes 206MEMORY_AND_DISK_SER 207tellMaster 208SparkException 209pieceId 210broadcastId 211blocks 212length 213readBlocks # 148: private def readBlocks(): Array[BlockData] = { 214Array 215BlockData 216blocks 217Array 218BlockData 219numBlocks 220bm 221SparkEnv 222get 223blockManager 224pid 225Random 226shuffle 227Seq 228range 229numBlocks 230pieceId 231BroadcastBlockId 232id 233pid 234logDebug 235pieceId 236broadcastId 237bm 238getLocalBytes 239pieceId 240Some 241block 242blocks 243pid 244block 245releaseLock 246pieceId 247None 248bm 249getRemoteBytes 250pieceId 251Some 252b 253checksumEnabled 254sum 255calcChecksum 256b 257chunks 258sum 259checksums 260pid 261SparkException 262pieceId 263broadcastId 264sum 265bm 266putBytes 267pieceId 268b 269StorageLevel 270MEMORY_AND_DISK_SER 271tellMaster 272SparkException 273pieceId 274broadcastId 275blocks 276pid 277ByteBufferBlockData 278b 279None 280SparkException 281pieceId 282broadcastId 283blocks 284doUnpersist # 192: override protected def doUnpersist(blocking: Boolean) { 285blocking 286Boolean 287TorrentBroadcast 288unpersist 289id 290removeFromDriver 291blocking 292doDestroy # 200: override protected def doDestroy(blocking: Boolean) { 293blocking 294Boolean 295TorrentBroadcast 296unpersist 297id 298removeFromDriver 299blocking 300writeObject # 205: private def writeObject(out: 301out 302ObjectOutputStream 303Unit 304Utils 305tryOrIOException 306assertValid 307out 308defaultWriteObject 309readBroadcastBlock # 210: private def readBroadcastBlock(): 310T 311Utils 312tryOrIOException 313TorrentBroadcast 314synchronized 315setConf 316SparkEnv 317get 318conf 319blockManager 320SparkEnv 321get 322blockManager 323blockManager 324getLocalValues 325broadcastId 326Some 327blockResult 328blockResult 329data 330hasNext 331x 332blockResult 333data 334next 335asInstanceOf 336T 337releaseLock 338broadcastId 339x 340SparkException 341broadcastId 342None 343logInfo 344id 345startTimeMs 346System 347currentTimeMillis 348blocks 349readBlocks 350logInfo 351id 352Utils 353getUsedTimeMs 354startTimeMs 355obj 356TorrentBroadcast 357unBlockifyObject 358T 359blocks 360map 361toInputStream 362SparkEnv 363get 364serializer 365compressionCodec 366storageLevel 367StorageLevel 368MEMORY_AND_DISK 369blockManager 370putSingle 371broadcastId 372obj 373storageLevel 374tellMaster 375SparkException 376broadcastId 377obj 378blocks 379foreach 380dispose 381releaseLock # 250: private def releaseLock(blockId: BlockId): Unit = { 382blockId 383BlockId 384Unit 385blockManager 386SparkEnv 387get 388blockManager 389Option 390TaskContext 391get 392Some 393taskContext 394taskContext 395addTaskCompletionListener 396blockManager 397releaseLock 398blockId 399None 400blockManager 401releaseLock 402blockId 403TorrentBroadcast # 268:private object TorrentBroadcast 404Logging 405blockifyObject # 270: def blockifyObject[T: ClassTag]( 406T 407ClassTag 408obj 409T 410blockSize 411Int 412serializer 413Serializer 414compressionCodec 415Option 416CompressionCodec 417Array 418ByteBuffer 419cbbos 420ChunkedByteBufferOutputStream 421blockSize 422ByteBuffer 423allocate 424out 425compressionCodec 426map 427c 428c 429compressedOutputStream 430cbbos 431getOrElse 432cbbos 433ser 434serializer 435newInstance 436serOut 437ser 438serializeStream 439out 440Utils 441tryWithSafeFinally 442serOut 443writeObject 444T 445obj 446serOut 447close 448cbbos 449toChunkedByteBuffer 450getChunks 451unBlockifyObject # 287: def unBlockifyObject[T: ClassTag]( 452T 453ClassTag 454blocks 455Array 456InputStream 457serializer 458Serializer 459compressionCodec 460Option 461CompressionCodec 462T 463require 464blocks 465nonEmpty 466is 467SequenceInputStream 468blocks 469iterator 470asJavaEnumeration 471in 472InputStream 473compressionCodec 474map 475c 476c 477compressedInputStream 478is 479getOrElse 480is 481ser 482serializer 483newInstance 484serIn 485ser 486deserializeStream 487in 488obj 489Utils 490tryWithSafeFinally 491serIn 492readObject 493T 494serIn 495close 496obj 497unpersist # 308: def unpersist(id: 498id 499Long 500removeFromDriver 501Boolean 502blocking 503Boolean 504Unit 505logDebug 506id 507SparkEnv 508get 509blockManager 510master 511removeBroadcast 512id 513removeFromDriver 514blocking 515unary_ 516