Oberon || Compiler & Tools || Library || Module Index || Search Engine


Streams Tutorial


Andreas Borchert, University of Ulm, SAI, Helmholtzstraße 18, D-89069 Ulm, Germany, Email: borchert@mathematik.uni-ulm.de


Introduction

Streams are a common abstraction for byte-oriented input and output operations. In typical environments input and output are restricted to file i/o and perhaps to some other devices like terminals, line printers etc. UNIX offers a well known abstraction for input and output operations. The input and output operations (open, read, write, positioning, and close) are implemented for file systems and various devices. Each device is defined by a set of procedures which implement a subset of the input and output operations.

It is quite natural to extend this concept to the library of a high level programming language. This allows to use common input and output operations as interface to data structures of the library or own modules. Typical examples are windows and strings. The C-library has specialized sets of input and output operations both for windows and strings (i.e. wprintw of curses(3) and sprintf). A common interface allows to be independent from the underlying implementation. There should be no difference if a procedure writes to a file or to a window on the screen.

Basic Stream Operations

Opening a Stream

At first, a stream has to be opened. At this time the decision has to be made what kind of stream is to be opened (e.g. a file or a window). Each module which implements a special kind of streams exports an Open procedure. Have a look at following example:

CONST
   filename = "input.txt";
VAR
   stream: Streams.Stream;

(* ... *)

IF UnixFiles.Open(stream, filename, UnixFiles.read, Streams.onebuf, NIL) THEN
   (* stream opened successfully *)
ELSE
   (* filename cannot be opened for reading *)
END;
Stream variables should be of type Streams.Stream and not of the specialized extension (UnixFiles.Stream in this example). Note that UnixFiles.Open accepts both variants but the first case allows the variable declaration to be independent from the Open procedure.

Most Open procedures take two types of parameters:

Some Open procedures require an additional parameter (of type RelatedEvents.Object) which allows to collect error informations. Like in the example above, NIL may be given for default error handling.

Per convention, opening procedures return TRUE on success and FALSE on failure (e.g. the named file cannot be found). Some implementations of Open are not functions (e.g. Strings.Open and Texts.Open). In this cases failures are fatal errors (e.g. no space available) and result in events.

The input and output operations are not necessarily mapped directly to the underlying stream implementation. In case of buffering the Streams module tries to minimize calls to the underlying implementation. This is very important if calls result in system calls which are relatively time consuming. Streams supports four buffering modes:

Streams.nobuf	  unbuffered streams; all stream operations are
		  mapped    directly	to    the    underlying
		  implementation
Streams.onebuf	  requests one buffer (of  a  system  dependent
		  size)	to be allocated	as cache
Streams.linebuf	  works	 like  Streams.onebuf  but  causes  the
		  buffer  automatically	 to  be	 flushed  if  a
		  line-terminator is  written  to  the	stream.
		  This	buffering  mode	 is  explained later in
		  more detail.
Streams.bufpool	  a set	of buffers works as cache.  The	 number
		  of buffers may be modified.

Simple Input and Output Operations

The example following illustrates a simple copying scheme:

PROCEDURE Copy(from, to: Streams.Stream);
   VAR
      ch: CHAR;
BEGIN
   WHILE Streams.ReadByte(from, ch) &
         Streams.WriteByte(to, ch) DO
   END;
END Copy;
This example is a universal copy operation which is completely independent from the underlying implementation. Streams.ReadByte reads the next byte from the given stream and stores it into ch. Streams.ReadByte returns FALSE if no byte is available. This happens if the stream ends (end of file) or an error occurs. Streams.WriteByte tries to write ch to the given stream and returns TRUE on success. Thus, the loop ends if copying is completed or any errors occurred.

Preopened Streams

Streams exports some predefined streams and one predefined stream implementation: the null device Streams.null. The null device works like /dev/null: read operations always return FALSE and indicate end of file; write operations succeed always but do not have any effect. Additionally, Streams exports stdin, stdout, and stderr. Initially they equal Streams.null but they may be re-initialized by other modules. This is conventionally done by UnixFiles which assigns the UNIX file descriptors 0, 1, and 2 to stdin, stdout, and stderr. It is important to note that UnixFiles must be imported to get these assignments.

Standard input and output are opened in Streams.linebuf mode if they are connected to a terminal device (SysIO.Isatty returns TRUE) else normal buffered (Streams.onebuf). Standard error is always opened in line buffering mode.

Formatted Input and Output

Some modules offer formatted input and output based on streams. Most important are Read and Write. Both modules offer two similar sets of procedures. One set takes no stream argument and works on Streams.stdin or Streams.stdout. The names of those procedures which take a stream argument end in "S".

A simple example for traditional line oriented input and output:

TYPE
   Book =
      RECORD
         title: ARRAY 80 OF CHAR;
         author: ARRAY 20 OF CHAR;
         year: INTEGER;
      END;

