next up previous
Next: Kooperativität Up: Rendezvous Previous: Eine Schnittstelle für Rendezvous

Die Implementierung von Rendezvous in Oberon

Die Schnittstelle für Rendezvous sollte möglichst einfach und nahe an Ada angelehnt sein. Entsprechend wurde darauf verzichtet, daß die als Task agierende Prozedur bei Select, Accept und AcceptEnd einen sich selbst adressierenden Parameter angeben muß. Weiter liefert Initiate ein Ergebnis vom Typ Tasks.Task zurück, jedoch keinen Verweis auf eigene Datenstrukturen. Das daraus resultierende Problem bei der Implementierung läßt sich nur auf Basis von Disciplines lösen. Da Tasks.Task eine Erweiterung von Disciplines.Object ist, kann Rendezvous seine Datenstruktur als Disziplin bei der Task unterbringen und dort auch wiederfinden:

CONST
   maxentries = MAX(SET) + 1;
TYPE
   TaskDiscipline = POINTER TO TaskDisciplineRec;
   TaskDisciplineRec =
      RECORD
         (Disciplines.DisciplineRec)
         awaited: EntrySet;
         queues: ARRAY maxentries OF Queue;
         mutex: Semaphores.Semaphore;
         entry: Entry;
      END;
VAR
   taskdID: Disciplines.Identifier;

Die Komponente queues unterhält für alle möglichen Service-Angebote (entries) eine Warteschlange. Die Menge der Warteschlangen, die nicht leer sind, wird in awaited verzeichnet. Innerhalb eines Accept/AcceptEnd-Bereiches gibt entry den aktiven Service an. Da die Datenstrukturen sowohl von der aufrufenden Task als auch von der bedienenden Task gelesen und modifiziert werden, ist es notwendig, auf Basis eines Semaphors sich kurzfristig gegenseitig auszuschließen. Die Warteschlangen sollten genauso wie in Ada nach der FIFO-Strategie verwaltet werden:

TYPE
   Queue = POINTER TO QueueRec;
   QueueRec =
      RECORD
         head, tail: ConnectCondition;
      END;

Die Bedingung, die zum Warten für die Aufrufer von Connect dient, trägt auch den Auftrag. Die Komponente done wird zunächst von Connect auf FALSE gesetzt. Später wird AcceptEnd das Ende des Aufrufs dadurch signalisieren, daß done auf TRUE gesetzt wird.

TYPE
   ConnectCondition = POINTER TO ConnectConditionRec;
   ConnectConditionRec =
      RECORD
         (Conditions.ConditionRec)
         message: Message;
         done: BOOLEAN;
         next: ConnectCondition;
      END;
VAR
   ccDomain: Conditions.Domain;

PROCEDURE TestConnectCondition(domain: Conditions.Domain;
                               condition: Conditions.Condition;
                               errors: RelatedEvents.Object) : BOOLEAN;
BEGIN
   RETURN condition(ConnectCondition).done
END TestConnectCondition;

Auf der anderen Seite müssen Select und Accept solange blockiert werden, bis entsprechende Aufträge vorliegen:

TYPE
   AcceptCondition = POINTER TO AcceptConditionRec;
   AcceptConditionRec =
      RECORD
         (Conditions.ConditionRec)
         taskd: TaskDiscipline;
         entries: EntrySet;
      END;
VAR
   acDomain: Conditions.Domain;

PROCEDURE TestAcceptCondition(domain: Conditions.Domain;
                              condition: Conditions.Condition;
                              errors: RelatedEvents.Object) : BOOLEAN;
BEGIN
   WITH condition: AcceptCondition DO
      RETURN condition.taskd.awaited * condition.entries # {}
   END;
END TestAcceptCondition;

Auf dieser Basis können die Operationen von Rendezvous realisiert werden:

PROCEDURE GetTaskDiscipline(VAR taskd: TaskDiscipline);
   VAR
      ok: BOOLEAN;
BEGIN
   ok := Disciplines.Seek(Tasks.Current(), taskdID, taskd); ASSERT(ok);
END GetTaskDiscipline;

PROCEDURE Accept(entry: Entry; VAR message: Message);
   VAR
      taskd: TaskDiscipline;
      condition: AcceptCondition;
