After using VisualWorks
for over a month I am impressed by the great improvement VisualWorks has made and all previous bugs now have gone. It s the best IDE I ve ever seen for native language programming. Thanks a lot.

Jim G.

Xtreams at Work

Lately I’ve been working on an Xtreams based implementation of SSL/TLS protocol and I think that some specific parts of that could make interesting examples. Examples that are non-trivial, real-life solutions rather than some artificial academic exercises. This is my first attempt to extract an interesting example, hopefully without getting bogged down in too much detail. Although I will try to explain the constructs used in the example, it will require some basic familiarity with Xtreams, the documentation at http://code.google.com/p/xtreams can be used as a reference.

Problem definition

To make the solution a bit less convoluted let’s simplify the problem a bit. Let’s say there aren’t four types of traffic but just two, data and non-data. We also won’t worry about what needs to happen with the non-data, let’s just log it in a separate stream. To have some concrete samples to work with we need to define the record structure. Let’s say a record starts with a boolean indicating if the record carries data or not, then another integer specifying the size of its contents and then the contents itself.

Let’s generate some random samples to work with. Let’s say our records will be anywhere from 0 to 9 bytes long and the contents will be always the sequence from 1 to <size>. If size is 0 the contents will be empty. First let’s make a random generator that will generate an integer between 0 and 9.

`random := Random new reading collecting: [ :f | (f * 10) floor ].`

The part “Random new reading” yields a stream that exploits an instance of Random to generate random floats in range 0 <= x < 1. The collecting part transforms each float into an integer between 0 and 9. With this we can generate a random sample of 10 records as follows.

```sample := Array new writing.
10 timesRepeat: [ | size |
size := random get.
sample put: random get even;
put: size;
write: (1 to: size) ].
sample close; terminal.```

The ‘Array new writing’ bit creates a simple write stream over an Array, as usual, the array will automatically grow as elements are written to it. Closing a collection write stream will trim the underlying collection to the actually written size and the #terminal message returns it (it’s called terminal because it will return whatever is at the bottom of an arbitrarily high stream stack). Here’s one sample generated by the above code.

```#(false 4 1 2 3 4 true 4 1 2 3 4 false 3 1 2 3 false 5 1 2 3 4 5
true 2 1 2
false 8 1 2 3 4 5 6 7 8 true 3 1 2 3 true 3 1 2 3 false 0 false
4 1 2 3 4)```

In further text when we refer to ‘sample’ we mean a read stream over such an array, which is created simply by sending #reading to the array.

Fragments

So we start with a simple stream and we need to parse the records out of it. That is actually quite simple.

