server

Server for publisher subscriber system. For more information, please run python server.py --help

  1"""Server for publisher subscriber system. For more information, please run `python server.py --help`"""
  2
  3import asyncio
  4import logging
  5import time
  6import functools
  7from argparse import ArgumentParser
  8from datetime import datetime
  9from threading import Lock, Thread
 10from typing import List, Optional, Union, Dict
 11
 12import socketio
 13from aiohttp import web
 14
 15from transport_message import TransportMessage
 16
 17
 18## Setup logging ##
 19# Set all loggers to ERROR level
 20loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
 21for logger in loggers:
 22    logger.setLevel(logging.ERROR)
 23
 24# Set server logger to INFO level
 25# Create file handler which logs even debug messages
 26file_handler = logging.FileHandler("server.log")
 27file_handler.setLevel(logging.DEBUG)
 28
 29# Create console handler
 30console_handler = logging.StreamHandler()
 31console_handler.setLevel(logging.DEBUG)
 32
 33# Create formatter and add it to the handlers
 34formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
 35file_handler.setFormatter(formatter)
 36console_handler.setFormatter(formatter)
 37
 38# Add the handlers to the logger
 39logging.getLogger().addHandler(file_handler)
 40logging.getLogger().addHandler(console_handler)
 41logging.getLogger().setLevel(logging.DEBUG)
 42
 43# logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
 44
 45
 46class ParallelTimer(Thread):
 47    """Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds."""
 48
 49    def __init__(self, server) -> None:
 50        """Constructor of ParallelTimer class.
 51
 52        :param server: server object
 53        """
 54        super().__init__()
 55        self.server = server
 56
 57    def run(self):
 58        """
 59        Starting parallel timer in loop.
 60        """
 61        while 1:
 62            heartbeat = self.server.heart_beat(20)
 63            asyncio.run(heartbeat)
 64
 65
 66class Topic:
 67    """Class to manage the Topics with needed data."""
 68
 69    name: Union[None, str] = None
 70    """name of the topic"""
 71    content: Union[None, str] = None
 72    """content of the topic"""
 73    subscribers: List[str] = []
 74    """list of subscribers"""
 75    timestamp: Union[None, int] = None
 76    """timestamp"""
 77    last_update: Union[None, int] = None
 78    """last update of topic"""
 79
 80
 81class Server:
 82    def __init__(self) -> None:
 83        self._list_of_topics: List[Topic] = []
 84        self._sid_ip_mapping: Dict[str, str] = {}
 85        self._lock = Lock()
 86
 87        self.sio = socketio.AsyncServer(
 88            async_mode="aiohttp", cors_allowed_origins="*", logger=False, engineio_logger=False
 89        )
 90        self.sio.event(self.connect)
 91        self.sio.on("SUBSCRIBE_TOPIC", self.handle_subscribe)
 92        self.sio.on("UNSUBSCRIBE_TOPIC", self.handle_unsubscribe)
 93        self.sio.on("PUBLISH_TOPIC", self.handle_publish)
 94        self.sio.on("LIST_TOPICS", self.handle_list_topics)
 95        self.sio.on("GET_TOPIC_STATUS", self.handle_topic_status)
 96
 97    def _check_data_none_decorator(func):
 98        """Decorator for checking if data is None.
 99        If data is None, the client will receive an error message.
100        """
101
102        @functools.wraps(func)
103        async def wrapper(self, *args, **kwargs):
104            sid = args[0]
105            data = args[1] if len(args) > 1 else None
106            if data is None:
107                response = TransportMessage(
108                    timestamp=int(time.time()), payload="Missing payload of type TransportMessage."
109                )
110                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
111                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
112                return None
113            return await func(self, *args, **kwargs)
114
115        return wrapper
116
117    def _check_topic_decorator(func):
118        """Decorator for checking if topic exists.
119        If topic does not exist, the client will receive an error message.
120        """
121
122        @functools.wraps(func)
123        async def wrapper(self, *args, **kwargs):
124            sid = args[0]
125            data = args[1] if len(args) > 1 else None
126            try:
127                parsed_data = TransportMessage.parse_raw(data)
128            except Exception:
129                response = TransportMessage(timestamp=int(time.time()), payload="Invalid payload.")
130                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
131                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
132                return None
133
134            # Check if data contains topic
135            if parsed_data.topic is None:
136                response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter topic.")
137                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
138                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
139                return None
140            return await func(self, *args, **kwargs)
141
142        return wrapper
143
144    async def connect(self, sid, environ, auth=None):
145        """Called when a client connects to the server.
146
147        :param sid: Generated session id
148        :param environ: Environment variables
149        :param auth: Unused
150        """
151        logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid)
152        self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote
153
154    @_check_data_none_decorator
155    @_check_topic_decorator
156    async def handle_subscribe(self, sid, data=None) -> None:
157        """Called when a client subscribes to a topic.
158        If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing
159        changes. Otherwise the client will be subscribed to the topic and will receive updates.
160
161        :param sid: Generated session id
162        :param data: Data sent by the client
163        """
164        data = TransportMessage.parse_raw(data)
165        topic = self._get_topic_by_name(data.topic)
166        if topic is not None:
167            # Check if sid already subscribed to topic
168            if sid in topic.subscribers:
169                response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.")
170            else:
171                # Subscribe to topic
172                topic.subscribers.append(sid)
173                response = TransportMessage(
174                    timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}."
175                )
176        else:
177            # Create new topic if not already existing and subscribe
178            new_topic = Topic()
179            new_topic.name = data.topic
180            new_topic.subscribers.append(sid)
181            self._add_topic(new_topic)
182            response = TransportMessage(
183                timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed."
184            )
185
186        await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid)
187        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
188
189    @_check_data_none_decorator
190    @_check_topic_decorator
191    async def handle_unsubscribe(self, sid, data=None) -> None:
192        """Called when a client unsubscribes from a topic.
193        If the client is not subscribed to the topic or topic does not exist, the client will receive an error message.
194        Otherwise the client will be unsubscribed from the topic and will not receive any updates.
195        If the topic has no subscribers left it will be deleted.
196
197        :param sid: Generated session id
198        :param data: Data sent by the client
199        """
200
201        data = TransportMessage.parse_raw(data)
202        topic = self._get_topic_by_name(data.topic)
203
204        if topic is not None:
205            # Check if sid subscribed to topic and unsubscribe
206            if sid in topic.subscribers:
207                topic.subscribers.remove(sid)
208                response = TransportMessage(
209                    timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}."
210                )
211                # Delete topic if no subscribers left
212                if len(topic.subscribers) == 0:
213                    self._remove_topic(topic)
214            else:
215                # Not subscribed
216                response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.")
217
218        else:
219            # Topic not existing
220            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
221
222        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
223        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
224
225    @_check_data_none_decorator
226    @_check_topic_decorator
227    async def handle_publish(self, sid, data=None) -> None:
228        """Called when a client publishes a message to a topic.
229        The message will be published to the topic and all subscribers will receive the message.
230
231        :param sid: Generated session id
232        :param data: Data sent by the client
233        """
234        data = TransportMessage.parse_raw(data)
235        topic = self._get_topic_by_name(data.topic)
236
237        # Check if data contains payload
238        if data.payload is None:
239            response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.")
240            await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
241            return None
242
243        if topic is not None:
244            # Publish message to topic
245            topic.content = data.payload
246            topic.timestamp = data.timestamp
247            response = TransportMessage(
248                timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}."
249            )
250            await self.update_topic(topic)
251        else:
252            # Topic not existing
253            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
254
255        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
256        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
257
258    async def handle_list_topics(self, sid, data=None) -> None:
259        """Called when a client requests a list of all topics.
260        The client will receive a list of all topics.
261
262        :param sid: Generated session id
263        :param data: Data sent by the client. Unused
264        """
265        response_msg = "All topics on the server:"
266        for topic in self._list_of_topics:
267            response_msg += f"\n{topic.name}"
268
269        response = TransportMessage(timestamp=int(time.time()), payload=response_msg)
270        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
271        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
272
273    @_check_data_none_decorator
274    @_check_topic_decorator
275    async def handle_topic_status(self, sid, data=None) -> None:
276        """Called when a client requests the status of a topic.
277        The client will receive the status of the topic.
278
279        :param sid: Generated session id
280        :param data: Data sent by the client
281        """
282
283        data = TransportMessage.parse_raw(data)
284        topic = self._get_topic_by_name(data.topic)
285
286        if topic is not None:
287            subscribers = ""
288            for subscriber in topic.subscribers:
289                subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t"
290
291            if topic.content is None or topic.timestamp is None:
292                topic_status = (
293                    f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet."
294                )
295            else:
296                topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}"
297
298            response = TransportMessage(timestamp=int(time.time()), payload=topic_status)
299        else:
300            # Topic not existing
301            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
302
303        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
304        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
305
306    async def update_topic(self, topic: Topic) -> None:
307        """Called when a topic is updated.
308        The subscribers of the topic will receive the updated topic.
309
310        :param topic: The topic
311        """
312        topic.last_update = int(time.time())
313        response = TransportMessage(
314            timestamp=int(time.time()),
315            payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}",
316        )
317        for sub in topic.subscribers:
318            await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub)
319
320    async def heart_beat(self, time_delta):
321        """Go through all topics and check if they were updated in the last time_delta seconds.
322        If not, update the topic.
323
324        :param time_delta: Time in seconds
325        """
326        for topic in self._list_of_topics:
327            if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta:
328                await self.update_topic(topic)
329                logging.info("Topic %s was updated through heart beat.", topic.name)
330
331    def _get_topic_by_name(self, name: str) -> Optional[Topic]:
332        """Get a topic by its name.
333
334        :param name: Name of the topic
335        :return: Topic object
336        """
337        for topic in self._list_of_topics:
338            if topic.name == name:
339                return topic
340        return None
341
342    def _add_topic(self, topic: Topic) -> None:
343        """Add a topic to the list of topics.
344
345        :param topic: Topic object
346        """
347        with self._lock:
348            self._list_of_topics.append(topic)
349
350    def _remove_topic(self, topic: Topic) -> None:
351        """Remove a topic from the list of topics.
352
353        :param topic: Topic object
354        """
355        with self._lock:
356            logging.warning("Topic %s was removed.", topic.name)
357            self._list_of_topics.remove(topic)
358
359
360def get_app():
361    """Create an ASGI application for the server.
362
363    :return: ASGI application
364    """
365    server = Server()
366    application = web.Application(logger=None)
367
368    server.sio.attach(application)
369
370    timer = ParallelTimer(server)
371    timer.start()
372
373    return application
374
375
376if __name__ == "__main__":
377    parser = ArgumentParser(prog="server.py", description="Starts a server for publisher subscriber system")
378    parser.add_argument(
379        "-p", "--port", type=str, help="Port to run the server on. Default is 8080", default=8080, metavar="STRING"
380    )
381    parser.add_argument(
382        "--host", type=str, help="Host to run the server on. Default is localhost", default="127.0.0.1", metavar="STRING"
383    )
384    params = parser.parse_args()
385
386    # wrap with ASGI application
387    app = get_app()
388    web.run_app(app, host=params.host, port=params.port)
class ParallelTimer(threading.Thread):
47class ParallelTimer(Thread):
48    """Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds."""
49
50    def __init__(self, server) -> None:
51        """Constructor of ParallelTimer class.
52
53        :param server: server object
54        """
55        super().__init__()
56        self.server = server
57
58    def run(self):
59        """
60        Starting parallel timer in loop.
61        """
62        while 1:
63            heartbeat = self.server.heart_beat(20)
64            asyncio.run(heartbeat)

Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds.

