Oberon || Compiler & Tools || Library || Module Index || Search Engine
Streams are a common abstraction for byte-oriented input and output operations. In typical environments input and output are restricted to file i/o and perhaps to some other devices like terminals, line printers etc. UNIX offers a well known abstraction for input and output operations. The input and output operations (open, read, write, positioning, and close) are implemented for file systems and various devices. Each device is defined by a set of procedures which implement a subset of the input and output operations.
It is quite natural to extend this concept to the library of a high level programming language. This allows to use common input and output operations as interface to data structures of the library or own modules. Typical examples are windows and strings. The C-library has specialized sets of input and output operations both for windows and strings (i.e. wprintw of curses(3) and sprintf). A common interface allows to be independent from the underlying implementation. There should be no difference if a procedure writes to a file or to a window on the screen.
At first, a stream has to be opened. At this time the decision has to be made what kind of stream is to be opened (e.g. a file or a window). Each module which implements a special kind of streams exports an Open procedure. Have a look at following example:
CONST filename = "input.txt"; VAR stream: Streams.Stream; (* ... *) IF UnixFiles.Open(stream, filename, UnixFiles.read, Streams.onebuf, NIL) THEN (* stream opened successfully *) ELSE (* filename cannot be opened for reading *) END;Stream variables should be of type Streams.Stream and not of the specialized extension (UnixFiles.Stream in this example). Note that UnixFiles.Open accepts both variants but the first case allows the variable declaration to be independent from the Open procedure.
Most Open procedures take two types of parameters:
Some Open procedures require an additional parameter (of type RelatedEvents.Object) which allows to collect error informations. Like in the example above, NIL may be given for default error handling.
Per convention, opening procedures return TRUE on success and FALSE on failure (e.g. the named file cannot be found). Some implementations of Open are not functions (e.g. Strings.Open and Texts.Open). In this cases failures are fatal errors (e.g. no space available) and result in events.
The input and output operations are not necessarily mapped directly to the underlying stream implementation. In case of buffering the Streams module tries to minimize calls to the underlying implementation. This is very important if calls result in system calls which are relatively time consuming. Streams supports four buffering modes:
Streams.nobuf unbuffered streams; all stream operations are mapped directly to the underlying implementation Streams.onebuf requests one buffer (of a system dependent size) to be allocated as cache Streams.linebuf works like Streams.onebuf but causes the buffer automatically to be flushed if a line-terminator is written to the stream. This buffering mode is explained later in more detail. Streams.bufpool a set of buffers works as cache. The number of buffers may be modified.
The example following illustrates a simple copying scheme:
PROCEDURE Copy(from, to: Streams.Stream); VAR ch: CHAR; BEGIN WHILE Streams.ReadByte(from, ch) & Streams.WriteByte(to, ch) DO END; END Copy;This example is a universal copy operation which is completely independent from the underlying implementation. Streams.ReadByte reads the next byte from the given stream and stores it into ch. Streams.ReadByte returns FALSE if no byte is available. This happens if the stream ends (end of file) or an error occurs. Streams.WriteByte tries to write ch to the given stream and returns TRUE on success. Thus, the loop ends if copying is completed or any errors occurred.
Streams exports some predefined streams and one predefined stream implementation: the null device Streams.null. The null device works like /dev/null: read operations always return FALSE and indicate end of file; write operations succeed always but do not have any effect. Additionally, Streams exports stdin, stdout, and stderr. Initially they equal Streams.null but they may be re-initialized by other modules. This is conventionally done by UnixFiles which assigns the UNIX file descriptors 0, 1, and 2 to stdin, stdout, and stderr. It is important to note that UnixFiles must be imported to get these assignments.
Standard input and output are opened in Streams.linebuf mode if they are connected to a terminal device (SysIO.Isatty returns TRUE) else normal buffered (Streams.onebuf). Standard error is always opened in line buffering mode.
Some modules offer formatted input and output based on streams. Most important are Read and Write. Both modules offer two similar sets of procedures. One set takes no stream argument and works on Streams.stdin or Streams.stdout. The names of those procedures which take a stream argument end in "S".
A simple example for traditional line oriented input and output:
TYPE Book = RECORD title: ARRAY 80 OF CHAR; author: ARRAY 20 OF CHAR; year: INTEGER; END; PROCEDURE ReadBook(VAR book: Book); BEGIN Write.String("Title: "); Read.Line(book.title); Write.String("Author: "); Read.Line(book.author); Write.String("Year: "); Read.Int(book.year); Read.Ln; END ReadBook;Write.String writes the given string to Streams.stdout. Read.Line reads a complete line and stores the read line without the line termination into the given character array. Like other library routines, Read.Line guarantees 0X-termination of the resulting array. Read.Line skips the rest of the line if it does not fit into the character array.
Read.Int reads an integer value in decimal representation. Preceding white space is skipped. On default, white space consists of ASCII.sp, ASCII.EOL (line terminator), ASCII.tab, and ASCII.np (form feed). Termination characters are pushed back to the input stream (this is always possible in case of buffered streams). Thus, the character found after the last digit of the integer read can be subsequently read. Read.Ln skips the input until (and including) the next line termination character or end of stream.
One important note is necessary in respect to buffering. The example above works if Streams.stdin and Streams.stdout are line buffered or unbuffered. Line buffered streams are flushed (i.e. the buffer contents is transferred to the underlying implementation) either if
In the example above, Read.Line(book.title) causes the buffer contents of Streams.stdout ("Title: ") to be flushed if both streams are line buffered.
The solution above is very simple and not error sensitive but robust. In each case at least three lines are eaten from the input. (Because Read.Int skips white space including line terminators more than three lines are possible.) The value of book.year is not changed if no integer has been found by Read.Int.
Be sure to avoid reading loops like the following:
book.year := 0; REPEAT Read.Int(book.year); UNTIL (book.year >= 1900) & (book.year < 2000);If Read.Int does not find any digits after skipping the preceding white space then the terminating character is pushed back and read again on the next try. This results in an endless loop.
Each stream has a couple of public components which allow better error checking:
_________________________________________________________________ |count gives the count of the last read or write operation| |errors is incremented for each error. A common practise| | is to set errors to 0, then to call some stream| | operations, and finally to check errors for being| | positive. | |error represents the success of the last operation. | |lasterror is set to the error code of the last failure. | |eof_________is_TRUE_if_read_operations_return_zero_counts._____|Following procedure demonstrates integer reading with error checking:
PROCEDURE ReadInt(VAR int: INTEGER) : BOOLEAN; VAR ok: BOOLEAN; BEGIN Read.Int(int); ok := Streams.stdin.count = 1; Read.Ln; RETURN ok END ReadInt;Conventionally, counts are adjusted to items which are read or written. Read.Int sets the count to 1 if an integer has been successfully read else to 0. On failure, either Streams.stdin.error or Streams.stdin.eof equals TRUE. In the first case the error is described by Streams.stdin.lasterror.
It is very easy to separate line reading and line analysing. This is very useful if a line contains more than one item or if you want to be sure that there is no unexpected character between the integer and the line terminator. In this case it is convenient to read a complete line into a string and then to read from the string. Following example illustrates this:
PROCEDURE ReadBook(VAR book: Book) : BOOLEAN; (* read one line of input of following syntax: Book = Title "," Author "," Year . return TRUE if successful *) VAR line: ARRAY 80 OF CHAR; stream: Streams.Stream; comma: CHAR; ok: BOOLEAN; BEGIN Read.Line(line); IF Streams.stdin.count = 0 THEN RETURN FALSE END; Strings.Open(stream, line); Read.WhiteSpaceS(stream); Read.TerminatedStringS(stream, book.title, ","); ok := stream.count > 0; Read.CharS(stream, comma); Read.WhiteSpaceS(stream); Read.TerminatedStringS(stream, book.author, ","); ok := ok & (stream.count > 0); Read.CharS(stream, comma); Read.IntS(stream, book.year); ok := ok & (stream.count = 1); Read.WhiteSpaceS(stream); Streams.Release(stream); RETURN ok & Streams.eof END ReadBook;At first, a complete line is read from Streams.stdin. If this fails, we return immediately. Strings.Open opens a character array for reading and writing. This allows to read from the character array containing the line like reading from standard input. Read.WhiteSpaceS skips white space in the given input stream. Read.TerminatedStringS reads a character array until the termination character is found (second parameter).
Please note that conversion errors (e.g. Read.Int does not find an integer) are not errors in the sense of Streams, thus errors is not incremented if zero counts are returned. String reading procedures set the count to the number of characters read.
Streams.Release closes the stream. Streams.Close works like Streams.Release but returns a BOOLEAN value indicating the success. One might not expect failures of Close but in some cases they may happen: e.g. a buffered output stream is flushed on closing and, of course, buffer flushing may fail. But in this case we do not expect errors on closing at all and Streams.Release is more convenient than
IF ~Streams.Close(stream) THEN END;The last read operation (Read.WhiteSpaceS) ends with streams.eof set to TRUE if nothing than white space follows the integer.
The module Disciplines allows to attach auxiliary data structures to other data types which are an extension of Disciplines.Object. Oberon has single inheritance in the sense of object oriented languages. This single inheritance is used to define specific sorts of streams and to define streams with additional operations (e.g. streams for windows). On the other side, we have modules which support streams (e.g. input and output conversions). In some cases it is advantageous to parameterize operations of these supporting modules on a long-term basis.
Disciplines maintains a list of additional records for objects of type Disciplines.Object. Records are added with Disciplines.Add and looked for by Disciplines.Seek. Disciplines are identified by an integer value which should be unique for each module which defines a discipline.
StreamDisciplines offers some standard parameters for streams like line terminators and field separators. Some modules generate streams with unusual separators, e.g. ScanDir separates entries by 0X because 0X is the only character which cannot be part of an UNIX filename. These modules are free to set the parameters of StreamDisciplines to appropriate values at the time of opening. This allows other modules not to worry about different line terminators on different streams -- Read.Line always returns one "entry" or "record".
Using disciplines we can rewrite our ReadBook example:
PROCEDURE ReadBook(s: Streams.Stream; VAR book: Book) : BOOLEAN; (* read one line of input of following syntax: Book = Title "," Author "," Year . return TRUE if successful *) VAR fieldseps: Sets.CharSet; rest: ARRAY 1 OF CHAR; rval: BOOLEAN; BEGIN Sets.CreateSet(fieldseps); Sets.InclChar(fieldseps, ","); StreamDisciplines.SetFieldSepSet(s, fieldseps); rval := Read.FieldS(s, book.title) & Read.FieldS(s, book.author) & Read.FieldS(s, book.year) & ~Read.FieldS(s, rest) Read.LnS(s); RETURN rval END ReadBook;StreamDisciplines.SetFieldSepSet allows to define a set of field separators for a given stream. Field separators allow to divide an input line into a list of fields. Fields are delimited either by field separators or line terminators. Read.FieldS reads one field and returns FALSE if there are no more fields on that line. Fields may be empty, e.g. the second field of "This is the title,,1991" would be empty. Like Read.StringS, Read.FieldS skips surrounding white space.
Streams does not interpret the bytes it transfers (one minor exception is the line terminator for line buffered streams) nor does it define any record structure on it. Stream positions, if supported, are primarily defined and interpreted by the underlying implementation. In case of buffered streams some rules must hold to assure that the operational semantics does not depend on the buffering mode:
These rules are not necessarily valid for all streams. In these cases the buffering mode is forced to be unbuffered.
Following example shows two procedures which access a random file by record numbers (from 0 to the number of records minus one):
TYPE Record = RECORD (* ... *) END; CONST recordLen = SYSTEM.SIZE(Record); PROCEDURE WriteRecord(s: Streams.Stream; recno: INTEGER; record: Record) : BOOLEAN; BEGIN RETURN Streams.Seek(s, recno * recordLen, Streams.fromStart) & Streams.Write(s, record) END WriteRecord; PROCEDURE ReadRecord(s: Streams.Stream; recno: INTEGER; record: Record) : BOOLEAN; BEGIN RETURN Streams.Seek(s, recno * recordLen, Streams.fromStart) & Streams.Read(s, record) END ReadRecord;Streams.Seek modifies the stream position. Stream positions may be either given absolute or relative (to the current position or to the end):
Streams.fromStart absolute position Streams.fromPos relative to current position Streams.fromEnd relative to the stream lengthStreams.fromEnd is used by the following procedure which appends a record to a stream:
PROCEDURE AppendRecord(s: Streams.Stream; record: Record) : BOOLEAN; BEGIN RETURN Streams.Seek(s, 0, Streams.fromEnd) & Streams.Write(s, record) END AppendRecord;
Following procedure determines the length of a stream:
PROCEDURE StreamLen(s: Streams.Stream) : Streams.Count; (* return -1 in case of errors *) VAR pos: Streams.Count; BEGIN IF Streams.Seek(s, 0, Streams.fromEnd) & Streams.Tell(s, pos) THEN RETURN pos ELSE RETURN -1 END; END StreamLen;Streams.Tell returns the current position and stores it into the second parameter. The appropriate type for stream positions is Streams.Count.
Valid stream position ranges need not to be contiguous. In other words: there may be holes in it. Suppose s is empty and following operations are successful:
VAR ok: BOOLEAN; s: Streams.Stream; ch: CHAR; (* ... *) ok := Streams.Seek(s, 0, Streams.fromStart) & Streams.Write(s, ch) & Streams.Seek(s, 5, Streams.fromStart) & Streams.Write(s, ch);Then we can expect 0 and 5 to be valid positions. Seeking to position 1 and reading from it may lead to different results: either failure (return of FALSE), return of a predefined pattern (e.g. null bytes) or undefined garbage. The length of the stream above would be 6 which shows that the length of a stream does not necessarily equal the number of bytes stored in it. It is important to note that only few stream implementations allow holes. One example are regular files of UNIX which are represented by UnixFiles. While the hole in the example above is simply created by not writing to the intermediate range, holes may also be created by record locking.
Two points are very important: the stream position does not necessarily equal 0 at time of opening. Further, there are some cases where the stream position of the underlying implementation is modified from outside (e.g. shared UNIX file descriptors). In this case the resynchronisation is in the responsibility of the stream user. Streams.Touch requests the buffering mechanism and the stream position to be checked against the underlying implementation.
Many i/o libraries (e.g. stdio of C) which support simultaneous read and write access require an intermediate call of Seek or Flush before switching from read to write operations or vice versa. This is not required by Streams.
Streams which are opened for reading and writing and which do not support a current position (i.e. they do not support Seek, Tell and Trunc) are defined to be bidirectional.
The most common use of bidirectional streams are communication streams to other processes or coroutines. Buffered bidirectional streams have two buffers: an input and an output queue (both are FIFO-queues). Reading an item removes it from the list, i.e. the item gets consumed. Writing an item produces a new item which is sent to the consuming side of the communication partner.
The implementation of a stream consists of two parts: the Open procedure which is exported to the outside world and the hidden interface procedures which are used via procedure type components by the Streams module.
Each implementation has its own extension of Streams.Stream which contains its private data structures. Consider, we want to implement a stream which represents a fixed length character array. Then the definition would look like following:
DEFINITION String80; (* streams which represent ARRAY 80 OF CHAR *) IMPORT Streams; TYPE Stream = POINTER TO StreamRec; StreamRec = RECORD (Streams.StreamRec) END; PROCEDURE Open(VAR s: Streams.Stream); END String80.The preferred return type of the Open procedure is Streams.Stream and not Stream. The latter wouldn't allow the user of this module to declare its stream variable of type Streams.Stream.
On the other side, we give our Stream definition even it is not used in the rest of the definition and even if there are no public components. The outside visible String80.Stream type allows other modules to test stream variables to be of this type.
On the private side of String80.Stream we need the data which are represented by the stream or at least a pointer (of any kind) to them. Further, a current position has to be maintained (at least when Seek and Tell are to be supported) and we should know the current length of our string:
CONST buflen = 80; TYPE StreamRec = RECORD (Streams.StreamRec) (* private components *) data: ARRAY buflen OF BYTE; len: Streams.Count; (* length of stream *) pos: Streams.Count; (* current position *) END;Let's have a look on a Write procedure which puts one byte into our stream. We should assume that len and pos equal 0 at initialization time. The current position may be at buflen, in this case we could not read or write until the position is changed.
PROCEDURE Write(s: Streams.Stream; byte: BYTE) : BOOLEAN; BEGIN WITH s: Stream DO IF s.pos >= buflen THEN RETURN FALSE ELSE s.data[s.pos] := byte; INC(s.pos); (* update stream length *) IF s.pos > s.len THEN s.len := s.pos; END; RETURN TRUE END; END; END Write;The interface procedures must match the procedure types of Streams.Stream. For this reason we have to take Streams.Stream as parameter type of s and not Stream. On the other side, we can be sure that s is of type Stream because our Write procedure is only known to our module and to the Streams module.
The task of Write is to store byte at the current position of the given stream. The current position is to be advanced implicitly.
We do not only allow the current position to be in the range of the array but also to go one position beyond this range. At this position Read and Write operations will fail. If we restricted the current position to the range of the character array, a Write or Read operation would never fail because Write or Read at the last valid position would not cause the current position to be incremented.
Conventionally, legal stream positions range from 0 to length while data is stored from 0 to length-1. If the current position equals the stream length, a Read would normally fail but a Write is expected to succeed. Of course, a Write operation may fail because of a "end of medium" condition. In our example the "end of medium" is reached with a length of 80 bytes.
Let's continue our example. Read looks very close to Write:
PROCEDURE Read(s: Streams.Stream; VAR byte: BYTE) : BOOLEAN; BEGIN WITH s: Stream DO IF s.pos < s.len THEN byte := s.data[s.pos]; INC(s.pos); RETURN TRUE ELSE RETURN FALSE END; END; END Read;Seek and Tell are now straightforward:
PROCEDURE Seek(s: Streams.Stream; offset: Streams.Count; whence: Streams.Whence) : BOOLEAN; VAR newpos: Streams.Count; BEGIN WITH s: Stream DO CASE whence OF | Streams.fromStart: newpos := offset; | Streams.fromPos: newpos := offset + s.pos; | Streams.fromEnd: newpos := offset + s.len; END; IF (newpos >= 0) & (newpos <= s.len) THEN s.pos := newpos; RETURN TRUE ELSE RETURN FALSE END; END; END Seek; PROCEDURE Tell(s: Streams.Stream; VAR pos: Streams.Count) : BOOLEAN; BEGIN WITH s: Stream DO pos := s.pos; RETURN TRUE END; END Tell;
Additionally, Trunc could be implemented which allows to reduce the length of a stream. Trunc must not change the current position, so we have to decide whether we allow to cut the stream before the current position. This would introduce an interesting situation on our next Write which would set the length to the current position in this case. In other words, we would regain our lost data.
To avoid this problem we require the current position to be less or equal to the trunc position:
PROCEDURE Trunc(s: Streams.Stream; pos: Streams.Count) : BOOLEAN; BEGIN WITH s: Stream DO IF (pos >= 0) & (s.pos <= pos) & (pos <= s.len) THEN s.len := pos; RETURN TRUE ELSE RETURN FALSE END; END; END Trunc;
Finally, Open has to be implemented in order to connect all these procedures to Streams. The interface record is kept globally because it is constant for all incarnations of String80.Stream:
MODULE String80; IMPORT Streams; (* ... type definitions (see above) ... *) VAR (* global variables *) caps: Streams.CapabilitySet; if: Streams.Interface; (* ... interface procedures (see above) ... *) PROCEDURE Open(VAR s: Streams.Stream); VAR stream: Stream; BEGIN NEW(stream); stream.pos := 0; stream.len := 0; Streams.Init(stream, if, caps, Streams.nobuf); s := stream; END Open; BEGIN (* initialization of String80 *) NEW(if); if.read := Read; if.write := Write; if.seek := Seek; if.tell := Tell; if.trunc := Trunc; caps := {Streams.read, Streams.write, Streams.seek, Streams.tell, Streams.trunc}; END String80.Streams.nobuf has been taken as buffering mode because there is no need to buffer the stuff twice. The Open procedure is the only place where we need a stream to be declared as extension and not as Streams.Stream.
While all procedures of String80 are able to indicate success or failure by their BOOLEAN-results, they do not tell the reason of failures. It might be of interest to distinguish different error types and to print out a readable message about the failure.
Because the set of possible error types can not be foreseen, it is impossible to extend the error numbers of Streams (remember, the error code of the last failure is stored into the lasterror component of a stream). Hence, each stream implementation has to declare its own set of errors. Following error codes could be exported by our String80 implementation:
Following definition of String80 contains the error codes above, a specific error event type of String80, and readable error messages which belong to the error codes:
DEFINITION String80; (* streams which represent ARRAY 80 OF CHAR *) IMPORT Events, Streams; TYPE Stream = POINTER TO StreamRec; StreamRec = RECORD (Streams.StreamRec) END; CONST endOfMedium = 0; negPosition = 1; posBeyondMaxLen = 2; posBeyondTruncPos = 3; posBeyondCurrentLength = 4; errorcodes = 5; TYPE ErrorEvent = POINTER TO ErrorEventRec; ErrorEventRec = RECORD (Events.EventRec) errorcode: SHORTINT; END; VAR error: Events.EventType; errormsg: ARRAY errorcodes OF Events.Message; PROCEDURE Open(VAR s: Streams.Stream); END String80.It is unwise to pass stream related error events directly to Events.Raise because this would not allow a stream specific error handling. Instead, RelatedEvents allows to setup error handling individually for each stream (or for other objects which are an extension of Disciplines.Object). RelatedEvents allows to queue events, or to forward events to another object, or to raise an object-specific event, or to pass events to Events.Raise. Typically, events are queued for later examination (e.g. printing of error messages).
Streams.Init does not change the default behaviour of RelatedEvents.Raise (passing events to Events.Raise). This task is usually left to the implementation specific Open procedure. In normal case, Open procedures are expected to setup an event queue for the new stream:
PROCEDURE Open(VAR s: Streams.Stream); VAR stream: Stream; BEGIN NEW(stream); stream.pos := 0; stream.len := 0; Streams.Init(stream, if, caps, Streams.nobuf); RelatedEvents.QueueEvents(stream); s := stream; END Open;Error handling is now quite straightforward: InitErrorHandling sets up error and errormsg during initialization time, and Error creates an error event and passes it to RelatedEvents.Raise:
PROCEDURE InitErrorHandling; BEGIN Events.Define(error); Events.SetPriority(error, Priorities.liberrors); errormsg[endOfMedium] := "maximal string length exceeded"; errormsg[negPosition] := "positions must be non-negative"; errormsg[posBeyondMaxLen] := "position must not be set beyond the maximal string length"; errormsg[posBeyondTruncPos] := "current position must not be beyond trunc position"; errormsg[posBeyondCurrentLength] := "trunc position is beyond the current stream length"; END InitErrorHandling; PROCEDURE Error(s: Streams.Stream; errorcode: SHORTINT); VAR event: ErrorEvent; BEGIN NEW(event); event.type := error; event.message := errormsg[errorcode]; event.errorcode := errorcode; RelatedEvents.Raise(s, event); END Error;Conventionally, error events of the library are expected to be of the priority Priorities.liberrors. Library errors should work silently on default and are not expected to cause the program to abort (an exception to this rule are assertions which indicate serious programming errors). Because we have decided to queue the events on default, we need not to call Events.Ignore for error.
Now, we are able to call Error from the interface procedures:
PROCEDURE Trunc(s: Streams.Stream; pos: Streams.Count) : BOOLEAN; BEGIN WITH s: Stream DO IF pos < 0 THEN Error(s, negPosition); RETURN FALSE ELSIF pos < s.pos THEN Error(s, posBeyondTruncPos); RETURN FALSE ELSIF pos > s.len THEN Error(s, posBeyondCurrentLength); RETURN FALSE ELSE s.len := pos; RETURN TRUE END; END; END Trunc;If a client module is now calling Streams.Trunc for a stream of type Stream80 with a position of 82, we get following error reactions:
Consider, the client module checks the return value of Streams.Trunc and calls Conclusions.Conclude in error case:
IF ~Streams.Trunc(s, pos) THEN Conclusions.Conclude(s, Errors.error, ""); END;Then we would get both error messages in reversed order. Sometimes it is more convenient to call many stream operations and to ask later for error events:
IF RelatedEvents.EventsPending(s) THEN Conclusions.Conclude(s, Errors.error, ""); END;RelatedEvents.EventsPending returns TRUE if the event queue of s is non-empty.
Some streams offer more attributes than only some storage and a current position. Consequently, streams may offer some operations which are not covered by Streams. The Message data type and the handler component of the stream interface allow to pass such operations through Streams.
A good example for additional attributes are terminal input settings. Attributes would be echo and linemode (read an entire line and then return it as requested to the reading procedures or return each character immediately). Following module defines the necessary types and procedures:
DEFINITION Terminal; IMPORT Streams; CONST setOp = 1; (* set terminal attributes *) getOp = 2; (* get terminal attributes *) TYPE Attributes = RECORD echo: BOOLEAN; linemode: BOOLEAN; END; Message = RECORD (Streams.Message) op: SHORTINT; at: Attributes; END; PROCEDURE Set(s: Streams.Stream; at: Attributes); PROCEDURE Get(s: Streams.Stream; VAR at: Attributes); END Terminal.Set and Get fill a Message record with at and op and pass it to Streams.Send. Streams.Send then passes its message without interaction to the handler procedure, if any. The implementation of Terminal:
MODULE Terminal; IMPORT Streams; (* ...definition of public constants and types... *) PROCEDURE Set(s: Streams.Stream; at: Attributes); VAR message: Message; BEGIN message.op := setOp; message.at := at; Streams.Send(s, message); END Set; PROCEDURE Get(s: Streams.Stream; VAR at: Attributes); VAR message: Message; BEGIN message.op := getOp; Streams.Send(s, message); at := message.at; END Get; END Terminal.Streams provides Streams.Send as a procedure without BOOLEAN result. Nevertheless, if no handler has been defined by the underlying implementation, error would be set to TRUE and lasterror to Streams.NoHandlerDefined. Failures of the extended operation itself should be noted in the Message record. The procedures which call Streams.Send are then free to indicate success by a BOOLEAN return value.
The following piece of code illustrates a possible implementation of Terminal.Set and Terminal.Get:
MODULE UnixFiles; (* UnixFiles are stream representations for UNIX file descriptors *) (* ... *) PROCEDURE TerminalHandler(s: Streams.Stream; VAR message: Streams.Message); VAR event: Events.Event; BEGIN IF message IS Terminal.Message THEN WITH message: Terminal.Message DO IF message.op = Terminal.setOp THEN (* ... *) ELSIF message.op = Terminal.getOp THEN (* ... *) END; END; ELSE (* ignore unknown messages *) END; END TerminalHandler; PROCEDURE OpenFd(VAR file: Streams.Stream; fd: SysIO.File; mode: Mode; bufmode: Streams.BufMode) : BOOLEAN; BEGIN (* ... *) IF SysIO.Isatty(fd) THEN if.handler := TerminalHandler; INCL(caps, Streams.handler); END; (* ... *) END OpenFd; (* ... *) END UnixFiles.Sending a foreign message is not necessarily a hard error -- it could be the result of a broadcast.
Streams may be defined upon other streams which leads to some typical UNIX filters like tee, sort, split etc. The following example illustrates a tee filter, i.e. a stream which maps every operation to two other streams:
DEFINITION Tee; IMPORT Streams; TYPE Stream = POINTER TO StreamRec; StreamRec = RECORD (Streams.StreamRec) END; PROCEDURE Open(VAR s: Streams.Stream; out1, out2: Streams.Stream); END Tee.We need both output streams and some other informations in our hidden extension of Streams.Stream:
TYPE Stream = POINTER TO StreamRec; StreamRec = RECORD (Streams.StreamRec) pos: Streams.Count; len: Streams.Count; out1, out2: Streams.Stream; start1, start2: Streams.Count; END;Only out1 and out2 would be needed if we did not support Seek and Tell. Both output streams are not necessarily at position 0 at time of opening. Consequently, we should always position relatively to the positions found at opening time. Both positions are stored in start1 and start2. To allow relative positioning to the end of our stream (Streams.fromEnd) we need the length of our stream.
The write operation can now be implemented easily:
PROCEDURE Write(s: Streams.Stream; byte: BYTE) : BOOLEAN; VAR ok1, ok2: BOOLEAN; BEGIN WITH s: Stream DO ok1 := Streams.WriteByte(s.out1, byte); ok2 := Streams.WriteByte(s.out2, byte); (* the position must not be changed in case of failures *) IF ok1 & ok2 THEN INC(s.pos); IF s.len < s.pos THEN s.len := s.pos; END; RETURN TRUE ELSE RETURN FALSE END; END; END Write;Everything is quite simple as long we get no errors: byte is copied to both streams, the current position is incremented, and the length of the stream is checked against the advanced position. If anything goes wrong and we decide to return FALSE, we must not modify our current position. So, we get a suspicious situation if one of the write operations succeeds while the other operation fails.
We could decide to ignore the error and to return TRUE. In this case the calling module has no chance to get knowledge about this problem (except if stream events are forwarded and checked against). Another approach would be to try to undo the successful operation. This requires seek ability of the underlying stream and seek operations may fail, too.
The decision is here to continue work but to report the error. This gives the calling module more flexibility: it could either decide to stop working on errors or to ignore the error. In the latter case output is still written to the other output stream. This strategy may be naive because we don't know the results of these repeated errors. Errors may lead to events, to printouts to log files, and even to annoying console messages (e.g. file system is out of space).
On the other hand, a stream implementation should not decide to indicate failure to the calling module because of one failed operation in earlier times. Each new call should be a new try. It is up to the decision of the calling module to retry or to abort.
The Seek operation:
PROCEDURE Seek(s: Streams.Stream; offset: Streams.Count; whence: Streams.Whence) : BOOLEAN; VAR newpos: Streams.Count; ok1, ok2: BOOLEAN; BEGIN WITH s: Stream DO CASE whence OF | Streams.fromStart: newpos := offset; | Streams.fromPos: newpos := offset + s.pos; | Streams.fromEnd: newpos := offset + s.len; END; IF newpos < 0 THEN RETURN FALSE END; ok1 := Streams.Seek(s.out1, newpos + s.start1, Streams.fromStart); ok2 := Streams.Seek(s.out2, newpos + s.start2, Streams.fromStart); IF ok1 & ok2 THEN s.pos := newpos; IF s.len < s.pos THEN s.len := s.pos; END; RETURN TRUE ELSE RETURN FALSE END; END; END Seek;The new position is calculated as found in our previous Seek example. While as usual negative positions are illegal, we do not check newpos against the length of our stream. As long as the underlying Seek operations accept our new position we do not deny it. Positioning for s.out1 and s.out2 is done relatively to the starting positions s.start1 and s.start2 which have been retrieved at opening time.
As before, we get some trouble if only one of the operations succeeds. The strategy is close to Write: continue work but return the error indication to the calling module. This is not without danger because, if the error is ignored, the calling module will not know which underlying positions are affected by write operations (the position maintained by Tee does no longer correspond to one of the underlying implementations).
We might conclude that a practical Tee implementation should support several recovery strategies from which one is selected at opening time.
Tell returns s.pos:
PROCEDURE Tell(s: Streams.Stream; VAR pos: Streams.Count) : BOOLEAN; BEGIN WITH s: Stream DO pos := s.pos; RETURN TRUE END; END Tell;
Filter streams should define a handler procedure which passes messages to the output streams:
PROCEDURE Handler(s: Streams.Stream; VAR message: Streams.Message); BEGIN WITH s: Stream DO Streams.Send(s.out1, message); Streams.Send(s.out2, message); END; END Handler;Obviously, sending an unknown message should not be a hard error. We would not be able to determine whether s.out1 or s.out2 support message.
Intermediate stream modules should transfer close operations to the underlying streams:
PROCEDURE Close(s: Streams.Stream) : BOOLEAN; VAR ok1, ok2: BOOLEAN; BEGIN WITH s: Stream DO ok1 := Streams.Close(s.out1); ok2 := Streams.Close(s.out2); RETURN ok1 & ok2 END; END Close;
Finally, Open and the module body are to be implemented:
MODULE Tee; IMPORT Streams; (* ... type definitions ... *) VAR if: Streams.Interface; caps: Streams.CapabilitySet; (* ... *) PROCEDURE Open(VAR s: Streams.Stream; out1, out2: Streams.Stream); VAR caps1, caps2: Streams.CapabilitySet; implemented: Streams.CapabilitySet; stream: Stream; pos: Streams.Count; BEGIN caps1 := Streams.Capabilities(out1); caps2 := Streams.Capabilities(out2); NEW(stream); implemented := caps * caps1 * caps2; Streams.Init(stream, if, implemented, Streams.nobuf); stream.out1 := out1; stream.start1 := 0; stream.out2 := out2; stream.start2 := 0; IF (Streams.tell IN caps1) & Streams.Tell(out1, pos) THEN stream.start1 := pos; END; IF (Streams.tell IN caps2) & Streams.Tell(out2, pos) THEN stream.start2 := pos; END; stream.pos := 0; stream.len := 0; s := stream; END Open; BEGIN NEW(if); if.write := Write; if.seek := Seek; if.tell := Tell; if.handler := Handler; if.close := Close; caps := {Streams.write, Streams.seek, Streams.tell, Streams.handler, Streams.close, Streams.holes}; END Tee.To avoid unnecessary errors we determine the intersection of supported operations. If neither out1 nor out2 support write operations this would lead to an event raised by Streams.Init (which is hopefully not survived). Streams.holes is included in our set of capabilities to allow them if out1 and out2 support them.
Oberon || Compiler & Tools || Library || Module Index || Search Engine