PROCEDURE ReadBook(VAR book: Book);
BEGIN
   Write.String("Title:  "); Read.Line(book.title);
   Write.String("Author: "); Read.Line(book.author);
   Write.String("Year:   "); Read.Int(book.year); Read.Ln;
END ReadBook;
Write.String writes the given string to Streams.stdout. Read.Line reads a complete line and stores the read line without the line termination into the given character array. Like other library routines, Read.Line guarantees 0X-termination of the resulting array. Read.Line skips the rest of the line if it does not fit into the character array.

Read.Int reads an integer value in decimal representation. Preceding white space is skipped. On default, white space consists of ASCII.sp, ASCII.EOL (line terminator), ASCII.tab, and ASCII.np (form feed). Termination characters are pushed back to the input stream (this is always possible in case of buffered streams). Thus, the character found after the last digit of the integer read can be subsequently read. Read.Ln skips the input until (and including) the next line termination character or end of stream.

One important note is necessary in respect to buffering. The example above works if Streams.stdin and Streams.stdout are line buffered or unbuffered. Line buffered streams are flushed (i.e. the buffer contents is transferred to the underlying implementation) either if

In the example above, Read.Line(book.title) causes the buffer contents of Streams.stdout ("Title: ") to be flushed if both streams are line buffered.

The solution above is very simple and not error sensitive but robust. In each case at least three lines are eaten from the input. (Because Read.Int skips white space including line terminators more than three lines are possible.) The value of book.year is not changed if no integer has been found by Read.Int.

Be sure to avoid reading loops like the following:

book.year := 0;
REPEAT
   Read.Int(book.year);
UNTIL (book.year >= 1900) & (book.year < 2000);
If Read.Int does not find any digits after skipping the preceding white space then the terminating character is pushed back and read again on the next try. This results in an endless loop.

Error Checking

Each stream has a couple of public components which allow better error checking:

_________________________________________________________________
|count	     gives the count of	the last read or write operation|
|errors	     is	incremented for	each error.  A	common	practise|
|	     is	 to  set  errors  to 0,	then to	call some stream|
|	     operations, and finally to	check errors  for  being|
|	     positive.						|
|error	     represents	the success of the last	operation.	|
|lasterror   is	set to the error code of the last failure.	|
|eof_________is_TRUE_if_read_operations_return_zero_counts._____|
Following procedure demonstrates integer reading with error checking:
PROCEDURE ReadInt(VAR int: INTEGER) : BOOLEAN;
   VAR ok: BOOLEAN;
BEGIN
   Read.Int(int);
   ok := Streams.stdin.count = 1;
   Read.Ln;
   RETURN ok
END ReadInt;
Conventionally, counts are adjusted to items which are read or written. Read.Int sets the count to 1 if an integer has been successfully read else to 0. On failure, either Streams.stdin.error or Streams.stdin.eof equals TRUE. In the first case the error is described by Streams.stdin.lasterror.

Advanced Line Oriented Input and Output

It is very easy to separate line reading and line analysing. This is very useful if a line contains more than one item or if you want to be sure that there is no unexpected character between the integer and the line terminator. In this case it is convenient to read a complete line into a string and then to read from the string. Following example illustrates this:

PROCEDURE ReadBook(VAR book: Book) : BOOLEAN;
   (* read one line of input of following syntax:

      Book = Title "," Author "," Year .

      return TRUE if successful
   *)
   VAR
      line: ARRAY 80 OF CHAR;
      stream: Streams.Stream;
      comma: CHAR;
      ok: BOOLEAN;

BEGIN
   Read.Line(line);
   IF Streams.stdin.count = 0 THEN
      RETURN FALSE
   END;
   Strings.Open(stream, line);

   Read.WhiteSpaceS(stream);
   Read.TerminatedStringS(stream, book.title, ",");
   ok := stream.count > 0;
   Read.CharS(stream, comma);

   Read.WhiteSpaceS(stream);
   Read.TerminatedStringS(stream, book.author, ",");
   ok := ok & (stream.count > 0);
   Read.CharS(stream, comma);

   Read.IntS(stream, book.year);
   ok := ok & (stream.count = 1);
   Read.WhiteSpaceS(stream);

   Streams.Release(stream);

   RETURN ok & Streams.eof
END ReadBook;
At first, a complete line is read from Streams.stdin. If this fails, we return immediately. Strings.Open opens a character array for reading and writing. This allows to read from the character array containing the line like reading from standard input. Read.WhiteSpaceS skips white space in the given input stream. Read.TerminatedStringS reads a character array until the termination character is found (second parameter).

Please note that conversion errors (e.g. Read.Int does not find an integer) are not errors in the sense of Streams, thus errors is not incremented if zero counts are returned. String reading procedures set the count to the number of characters read.