ParallelTimer(server)
50    def __init__(self, server) -> None:
51        """Constructor of ParallelTimer class.
52
53        :param server: server object
54        """
55        super().__init__()
56        self.server = server

Constructor of ParallelTimer class.

Parameters
  • server: server object
def run(self):
58    def run(self):
59        """
60        Starting parallel timer in loop.
61        """
62        while 1:
63            heartbeat = self.server.heart_beat(20)
64            asyncio.run(heartbeat)

Starting parallel timer in loop.

Inherited Members
threading.Thread
start
join
name
ident
is_alive
daemon
isDaemon
setDaemon
getName
setName
native_id
class Topic:
67class Topic:
68    """Class to manage the Topics with needed data."""
69
70    name: Union[None, str] = None
71    """name of the topic"""
72    content: Union[None, str] = None
73    """content of the topic"""
74    subscribers: List[str] = []
75    """list of subscribers"""
76    timestamp: Union[None, int] = None
77    """timestamp"""
78    last_update: Union[None, int] = None
79    """last update of topic"""

Class to manage the Topics with needed data.

name: Optional[str] = None

name of the topic

content: Optional[str] = None

content of the topic

subscribers: List[str] = []

list of subscribers

timestamp: Optional[int] = None

timestamp

last_update: Optional[int] = None