```fragments := [ | size |
isData := sample get.
size := sample get.
(sample limiting: size)
closeBlock: [ :stream | stream -= 0 ];
yourself

Sending #reading to a block creates a block stream, which produces each element by running the block, the element is the result of the block run. Now let’s take a look at what the block produces. It gets a single element from the sample stream and puts it in variable isData. Assuming the sample stream is aligned with beginning of a record, the first element should be the Boolean indicating the type of the record. Then we get another element from the sample stream, the record size, and use it as a parameter of the #limiting: message. Sending #limiting: to a stream creates a “virtual” substream. The actual contents of the substream come from the underlying stream, the substream just makes sure we don’t read more than the specified limit. The closeBlock is there to make sure that when we close the substream the underlying is positioned at the end of it, i.e. at the beginning of the next record. The argument to the closeBlock: is the substream itself and the expression “-= 0” seeks to the end of itself (read seek 0 bytes from the end of the stream). So the result of the block is a virtual stream of the payload of the current record (called fragment in the SSL/TLS spec). That means that fragments is a stream of record payload streams and global variable isData indicates the type of the current record (i.e. the most recent one we read from fragments).

Runs

Now we can treat each fragment as a stream, but we know that the fragment boundaries are meaningless. If we have an algorithm that parses a handshake message from a stream, we can’t give it a fragment, because the message might be continuing in a subsequent fragment. What we need is to be able to treat a sequence of adjacent fragments of the same type as a single continuous stream, let’s call it a “run”. As it happens, Xtreams has a handy construct to combine several streams into one, it’s called “stitching”. It takes a stream of streams and makes it look like a single continuous stream. For example, the following two expressions yield the same result.

```(1 to: 10) reading
(Array with: (1 to: 3) reading with: (4 to: 7)

Conveniently, fragments is a stream of streams, however we don’t want to stitch it all together. We want to stitch just the adjacent fragments of the same type. So, the result will still be a stream of streams, it’s just a stream of runs, rather than stream of individual fragments. To be able to stitch fragments together we need a stream that will keep handing out fragments while the type is the same and ends when the type changes.

```[	fragment := fragments get
runType ifNil: [ runType := isData ].
isData = runType
ifTrue: [ fragment ]
ifFalse: [ Incomplete zero raise ]

We get a fragment, if it’s the first one, we remember its type and we keep returning fragments, until the type changes. When that happens we raise the Incomplete exception, which signals the block stream that it ended, which in turn signals the stitching stream that it ended as well. This however won’t work, because it will consume the first fragment of the following run. We need to rewrite it a bit differently so that the non-matching fragment can be carried over to the next run. We’ll rewrite the construct so that the first fragment of a run is obtained outside of the run stream itself and brought in via an external variable, fragment. That way the first fragment of the next run can be fetched by the final iteration of previous run.

```fragment := fragments get
runType := isData.
[  isData = runType
ifTrue: [ fragment closing: [ fragment := fragments get ] ]
ifFalse: [ Incomplete zero raise ]

The first fragment is read outside of the block stream, we may as well capture the runType at that point. The block stream then simply compares the current isData value to the captured runType, if it matches it returns the current fragment. The difficulty here is that we need to get next fragment after the previous one was read. The best way to achieve that is to fetch the next one in a close block of the previous one. Again if the type changes we raise Incomplete, to signal the end of the run.

Now that we know how to build a single run stream, we need to wrap that up in a stream of run streams. A simple block stream returning the stitched run streams should suffice.

```fragment := fragments get.
runsFinished := false.
runs := [ | runType |
runsFinished ifTrue: [ Incomplete zero raise ].
runType := isData.
[	isData = runType
ifTrue: [
fragment closing: [
[ fragment := fragments get ]
ifCurtailed:
[ runsFinished := true ] ] ]
ifFalse: [ Incomplete zero raise ]

The tricky part is ending the stream. A block stream will keep running the block (whenever it is asked for an element) until a block run raises an Incomplete. Here we want this to be the moment when we run out of fragments, i.e. when getting the next fragment raises an Incomplete. However that action is buried inside the close block inside the stitched stream of a run. When it happens the stitched stream will re-interpret that as the end of itself, so the outer block stream cannot distinguish an end of a run from the end of fragments (it is the end of the last run as well after all). So somehow we need to capture the fact that a fragment get raised an Incomplete and bring that information up to the block stream. That’s what the runsFinished variable is for. Without that the runs stream will keep giving out empty runs forever once it runs out of fragments.

To summarize this step, the “runs” stream turns the “fragments” stream into a stream of runs where adjacent fragments of the same type are stitched together into a single continuous stream, a run. With our sample input we should get following result.

```runs collect: [ :r | r rest ]
=>
#(#(1 2 3 4) #(1 2 3 4) #(1 2 3 1 2 3 4 5) #(1 2)
#(1 2 3 4 5 6 7 8)
#(1 2 3 1 2 3) #(1 2 3 4))```

Data vs Control

Now that we have a stream of alternating data and non-data runs, we need to to stitch the data runs together and log the non-data runs into a separate stream. For that we just need a simple block stream that gets a run, if it’s not data, log it and get next one.

```control := ByteArray new writing.
data := [ | run |
run := runs get.
isData ifFalse: [
control write: run.
run := runs get ].
run

With our sample we should get following results.

```data rest
=>
#(1 2 3 4 1 2 1 2 3 1 2 3)```
```control close; terminal
=>
#[1 2 3 4 1 2 3 1 2 3 4 5 1 2 3 4 5 6 7 8 1 2 3 4]```

Note that the whole processing happens behind the scenes as we’re reading from the data stream, the “data rest” bit simply reads everything from the data stream. It happens lazily as data is being read, nothing is being cached, so the performance characteristics shouldn’t change even if we pump megabytes of data through it. In fact we can easily rewrite the fragment stream so that the sample is generated on the fly.

```sample := Array new writing.
fragments := [ | size |
sample put: (isData := random get even).
sample put: (size := random get).

Here the “sample” stream is used just to log what we’ve generated, so that we can verify that the results are correct. We only log the type and size, the contents are implicit. If we use this version of fragments, we can’t call #rest on the data stream because the fragments never finish it will just keep reading forever. Here’s a sample run where we read a 100 data bytes instead.

```data read: 100
=>
#(1 2 1 2 3 4 1 2 3 4 5 1 1 2 3 4 5 6 7 8 9 1 2 3 4 1 2 1
2 1 2 3 4 1 2 3 4 5 6 1 2 3 4 5 6 7 8 9
1 2 3 4 5 6 7 8 9 1 2 3 1 2 1 2 3 4 5 6 7 1 2 3 4 5 6 1 2
3 4 5 6 7 1 1 2 3 4 5 6 7 1 2 3 4 5 1 2 1 2 3)

sample close; terminal
=>
#(false 6 true 2 false 3 true 4 false 7 true 5 false 5 true 1
false 1 true 9 false 7 true 0 false 8 true 4 false 2
false 8 false 4 true 2 true 2 true 0 false 5 true 4 true 6 true 0
true 9 true 9 false 1 false 6 true 3 false 4 true 2
false 5 true 7 false 7 false 8 true 6 false 8 false 5 false 6 false 2
false 5 false 6 false 4 false 0 true 7 true 1
false 1 true 7 true 5 true 2 false 0 false 1 true 5)

