next up previous
Next: Zusammenfassung Up: Parallelität und Synchronisierung Previous: Kooperativität

Netzwerkdämonen

Als Dämonen werden Programme bezeichnet, die langfristig laufen und bestimmte Dienstleistungen anbieten oder übernehmen. Im folgenden wird ein Konferenz-Dämon vorgestellt, der seine Dienste auf einem Netzwerk zur Verfügung stellt. Einmal gestartet, kann jeder auf dem Netzwerk den Kontakt zu dem Dämon aufnehmen und nach einer Registrierung sich mit allen anderen Kontaktaufnehmern unterhalten.

Auf dem Internet könnte eine Sitzung dann etwa so ablaufen:

 
Abbildung:  Zustandsübergänge für Tasks
oberon$ telnet belinda 4711 Kontaktaufnahme
Trying 134.60.66.21...  
Connected to belinda.  
Escape character is ^]. Kontakt ist hergestellt
Hallo, hier ist der Konferenz-Daemon. Begrüßung durch den Dämon
Gib bitte Deinen Namen ein. Aufforderung zur Identifizierung
Andreas Eingabe des eigenen Namens
Hallo Andreas, Du bist herzlich willkommen!  
Hallo zusammen, ist jemand da? Eingabe einer Textzeile
Andreas: Hallo zusammen, ist jemand da? Echo der eigenen Zeile
Robert: Hallo Andreas! Eine Zeile eines anderen Teilnehmers
Wolfgang: Nett, Dich zu treffen, Andreas! Noch ein weiterer Teilnehmer
quit Verlassen des Konferenz-Dämons
Connection closed by foreign host.  
oberon$  


Die Schnittstelle eines entsprechenden Moduls könnte folgendermaßen aussehen:

DEFINITION ChatDaemons;

   IMPORT Networks, RelatedEvents, Services;

   TYPE
      ChatDaemon = POINTER TO ChatDaemonRec;
      ChatDaemonRec = RECORD (Services.ObjectRec) END;

   PROCEDURE Create(VAR daemon: ChatDaemon; port: Networks.Address;
                    errors: RelatedEvents.Object) : BOOLEAN;
   PROCEDURE Terminate(daemon: ChatDaemon);

END ChatDaemons.

Das Modul Networks stellt die allgemeine Abstraktion für Netzwerke und Netzwerkverbindungen zur Verfügung. Eine mögliche Implementierung hierfür ist in der obigen Beispielsitzung Internet. Der Parameter port ist eine Adresse, bei der der neu zu startende Dämon sich registrieren soll, so daß spätere Teilnehmer ihn anwählen können.

Tasks läßt es nur zu, daß eine Task sich selbst terminiert. Trotzdem lassen sich indirekt Tasks auch von außen terminieren, wenn die Task bereit ist, einen entsprechenden Wunsch entgegenzunehmen. Terminate dient hier als Beispiel für diese Technik.

Die Arbeit eines Konferenz-Dämons wird von einer Reihe von Tasks übernommen: Eine Task kümmert sich um neue Kontaktaufnahmen, und für jede bestehende Verbindung gibt es eine weitere Task. Alle Tasks, die im Rahmen eines Konferenz-Dämons erzeugt werden, interessieren sich für die Terminierung des Objekts, das den Dämonen repräsentiert (termination), und alle Tasks, die mit einer Verbindung zu tun haben, interessieren sich für die Textzeilen anderer Benutzer (broadcast):

TYPE
   ChatDaemon = POINTER TO ChatDaemonRec;
   ChatDaemonRec =
      RECORD
         (Services.ObjectRec)
         broadcast: Events.EventType;
         termination: Events.EventType;
      END;
VAR
   type: Services.Type;

TYPE
   Name = ARRAY 32 OF CHAR;
   Message = Events.Message;
   BroadcastEvent = POINTER TO BroadcastEventRec;
   BroadcastEventRec =
      RECORD
         (Events.EventRec)
         sender: Name;
      END;

Das Verteilen eingegebener Textzeilen an andere Benutzer erfolgt über Broadcast:

PROCEDURE Broadcast(daemon: ChatDaemon;
                    sender: Name; line: Message);
   VAR
      event: BroadcastEvent;