last update of topic

class Server:
 82class Server:
 83    def __init__(self) -> None:
 84        self._list_of_topics: List[Topic] = []
 85        self._sid_ip_mapping: Dict[str, str] = {}
 86        self._lock = Lock()
 87
 88        self.sio = socketio.AsyncServer(
 89            async_mode="aiohttp", cors_allowed_origins="*", logger=False, engineio_logger=False
 90        )
 91        self.sio.event(self.connect)
 92        self.sio.on("SUBSCRIBE_TOPIC", self.handle_subscribe)
 93        self.sio.on("UNSUBSCRIBE_TOPIC", self.handle_unsubscribe)
 94        self.sio.on("PUBLISH_TOPIC", self.handle_publish)
 95        self.sio.on("LIST_TOPICS", self.handle_list_topics)
 96        self.sio.on("GET_TOPIC_STATUS", self.handle_topic_status)
 97
 98    def _check_data_none_decorator(func):
 99        """Decorator for checking if data is None.
100        If data is None, the client will receive an error message.
101        """
102
103        @functools.wraps(func)
104        async def wrapper(self, *args, **kwargs):
105            sid = args[0]
106            data = args[1] if len(args) > 1 else None
107            if data is None:
108                response = TransportMessage(
109                    timestamp=int(time.time()), payload="Missing payload of type TransportMessage."
110                )
111                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
112                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
113                return None
114            return await func(self, *args, **kwargs)
115
116        return wrapper
117
118    def _check_topic_decorator(func):
119        """Decorator for checking if topic exists.
120        If topic does not exist, the client will receive an error message.
121        """
122
123        @functools.wraps(func)
124        async def wrapper(self, *args, **kwargs):
125            sid = args[0]
126            data = args[1] if len(args) > 1 else None
127            try:
128                parsed_data = TransportMessage.parse_raw(data)
129            except Exception:
130                response = TransportMessage(timestamp=int(time.time()), payload="Invalid payload.")
131                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
132                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
133                return None
134
135            # Check if data contains topic
136            if parsed_data.topic is None:
137                response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter topic.")
138                await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
139                logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload)
140                return None
141            return await func(self, *args, **kwargs)
142
143        return wrapper
144
145    async def connect(self, sid, environ, auth=None):
146        """Called when a client connects to the server.
147
148        :param sid: Generated session id
149        :param environ: Environment variables
150        :param auth: Unused
151        """
152        logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid)
153        self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote
154
155    @_check_data_none_decorator
156    @_check_topic_decorator
157    async def handle_subscribe(self, sid, data=None) -> None:
158        """Called when a client subscribes to a topic.
159        If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing
160        changes. Otherwise the client will be subscribed to the topic and will receive updates.
161
162        :param sid: Generated session id
163        :param data: Data sent by the client
164        """
165        data = TransportMessage.parse_raw(data)
166        topic = self._get_topic_by_name(data.topic)
167        if topic is not None:
168            # Check if sid already subscribed to topic
169            if sid in topic.subscribers:
170                response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.")
171            else:
172                # Subscribe to topic
173                topic.subscribers.append(sid)
174                response = TransportMessage(
175                    timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}."
176                )
177        else:
178            # Create new topic if not already existing and subscribe
179            new_topic = Topic()
180            new_topic.name = data.topic
181            new_topic.subscribers.append(sid)
182            self._add_topic(new_topic)
183            response = TransportMessage(
184                timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed."
185            )
186
187        await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid)
188        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
189
190    @_check_data_none_decorator
191    @_check_topic_decorator
192    async def handle_unsubscribe(self, sid, data=None) -> None:
193        """Called when a client unsubscribes from a topic.
194        If the client is not subscribed to the topic or topic does not exist, the client will receive an error message.
195        Otherwise the client will be unsubscribed from the topic and will not receive any updates.
196        If the topic has no subscribers left it will be deleted.
197
198        :param sid: Generated session id
199        :param data: Data sent by the client
200        """
201
202        data = TransportMessage.parse_raw(data)
203        topic = self._get_topic_by_name(data.topic)
204
205        if topic is not None:
206            # Check if sid subscribed to topic and unsubscribe
207            if sid in topic.subscribers:
208                topic.subscribers.remove(sid)
209                response = TransportMessage(
210                    timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}."
211                )
212                # Delete topic if no subscribers left
213                if len(topic.subscribers) == 0:
214                    self._remove_topic(topic)
215            else:
216                # Not subscribed
217                response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.")
218
219        else:
220            # Topic not existing
221            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
222
223        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
224        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
225
226    @_check_data_none_decorator
227    @_check_topic_decorator
228    async def handle_publish(self, sid, data=None) -> None:
229        """Called when a client publishes a message to a topic.
230        The message will be published to the topic and all subscribers will receive the message.
231
232        :param sid: Generated session id
233        :param data: Data sent by the client
234        """
235        data = TransportMessage.parse_raw(data)
236        topic = self._get_topic_by_name(data.topic)
237
238        # Check if data contains payload
239        if data.payload is None:
240            response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.")
241            await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
242            return None
243
244        if topic is not None:
245            # Publish message to topic
246            topic.content = data.payload
247            topic.timestamp = data.timestamp
248            response = TransportMessage(
249                timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}."
250            )
251            await self.update_topic(topic)
252        else:
253            # Topic not existing
254            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
255
256        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
257        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
258
259    async def handle_list_topics(self, sid, data=None) -> None:
260        """Called when a client requests a list of all topics.
261        The client will receive a list of all topics.
262
263        :param sid: Generated session id
264        :param data: Data sent by the client. Unused
265        """
266        response_msg = "All topics on the server:"
267        for topic in self._list_of_topics:
268            response_msg += f"\n{topic.name}"
269
270        response = TransportMessage(timestamp=int(time.time()), payload=response_msg)
271        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
272        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
273
274    @_check_data_none_decorator
275    @_check_topic_decorator
276    async def handle_topic_status(self, sid, data=None) -> None:
277        """Called when a client requests the status of a topic.
278        The client will receive the status of the topic.
279
280        :param sid: Generated session id
281        :param data: Data sent by the client
282        """
283
284        data = TransportMessage.parse_raw(data)
285        topic = self._get_topic_by_name(data.topic)
286
287        if topic is not None:
288            subscribers = ""
289            for subscriber in topic.subscribers:
290                subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t"
291
292            if topic.content is None or topic.timestamp is None:
293                topic_status = (
294                    f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet."
295                )
296            else:
297                topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}"
298
299            response = TransportMessage(timestamp=int(time.time()), payload=topic_status)
300        else:
301            # Topic not existing
302            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
303
304        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
305        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
306
307    async def update_topic(self, topic: Topic) -> None:
308        """Called when a topic is updated.
309        The subscribers of the topic will receive the updated topic.
310
311        :param topic: The topic
312        """
313        topic.last_update = int(time.time())
314        response = TransportMessage(
315            timestamp=int(time.time()),
316            payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}",
317        )
318        for sub in topic.subscribers:
319            await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub)
320
321    async def heart_beat(self, time_delta):
322        """Go through all topics and check if they were updated in the last time_delta seconds.
323        If not, update the topic.
324
325        :param time_delta: Time in seconds
326        """
327        for topic in self._list_of_topics:
328            if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta:
329                await self.update_topic(topic)
330                logging.info("Topic %s was updated through heart beat.", topic.name)
331
332    def _get_topic_by_name(self, name: str) -> Optional[Topic]:
333        """Get a topic by its name.
334
335        :param name: Name of the topic
336        :return: Topic object
337        """
338        for topic in self._list_of_topics:
339            if topic.name == name:
340                return topic
341        return None
342
343    def _add_topic(self, topic: Topic) -> None:
344        """Add a topic to the list of topics.
345
346        :param topic: Topic object
347        """
348        with self._lock:
349            self._list_of_topics.append(topic)
350
351    def _remove_topic(self, topic: Topic) -> None:
352        """Remove a topic from the list of topics.
353
354        :param topic: Topic object
355        """
356        with self._lock:
357            logging.warning("Topic %s was removed.", topic.name)
358            self._list_of_topics.remove(topic)
async def connect(self, sid, environ, auth=None):
145    async def connect(self, sid, environ, auth=None):
146        """Called when a client connects to the server.
147
148        :param sid: Generated session id
149        :param environ: Environment variables
150        :param auth: Unused
151        """
152        logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid)
153        self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote

Called when a client connects to the server.

Parameters
  • sid: Generated session id
  • environ: Environment variables
  • auth: Unused
async def handle_subscribe(self, sid, data=None) -> None:
155    @_check_data_none_decorator
156    @_check_topic_decorator
157    async def handle_subscribe(self, sid, data=None) -> None:
158        """Called when a client subscribes to a topic.
159        If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing
160        changes. Otherwise the client will be subscribed to the topic and will receive updates.
161
162        :param sid: Generated session id
163        :param data: Data sent by the client
164        """
165        data = TransportMessage.parse_raw(data)
166        topic = self._get_topic_by_name(data.topic)
167        if topic is not None:
168            # Check if sid already subscribed to topic
169            if sid in topic.subscribers:
170                response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.")
171            else:
172                # Subscribe to topic
173                topic.subscribers.append(sid)
174                response = TransportMessage(
175                    timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}."
176                )
177        else:
178            # Create new topic if not already existing and subscribe
179            new_topic = Topic()
180            new_topic.name = data.topic
181            new_topic.subscribers.append(sid)
182            self._add_topic(new_topic)
183            response = TransportMessage(
184                timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed."
185            )
186
187        await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid)
188        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)

Called when a client subscribes to a topic. If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing changes. Otherwise the client will be subscribed to the topic and will receive updates.