Streams.Release closes the stream. Streams.Close works like Streams.Release but returns a BOOLEAN value indicating the success. One might not expect failures of Close but in some cases they may happen: e.g. a buffered output stream is flushed on closing and, of course, buffer flushing may fail. But in this case we do not expect errors on closing at all and Streams.Release is more convenient than

IF ~Streams.Close(stream) THEN END;
The last read operation (Read.WhiteSpaceS) ends with streams.eof set to TRUE if nothing than white space follows the integer.

Stream Disciplines

The module Disciplines allows to attach auxiliary data structures to other data types which are an extension of Disciplines.Object. Oberon has single inheritance in the sense of object oriented languages. This single inheritance is used to define specific sorts of streams and to define streams with additional operations (e.g. streams for windows). On the other side, we have modules which support streams (e.g. input and output conversions). In some cases it is advantageous to parameterize operations of these supporting modules on a long-term basis.

Disciplines maintains a list of additional records for objects of type Disciplines.Object. Records are added with Disciplines.Add and looked for by Disciplines.Seek. Disciplines are identified by an integer value which should be unique for each module which defines a discipline.

StreamDisciplines offers some standard parameters for streams like line terminators and field separators. Some modules generate streams with unusual separators, e.g. ScanDir separates entries by 0X because 0X is the only character which cannot be part of an UNIX filename. These modules are free to set the parameters of StreamDisciplines to appropriate values at the time of opening. This allows other modules not to worry about different line terminators on different streams -- Read.Line always returns one "entry" or "record".

Using disciplines we can rewrite our ReadBook example:

PROCEDURE ReadBook(s: Streams.Stream; VAR book: Book) : BOOLEAN;
   (* read one line of input of following syntax:

      Book = Title "," Author "," Year .

      return TRUE if successful
   *)
   VAR
      fieldseps: Sets.CharSet;
      rest: ARRAY 1 OF CHAR;
      rval: BOOLEAN;
   
BEGIN
   Sets.CreateSet(fieldseps); Sets.InclChar(fieldseps, ",");
   StreamDisciplines.SetFieldSepSet(s, fieldseps);
   rval := Read.FieldS(s, book.title) &
           Read.FieldS(s, book.author) &
           Read.FieldS(s, book.year) &
           ~Read.FieldS(s, rest)
   Read.LnS(s);
   RETURN rval
END ReadBook;
StreamDisciplines.SetFieldSepSet allows to define a set of field separators for a given stream. Field separators allow to divide an input line into a list of fields. Fields are delimited either by field separators or line terminators. Read.FieldS reads one field and returns FALSE if there are no more fields on that line. Fields may be empty, e.g. the second field of "This is the title,,1991" would be empty. Like Read.StringS, Read.FieldS skips surrounding white space.

Advanced Stream Operations

Random Access

Streams does not interpret the bytes it transfers (one minor exception is the line terminator for line buffered streams) nor does it define any record structure on it. Stream positions, if supported, are primarily defined and interpreted by the underlying implementation. In case of buffered streams some rules must hold to assure that the operational semantics does not depend on the buffering mode:

(1)
Stream positions are non-negative byte offsets to the beginning of the stream, i.e. 0 addresses the first byte, 1 the second etc.
(2)
The current position determines from where bytes are read or written. Reading or writing n bytes increments the current position by n.
(3)
Reading or writing is independent of partitioning, i.e. reading or writing n bytes in n calls with a quantity of one byte each is equivalent to one call with n bytes.

These rules are not necessarily valid for all streams. In these cases the buffering mode is forced to be unbuffered.

Following example shows two procedures which access a random file by record numbers (from 0 to the number of records minus one):

TYPE Record = RECORD (* ... *) END;
CONST recordLen = SYSTEM.SIZE(Record);

PROCEDURE WriteRecord(s: Streams.Stream;
                      recno: INTEGER; record: Record) : BOOLEAN;
BEGIN
   RETURN Streams.Seek(s, recno * recordLen, Streams.fromStart) &
          Streams.Write(s, record)
END WriteRecord;

PROCEDURE ReadRecord(s: Streams.Stream;
                     recno: INTEGER; record: Record) : BOOLEAN;
BEGIN
   RETURN Streams.Seek(s, recno * recordLen, Streams.fromStart) &
          Streams.Read(s, record)
END ReadRecord;
Streams.Seek modifies the stream position. Stream positions may be either given absolute or relative (to the current position or to the end):
Streams.fromStart   absolute position
Streams.fromPos	    relative to	current	position
Streams.fromEnd	    relative to	the stream length
Streams.fromEnd is used by the following procedure which appends a record to a stream:
PROCEDURE AppendRecord(s: Streams.Stream; record: Record) : BOOLEAN;
BEGIN
   RETURN Streams.Seek(s, 0, Streams.fromEnd) &
          Streams.Write(s, record)