BEGIN
   NEW(event); event.type := daemon.broadcast;
   event.message := line;
   event.sender := sender;
   Events.Raise(event);
END Broadcast;

Jede Task, die sich um eine Verbindung zu einem Benutzer kümmert, wartet während des Dialogs auf eines von drei Ereignissen: die Eingabe einer Zeile vom Benutzer, das Eingeben einer Zeile von einem anderen Benutzer oder die Terminierung. Das Warten auf Eingabe erfolgt mit Hilfe von StreamConditions, das die Abstraktion von Streams in Richtung asynchroner Ein- und Ausgabe erweitert. Dies geschieht auf Basis von Aufträgen, die Erweiterungen von Streams.Message sind und mit Hilfe von Streams.Send an die Implementierung verschickt werden können.

Dialog ist eine Prozedur, die sich selbst zur Koroutine erklärt und dann den Dialog mit einem Benutzer aufnimmt. Der Parameter s steht für den bidirektionalen Kommunikationskanal zum Benutzer:

PROCEDURE Dialog(VAR cr: Coroutines.Coroutine;
                 daemon: ChatDaemon;
                 s: Streams.Stream);
   CONST
      hello1 = "Hallo, hier ist der Konferenz-Daemon.";
      hello2 = "Gib bitte Deinen Namen ein.";
      welcome1 = "Hallo ";
      welcome2 = ", Du bist herzlich willkommen!";
      greeting = "ist mit von der Partie.";
      leaves = "verabschiedet sich.";
      shutdown = "Der Daemon verabschiedet sich!";
   VAR
      name: Name;    (* the name of our user *)
      line: Message; (* last line typed in by our user *)
      (* conditions *)
      readyForReading: Conditions.Condition;
         (* yields TRUE if our user has sth typed in *)
      broadcastMsgAvailable: Conditions.Condition;
         (* yields TRUE if anybody has sth typed in *)
      terminationMsgAvailable: Conditions.Condition;
         (* yields TRUE on termination *)
      conditions: Conditions.ConditionSet;
         (* the set of all conditions above *)
      errors: RelatedEvents.Object;
      event: Events.Event; (* broadcast or termination event *)
      lineterm: StreamDisciplines.LineTerminator; (* salami *)

   PROCEDURE Terminate;
      (* clean up and terminate *)
   BEGIN
      EventConditions.Drop(broadcastMsgAvailable);
      EventConditions.Drop(terminationMsgAvailable);
      Streams.Release(s);
      Tasks.Terminate;
   END Terminate;

BEGIN
   SYSTEM.CRSPAWN(cr);
   lineterm[0] := ASCII.cr; lineterm[1] := ASCII.nl; lineterm[2] := 0X;
   StreamDisciplines.SetLineTerm(s, lineterm);

   NEW(errors); RelatedEvents.QueueEvents(errors);

   (* create set of conditions *)
   StreamConditions.Create(readyForReading, s, StreamConditions.read);
   EventConditions.Create(broadcastMsgAvailable, daemon.broadcast);
   EventConditions.Create(terminationMsgAvailable, daemon.termination);
   Conditions.CreateSet(conditions);
   Conditions.Incl(conditions, readyForReading);
   Conditions.Incl(conditions, broadcastMsgAvailable);
   Conditions.Incl(conditions, terminationMsgAvailable);

   (* first hello and identification *)
   Write.LineS(s, hello1); Write.LineS(s, hello2);
   Read.LineS(s, name);
   IF name = "" THEN
      Terminate;
   END;
   (* welcome and first broadcast *)
   Write.StringS(s, welcome1);
   Write.StringS(s, name);
   Write.LineS(s, welcome2);
   Broadcast(daemon, name, greeting);

   LOOP (* until termination *)
      Tasks.WaitForOneOf(conditions);
      WHILE Conditions.Test(readyForReading, errors) &
            ~RelatedEvents.EventsPending(errors) DO
         Read.LineS(s, line);
         IF s.eof OR (line = "quit") THEN
            EXIT
         END;
         Broadcast(daemon, name, line);
      END;
      IF RelatedEvents.EventsPending(errors) THEN EXIT END;

      WHILE EventConditions.TestAndGet(broadcastMsgAvailable, event) DO
         WITH event: BroadcastEvent DO
            Write.StringS(s, event.sender);
            Write.StringS(s, ": ");
            Write.LineS(s, event.message);
         END;
      END;
      IF EventConditions.TestAndGet(terminationMsgAvailable, event) &
            (event(Resources.Event).change = Resources.terminated) THEN
         Write.LineS(s, shutdown);
         EXIT
      END;
   END;
   Broadcast(daemon, name, leaves);
   Terminate;