Parameters
  • sid: Generated session id
  • data: Data sent by the client
async def handle_unsubscribe(self, sid, data=None) -> None:
190    @_check_data_none_decorator
191    @_check_topic_decorator
192    async def handle_unsubscribe(self, sid, data=None) -> None:
193        """Called when a client unsubscribes from a topic.
194        If the client is not subscribed to the topic or topic does not exist, the client will receive an error message.
195        Otherwise the client will be unsubscribed from the topic and will not receive any updates.
196        If the topic has no subscribers left it will be deleted.
197
198        :param sid: Generated session id
199        :param data: Data sent by the client
200        """
201
202        data = TransportMessage.parse_raw(data)
203        topic = self._get_topic_by_name(data.topic)
204
205        if topic is not None:
206            # Check if sid subscribed to topic and unsubscribe
207            if sid in topic.subscribers:
208                topic.subscribers.remove(sid)
209                response = TransportMessage(
210                    timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}."
211                )
212                # Delete topic if no subscribers left
213                if len(topic.subscribers) == 0:
214                    self._remove_topic(topic)
215            else:
216                # Not subscribed
217                response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.")
218
219        else:
220            # Topic not existing
221            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
222
223        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
224        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)

Called when a client unsubscribes from a topic. If the client is not subscribed to the topic or topic does not exist, the client will receive an error message. Otherwise the client will be unsubscribed from the topic and will not receive any updates. If the topic has no subscribers left it will be deleted.