END AppendRecord;

Following procedure determines the length of a stream:

PROCEDURE StreamLen(s: Streams.Stream) : Streams.Count;
   (* return -1 in case of errors *)
   VAR pos: Streams.Count;
BEGIN
   IF Streams.Seek(s, 0, Streams.fromEnd) &
      Streams.Tell(s, pos) THEN
      RETURN pos
   ELSE
      RETURN -1
   END;
END StreamLen;
Streams.Tell returns the current position and stores it into the second parameter. The appropriate type for stream positions is Streams.Count.

Valid stream position ranges need not to be contiguous. In other words: there may be holes in it. Suppose s is empty and following operations are successful:

VAR ok: BOOLEAN; s: Streams.Stream; ch: CHAR;
(* ... *)
ok := Streams.Seek(s, 0, Streams.fromStart) &
      Streams.Write(s, ch) &
      Streams.Seek(s, 5, Streams.fromStart) &
      Streams.Write(s, ch);
Then we can expect 0 and 5 to be valid positions. Seeking to position 1 and reading from it may lead to different results: either failure (return of FALSE), return of a predefined pattern (e.g. null bytes) or undefined garbage. The length of the stream above would be 6 which shows that the length of a stream does not necessarily equal the number of bytes stored in it. It is important to note that only few stream implementations allow holes. One example are regular files of UNIX which are represented by UnixFiles. While the hole in the example above is simply created by not writing to the intermediate range, holes may also be created by record locking.

Two points are very important: the stream position does not necessarily equal 0 at time of opening. Further, there are some cases where the stream position of the underlying implementation is modified from outside (e.g. shared UNIX file descriptors). In this case the resynchronisation is in the responsibility of the stream user. Streams.Touch requests the buffering mechanism and the stream position to be checked against the underlying implementation.

Many i/o libraries (e.g. stdio of C) which support simultaneous read and write access require an intermediate call of Seek or Flush before switching from read to write operations or vice versa. This is not required by Streams.

Bidirectional Streams

Streams which are opened for reading and writing and which do not support a current position (i.e. they do not support Seek, Tell and Trunc) are defined to be bidirectional.

The most common use of bidirectional streams are communication streams to other processes or coroutines. Buffered bidirectional streams have two buffers: an input and an output queue (both are FIFO-queues). Reading an item removes it from the list, i.e. the item gets consumed. Writing an item produces a new item which is sent to the consuming side of the communication partner.

Implementing a Stream

Introduction

The implementation of a stream consists of two parts: the Open procedure which is exported to the outside world and the hidden interface procedures which are used via procedure type components by the Streams module.

Each implementation has its own extension of Streams.Stream which contains its private data structures. Consider, we want to implement a stream which represents a fixed length character array. Then the definition would look like following:

DEFINITION String80;

   (* streams which represent ARRAY 80 OF CHAR *)

   IMPORT Streams;

   TYPE
      Stream = POINTER TO StreamRec;
      StreamRec = RECORD (Streams.StreamRec) END;

   PROCEDURE Open(VAR s: Streams.Stream);

END String80.
The preferred return type of the Open procedure is Streams.Stream and not Stream. The latter wouldn't allow the user of this module to declare its stream variable of type Streams.Stream.

On the other side, we give our Stream definition even it is not used in the rest of the definition and even if there are no public components. The outside visible String80.Stream type allows other modules to test stream variables to be of this type.

On the private side of String80.Stream we need the data which are represented by the stream or at least a pointer (of any kind) to them. Further, a current position has to be maintained (at least when Seek and Tell are to be supported) and we should know the current length of our string:

CONST
   buflen = 80;
TYPE
   StreamRec =
      RECORD
         (Streams.StreamRec)
         (* private components *)
         data: ARRAY buflen OF BYTE;
         len: Streams.Count; (* length of stream *)
         pos: Streams.Count; (* current position *)
      END;
Let's have a look on a Write procedure which puts one byte into our stream. We should assume that len and pos equal 0 at initialization time. The current position may be at buflen, in this case we could not read or write until the position is changed.
PROCEDURE Write(s: Streams.Stream; byte: BYTE) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      IF s.pos >= buflen THEN
         RETURN FALSE
      ELSE
         s.data[s.pos] := byte;
         INC(s.pos);
         (* update stream length *)
         IF s.pos > s.len THEN
            s.len := s.pos;
         END;
         RETURN TRUE
      END;
   END;
END Write;
The interface procedures must match the procedure types of Streams.Stream. For this reason we have to take Streams.Stream as parameter type of s and not Stream. On the other side, we can be sure that s is of type Stream because our Write procedure is only known to our module and to the Streams module.

The task of Write is to store byte at the current position of the given stream. The current position is to be advanced implicitly.