END Dialog;

Die Task, die sich um das Aufnehmen neuer Kontakte kümmert, macht von Networks.AcceptCondition Gebrauch, das das Warten auf neue Kontaktaufnahmen ermöglicht. Jede neue Kontaktaufnahme führt zum Erzeugen einer neuen Koroutine für den Dialog, die dann Tasks übergeben wird:

PROCEDURE ConnectionManager(VAR cr: Coroutines.Coroutine;
                            daemon: ChatDaemon; port: Networks.Address;
                            errors: RelatedEvents.Object;
                            VAR ok: BOOLEAN);
   VAR
      newConnection: Conditions.Condition;
      terminationMsgAvailable: Conditions.Condition;
      conditions: Conditions.ConditionSet;
      socket: Networks.Socket; (* registered port *)
      s: Streams.Stream; (* new communication channel *)
      event: Events.Event;
      dialogCR: Coroutines.Coroutine;
      task: Tasks.Task;
BEGIN
   (* register port address so that clients may connect to *)
   IF ~Networks.Listen(socket, port, errors) THEN
      ok := FALSE; RETURN
   END;

   SYSTEM.CRSPAWN(cr);

   (* create conditions *)
   Networks.CreateAcceptCondition(newConnection, socket);
   EventConditions.Create(terminationMsgAvailable, daemon.termination);
   Conditions.CreateSet(conditions);
   Conditions.Incl(conditions, newConnection);
   Conditions.Incl(conditions, terminationMsgAvailable);

   LOOP
      Tasks.WaitForOneOf(conditions);
      IF EventConditions.TestAndGet(terminationMsgAvailable, event) &
            (event(Resources.Event).change = Resources.terminated) THEN
         EXIT
      END;
      WHILE Conditions.Test(newConnection, errors) &
            ~RelatedEvents.EventsPending(errors) &
            Networks.Accept(socket, s, Streams.linebuf) DO
         Dialog(dialogCR, daemon, s); Tasks.Create(task, dialogCR);
         RelatedEvents.Forward(s, daemon);
      END;
   END;
   EventConditions.Drop(terminationMsgAvailable);
   Networks.Release(socket);
   Tasks.Terminate;
END ConnectionManager;

Create braucht dann nur ConnectionManager zu starten:

PROCEDURE Create(VAR daemon: ChatDaemon; port: Networks.Address;
                 errors: RelatedEvents.Object) : BOOLEAN;
   VAR
      cm: Coroutines.Coroutine;
      task: Tasks.Task;
      ok: BOOLEAN;
BEGIN
   NEW(daemon);
   Services.Init(daemon, type);
   Events.Define(daemon.broadcast); Events.Ignore(daemon.broadcast);
   Resources.TakeInterest(daemon, daemon.termination);
   ConnectionManager(cm, daemon, port, errors, ok);
   IF ok THEN
      Tasks.Create(task, cm);
      RelatedEvents.QueueEvents(daemon);
      RETURN TRUE
   ELSE
      RETURN FALSE
   END;
END Create;

Terminate über Resources das den Dämonen repräsentierende Objekt. Da alle zugehörigen Tasks auf dieses Ereignis warten und bei Eintreffen sofort Tasks.Terminate ausführen, kommt es zur Terminierung aller für diesen Dämonen kreierten Tasks.

PROCEDURE Terminate(daemon: ChatDaemon);
BEGIN
   Resources.Notify(daemon, Resources.terminated);
END Terminate;


next up previous
Next: Zusammenfassung Up: Parallelität und Synchronisierung Previous: Kooperativität
Andreas Borchert
2/2/1998