4
\$\begingroup\$

I need to test a specialized input stream class that takes input from a TCP/IP network connection. I particularly need to ensure that the blocking and availability behavior is correct.

To help me with this, I wrote this "white box" input stream so that test scenarios can provide the necessary test conditions to exercise the code being tested.

WhiteBoxInputStream.scala

import java.io.InputStream
import scala.collection.mutable
/**
 * A stream to be used in test scenarios to create specific conditions.
 */
class WhiteBoxInputStream extends InputStream {
 private var current: Array[Byte] = null
 private var pos: Int = -1
 private var isEOF: Boolean = false
 private val data = new mutable.Queue[Array[Byte]]
 def this(someData: Array[Byte]) {
 this()
 queueData(someData)
 }
 override def read(): Int = {
 if (current != null) doRead()
 else if (isEOF) -1
 else {
 var notReady = true
 while (notReady) {
 Thread.sleep(10)
 if (current != null) notReady = false
 }
 doRead()
 }
 }
 def doRead(): Int = {
 val result = current(pos)
 incrementPos()
 result
 }
 override def available(): Int = {
 this.synchronized {
 if (current == null) 0
 else data.foldLeft(current.length - pos)((a, b) => a + b.length)
 }
 }
 def queueData(someData: Array[Byte]): Unit = {
 if (isEOF) {
 throw new IllegalStateException("Can't add more data to stream in End Of File state.")
 }
 if (current == null) {
 current = someData
 pos = 0
 }
 else this.synchronized{
 data.enqueue(someData)
 }
 }
 def markEOF(): Unit = {
 isEOF = true
 }
 private def incrementPos(): Unit = {
 pos += 1
 if (pos >= current.length) {
 current = null
 pos = -1
 dequeueData()
 }
 }
 private def dequeueData(): Unit = {
 if (data.nonEmpty) {
 this.synchronized
 {
 current = data.dequeue()
 pos = 0
 }
 }
 }
}

WhiteBoxInputStreamTest.scala

import edu.stsci.efl.ml.{EFLContext, ModuleLoader}
import edu.stsci.efl.services.LoggerFactoryService
import edu.stsci.util.WhiteBoxInputStreamTest._
import org.junit.runner.RunWith
import org.scalatest.{Matchers, BeforeAndAfter, FunSuite}
import org.scalatest.junit.JUnitRunner
import org.slf4j.Logger
@RunWith(classOf[JUnitRunner])
class WhiteBoxInputStreamTest extends FunSuite with BeforeAndAfter with Matchers {
 var context: EFLContext = null
 var logger: Logger = null
 before {
 ModuleLoader.shutdown()
 Thread.sleep(1000)
 val resourceURL = getClass.getResource("genericManifest.xml")
 resourceURL should not be (null)
 ModuleLoader.initialize(resourceURL)
 context = ModuleLoader.getDefaultContext
 logger = {
 val service = context.findRequiredService(classOf[LoggerFactoryService])
 service.getLogger(getClass.getName)
 }
 }
 test("simple case") {
 val testObject = new WhiteBoxInputStream(Array[Byte](A, B))
 testObject.markEOF()
 val a = testObject.read()
 a should be(A)
 val b = testObject.read()
 b should be(B)
 val eof = testObject.read()
 eof should be(EOF)
 }
 test("two block") {
 val testObject = new WhiteBoxInputStream(Array[Byte](A))
 testObject.queueData(Array[Byte](B))
 val a = testObject.read()
 a should be (A)
 val b = testObject.read()
 b should be (B)
 }
 test("blocking read") {
 val testObject = new WhiteBoxInputStream(Array[Byte](A))
 val d = new DataThread(logger, testObject)
 val start = System.currentTimeMillis()
 val a = testObject.read()
 a should be (A)
 d.start()
 val b = testObject.read()
 b should be (B)
 val stop = System.currentTimeMillis()
 // if the read blocked properly then there should be a delay here
 (stop - start) should be > (900l)
 }
 test("available") {
 val testObject = new WhiteBoxInputStream(Array[Byte](A, B))
 testObject.available() should be(2)
 testObject.queueData(Array[Byte](A))
 testObject.available() should be(3)
 testObject.read()
 testObject.available() should be(2)
 testObject.read()
 testObject.available() should be(1)
 testObject.read()
 testObject.available() should be(0)
 }
}
class DataThread(logger: Logger, testObject: WhiteBoxInputStream) extends Thread {
 override def run(): Unit = {
 logger.trace("[DataThread.run] enter.")
 Thread.sleep(1000)
 testObject.queueData(Array[Byte](B))
 logger.trace("[DataThread.run] complete.")
 }
}
object WhiteBoxInputStreamTest {
 val A = 'a'.toByte
 val B = 'b'.toByte
 val EOF = -1
}

I am particularly interested in criticism of my Scala form.

asked Jul 23, 2015 at 18:57
\$\endgroup\$
1
  • \$\begingroup\$ Whoah, this code is pretty. Nicely done. \$\endgroup\$ Commented Jul 23, 2015 at 21:29

1 Answer 1

1
\$\begingroup\$

You could use a ConcurrentLinkedQueue instead and get rid of some of those synchronized blocks.

Jamal
35.2k13 gold badges134 silver badges238 bronze badges
answered Jul 25, 2015 at 3:55
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.