We do not only allow the current position to be in the range of the array but also to go one position beyond this range. At this position Read and Write operations will fail. If we restricted the current position to the range of the character array, a Write or Read operation would never fail because Write or Read at the last valid position would not cause the current position to be incremented.

Conventionally, legal stream positions range from 0 to length while data is stored from 0 to length-1. If the current position equals the stream length, a Read would normally fail but a Write is expected to succeed. Of course, a Write operation may fail because of a "end of medium" condition. In our example the "end of medium" is reached with a length of 80 bytes.

Let's continue our example. Read looks very close to Write:

PROCEDURE Read(s: Streams.Stream; VAR byte: BYTE) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      IF s.pos < s.len THEN
         byte := s.data[s.pos];
         INC(s.pos);
         RETURN TRUE
      ELSE
         RETURN FALSE
      END;
   END;
END Read;
Seek and Tell are now straightforward:
PROCEDURE Seek(s: Streams.Stream;
               offset: Streams.Count;
               whence: Streams.Whence) : BOOLEAN;
   VAR
      newpos: Streams.Count;
BEGIN
   WITH s: Stream DO
      CASE whence OF
      | Streams.fromStart: newpos := offset;
      | Streams.fromPos:   newpos := offset + s.pos;
      | Streams.fromEnd:   newpos := offset + s.len;
      END;
      IF (newpos >= 0) & (newpos <= s.len) THEN
         s.pos := newpos; RETURN TRUE
      ELSE
         RETURN FALSE
      END;
   END;
END Seek;

PROCEDURE Tell(s: Streams.Stream;
               VAR pos: Streams.Count) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      pos := s.pos;
      RETURN TRUE
   END;
END Tell;

Additionally, Trunc could be implemented which allows to reduce the length of a stream. Trunc must not change the current position, so we have to decide whether we allow to cut the stream before the current position. This would introduce an interesting situation on our next Write which would set the length to the current position in this case. In other words, we would regain our lost data.

To avoid this problem we require the current position to be less or equal to the trunc position:

PROCEDURE Trunc(s: Streams.Stream; pos: Streams.Count) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      IF (pos >= 0) & (s.pos <= pos) & (pos <= s.len) THEN
         s.len := pos; RETURN TRUE
      ELSE
         RETURN FALSE
      END;
   END;
END Trunc;

Finally, Open has to be implemented in order to connect all these procedures to Streams. The interface record is kept globally because it is constant for all incarnations of String80.Stream:

MODULE String80;

   IMPORT Streams;

   (* ... type definitions (see above) ... *)

   VAR
      (* global variables *)
      caps: Streams.CapabilitySet;
      if: Streams.Interface;

   (* ... interface procedures (see above) ... *)

   PROCEDURE Open(VAR s: Streams.Stream);
      VAR
         stream: Stream;
   BEGIN
      NEW(stream);
      stream.pos := 0; stream.len := 0;
      Streams.Init(stream, if, caps, Streams.nobuf);
      s := stream;
   END Open;

BEGIN (* initialization of String80 *)
   NEW(if);
   if.read := Read; if.write := Write;
   if.seek := Seek; if.tell := Tell; if.trunc := Trunc;
   caps := {Streams.read, Streams.write, Streams.seek, Streams.tell, Streams.trunc};
END String80.
Streams.nobuf has been taken as buffering mode because there is no need to buffer the stuff twice. The Open procedure is the only place where we need a stream to be declared as extension and not as Streams.Stream.

Advanced Error Handling

While all procedures of String80 are able to indicate success or failure by their BOOLEAN-results, they do not tell the reason of failures. It might be of interest to distinguish different error types and to print out a readable message about the failure.

Because the set of possible error types can not be foreseen, it is impossible to extend the error numbers of Streams (remember, the error code of the last failure is stored into the lasterror component of a stream). Hence, each stream implementation has to declare its own set of errors. Following error codes could be exported by our String80 implementation:

endOfMedium
indicates that a write operation failed due to maximal length of 80 characters.
negPosition
Seek or Trunc was called with parameters which lead to a negative stream position.
posBeyondMaxLen
Seek was called with parameters which lead to a position beyond the maximum stream length of 80.
posBeyondTruncPos
Trunc was called with a position which is less than the current position.
posBeyondCurrentLength
Trunc was called with a position greater than the current stream length.

Following definition of String80 contains the error codes above, a specific error event type of String80, and readable error messages which belong to the error codes:

DEFINITION String80;

   (* streams which represent ARRAY 80 OF CHAR *)

   IMPORT Events, Streams;

   TYPE
      Stream = POINTER TO StreamRec;
      StreamRec = RECORD (Streams.StreamRec) END;

   CONST
      endOfMedium = 0;
      negPosition = 1;
      posBeyondMaxLen = 2;
      posBeyondTruncPos = 3;
      posBeyondCurrentLength = 4;
      errorcodes = 5;
   TYPE
      ErrorEvent = POINTER TO ErrorEventRec;
      ErrorEventRec =
         RECORD
            (Events.EventRec)
            errorcode: SHORTINT;
         END;
   VAR
      error: Events.EventType;
      errormsg: ARRAY errorcodes OF Events.Message;

   PROCEDURE Open(VAR s: Streams.Stream);