Parameters
  • sid: Generated session id
  • data: Data sent by the client
async def handle_publish(self, sid, data=None) -> None:
226    @_check_data_none_decorator
227    @_check_topic_decorator
228    async def handle_publish(self, sid, data=None) -> None:
229        """Called when a client publishes a message to a topic.
230        The message will be published to the topic and all subscribers will receive the message.
231
232        :param sid: Generated session id
233        :param data: Data sent by the client
234        """
235        data = TransportMessage.parse_raw(data)
236        topic = self._get_topic_by_name(data.topic)
237
238        # Check if data contains payload
239        if data.payload is None:
240            response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.")
241            await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
242            return None
243
244        if topic is not None:
245            # Publish message to topic
246            topic.content = data.payload
247            topic.timestamp = data.timestamp
248            response = TransportMessage(
249                timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}."
250            )
251            await self.update_topic(topic)
252        else:
253            # Topic not existing
254            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
255
256        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
257        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)

Called when a client publishes a message to a topic. The message will be published to the topic and all subscribers will receive the message.

Parameters
  • sid: Generated session id
  • data: Data sent by the client
async def handle_list_topics(self, sid, data=None) -> None:
259    async def handle_list_topics(self, sid, data=None) -> None:
260        """Called when a client requests a list of all topics.
261        The client will receive a list of all topics.
262
263        :param sid: Generated session id
264        :param data: Data sent by the client. Unused
265        """
266        response_msg = "All topics on the server:"
267        for topic in self._list_of_topics:
268            response_msg += f"\n{topic.name}"
269
270        response = TransportMessage(timestamp=int(time.time()), payload=response_msg)
271        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
272        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)

