Xtreams at Work
Lately I’ve been working on an Xtreams based implementation of SSL/TLS protocol and I think that some specific parts of that could make interesting examples. Examples that are non-trivial, real-life solutions rather than some artificial academic exercises. This is my first attempt to extract an interesting example, hopefully without getting bogged down in too much detail. Although I will try to explain the constructs used in the example, it will require some basic familiarity with Xtreams, the documentation at http://code.google.com/p/xtreams can be used as a reference.
As you may know the SSL/TLS protocol is meant to protect other, higher-level protocols from eavesdropping and tampering. The data payload is simply a stream of bytes, the semantics of it are not relevant to SSL/TLS at all. The payload gets split into chunks called records and each record is then individually encrypted and signed to provide the required protection. There is other traffic beyond just the data on an established connection. There are handshake messages for establishment of session keys, etc. Handshake normally happens at the beginning, but can also happen again later on a long lasting connection, generally to allow refreshing the keying material for improved security. There are also alerts, that allow one side to warn the other about certain conditions (e.g. that the connection is about to be closed). Overall there are four different types of payload that can be carried by a record. The rule is that a single record can carry only one type of payload. However there are no rules about how the payload is partitioned between records. A handshake record can carry several handshake messages inside or a single handshake message can span several handshake records. However a single record cannot carry both handshake messages and data. From the point of view of the data payload the record boundaries are irrelevant and should be completely transparent. Consequently, the most straightforward way to present the data payload is as one continuous stream of bytes. Of course the interleaving chunks of non-data payload need to be filtered out and handled accordingly.
To make the solution a bit less convoluted let’s simplify the problem a bit. Let’s say there aren’t four types of traffic but just two, data and non-data. We also won’t worry about what needs to happen with the non-data, let’s just log it in a separate stream. To have some concrete samples to work with we need to define the record structure. Let’s say a record starts with a boolean indicating if the record carries data or not, then another integer specifying the size of its contents and then the contents itself.
Let’s generate some random samples to work with. Let’s say our records will be anywhere from 0 to 9 bytes long and the contents will be always the sequence from 1 to <size>. If size is 0 the contents will be empty. First let’s make a random generator that will generate an integer between 0 and 9.
random := Random new reading collecting: [ :f | (f * 10) floor ].
The part “Random new reading” yields a stream that exploits an instance of Random to generate random floats in range 0 <= x < 1. The collecting part transforms each float into an integer between 0 and 9. With this we can generate a random sample of 10 records as follows.
sample := Array new writing. 10 timesRepeat: [ | size | size := random get. sample put: random get even; put: size; write: (1 to: size) ]. sample close; terminal.
The ‘Array new writing’ bit creates a simple write stream over an Array, as usual, the array will automatically grow as elements are written to it. Closing a collection write stream will trim the underlying collection to the actually written size and the #terminal message returns it (it’s called terminal because it will return whatever is at the bottom of an arbitrarily high stream stack). Here’s one sample generated by the above code.
#(false 4 1 2 3 4 true 4 1 2 3 4 false 3 1 2 3 false 5 1 2 3 4 5 true 2 1 2 false 8 1 2 3 4 5 6 7 8 true 3 1 2 3 true 3 1 2 3 false 0 false 4 1 2 3 4)
In further text when we refer to ‘sample’ we mean a read stream over such an array, which is created simply by sending #reading to the array.
So we start with a simple stream and we need to parse the records out of it. That is actually quite simple.
fragments := [ | size | isData := sample get. size := sample get. (sample limiting: size) closeBlock: [ :stream | stream -= 0 ]; yourself ] reading.
Sending #reading to a block creates a block stream, which produces each element by running the block, the element is the result of the block run. Now let’s take a look at what the block produces. It gets a single element from the sample stream and puts it in variable isData. Assuming the sample stream is aligned with beginning of a record, the first element should be the Boolean indicating the type of the record. Then we get another element from the sample stream, the record size, and use it as a parameter of the #limiting: message. Sending #limiting: to a stream creates a “virtual” substream. The actual contents of the substream come from the underlying stream, the substream just makes sure we don’t read more than the specified limit. The closeBlock is there to make sure that when we close the substream the underlying is positioned at the end of it, i.e. at the beginning of the next record. The argument to the closeBlock: is the substream itself and the expression “-= 0” seeks to the end of itself (read seek 0 bytes from the end of the stream). So the result of the block is a virtual stream of the payload of the current record (called fragment in the SSL/TLS spec). That means that fragments is a stream of record payload streams and global variable isData indicates the type of the current record (i.e. the most recent one we read from fragments).
Now we can treat each fragment as a stream, but we know that the fragment boundaries are meaningless. If we have an algorithm that parses a handshake message from a stream, we can’t give it a fragment, because the message might be continuing in a subsequent fragment. What we need is to be able to treat a sequence of adjacent fragments of the same type as a single continuous stream, let’s call it a “run”. As it happens, Xtreams has a handy construct to combine several streams into one, it’s called “stitching”. It takes a stream of streams and makes it look like a single continuous stream. For example, the following two expressions yield the same result.
(1 to: 10) reading (Array with: (1 to: 3) reading with: (4 to: 7) reading with: (8 to: 10) reading) reading stitching
Conveniently, fragments is a stream of streams, however we don’t want to stitch it all together. We want to stitch just the adjacent fragments of the same type. So, the result will still be a stream of streams, it’s just a stream of runs, rather than stream of individual fragments. To be able to stitch fragments together we need a stream that will keep handing out fragments while the type is the same and ends when the type changes.
[ fragment := fragments get runType ifNil: [ runType := isData ]. isData = runType ifTrue: [ fragment ] ifFalse: [ Incomplete zero raise ] ] reading stitching
We get a fragment, if it’s the first one, we remember its type and we keep returning fragments, until the type changes. When that happens we raise the Incomplete exception, which signals the block stream that it ended, which in turn signals the stitching stream that it ended as well. This however won’t work, because it will consume the first fragment of the following run. We need to rewrite it a bit differently so that the non-matching fragment can be carried over to the next run. We’ll rewrite the construct so that the first fragment of a run is obtained outside of the run stream itself and brought in via an external variable, fragment. That way the first fragment of the next run can be fetched by the final iteration of previous run.
fragment := fragments get runType := isData. [ isData = runType ifTrue: [ fragment closing: [ fragment := fragments get ] ] ifFalse: [ Incomplete zero raise ] ] reading stitching
The first fragment is read outside of the block stream, we may as well capture the runType at that point. The block stream then simply compares the current isData value to the captured runType, if it matches it returns the current fragment. The difficulty here is that we need to get next fragment after the previous one was read. The best way to achieve that is to fetch the next one in a close block of the previous one. Again if the type changes we raise Incomplete, to signal the end of the run.
Now that we know how to build a single run stream, we need to wrap that up in a stream of run streams. A simple block stream returning the stitched run streams should suffice.
fragment := fragments get. runsFinished := false. runs := [ | runType | runsFinished ifTrue: [ Incomplete zero raise ]. runType := isData. [ isData = runType ifTrue: [ fragment closing: [ [ fragment := fragments get ] ifCurtailed: [ runsFinished := true ] ] ] ifFalse: [ Incomplete zero raise ] ] reading stitching. ] reading.
The tricky part is ending the stream. A block stream will keep running the block (whenever it is asked for an element) until a block run raises an Incomplete. Here we want this to be the moment when we run out of fragments, i.e. when getting the next fragment raises an Incomplete. However that action is buried inside the close block inside the stitched stream of a run. When it happens the stitched stream will re-interpret that as the end of itself, so the outer block stream cannot distinguish an end of a run from the end of fragments (it is the end of the last run as well after all). So somehow we need to capture the fact that a fragment get raised an Incomplete and bring that information up to the block stream. That’s what the runsFinished variable is for. Without that the runs stream will keep giving out empty runs forever once it runs out of fragments.
To summarize this step, the “runs” stream turns the “fragments” stream into a stream of runs where adjacent fragments of the same type are stitched together into a single continuous stream, a run. With our sample input we should get following result.
runs collect: [ :r | r rest ] => #(#(1 2 3 4) #(1 2 3 4) #(1 2 3 1 2 3 4 5) #(1 2) #(1 2 3 4 5 6 7 8) #(1 2 3 1 2 3) #(1 2 3 4))
Data vs Control
Now that we have a stream of alternating data and non-data runs, we need to to stitch the data runs together and log the non-data runs into a separate stream. For that we just need a simple block stream that gets a run, if it’s not data, log it and get next one.
control := ByteArray new writing. data := [ | run | run := runs get. isData ifFalse: [ control write: run. run := runs get ]. run ] reading stitching.
With our sample we should get following results.
data rest => #(1 2 3 4 1 2 1 2 3 1 2 3)control close; terminal => #[1 2 3 4 1 2 3 1 2 3 4 5 1 2 3 4 5 6 7 8 1 2 3 4]
Note that the whole processing happens behind the scenes as we’re reading from the data stream, the “data rest” bit simply reads everything from the data stream. It happens lazily as data is being read, nothing is being cached, so the performance characteristics shouldn’t change even if we pump megabytes of data through it. In fact we can easily rewrite the fragment stream so that the sample is generated on the fly.
sample := Array new writing. fragments := [ | size | sample put: (isData := random get even). sample put: (size := random get). (1 to: size) reading ] reading.
Here the “sample” stream is used just to log what we’ve generated, so that we can verify that the results are correct. We only log the type and size, the contents are implicit. If we use this version of fragments, we can’t call #rest on the data stream because the fragments never finish it will just keep reading forever. Here’s a sample run where we read a 100 data bytes instead.
data read: 100 => #(1 2 1 2 3 4 1 2 3 4 5 1 1 2 3 4 5 6 7 8 9 1 2 3 4 1 2 1 2 1 2 3 4 1 2 3 4 5 6 1 2 3 4 5 6 7 8 9 1 2 3 4 5 6 7 8 9 1 2 3 1 2 1 2 3 4 5 6 7 1 2 3 4 5 6 1 2 3 4 5 6 7 1 1 2 3 4 5 6 7 1 2 3 4 5 1 2 1 2 3) sample close; terminal => #(false 6 true 2 false 3 true 4 false 7 true 5 false 5 true 1 false 1 true 9 false 7 true 0 false 8 true 4 false 2 false 8 false 4 true 2 true 2 true 0 false 5 true 4 true 6 true 0 true 9 true 9 false 1 false 6 true 3 false 4 true 2 false 5 true 7 false 7 false 8 true 6 false 8 false 5 false 6 false 2 false 5 false 6 false 4 false 0 true 7 true 1 false 1 true 7 true 5 true 2 false 0 false 1 true 5) control close; terminal => #[1 2 3 4 5 6 1 2 3 1 2 3 4 5 6 7 1 2 3 4 5 1 1 2 3 4 5 6 7 1 2 3 4 5 6 7 8 1 2 1 2 3 4 5 6 7 8 1 2 3...etc...]
We can also easily profile arbitrarily large run. Let’s rebuild the fragment stream so that it doesn’t log the samples, otherwise it will keep growing and skew the results unnecessarily. Similarly let’s turn the control log into a bit bucket too.
control := nil writing. TimeProfiler profile: [ nil writing write: 10**7 from: data ]
Here’s a time and allocation profile summary from reading 10MB of data (and about as much control data, assuming reasonably non-biased random generator).
Time 2083 samples, 17.22 average ms/sample, 4023 scavenges, 0 incGCs, 5 stack spills, 0 mark stack overflows, 0 weak list overflows, 0 JIT cache spills 34.82s active, 1.0s other processes, 35.87s real time, 0.05s profiling overhead ** Totals ** 28.8 Context>>findNextMarkedUpTo: 9.6 Context>>terminateTo: 6.5 BlockClosure>>on:do: 3.8 GenericException class>>handles: 3.1 SequenceableCollection>> replaceElementsFrom:to:withSequenceableCollection:startingAt: 2.8 BlockClosure>>cull: 2.4 ResolvedDeferredBinding>>value 2.3 MarkedMethod>>isMarkedForHandle Space 1394487 samples, 1045 average bytes/sample, 6760 scavenges, 0 incGCs, 2 stack spills, 0 mark stack overflows, 0 weak list overflows, 0 JIT cache spills 1458037980 bytes ** Totals ** 41.7 GenericException class>>new 19.7 Xtreams.ReadStream class>>on: 17.6 LaggedFibonacciRandom>>nextValue 11.1  in UndefinedObject>>unboundMethod 6.2 Interval class>>from:to:by: 3.7 Xtreams.StitchReadStream class>>on:first:
To put the results in some perspective, the 20MB of records of average size 5 (0 to 9) means we’ve processed about four million records, each as a stream of its own with bunch of other virtual streams set-up on top. There were lots of lightweight, short-lived objects created in the process: the streams, an Incomplete exception at the end of each stream, the sample intervals representing the contents of each fragment, etc. Apparently the floats generated by the random generator are a significant portion of the profile as well. The profile says that we went through 1.5 GB of objects which, frankly, is a bit more than I’d expect, but the good news is we didn’t trigger single incremental GC, it was all handled within the scope of new space. Remember also that the total includes sample generation as well, it seems that we can safely attribute at least a quarter of the space cost to that. Either way the runtime image size didn’t spike at all. With a normal SSL/TLS connection where record size goes up to 16K, the same amount of overhead should easily cover 50GB of payload (probably more). So the cost of this rather powerful abstraction, which completely hides the underlying protocol, should be quite reasonable and easily dwarfed by all the other overhead on a typical SSL/TLS connection (encryption, IO, etc).
PS: In case you wonder, I did verify the claim that the amount of control payload roughly corresponds to the amount of data payload. For that I used the handy monitoring stream. I wrapped it around the control stream as follows.
control := nil writing monitoring: [ :total | nonData := total ] every: 1 seconds
The stream runs the monitoring block at the specified intervals providing some optional handy arguments, first of which is the total number of elements that went through the stream. After the profiling run I simply inspected the value of nonData and it was where it should have been, well within 2% of 10M.