END String80.
It is unwise to pass stream related error events directly to Events.Raise because this would not allow a stream specific error handling. Instead, RelatedEvents allows to setup error handling individually for each stream (or for other objects which are an extension of Disciplines.Object). RelatedEvents allows to queue events, or to forward events to another object, or to raise an object-specific event, or to pass events to Events.Raise. Typically, events are queued for later examination (e.g. printing of error messages).

Streams.Init does not change the default behaviour of RelatedEvents.Raise (passing events to Events.Raise). This task is usually left to the implementation specific Open procedure. In normal case, Open procedures are expected to setup an event queue for the new stream:

PROCEDURE Open(VAR s: Streams.Stream);
   VAR
      stream: Stream;
BEGIN
   NEW(stream);
   stream.pos := 0; stream.len := 0;
   Streams.Init(stream, if, caps, Streams.nobuf);
   RelatedEvents.QueueEvents(stream);
   s := stream;
END Open;
Error handling is now quite straightforward: InitErrorHandling sets up error and errormsg during initialization time, and Error creates an error event and passes it to RelatedEvents.Raise:
PROCEDURE InitErrorHandling;
BEGIN
   Events.Define(error);
   Events.SetPriority(error, Priorities.liberrors);
   errormsg[endOfMedium] := "maximal string length exceeded";
   errormsg[negPosition] := "positions must be non-negative";
   errormsg[posBeyondMaxLen] :=
      "position must not be set beyond the maximal string length";
   errormsg[posBeyondTruncPos] :=
      "current position must not be beyond trunc position";
   errormsg[posBeyondCurrentLength] :=
      "trunc position is beyond the current stream length";
END InitErrorHandling;

PROCEDURE Error(s: Streams.Stream; errorcode: SHORTINT);
   VAR
      event: ErrorEvent;
BEGIN
   NEW(event);
   event.type := error;
   event.message := errormsg[errorcode];
   event.errorcode := errorcode;
   RelatedEvents.Raise(s, event);
END Error;
Conventionally, error events of the library are expected to be of the priority Priorities.liberrors. Library errors should work silently on default and are not expected to cause the program to abort (an exception to this rule are assertions which indicate serious programming errors). Because we have decided to queue the events on default, we need not to call Events.Ignore for error.

Now, we are able to call Error from the interface procedures:

PROCEDURE Trunc(s: Streams.Stream; pos: Streams.Count) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      IF pos < 0 THEN
         Error(s, negPosition); RETURN FALSE
      ELSIF pos < s.pos THEN
         Error(s, posBeyondTruncPos); RETURN FALSE
      ELSIF pos > s.len THEN
         Error(s, posBeyondCurrentLength); RETURN FALSE
      ELSE
         s.len := pos; RETURN TRUE
      END;
   END;
END Trunc;
If a client module is now calling Streams.Trunc for a stream of type Stream80 with a position of 82, we get following error reactions:

Consider, the client module checks the return value of Streams.Trunc and calls Conclusions.Conclude in error case:

IF ~Streams.Trunc(s, pos) THEN
   Conclusions.Conclude(s, Errors.error, "");
END;
Then we would get both error messages in reversed order. Sometimes it is more convenient to call many stream operations and to ask later for error events:
IF RelatedEvents.EventsPending(s) THEN
   Conclusions.Conclude(s, Errors.error, "");
END;
RelatedEvents.EventsPending returns TRUE if the event queue of s is non-empty.

Extended Stream Operations

Some streams offer more attributes than only some storage and a current position. Consequently, streams may offer some operations which are not covered by Streams. The Message data type and the handler component of the stream interface allow to pass such operations through Streams.

A good example for additional attributes are terminal input settings. Attributes would be echo and linemode (read an entire line and then return it as requested to the reading procedures or return each character immediately). Following module defines the necessary types and procedures:

DEFINITION Terminal;

   IMPORT Streams;

   CONST
      setOp = 1;   (* set terminal attributes *)
      getOp = 2;   (* get terminal attributes *)
   TYPE
      Attributes =
         RECORD
            echo: BOOLEAN;
            linemode: BOOLEAN;
         END;
      Message =
         RECORD
            (Streams.Message)
            op: SHORTINT;
            at: Attributes;
         END;

   PROCEDURE Set(s: Streams.Stream; at: Attributes);

   PROCEDURE Get(s: Streams.Stream; VAR at: Attributes);

