Die Schnittstelle für Rendezvous sollte möglichst einfach und nahe an Ada angelehnt sein. Entsprechend wurde darauf verzichtet, daß die als Task agierende Prozedur bei Select, Accept und AcceptEnd einen sich selbst adressierenden Parameter angeben muß. Weiter liefert Initiate ein Ergebnis vom Typ Tasks.Task zurück, jedoch keinen Verweis auf eigene Datenstrukturen. Das daraus resultierende Problem bei der Implementierung läßt sich nur auf Basis von Disciplines lösen. Da Tasks.Task eine Erweiterung von Disciplines.Object ist, kann Rendezvous seine Datenstruktur als Disziplin bei der Task unterbringen und dort auch wiederfinden:
CONST maxentries = MAX(SET) + 1; TYPE TaskDiscipline = POINTER TO TaskDisciplineRec; TaskDisciplineRec = RECORD (Disciplines.DisciplineRec) awaited: EntrySet; queues: ARRAY maxentries OF Queue; mutex: Semaphores.Semaphore; entry: Entry; END; VAR taskdID: Disciplines.Identifier;
Die Komponente queues unterhält für alle möglichen Service-Angebote (entries) eine Warteschlange. Die Menge der Warteschlangen, die nicht leer sind, wird in awaited verzeichnet. Innerhalb eines Accept/AcceptEnd-Bereiches gibt entry den aktiven Service an. Da die Datenstrukturen sowohl von der aufrufenden Task als auch von der bedienenden Task gelesen und modifiziert werden, ist es notwendig, auf Basis eines Semaphors sich kurzfristig gegenseitig auszuschließen. Die Warteschlangen sollten genauso wie in Ada nach der FIFO-Strategie verwaltet werden:
TYPE Queue = POINTER TO QueueRec; QueueRec = RECORD head, tail: ConnectCondition; END;
Die Bedingung, die zum Warten für die Aufrufer von Connect dient, trägt auch den Auftrag. Die Komponente done wird zunächst von Connect auf FALSE gesetzt. Später wird AcceptEnd das Ende des Aufrufs dadurch signalisieren, daß done auf TRUE gesetzt wird.
TYPE ConnectCondition = POINTER TO ConnectConditionRec; ConnectConditionRec = RECORD (Conditions.ConditionRec) message: Message; done: BOOLEAN; next: ConnectCondition; END; VAR ccDomain: Conditions.Domain; PROCEDURE TestConnectCondition(domain: Conditions.Domain; condition: Conditions.Condition; errors: RelatedEvents.Object) : BOOLEAN; BEGIN RETURN condition(ConnectCondition).done END TestConnectCondition;
Auf der anderen Seite müssen Select und Accept solange blockiert werden, bis entsprechende Aufträge vorliegen:
TYPE AcceptCondition = POINTER TO AcceptConditionRec; AcceptConditionRec = RECORD (Conditions.ConditionRec) taskd: TaskDiscipline; entries: EntrySet; END; VAR acDomain: Conditions.Domain; PROCEDURE TestAcceptCondition(domain: Conditions.Domain; condition: Conditions.Condition; errors: RelatedEvents.Object) : BOOLEAN; BEGIN WITH condition: AcceptCondition DO RETURN condition.taskd.awaited * condition.entries # {} END; END TestAcceptCondition;
Auf dieser Basis können die Operationen von Rendezvous realisiert werden:
PROCEDURE GetTaskDiscipline(VAR taskd: TaskDiscipline); VAR ok: BOOLEAN; BEGIN ok := Disciplines.Seek(Tasks.Current(), taskdID, taskd); ASSERT(ok); END GetTaskDiscipline; PROCEDURE Accept(entry: Entry; VAR message: Message); VAR taskd: TaskDiscipline; condition: AcceptCondition; BEGIN GetTaskDiscipline(taskd); IF ~(entry IN taskd.awaited) THEN NEW(condition); Conditions.Init(condition, acDomain); condition.taskd := taskd; condition.entries := {entry}; Tasks.WaitFor(condition); END; Semaphores.P(taskd.mutex); message := taskd.queues[entry].head.message; taskd.entry := entry; Semaphores.V(taskd.mutex); END Accept; PROCEDURE AcceptEnd; VAR taskd: TaskDiscipline; queue: Queue; BEGIN GetTaskDiscipline(taskd); Semaphores.P(taskd.mutex); queue := taskd.queues[taskd.entry]; queue.head.done := TRUE; queue.head := queue.head.next; IF queue.head = NIL THEN queue.tail := NIL; EXCL(taskd.awaited, taskd.entry); END; Semaphores.V(taskd.mutex); END AcceptEnd; PROCEDURE Select(entries: EntrySet; VAR entry: Entry); VAR taskd: TaskDiscipline; condition: AcceptCondition; BEGIN GetTaskDiscipline(taskd); IF entries * taskd.awaited = {} THEN NEW(condition); Conditions.Init(condition, acDomain); condition.taskd := taskd; condition.entries := entries; Tasks.WaitFor(condition); END; (* select-marker method *) entry := taskd.entry; WHILE ~(entry IN entries * taskd.awaited) DO entry := (entry + 1) MOD maxentries; END; END Select; PROCEDURE Connect(task: Tasks.Task; entry: Entry; message: Message); VAR taskd: TaskDiscipline; condition: ConnectCondition; queue: Queue; ok: BOOLEAN; BEGIN ok := Disciplines.Seek(task, taskdID, taskd); ASSERT(ok); NEW(condition); Conditions.Init(condition, ccDomain); condition.message := message; condition.done := FALSE; condition.next := NIL; Semaphores.P(taskd.mutex); queue := taskd.queues[entry]; IF queue.head = NIL THEN queue.head := condition; ELSE queue.tail.next := condition; END; queue.tail := condition; INCL(taskd.awaited, entry); Semaphores.V(taskd.mutex); Tasks.WaitFor(condition); END Connect;
Die folgende Variante von Initiate basiert auf dem Ulmer Koroutinenkonzept:
PROCEDURE CallTaskBody(VAR cr: Coroutines.Coroutine; taskbody: TaskBody); BEGIN SYSTEM.CRSPAWN(cr); taskbody; Tasks.Terminate; END CallTaskBody; PROCEDURE Initiate(body: TaskBody; VAR task: Tasks.Task); VAR cr: Coroutines.Coroutine; taskd: TaskDiscipline; entry: Entry; BEGIN CallTaskBody(cr, body); Tasks.Create(task, cr); NEW(taskd); taskd.id := taskdID; taskd.awaited := {}; taskd.entry := 0; entry := 0; WHILE entry < maxentries DO NEW(taskd.queues[entry]); taskd.queues[entry].head := NIL; taskd.queues[entry].tail := NIL; INC(entry); END; LocalSemaphores.Create(taskd.mutex); Disciplines.Add(task, taskd); END Initiate;
Zum Schluß noch die Prozedur, die bei der Initialisierung des Moduls aufgerufen wird:
PROCEDURE Init; PROCEDURE InitDomain(VAR domain: Conditions.Domain; test: Conditions.TestProc); VAR if: Conditions.Interface; desc: Conditions.Description; BEGIN NEW(if); if.test := test; NEW(desc); desc.caps := {}; desc.internal := TRUE; NEW(domain); Conditions.InitDomain(domain, if, desc); END InitDomain; BEGIN taskdID := Disciplines.Unique(); InitDomain(ccDomain, TestConnectCondition); InitDomain(acDomain, TestAcceptCondition); END Init;