BEGIN
   GetTaskDiscipline(taskd);
   IF ~(entry IN taskd.awaited) THEN
      NEW(condition); Conditions.Init(condition, acDomain);
      condition.taskd := taskd; condition.entries := {entry};
      Tasks.WaitFor(condition);
   END;
   Semaphores.P(taskd.mutex);
   message := taskd.queues[entry].head.message;
   taskd.entry := entry;
   Semaphores.V(taskd.mutex);
END Accept;

PROCEDURE AcceptEnd;
   VAR
      taskd: TaskDiscipline;
      queue: Queue;
BEGIN
   GetTaskDiscipline(taskd);
   Semaphores.P(taskd.mutex);
   queue := taskd.queues[taskd.entry];
   queue.head.done := TRUE;
   queue.head := queue.head.next;
   IF queue.head = NIL THEN
      queue.tail := NIL;
      EXCL(taskd.awaited, taskd.entry);
   END;
   Semaphores.V(taskd.mutex);
END AcceptEnd;

PROCEDURE Select(entries: EntrySet; VAR entry: Entry);
   VAR
      taskd: TaskDiscipline;
      condition: AcceptCondition;
BEGIN
   GetTaskDiscipline(taskd);
   IF entries * taskd.awaited = {} THEN
      NEW(condition); Conditions.Init(condition, acDomain);
      condition.taskd := taskd; condition.entries := entries;
      Tasks.WaitFor(condition);
   END;
   (* select-marker method *)
   entry := taskd.entry;
   WHILE ~(entry IN entries * taskd.awaited) DO
      entry := (entry + 1) MOD maxentries;
   END;
END Select;

PROCEDURE Connect(task: Tasks.Task; entry: Entry; message: Message);
   VAR
      taskd: TaskDiscipline;
      condition: ConnectCondition;
      queue: Queue;
      ok: BOOLEAN;
BEGIN
   ok := Disciplines.Seek(task, taskdID, taskd); ASSERT(ok);
   NEW(condition); Conditions.Init(condition, ccDomain);
   condition.message := message; condition.done := FALSE;
   condition.next := NIL;
   Semaphores.P(taskd.mutex);
   queue := taskd.queues[entry];
   IF queue.head = NIL THEN
      queue.head := condition;
   ELSE
      queue.tail.next := condition;
   END;
   queue.tail := condition;
   INCL(taskd.awaited, entry);
   Semaphores.V(taskd.mutex);
   Tasks.WaitFor(condition);
END Connect;

Die folgende Variante von Initiate basiert auf dem Ulmer Koroutinenkonzept:

PROCEDURE CallTaskBody(VAR cr: Coroutines.Coroutine;
                       taskbody: TaskBody);
BEGIN
   SYSTEM.CRSPAWN(cr);
   taskbody;
   Tasks.Terminate;
END CallTaskBody;

PROCEDURE Initiate(body: TaskBody; VAR task: Tasks.Task);
   VAR
      cr: Coroutines.Coroutine;
      taskd: TaskDiscipline;
      entry: Entry;
BEGIN
   CallTaskBody(cr, body);
   Tasks.Create(task, cr);
   NEW(taskd); taskd.id := taskdID;
   taskd.awaited := {}; taskd.entry := 0;
   entry := 0;
   WHILE entry < maxentries DO
      NEW(taskd.queues[entry]);
      taskd.queues[entry].head := NIL; taskd.queues[entry].tail := NIL;
      INC(entry);
   END;
   LocalSemaphores.Create(taskd.mutex);
   Disciplines.Add(task, taskd);
END Initiate;

Zum Schluß noch die Prozedur, die bei der Initialisierung des Moduls aufgerufen wird:

PROCEDURE Init;

   PROCEDURE InitDomain(VAR domain: Conditions.Domain;
                        test: Conditions.TestProc);
      VAR
         if: Conditions.Interface;
         desc: Conditions.Description;
   BEGIN
      NEW(if); if.test := test;
      NEW(desc); desc.caps := {}; desc.internal := TRUE;
      NEW(domain);
      Conditions.InitDomain(domain, if, desc);
   END InitDomain;

BEGIN
   taskdID := Disciplines.Unique();
   InitDomain(ccDomain, TestConnectCondition);
   InitDomain(acDomain, TestAcceptCondition);
END Init;


next up previous
Next: Kooperativität Up: Rendezvous Previous: Eine Schnittstelle für Rendezvous
Andreas Borchert
2/2/1998