END Terminal.
Set and Get fill a Message record with at and op and pass it to Streams.Send. Streams.Send then passes its message without interaction to the handler procedure, if any. The implementation of Terminal:
MODULE Terminal;

   IMPORT Streams;

   (* ...definition of public constants and types... *)

   PROCEDURE Set(s: Streams.Stream; at: Attributes);
      VAR
         message: Message;
   BEGIN
      message.op := setOp;
      message.at := at;
      Streams.Send(s, message);
   END Set;

   PROCEDURE Get(s: Streams.Stream; VAR at: Attributes);
      VAR
         message: Message;
   BEGIN
      message.op := getOp;
      Streams.Send(s, message);
      at := message.at;
   END Get;

END Terminal.
Streams provides Streams.Send as a procedure without BOOLEAN result. Nevertheless, if no handler has been defined by the underlying implementation, error would be set to TRUE and lasterror to Streams.NoHandlerDefined. Failures of the extended operation itself should be noted in the Message record. The procedures which call Streams.Send are then free to indicate success by a BOOLEAN return value.

The following piece of code illustrates a possible implementation of Terminal.Set and Terminal.Get:

MODULE UnixFiles;

   (* UnixFiles are stream representations for UNIX file descriptors *)

   (* ... *)

   PROCEDURE TerminalHandler(s: Streams.Stream; VAR message: Streams.Message);
      VAR
         event: Events.Event;
   BEGIN
      IF message IS Terminal.Message THEN
         WITH message: Terminal.Message DO
            IF message.op = Terminal.setOp THEN
               (* ... *)
            ELSIF message.op = Terminal.getOp THEN
               (* ... *)
            END;
         END;
      ELSE
         (* ignore unknown messages *)
      END;
   END TerminalHandler;

   PROCEDURE OpenFd(VAR file: Streams.Stream; fd: SysIO.File;
                    mode: Mode; bufmode: Streams.BufMode) : BOOLEAN;
   BEGIN
      (* ... *)
      IF SysIO.Isatty(fd) THEN
         if.handler := TerminalHandler;
         INCL(caps, Streams.handler);
      END;
      (* ... *)
   END OpenFd;

   (* ... *)

END UnixFiles.
Sending a foreign message is not necessarily a hard error -- it could be the result of a broadcast.

Intermediate Streams

Streams may be defined upon other streams which leads to some typical UNIX filters like tee, sort, split etc. The following example illustrates a tee filter, i.e. a stream which maps every operation to two other streams:

DEFINITION Tee;

   IMPORT Streams;

   TYPE
      Stream = POINTER TO StreamRec;
      StreamRec = RECORD (Streams.StreamRec) END;

   PROCEDURE Open(VAR s: Streams.Stream; out1, out2: Streams.Stream);

END Tee.
We need both output streams and some other informations in our hidden extension of Streams.Stream:
TYPE
   Stream = POINTER TO StreamRec;
   StreamRec =
      RECORD
         (Streams.StreamRec)
         pos: Streams.Count;
         len: Streams.Count;
         out1, out2: Streams.Stream;
         start1, start2: Streams.Count;
      END;
Only out1 and out2 would be needed if we did not support Seek and Tell. Both output streams are not necessarily at position 0 at time of opening. Consequently, we should always position relatively to the positions found at opening time. Both positions are stored in start1 and start2. To allow relative positioning to the end of our stream (Streams.fromEnd) we need the length of our stream.

The write operation can now be implemented easily:

PROCEDURE Write(s: Streams.Stream; byte: BYTE) : BOOLEAN;
   VAR
      ok1, ok2: BOOLEAN;
BEGIN
   WITH s: Stream DO
      ok1 := Streams.WriteByte(s.out1, byte);
      ok2 := Streams.WriteByte(s.out2, byte);
      (* the position must not be changed in case of failures *)
      IF ok1 & ok2 THEN
         INC(s.pos);
         IF s.len < s.pos THEN
            s.len := s.pos;
         END;
         RETURN TRUE
      ELSE
         RETURN FALSE
      END;
   END;
END Write;
Everything is quite simple as long we get no errors: byte is copied to both streams, the current position is incremented, and the length of the stream is checked against the advanced position. If anything goes wrong and we decide to return FALSE, we must not modify our current position. So, we get a suspicious situation if one of the write operations succeeds while the other operation fails.

We could decide to ignore the error and to return TRUE. In this case the calling module has no chance to get knowledge about this problem (except if stream events are forwarded and checked against). Another approach would be to try to undo the successful operation. This requires seek ability of the underlying stream and seek operations may fail, too.

The decision is here to continue work but to report the error. This gives the calling module more flexibility: it could either decide to stop working on errors or to ignore the error. In the latter case output is still written to the other output stream. This strategy may be naive because we don't know the results of these repeated errors. Errors may lead to events, to printouts to log files, and even to annoying console messages (e.g. file system is out of space).

