| Edit | Rename | Changes | History | Upload | Download | Back to Top |
This document describes the VisualWorks Threaded API (THAPI) mechanism, a multithreaded extension of the standard VisualWorks DLL and C Connect (DLLCC) facilities. This mechanism allows a VisualWorks application to make multiple concurrent, possibly blocking, calls to external code, each on its own independent thread of control, and it allows VisualWorks to handle multiple concurrent callbacks from external code running on any thread in the VisualWorks process.
The document provides an overview of the design and its necessity, a comprehensive example, a discussion of limitations and performance, and a reference section.
This section provides the rationale for the VisualWorks threaded interconnect. It first explains the problems and limitations imposed by the VisualWorks ObjectEngine prior to the threaded interconnect, and the strategies used to work around these limitations. Finally, it considers the design space for the threaded interconnect and explains why the current architecture was chosen.
All Smalltalk-80 virtual machines in the lineage from the Xerox D-machine implementations through to HPS (the VisualWorks Object Engine) have supported the Smalltalk process model, which provides lightweight processes within a single Smalltalk object space. In all of these implementations the scheduling of Smalltalk processes (STPs) is done by the virtual machine itself, either in response to external events (e.g. from timers and I/O devices) or internal manipulation of Smalltalk semaphores and processes. The ParcPlace HPS virtual machine implements this model upon conventional operating-system platforms that provide their own multiprocessing model and I/O architecture. In all of these contexts, I/O is modulated by the operating system (OS); the only safe access to I/O devices being through system calls into the OS. In all of these contexts, HPS runs as a single heavyweight process with one thread of control. HPS schedules multiple STPs internally, multiplexing (sharing) the single thread of control amongst the various STPs, as appropriate. This multiprocessing is invisible to the outside world; from the host OS's point-of-view HPS appears to be a single thread of control.
When HPS performs I/O on behalf of Smalltalk code, it must actually make an I/O system call. At this point, control passes to the host OS, which performs the requested I/O operation. Since the operation is likely to require external I/O devices to respond, which can take considerable time, the host OS often cannot respond to the I/O request immediately. Since the process invoking the I/O operation is waiting for the result of the I/O operation, it cannot continue; thus, the host OS blocks the process, at least until the I/O operation completes, and schedules other runnable processes in the meantime. Consequently, HPS often blocks when performing I/O operations. Even though there might be other runnable STPs, they cannot run, since HPS itself is blocked by the OS, and is therefore unavailable to schedule and run those STPs.
This state of affairs is extremely deleterious to the performance of Smalltalk applications that have heavy I/O operations and concurrently handle multiple threads of control. This includes middle-tier application servers in a three-tier architecture. In this context, multiple clients send requests to an application server that cause it to make multiple requests on a database. The application server can achieve higher throughput if it can handle multiple concurrent requests for data from clients. However, each time the application server makes a request to the database, the entire application server is blocked, awaiting the completion of the database I/O request. No incoming requests can be responded to, and no pending data can be delivered back to the client(s) until the I/O operation completes.
HPS uses an asynchronous I/O architecture for various I/O connections, as is the case when HPS issues a request for service and is later asynchronously notified, via a software interrupt, that the service is available. Socket and terminal I/O is done this way. This architecture can also be used to build a non-blocking interconnect. One or more separate heavyweight processes, possibly multithreaded, and implemented in a suitable system language, e.g. C, implement a blocking I/O call server. Smalltalk communicates asynchronously with the call server, e.g. via sockets, and the multithreaded call server makes blocking calls on Smalltalk's behalf, notifying Smalltalk when each call completes.
This approach does work, and is used for example in the VisualWave Server 1.0 product. However, there are several problems with this approach:
A number of architectures can be used to ameliorate the blocking I/O problem, from one involving multiple invocations of HPS, communicating amongst each other to process multiple concurrent requests, to the use of host threads to process I/O requests. Most modern operating systems provide for independent threads of control within the address space of a single heavyweight OS process. If one thread makes an I/O request, the host OS blocks that thread until the I/O request is completed, but the OS is free to schedule other threads within the process whilst the I/O operation is pending. All approaches that do not use multiple threads suffer from the same problem, HPS can perform only one blocking I/O operation at a time, during which no other activities can occur. Only solutions using multiple threads can possibly handle multiple blocking I/O requests whilst allowing other activity concurrent with I/O.
Threads managed by the operating system are known as kernel or OS threads. Many thread implementations are available on operating systems which lack kernel threads. These implementations are called library threads since they are typically implemented as a library of routines. In these systems the appearance of multiple threads is simulated by multiplexing a single OS thread amongst a number of library threads. In fact HPS does just this with Smalltalk processes. However, all library thread systems suffer from the same fatal flaw. Once control passes to the kernel no further scheduling can occur until control returns, blocking other threads for the duration of the call. To avoid blocking one must use an asynchronous I/O architecture as discussed in the previous section.
Kernel threads are insufficient to achieve non-blocking I/O. The operating system must allow threads to run in parallel with I/O operations. The circumstances under which the OS is able to schedule multiple threads while I/O operations are in progress are those in which the underlying machine either has multiple CPUs, only one of which need handle I/O requests, or separate I/O processors that are able to handle I/O requests handed off to them by the host OS in parallel with the CPU(s) normal processing, or those in which the I/O device is sufficiently slow that the host OS can issue requests to the I/O device and interrogate the device at a later time by scheduling an interrupt timer. Some machines provide multiple threads but lack the necessary hardware features that enable the scheduling of other threads whilst I/O is in progress. On such platforms the architecture limits this objective; the threaded interconnect depends both on the existence of multiple threads and an appropriately architected host OS and hardware combination.
Two mappings of the current HPS system to a system using multiple threads have been considered; firstly, the mapping of each runnable STP to its own thread, and secondly, the use of a single thread to schedule Smalltalk processes, with multiple threads to perform I/O operations, as required. The former approach is fraught with complications, the most important among these being:
A simpler approach is to use multiple threads only to make I/O calls. As in the current system, a single thread is used to run all Smalltalk processes. Whenever a potentially blocking I/O call is to be made, a separate thread is handed the information necessary to make the call, executes the call, and is blocked by the OS until the call completes and the OS reschedules the thread. Meanwhile, the ObjectEngine thread blocks the Smalltalk process that invoked the operation, scheduling other runnable STPs. Once the I/O thread is scheduled, it passes back the result(s) of the call to the ObjectEngine thread. The ObjectEngine thread is then free to resume the STP that invoked the call. Once the I/O thread has passed its results back, it can either terminate, or remain in a quiescent state, awaiting subsequent requests for I/O operations.
This scheme creates the illusion of a truly multithreaded Smalltalk. Note that the true multithreaded architecture provides only the ability to run more than one STP concurrently on true multiprocessors; i.e., machines with multiple CPUs. On machines with a single CPU, only one thread can run at a time; therefore, even though Smalltalk might have multiple runnable processes mapped to multiple runnable threads, only one of these can run at a time. Whilst the use of machines with multiple CPUs is likely to increase, especially to exploit the parallelism of these machines, currently few customers are doing so. On single processor machines, HPS implements the current Smalltalk process model with excellent performance (process switch time of about 25 microseconds on a 60 MHz Pentium). Viewed as a black box, on a single processor the architecture presented herein provides the illusion of a true multithreaded implementation, except that:
Using multiple threads only for implementing I/O operations is much simpler to implement than a truly host-threaded implementation. On single processor machines, this approach is equivalent to a true multithreaded implementation. Hence, this architecture has been chosen as the basis for the threaded interconnect, called THAPI for short.
HPS is based on a copying garbage collector. The garbage collector routinely moves objects within the heap at arbitrary times. This is not a problem for normal synchronous calls, because for the duration of a call the garbage collector is blocked along the rest of HPS, and hence no relocation can occur. However, moving objects within the heap is a problem if the call invokes code that tries to remember the address of an object for subsequent calls, or if the code invokes a callback. In either case, when HPS runs, the garbage collector might move the object, invalidating the information retained by the foreign code. In the current system, it is the programmer's responsibility to cope with foreign code that depends on being passed fixed pointers by using DLLCC's malloc facilities, which allow Smalltalk programmers to manipulate data on the C heap (obtained via malloc) and reference this data using instances of CPointer.
Since HPS is not blocked during a threaded call, the garbage collector is free to move objects whilst the call is in progress. If the garbage collector moves an object being used as an argument to a threaded call, disaster often follows, either by passing invalid data to the foreign code, or by the foreign code writing to invalid addresses and corrupting the Smalltalk heap. For this reason, all reference parameters in threaded calls must refer to data that does not move for the duration of the call. There are three ways to achieve this. Firstly, you can use the existing DLLCC malloc facilities. Secondly, you can arrange it so that the object's data is copied to some fixed space at the start of the call, and copied back once the call completes. Lastly, the object's data can be relocated to a fixed space, where it resides for at least the duration of the call. The second alternative does not preserve referential integrity; that is, if an object is passed to more than one concurrent threaded call, or if Smalltalk code accesses the object whilst the call is in progress, the participants see different copies of the data. This is a serious problem in applications that want to share data, e.g. applications that share pools of I/O buffers between I/O drivers and applications, as supported by some file systems such as Windows NT. Further, the overhead of copying data to and from some fixed space might be unacceptably high.
Consequently, the Smalltalk heap now has an additional space called FixedSpace. The bodies (contents) of objects in FixedSpace do not move, preserving referential integrity and allowing them to be used freely as the arguments in threaded (and normal) calls. Promotion of objects to FixedSpace is automatic, and occurs when a mobile argument is passed as an argument to a threaded call. Further, objects can be created in FixedSpace using new object-creation primitives.
The cost of thread creation can be high, so the threaded interconnect should manage a pool of threads, rather than creating a thread for each threaded call. (For example, making a trivial threaded call on a 60MHz Pentium running Windows NT 4.0 takes about 300 microseconds. If a thread must be created in addition, the call takes about 3500 microseconds, an overhead of around 1100 percent.) Since some applications require that an API be used by a specific thread (e.g. a debugging process observing some other process under Windows NT), the interconnect should also allow programmers to ensure that a specific thread is used to make a call. The THAPI meets these requirements by maintaining a pool of active threads, and, as much as possible, by using the same I/O thread to perform calls on behalf of a specific Smalltalk process. Callbacks from I/O threads are run on the stack of the Smalltalk process that issued the callout. Callbacks from foreign threads (threads other than ones created by the threaded interconnect) are handled on new Smalltalk processes that remain associated with the foreign thread until it returns from the callback. Facilities are provided to reserve an I/O thread for the sole use of a specific Smalltalk process, and to control the size of the pool.
From the DLLCC programmer's perspective, THAPI is a relatively small extension of the existing DLLCC facilities. A good way to explain these new facilities is through a simple example that illustrates all features except callbacks. In the example, a multithreaded "server" is constructed with a number of Smalltalk processes waiting for "requests" on an I/O connection. For simplicity, the requests within a single image are generated and served.
In this example, a "request" is simply a character string, and a response to a request is to display the string in the System Transcript. Requests are written to a single pipe. (A pipe is an I/O channel represented by a pair of file descriptors. A write of data via the write file descriptor makes that data available for reads via the read file descriptor.) Each "server" process is a loop that blocks, waiting for data to appear on the pipe before writing this data to the transcript. Another process makes "requests" by writing data to the pipe. To give the Smalltalk system something computationally intensive to do whilst all this is going on, the data written to the pipe is the results of running a simple benchmark.
This example, even though very simple, is sufficient to require all THAPI facilities other than callbacks. If you were to run the example without THAPI, Smalltalk would freeze immediately. As soon as a Smalltalk process attempts to wait for a request by performing a blocking read on the pipe, the ObjectEngine blocks in the read call. Hence, the Smalltalk process that generates requests can no longer run. With no data written to the pipe, the server processes never return from the call, resulting in deadlock.
Our example class is NonBlockingPipeInterface, since it waits for data on a pipe without blocking the ObjectEngine. It is a subclass of ExternalInterface, since it has a number of external methods. Amongst its instance variables is a pair of file descriptors, infd and outfd, for the pipe, and a boolean flag running that determines when each "server" process terminates.
Each "server" loops in the reader: method, reading data and then writing the data to the Transcript.
NonBlockingPipeInterface methods for server
reader: id
"Read from the pipe as long as running is true. Print what ever is
read from the pipe to the Transcript and tag it with id."
| buffer count |
buffer := CIntegerType char gcMalloc: 1025. "Use a buffer on the C heap for the read call."
[running] whileTrue: "Continue until running is false"
[count := self read: infd with: buffer with: 1024. "Make a blocking read from pipe on its own thread."
TranscriptProtect critical: "Use a mutex to serialize writing to Transcript"
[Transcript cr; print: id; tab. "Print this reader's id tag."
(count > 1024 or: [count < 0]) "Check read result and complain if its in error."
ifTrue: [Transcript nextPutAll: 'READ RETURNED '; print: count]
ifFalse:
[buffer at: count put: 0. "Null-terminate then copy data as a String."
Transcript nextPutAll: buffer copyCStringFromHeap].
Transcript endEntry]]
The read buffer is allocated on the C heap. The buffer is 1025 bytes long, large enough for 1024 characters and a null-terminating byte. Smalltalk objects can be moved by the garbage collector, which can and might run whilst a threaded call is in progress. Consequently, normal Smalltalk objects cannot be used as container arguments to _threaded calls, since the garbage collector might move them independently of the threaded call. Later, this example is refined to include the use of fixed-space objects.
The Transcript is not thread safe, so access to multiple reader processes attempting to write to the Transcript at the same time must be serialized. A class variable, TranscriptProtect, is used to achieve this.
NonBlockingPipeInterface class methods for class initialization
initialize
"Initialize the mutex for serializing writes to the Transcript and a constant to open the pipe in binary mode."
TranscriptProtect := Semaphore forMutualExclusion.
O_BINARY := 16r8000 "from msdev\include\fcntl.h"
"self initialize"
As for the threaded call itself, the send message that invokes the call is indistinguishable from a normal DLLCC call. The threaded-ness is a property of the external method specifying the call. Threaded calls are specified by using the _threaded pseudo-qualifier in the C pragma of an ExternalMethod. Note that the _threaded keyword must follow the function's return-type. The type of the buffer argument is _oopref , which passes a pointer to the buffer's contents without interpreting its contents. (Using a type such as char * causes the contents of a ByteString or TwoByteString buffer to be checked to ensure that the character set in the string agrees with the platform's character set, which is unnecessary in this case.
NonBlockingPipeInterface methods for procedures
read: fd with: buffer with: size
"Invoke the read system call on its own thread and hence avoid blocking the ObjectEngine."
<C: long _threaded read(int fd, _oopref *buffer, unsigned long size)>
^self externalAccessFailed
An invocation of the method causes the calling Smalltalk process to block until the read call returns. Meanwhile, other runnable Smalltalk processes can execute. To perform the call the ObjectEngine provides a thread that is available to make the call, passes all the information necessary to make the call (the function and arguments) to the thread, and blocks the calling Smalltalk process until the thread returns a result. Once the result is returned, the ObjectEngine passes the result back to the process and allows it to continue. Importantly, the thread making the call is given the next higher priority to the ObjectEngine thread to ensure that it makes progress, even if the Smalltalk system has other runnable processes for the ObjectEngine to execute.
Since pipes have limited capacity, it is possible to block when writing to a pipe. A write to a pipe might block until sufficient reads have been done to make space available for the write. Thus, to ensure that the example's computation is not interrupted by potentially blocking writes, a separate process is used to perform the writes via threaded calls. The write process reads results from the generator through an instance variable, results, which is a SharedQueue. A SharedQueue is a thread-safe way of communicating between processes, somewhat analogous to a pipe. An object added to the queue via nextPut: is available via next. If the queue is empty, the calling process blocks in the next method until an object is added to the queue via nextPut:.
The writer is rather similar to a reader:
NonBlockingPipeInterface methods for server
writer
"Loop writing strings from the results queue to the pipe."
| result buffer writeCount |
buffer := CIntegerType char gcMalloc: 1024. "Use a buffer on the C heap for the read call."
[true] whileTrue:
["Get the next result from the results shared queue. This process waits until one is available.
Convert the result to a ByteArray since the type of the buffer is char (an integer)."
result := results next asByteArray.
buffer copyAt: 0 from: result size: results size startingAt: 1. "Copy the string into the buffer."
writeCount := self write: outfd with: buffer with: results size. "Write the buffer's data to the pipe."
writeCount ~= results size ifTrue: "Check the write operation succeeded."
[TranscriptProtect critical:
[Transcript
cr;
nextPutAll: 'WRITE RETURNED '; print: writeCount;
nextPutAll: ' EXPECTED '; print: results size;
nextPut: $!; endEntry]]]
The writer does not test running, since it is explicitly terminated. The write is also a threaded call:
NonBlockingPipeInterface methods for procedures
write: fd with: buffer with: size
"Invoke the write system call on its own thread and hence avoid blocking the ObjectEngine."
<C: long _threaded write(int fd, _oopref *buffer, unsigned long size)>
^self externalAccessFailed
To implement the rest of the interface, you first need some interface functions to open and close the pipe, and you need a benchmark to run.
NonBlockingPipeInterface methods for procedures
close: fd
"Close the file descriptor fd"
<C: int close(int fd)>
^self externalAccessFailed
pipe: arg
"UNIX pipe creation function."
<C: int pipe(int [])>
^self externalAccessFailed
pipe: arg ofSize: size mode: textMode
"NT pipe creation function."
<C: int pipe(int arg[], unsigned int size, int textMode)>
^self externalAccessFailed
Integer methods for mathematical functions
nfib
"The nfib benchmark calculates a rough measure of activations per second. This is a version of fibonacci in
which 1 is added for each activation. The result is therefore equal to the number of activations required to
calculate that result. To get the 'nfib' figure of nfib activations per second choose a value which takes nfib
about 30 seconds to calculate. Then divide the result by the time taken, yielding activations per second."
self < 2 ifTrue: [^1] ifFalse: [^(self - 1) nfib + (self - 2) nfib + 1]
Another method is used to open the pipe, since it must be opened differently under UNIX and Windows. Also, a separate method is used to terminate the example, since it might be run in the background and need to be terminated from some other process. (The terminate method is careful to do nothing if already terminated. On process termination, any unwind blocks in the process are run. Hence, if terminate is sent from some other process, it gets sent again from the unwind block in the readers: method when terminate kills the generator process. An instance variable generator is used to refer to the process running the benchmark, and the instance variable readers is used to refer to the collection of readers.
NonBlockingPipeInterface methods for initialize-release
openPipes
"Create the pipe. Note that pipe in the MSVC run-time library is different from the standard UNIX pipe."
| fds |
fds := CIntegerType int gcMalloc: 2.
(OSHandle currentOS == #win32
ifTrue: [self pipe: fds ofSize: 1024 mode: O_BINARY]
ifFalse: [self pipe: fds]) < 0
ifTrue: [self error: 'pipe open failed.'].
infd := fds at: 0.
outfd := fds at: 1
terminate
"Terminate all the relevant processes and close the pipe."
running ifFalse: [^self]. "Don't do anything if already terminated."
running := false.
generator == Processor activeProcess ifFalse: [generator terminate].
"Write sufficient data to the pipe so that all readers get data, and hence by checking running, stop."
readers size * 2 timesRepeat: [results nextPut: 'so long!'].
"Delay until the results have been written by the writer and then kill the writer.
Yield doesn't work if the process doing terminate has a higher priority than the writer so use a delay."
[results isEmpty] whileFalse: [(Delay forMilliseconds: 20) wait].
writer terminate.
self close: infd; close: outfd. "Close the pipe"
The "main loop" opens the pipe, creates a shared queue to communicate results to the writer, spawns the readers and writer, and then loops, generating data. On unwind, it calls terminate to shut down.
NonBlockingPipeInterface methods for public access
readers: n
"Run the example with n reader processes."
results := SharedQueue new.
self openPipes.
generator := Processor activeProcess. "remember the generator process for terminate.""
running := true.
"Fork a writer process to write data to the pipe. Its priority is higher than the generator
to ensure writes happen promptly."
writer := [self writer] forkAt: generator priority + 1.
"Fork n readers at a higher priority so that results get read and displayed."
readers := (1 to: n) collect: [:i| [Processor yield. self reader: i] forkAt: generator priority + 1].
"Generate some data. Use a benchmark the author is excessively fond of. But anything would do."
[| i r t s nfibs |
s := String new writeStream.
i := 0.
[t := Time millisecondsToRun: [r := i nfib].
s reset.
nfibs := Number errorSignal handle: [:ex| '??'] do: [(r * 1000.0 / t) rounded].
s nextPutAll: 'nfib '; print: i; nextPutAll: ' = '; print: r;
tab; tab;
nextPutAll: 'nfibs '; print: nfibs;
nextPutAll: ' ('; print: t / 1000.0; nextPutAll: ' seconds)'.
results nextPut: s contents. "Put datum in results shared queue for the writer to consume."
"Increase the value from which we compute nfib, limiting it to one that takes 30 seconds or less to run."
i := t > 30000 ifTrue: [0] ifFalse: [i + 1]] repeat]
valueNowOrOnUnwindDo: [self terminate]
The underlying C functions for accessing the pipe are in the C library. On Windows the C library is one of the MSVCRTNN.DLL DLLs. On Solaris the C library is /usr/lib/libc.so, and on Digital UNIX it is /usr/shlib/libc.so. You can use a single interface class for all these cases, provided the interface copes with the libraryNotFoundSignal, which is raised when an attempt is made to open a nonexistent library. For example, the libraryNotFoundSignal signal is raised if the interface tries to open /usr/shlib/libc.so on a Windows machine.
ExternalInterface supports a standard idiom for doing just this. The class declaration should include the full set of library files and directories for all systems, and on the class side of the interface you implement the libraryFilesSearchSignals method to return the Signal or SignalCollection of the signals to be ignored during library loading. Thus, you need the following method to avoid raising a signal when using the interface's procedures:
NonBlockingPipeInterface class methods for private
libraryFilesSearchSignals
"Answer a SignalCollection used to handle exceptions raised when scanning for library files. The signals
answered by this method results in those signals being ignored by the library search machinery. Clients
should not answer signals they wish to receive."
^ExternalLibraryHolder libraryNotFoundSignal
On Windows you also need to know where to look for MSVCRTNN.DLL. One enhancement in VisualWorks 2.5.2 is the ability to use environment variables in the list of library files and directories. (You can also use 'patterns' to match against OSHandle currentPlatformID. Browse ExternalLibrary>> findFile: inDirectories: for a full description.) In the following, $windir expands to the value of the windir environment variable; for example, C:WIN95. Thus, the following class declaration loads the appropriate C library on Windows 95, Windows NT 3.51 & 4.0, and Solaris and Digital UNIX.
ExternalInterface subclass: #NonBlockingPipeInterface
includeFiles: ''
includeDirectories: ''
libraryFiles: 'libc.so msvcrt40.dll msvcrt20.dll '
libraryDirectories: '/usr/shlib /usr/lib $windir\system $windir\system32 '
generateMethods: ''
beVirtual: false
optimizationLevel: #full
instanceVariableNames: 'infd outfd generator writer readers results running '
classVariableNames: 'O_BINARY TranscriptProtect '
poolDictionaries: 'NonBlockingPipeInterfaceDictionary '
category: 'ExternalInterface- THAPI Example
To run the example, evaluate, e.g. NonBlockingPipeInterface new readers: 10. Here is a screenshot of the transcript output when running this example on a 60MHz Pentium running Windows 95.
The example is extended next to add some control over the number of threads that can be created. THAPI provides a hard limit on the maximum number of threads that can be created and a low-tide limit on the number of quiescent threads. When the ObjectEngine starts up, it initializes both the upper and lower thread limits to 32.
A thread is created when a threaded call is made by a process with no associated thread and no unassociated threads exist in the pool. For the duration of the call, the thread is then associated with the calling process and is used in any nested threaded calls. For example, if a callback occurs during the call, the callback runs in the same process that made the callout. Any threaded callouts made from this process whilst the outermost threaded call is still in progress are made by the same thread.
Once the outermost threaded call returns, the thread is disassociated with the calling process and is returned to the pool. The thread can then be used to perform a call on behalf of any process. Thus, a burst of concurrent threaded calls can result in the creation of a number of threads which, when the calls return, end up unassociated in the pool. The live thread low tide is used to control the size of the pool. If the total number of threads maintained by ObjectEngine exceeds the low tide, then unassociated threads in the pool are terminated until the low tide is met.
For example, if we set the limit to 64 and the low tide to 32 and evaluated NonBlockingPipeInterface new readers: 60, then the ObjectEngine creates 61 threads (60 concurrent read calls from readers and one concurrent write call from the writer). As each call returns, the thread that made the call is terminated, since the total number of threads is higher than the low tide. However, once the example terminates and all calls return, the ObjectEngine keeps 32 threads alive, ready for future use.
The following low-level methods provide access to the limit and the low tide:
ProcessorScheduler methods for private
primGetThreadLevels
"Return an array of
@1 the limit on the number of active threads created by the OE.
@2 the low tide on the number of active threads maintained by the OE. The
OE does not reduce the number of inactive threads below this level.
@3 the current number of active threads
@4 the current number of inactive (but live) threads
@5 the number of foreign threads currently calling in.
@6 the number of threads created by the OE since start-up.
@7 the number of threads killed by the OE since start-up.
Counts 1 through 4 are inclusive of the OE's single Smalltalk thread. Fail if the array cannot be created."
<primitive: 1055 errorCode: errorCode>
^self primitiveFailed
primSetThreadLimit: limit lowTide: lowTide
"Set the OE's thread limit and low tide. A less-than-zero value is ignored, allowing each level to be set
independently. Note that these values must be at least 1 since there is one thread devoted to Smalltalk."
<primitive: 1056 errorCode: errorCode>
^self primitiveFailed
If you try to run the example with more readers than the live thread limit, the example deadlocks. As readers are forked, they make threaded calls, resulting in threads being created. Once the number of readers exceed the thread limit, subsequently attempted read calls fail with an 'out of threads' error. When the writer attempts its write call, it fails in the same way for the same reason. Thus, to guarantee progress in the presence of potential thread starvation, a thread must be reserved for exclusive use by the writer. This is called attaching a process to a thread. (Another circumstance in which you need to attach a process to a given thread is where a specific thread must be used to make certain calls. One example is in Windows, where a specific thread must be used to make debugging calls to inspect the state of a process being debugged.) The following methods provide control over thread attachment.
Process methods for native thread support
attachToThread
"Reserve a native thread for the receiver to make _threaded calls."
self primThreadAttachment: true
detachFromThread
"Release the native thread from its attachment to the receiver."
self primThreadAttachment: false
isAttachedToThread
"Answer if the receiver is attached to a native thread."
^self primThreadAttachment: nil
Now the example can be extended to cope with thread starvation. Change the readers: method to reserve a thread for the writer, and add a method to handle out-of-threads exceptions in reader processes, as follows.
NonBlockingPipeInterface methods for public access
readers: n
"Run the example with n reader processes."
results := SharedQueue new.
self openPipes.
generator := Processor activeProcess. "remember the generator process for terminate.""
running := true.
"Fork a writer process to write data to the pipe. Its priority is higher than the generator to ensure writes
happen promptly. It needs to reserve its own thread because there might be more readers than available
threads, and if the writer can't get a thread because they have been used up by the readers the example
deadlocks, since the readers get no data unless some is written."
writer := [self writer] forkAt: generator priority + 1.
writer attachToProcess.
"Fork n readers at a higher priority so that results get read and displayed."
readers := (1 to: n) collect: [:i| [Processor yield. self startReader: i] forkAt: generator priority + 1].
"Generate some data. Use a benchmark the author is excessively fond of. But anything would do."
[| i r t s nfibs |
s := String new writeStream.
i := 0.
[t := Time millisecondsToRun: [r := i nfib].
s reset.
nfibs := Number errorSignal handle: [:ex| '??'] do: [(r * 1000.0 / t) rounded].
s nextPutAll: 'nfib '; print: i; nextPutAll: ' = '; print: r;
tab; tab;
nextPutAll: 'nfibs '; print: nfibs;
nextPutAll: ' ('; print: t / 1000.0; nextPutAll: ' seconds)'.
results nextPut: s contents. "Put datum in results shared queue for the writer to consume."
"Increase the value from which we compute nfib, limiting it to one that takes 30 seconds or less to run."
i := t > 30000 ifTrue: [0] ifFalse: [i + 1]] repeat]
valueNowOrOnUnwindDo: [self terminate]
startReader: id
"Place an exception handler around the reader: method to catch ou-of-thread exceptions."
self externalAccessFailedSignal
handle:
[:ex |
ex name == #'out of threads'
ifTrue:
[TranscriptProtect critical:
[Transcript
cr; print: id;
nextPutAll: ' CANNOT PROCEED. OUT OF THREADS';
endEntry].
Processor activeProcess terminate]
ifFalse: [ex reject]]
do: [self reader: id]
Here's a screenshot of evaluating the following with the new example
Processor primSetThreadLimit: 6 lowTide: 4.
NonBlockingPipeInterface new readers: 10
The limit is set to six threads. Inclusive of the ObjectEngine thread, this leaves five threads for making threaded calls. One is reserved for the writer thread. Thus, the last six readers fail, due to the thread limit being exceeded, and they terminate, leaving four threads to read the pipe.
There are three basic representations of Smalltalk objects: objects containing references to other objects, objects containing binary data, and "immediate" objects. An object containing references to other objects is a vector of 32-bit slots, each slot holding an object pointer, or oop. An object containing binary data is a vector of bytes. Oops are always 32 bits and have two forms, the first being the address of some other object, the second, an immediate object, being an encoding of either an integer or a character. For a number of reasons, including efficient implementation of become: and more efficient compaction, non-immediate objects are split into two parts: a header and a body. 
Thus, for example, to find the class of an oop, the ObjectEngine tests the bottom 2 bits. If they are 01, the class is SmallInteger; if they are 10, the class is Character; otherwise, the class is the second word in the header to which the oop points.
One of the flags in the header indicates whether the body of the object contains oops (as it does with normal pointer objects, nil, Points Arrays, etc.) or bytes (as it does with non-pointer objects such as ByteString, ByteArray, etc.). The 11-bit field holds the size of the object in bytes. If the object is too large for its size to be represented in this field, an extra word is allocated to hold a 32-bit field, which gives the size in bytes.
The pointer to the object's body in the header always points to the first instance variable or first byte of an object. When one passes, e.g. a ByteArray, as an argument to a DLLCC call, the ObjectEngine passes this pointer, which is the address of the first byte in the object.
The garbage collector moves both object headers and object bodies when it collects. Consequently, both oops (pointers to headers) and body pointers might be updated during garbage collection. This is not normally an issue for DLLCC calls, since the ObjectEngine, and hence the garbage collector, cannot run for the duration of the call. However, it is an issue for both callbacks and threaded calls. If a callback happens during a normal DLLCC call, the garbage collector might run; hence, the requirement that any DLLCC call that might call back must register any oops that it requires to persist across the callback.
In the case of a threaded call, the garbage collector might run at any time during the call, since the ObjectEngine and thread run concurrently. Consequently, you cannot pass oop arguments through threaded calls. Further, you cannot use the message sending facilities from within threaded calls, since these also depend on oops.
You can obviously use data allocated on the C heap (accessed via Cdatum and its subclasses). However, using these facilities is clumsy and can be inefficient, since data might have to be copied from the C heap into a Smalltalk object before it can be used.
To circumvent these problems, THAPI is augmented with a new space, FixedSpace, in the Smalltalk object heap. Non-pointer objects can have their bodies in FixedSpace. Throughout their lifetime, object bodies in FixedSpace are never moved, but they are still garbage-collected. Further, all object bodies in FixedSpace have at least two extra bytes allocated to provide for null-termination of single and double-byte strings.
The ObjectEngine ensures automatically that the body of any byte-like object that is passed as a pointer argument to a threaded call gets promoted to FixedSpace. This ensures that the garbage collector does not move the object's body during the _threaded call, although the garbage collector might move the object's header (and hence need to change its oop). For efficiency, you might want to avoid promoting objects during a call, hence THAPI provides facilities for instantiating byte-like objects in FixedSpace. The following protocol allows you to instantiate objects in FixedSpace:
Behavior methods for instance creation
newInFixedSpace: anInteger
"Answer with a new instance of the receiver, a class, with the number of indexable variables specified by
the argument, anInteger. Arrange that the object's data resides at a fixed address through-out the object's
lifetime. Such an object is suitable for passing to foreign code, since it does not move over time, and can be
effectively shared between Smalltalk and foreign code. Fail if the class is not bits-indexable, or if the
argument is not a positive Integer."
<primitive: 1040 errorCode: errorCode>
^self handleFailedNewInFixedSpace: errorCode size: anInteger
basicNewInFixedSpace: anInteger
Same as newInFixedSpace: above
The following protocol allows you to promote objects to FixedSpace:
SequenceableCollection methods for converting
asFixedArgument
"Coerce the receiver to an object whose data resides at a fixed address. If the receiver already has fixed
data return the receiver, otherwise return a copy of the receiver which does have fixed data."
| copy size |
self isFixedArgument ifTrue: [^self].
copy := self class basicNewInFixedSpace: (size := self basicSize).
copy replaceFrom: 1 to: size with: self startingAt: 1.
^copy
The following protocol allows you to test whether an object's body resides in fixed space:
CDatum methods for testing
isFixedArgument
"Answer if the receiver, when passed as an argument through the DLLCC, represents data at a
fixed address. C objects do reside at fixed addresses. So answer true."
^true
Object methods for external testing
isFixedArgument
"Answer if the receiver, when passed as an argument through the DLLCC, represents data at a
fixed address. This is true for objects created via the newInFixedSpace: primitive. Fail if the receiver is
immediate, since it has no data. In this case the Smalltalk code copes with the immediate case."
<primitive: 1041>
^self class hasImmediateInstances
hasFixedData
"Answer if the receiver's data resides at a fixed address. This is true for objects created via the
newInFixedSpace primitive. Fail if the receiver is immediate, since it has no data."
<primitive: 1041>
^false
Using FixedSpace, you can slightly simplify the example by reimplementing reader: and writer to use FixedSpace in a new subclass, FSNBPI (for FixedSpaceNonBlockingPipeInterface).
FSNBPI methods for server
reader: id
| buffer count |
"Use a normal Smalltalk string, but allocate it in FixedSpace."
buffer := String defaultPlatformClass newInFixedSpace: 1024.
[running] whileTrue:
[count := self read: infd with: buffer with: buffer size.
TranscriptProtect critical:
[Transcript cr; print: id; tab.
(count > buffer size or: [count < 0])
ifTrue: [Transcript nextPutAll: 'READ RETURNED '; print: count]
ifFalse: [1 to: count do: [:i| Transcript nextPut: (buffer at: i)]].
Transcript endEntry]]
writer
"Loop writing strings from the results queue to the pipe."
| result writeCount |
[true] whileTrue:
[result := results next.
writeCount := self write: outfd with: result with: result size.
writeCount ~= result size ifTrue:
[TranscriptProtect critical:
[Transcript
cr;
nextPutAll: 'WRITE RETURNED '; print: writeCount;
nextPutAll: ' EXPECTED '; print: result size;
nextPut: $!; endEntry]]]
Thus, the buffer in the above version of reader: is instantiated in FixedSpace, while in the above version of writer each string gets promoted to FixedSpace on each call of write.
THAPI supports callbacks from arbitrary threads. Callbacks are not restricted to threads used for threaded callouts. However, the exact handling of callbacks does depend on which thread makes the callback. Threaded callbacks do not require any new syntax. The threaded-ness of a callback is determined by which kind of thread calls back.
Firstly, if a callback is made by the ObjectEngine thread, as happens when a normal (not threaded) callout calls back, the callback is handled in exactly the same way as in the existing DLLCC. The last-in, first-out restriction on such callbacks is still present.
If a callback happens from a thread used to make a threaded callout, then the callback runs in the same process that made the threaded callout. Hence, it runs at the same priority as the process that made the callout. If the process makes a nested callout (does a threaded callout within a threaded callback), then the callout happens on the same thread.
A callback from any other thread is termed a foreign callback, since the thread is not in the ObjectEngine's thread pool. Foreign callbacks run on their own special process. A new foreign callback process is created to run each bottom-level foreign callback. The priority of this process is controlled by the ForeignCallbackPriority class variable in CCallback and by default is Processor lowIOPriority - 5. No protocol is available to change this priority. If you need to change this priority, then you can change the class variable explicitly, e.g. CCallback classPool at: #ForeignCallbackPriority put: Processor userInterruptPriority - 1. But it is probably better to change the priority of a callback within the Smalltalk code for the callback.
Once a foreign callback occurs, the process created to service the callback remains associated with that thread until the callback returns (or the process terminates). Subsequent threaded callouts from this process happen in the foreign thread. If one of these callouts were to call back then the callback would run on the same process. Hence, a foreign callback process is created only for each bottom-level foreign callback.
Since each threaded callback (foreign or otherwise) is running on its own thread's stack, there is no problem with the ordering of returns from threaded callbacks. The LIFO restriction on normal callbacks results from the fact that each callback shares the stack of the Smalltalk thread, so each must return in LIFO order to ensure the stack can be cut-back safely.
As explained earlier in the section Objects and FixedSpace, you cannot pass oop arguments to threaded calls nor send messages from within threads. This is because the garbage collector moves objects and hence updates oops at arbitrary times relative to other threads.
Although the VisualWorks 2.5.2 ObjectEngine has facilities for growing FixedSpace, the Smalltalk code to exercise this functionality is not included. So effectively, FixedSpace is also fixed in size to the default startup value of 160 Kbytes. New ObjectMemory code that permits FixedSpace growth is planned for a release sometime in calendar 1997.
All threads created by THAPI run at the next highest priority to the ObjectEngine thread. This is to ensure that they make progress relative to the ObjectEngine. In cases where threads are to be used to perform some blocking action, as in the example, this behavior is appropriate. However, if threads are to be used to perform some computation, they prevent the ObjectEngine from running unless the underlying machine is a multiprocessor. It is possible to augment the THAPI interface with control over thread priorities, but the impact on performance (that of potentially changing the priority of a thread on each call) outweighs the utility of priority control. There is nothing to prevent you from changing the priority of a thread within your own code, reached through a threaded call. On Solaris and Digital UNIX, THAPI relies on those system's implementation of POSIX threads. On Windows THAPI uses standard Windows threads.
To change thread priority on Solaris and Digital UNIX, use code similar to the following:
priority = pthread_getprio(pthread_self());
if (pthread_setprio(pthread_self(),priority + delta) <= 0)
errinfo = errno;
On Windows use code such as the following:
priority = GetThreadPriority(GetCurrentThread());
if (!SetThreadPriority(GetCurrentThread(),priority + delta))
errinfo = GetLastError();
Be aware that you should reset the thread's priority before returning, since THAPI does not alter a thread's priority, once created. Remember that if the ObjectEngine is busy, lower priority threads can be blocked indefinitely.
The current VisualWorks debugger is not "process-aware". Any primitive invoked whilst in the debugger is made by the debugger's process, not by the process being debugged. Consequently, the debugger should not be used to step through threaded calls. THAPI pairs threads and processes; thus, when the debugger process is used to issue a threaded call, THAPI might become confused, and the call can fail with a 'no such thread' error.
Terminating a Process that is waiting for the results of a threaded call kills the associated thread, unless the thread is foreign. This might or might not create problems, depending on the context. For example, the following version of terminate (in the example) works fine on Windows, but on Solaris it causes the ObjectEngine to freeze (because of a bug in Solaris). On Solaris, if any thread is in a read call and some other thread closes the read size of the pipe, the process freezes. Each reader thread might not be terminated until reaching a termination point, so even though the ObjectEngine has attempted to terminate the reader threads, they might not terminate immediately, and if a thread remains in the read call when the read side of the pipe is closed, the entire process freezes.
NonBlockingPipeInterface methods for initialize-release
terminate
"Terminate all the relevant processes and close the pipe."
running ifFalse: [^self].
running := false.
generator == Processor activeProcess ifFalse: [generator terminate].
writer terminate.
readers do: [:ea| ea terminate].
self close: infd; close: outfd
In general, it is wise to avoid terminating processes that have threaded calls in progress, unless absolutely necessary. In particular, terminating a thread on Windows causes a memory leak, so you can create and terminate only a fixed number of threads. The actual number probably depends on the amount of memory in the system.
The maximum number of threads active at any one time depends on the underlying platform. Threads created by THAPI have a default stack size, an operating system semaphore, and a small amount of memory for argument marshalling. The example has been run with over a thousand readers/threads on Windows 95 and Windows NT 4.0. On a 40-Mbyte machine, the limit seems to be 1600 threads on Windows 95, and it seems to be 1987 threads on Windows NT 4.0.
Threaded calls are considerably slower than normal DLLCC calls. For example, here are some times for making a zero-argument call that returns immediately, using a normal DLLCC call, a threaded call, and a threaded call that involves creating a thread. These times are for a 60 MHz Pentium COMPAQ 5/60M running Windows NT 4.0.
On Solaris and Digital UNIX the ratios differ considerably, because the underlying scheduler gives threads a more sluggish response. No measurements have yet been done on multiprocessors.
The ObjectEngine sleeps if it has no runnable process, an activity that can be time consuming. If a trivial threaded call is made, during which the ObjectEngine goes to sleep, the call takes considerably longer, since the engine must wake up before the call can return. On the above machine, if the same trivial call is made when the engine sleeps, a call takes approximately 2400 microseconds.
Sleeping can be avoided by providing a background process, or by keeping the system generally active. Thread creation can be avoided by setting appropriate thread limits and low tides. Profiling shows that the fundamental cost of a threaded call is due to the operating system scheduling facilities required by the internal design of the THAPI. For each call, the following sequence of actions occurs to perform a threaded callout:
Threaded calls are specified by using the _threaded pseudo-qualifier in the C pragma of an ExternalMethod. Note that the _threaded keyword must follow the function's return-type. For example:
read: fd with: buffer with: size
<C: long _threaded read(int fd, _oopref *buffer, unsigned long size)>
^self externalAccessFailed
Callbacks require no additional syntax; use the existing form.
The following methods on ProcessorScheduler provide information on and control over thread creation:
ProcessorScheduler methods for private
primGetThreadLevels
"Return an array of
@1 the limit on the number of active threads created by the OE.
@2 the low tide on the number of active threads maintained by the OE.
The OE does not reduce the number of inactive threads below this level.
@3 the current number of active threads
@4 the current number of inactive (but live) threads
@5 the number of foreign threads currently calling in.
@6 the number of threads created by the OE since start-up.
@7 the number of threads killed by the OE since start-up.
Counts 1 through 4 are inclusive of the OE's single Smalltalk thread.
Fail if the array cannot be created."
primSetThreadLimit: limit lowTide: lowTide
"Set the OE's thread limit and low tide. A less-than-zero value is ignored,
allowing each level to be set independently. Note that these values must
be at least 1 since there is one thread devoted to Smalltalk."
The following methods on Process provide control over thread attachment:
Process methods for native thread support
attachToThread
"Reserve a native thread for the receiver to make _threaded calls."
detachFromThread
"Release the native thread from its attachment to the receiver."
isAttachedToThread
"Answer if the receiver is attached to a native thread."
The following protocol provides facilities for managing objects in FixedSpace:
Behavior methods for instance creation
newInFixedSpace: anInteger
"Answer with a new instance of the receiver, a class, with the number of
indexable variables specified by the argument, anInteger. Arrange that the
object's data resides at a fixed address through-out the object's lifetime.
Such an object is suitable for passing to foreign code, since it does not
move over time, and can be effectively shared between Smalltalk and foreign
code. Fail if the class is not bits-indexable, or if the argument is not a
positive Integer."
basicNewInFixedSpace: anInteger
Same as newInFixedSpace: above
SequenceableCollection methods for converting
asFixedArgument
"Coerce the receiver to an object whose data resides at a fixed address.
If the receiver already has fixed data return the receiver, otherwise
return a copy of the receiver which does have fixed data."
Object methods for external testing
isFixedArgument
"Answer if the receiver, when passed as an argument through the DLLCC,
represents data at a fixed address. This is true for objects created via
the newInFixedSpace: primitive, and for instances of CDatum""
hasFixedData
"Answer if the receiver's data resides at a fixed address"
The priority of processes running foreign callbacks is set by the ForeignCallbackPriority class variable in CCallback. By default, the priority is Processor lowIOPriority - 5. No protocol is provided to change this priority. You can change it explicitly, as follows:
CCallback classPool
at: #ForeignCallbackPriority
put: Processor userInterruptPriority - 1
or by browsing the CCallback class. However, the best method is probably to set the priority explicitly in the Smalltalk callback itself.
The source for the extended example in this document is in THAPITesting.st. To access the example's source code, follow the link and use your browser's facility for saving the source of the page as a new file.
Back to Documentation
Back to Top
Edit
Rename
Changes
History
Upload
Download
Back to Top