Called when a client requests a list of all topics. The client will receive a list of all topics.

Parameters
  • sid: Generated session id
  • data: Data sent by the client. Unused
async def handle_topic_status(self, sid, data=None) -> None:
274    @_check_data_none_decorator
275    @_check_topic_decorator
276    async def handle_topic_status(self, sid, data=None) -> None:
277        """Called when a client requests the status of a topic.
278        The client will receive the status of the topic.
279
280        :param sid: Generated session id
281        :param data: Data sent by the client
282        """
283
284        data = TransportMessage.parse_raw(data)
285        topic = self._get_topic_by_name(data.topic)
286
287        if topic is not None:
288            subscribers = ""
289            for subscriber in topic.subscribers:
290                subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t"
291
292            if topic.content is None or topic.timestamp is None:
293                topic_status = (
294                    f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet."
295                )
296            else:
297                topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}"
298
299            response = TransportMessage(timestamp=int(time.time()), payload=topic_status)
300        else:
301            # Topic not existing
302            response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.")
303
304        await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid)
305        logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)

Called when a client requests the status of a topic. The client will receive the status of the topic.

Parameters
  • sid: Generated session id
  • data: Data sent by the client
async def update_topic(self, topic: server.Topic) -> None:
307    async def update_topic(self, topic: Topic) -> None:
308        """Called when a topic is updated.
309        The subscribers of the topic will receive the updated topic.
310
311        :param topic: The topic
312        """
313        topic.last_update = int(time.time())
314        response = TransportMessage(
315            timestamp=int(time.time()),
316            payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}",
317        )
318        for sub in topic.subscribers:
319            await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub)

Called when a topic is updated. The subscribers of the topic will receive the updated topic.

Parameters
  • topic: The topic
async def heart_beat(self, time_delta):
321    async def heart_beat(self, time_delta):
322        """Go through all topics and check if they were updated in the last time_delta seconds.
323        If not, update the topic.
324
325        :param time_delta: Time in seconds
326        """
327        for topic in self._list_of_topics:
328            if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta:
329                await self.update_topic(topic)
330                logging.info("Topic %s was updated through heart beat.", topic.name)

Go through all topics and check if they were updated in the last time_delta seconds. If not, update the topic.

Parameters
  • time_delta: Time in seconds
def get_app():
361def get_app():
362    """Create an ASGI application for the server.
363
364    :return: ASGI application
365    """
366    server = Server()
367    application = web.Application(logger=None)
368
369    server.sio.attach(application)
370
371    timer = ParallelTimer(server)
372    timer.start()
373
374    return application

Create an ASGI application for the server.

Returns

ASGI application