On the other hand, a stream implementation should not decide to indicate failure to the calling module because of one failed operation in earlier times. Each new call should be a new try. It is up to the decision of the calling module to retry or to abort.

The Seek operation:

PROCEDURE Seek(s: Streams.Stream; offset: Streams.Count;
               whence: Streams.Whence) : BOOLEAN;
   VAR
      newpos: Streams.Count;
      ok1, ok2: BOOLEAN;
BEGIN
   WITH s: Stream DO
      CASE whence OF
      | Streams.fromStart: newpos := offset;
      | Streams.fromPos:   newpos := offset + s.pos;
      | Streams.fromEnd:   newpos := offset + s.len;
      END;
      IF newpos < 0 THEN
         RETURN FALSE
      END;
      ok1 := Streams.Seek(s.out1, newpos + s.start1, Streams.fromStart);
      ok2 := Streams.Seek(s.out2, newpos + s.start2, Streams.fromStart);
      IF ok1 & ok2 THEN
         s.pos := newpos;
         IF s.len < s.pos THEN
            s.len := s.pos;
         END;
         RETURN TRUE
      ELSE
         RETURN FALSE
      END;
   END;
END Seek;
The new position is calculated as found in our previous Seek example. While as usual negative positions are illegal, we do not check newpos against the length of our stream. As long as the underlying Seek operations accept our new position we do not deny it. Positioning for s.out1 and s.out2 is done relatively to the starting positions s.start1 and s.start2 which have been retrieved at opening time.

As before, we get some trouble if only one of the operations succeeds. The strategy is close to Write: continue work but return the error indication to the calling module. This is not without danger because, if the error is ignored, the calling module will not know which underlying positions are affected by write operations (the position maintained by Tee does no longer correspond to one of the underlying implementations).

We might conclude that a practical Tee implementation should support several recovery strategies from which one is selected at opening time.

Tell returns s.pos:

PROCEDURE Tell(s: Streams.Stream; VAR pos: Streams.Count) : BOOLEAN;
BEGIN
   WITH s: Stream DO
      pos := s.pos;
      RETURN TRUE
   END;
END Tell;

Filter streams should define a handler procedure which passes messages to the output streams:

PROCEDURE Handler(s: Streams.Stream; VAR message: Streams.Message);
BEGIN
   WITH s: Stream DO
      Streams.Send(s.out1, message);
      Streams.Send(s.out2, message);
   END;
END Handler;
Obviously, sending an unknown message should not be a hard error. We would not be able to determine whether s.out1 or s.out2 support message.

Intermediate stream modules should transfer close operations to the underlying streams:

PROCEDURE Close(s: Streams.Stream) : BOOLEAN;
   VAR
      ok1, ok2: BOOLEAN;
BEGIN
   WITH s: Stream DO
      ok1 := Streams.Close(s.out1);
      ok2 := Streams.Close(s.out2);
      RETURN ok1 & ok2
   END;
END Close;

Finally, Open and the module body are to be implemented:

MODULE Tee;

   IMPORT Streams;

   (* ... type definitions ... *)

   VAR
      if: Streams.Interface;
      caps: Streams.CapabilitySet;

   (* ... *)

   PROCEDURE Open(VAR s: Streams.Stream; out1, out2: Streams.Stream);
      VAR
         caps1, caps2: Streams.CapabilitySet;
         implemented: Streams.CapabilitySet;
         stream: Stream;
         pos: Streams.Count;
   BEGIN
      caps1 := Streams.Capabilities(out1);
      caps2 := Streams.Capabilities(out2);
      NEW(stream);
      implemented := caps * caps1 * caps2;
      Streams.Init(stream, if, implemented, Streams.nobuf);
      stream.out1 := out1; stream.start1 := 0;
      stream.out2 := out2; stream.start2 := 0;
      IF (Streams.tell IN caps1) & Streams.Tell(out1, pos) THEN
         stream.start1 := pos;
      END;
      IF (Streams.tell IN caps2) & Streams.Tell(out2, pos) THEN
         stream.start2 := pos;
      END;
      stream.pos := 0; stream.len := 0;
      s := stream;
   END Open;

BEGIN
   NEW(if);
   if.write := Write; if.seek := Seek; if.tell := Tell; if.handler := Handler;
   if.close := Close;
   caps := {Streams.write, Streams.seek, Streams.tell, Streams.handler,
            Streams.close, Streams.holes};
END Tee.
To avoid unnecessary errors we determine the intersection of supported operations. If neither out1 nor out2 support write operations this would lead to an event raised by Streams.Init (which is hopefully not survived). Streams.holes is included in our set of capabilities to allow them if out1 and out2 support them.
converted to HTML: 1995/12/18

Oberon || Compiler & Tools || Library || Module Index || Search Engine