control close; terminal
=>
#[1 2 3 4 5 6 1 2 3 1 2 3 4 5 6 7 1 2 3 4 5 1 1 2 3 4 5 6 7 1 2 3 4
5 6 7 8 1 2 1 2 3 4 5 6 7 8 1 2 3...etc...]```

We can also easily profile arbitrarily large run. Let’s rebuild the fragment stream so that it doesn’t log the samples, otherwise it will keep growing and skew the results unnecessarily. Similarly let’s turn the control log into a bit bucket too.

```control := nil writing.
TimeProfiler profile: [ nil writing write: 10**7 from: data ]```

Here’s a time and allocation profile summary from reading 10MB of data (and about as much control data, assuming reasonably non-biased random generator).

```Time

2083 samples, 17.22 average ms/sample, 4023 scavenges, 0 incGCs,
5 stack spills, 0 mark stack overflows, 0 weak list overflows,
0 JIT cache spills
34.82s active, 1.0s other processes,
35.87s real time, 0.05s profiling overhead
** Totals **
28.8 Context>>findNextMarkedUpTo:
9.6 Context>>terminateTo:
6.5 BlockClosure>>on:do:
3.8 GenericException class>>handles:
3.1 SequenceableCollection>>
replaceElementsFrom:to:withSequenceableCollection:startingAt:
2.8 BlockClosure>>cull:
2.4 ResolvedDeferredBinding>>value
2.3 MarkedMethod>>isMarkedForHandle

Space

1394487 samples, 1045 average bytes/sample, 6760 scavenges, 0 incGCs,
2 stack spills, 0 mark stack overflows, 0 weak list overflows,
0 JIT cache spills
1458037980 bytes
** Totals **
41.7 GenericException class>>new
```control := nil writing monitoring: [ :total | nonData := total ]