server
Server for publisher subscriber system. For more information, please run python server.py --help
1"""Server for publisher subscriber system. For more information, please run `python server.py --help`""" 2 3import asyncio 4import logging 5import time 6import functools 7from argparse import ArgumentParser 8from datetime import datetime 9from threading import Lock, Thread 10from typing import List, Optional, Union, Dict 11 12import socketio 13from aiohttp import web 14 15from transport_message import TransportMessage 16 17 18## Setup logging ## 19# Set all loggers to ERROR level 20loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] 21for logger in loggers: 22 logger.setLevel(logging.ERROR) 23 24# Set server logger to INFO level 25# Create file handler which logs even debug messages 26file_handler = logging.FileHandler("server.log") 27file_handler.setLevel(logging.DEBUG) 28 29# Create console handler 30console_handler = logging.StreamHandler() 31console_handler.setLevel(logging.DEBUG) 32 33# Create formatter and add it to the handlers 34formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") 35file_handler.setFormatter(formatter) 36console_handler.setFormatter(formatter) 37 38# Add the handlers to the logger 39logging.getLogger().addHandler(file_handler) 40logging.getLogger().addHandler(console_handler) 41logging.getLogger().setLevel(logging.DEBUG) 42 43# logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") 44 45 46class ParallelTimer(Thread): 47 """Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds.""" 48 49 def __init__(self, server) -> None: 50 """Constructor of ParallelTimer class. 51 52 :param server: server object 53 """ 54 super().__init__() 55 self.server = server 56 57 def run(self): 58 """ 59 Starting parallel timer in loop. 60 """ 61 while 1: 62 heartbeat = self.server.heart_beat(20) 63 asyncio.run(heartbeat) 64 65 66class Topic: 67 """Class to manage the Topics with needed data.""" 68 69 name: Union[None, str] = None 70 """name of the topic""" 71 content: Union[None, str] = None 72 """content of the topic""" 73 subscribers: List[str] = [] 74 """list of subscribers""" 75 timestamp: Union[None, int] = None 76 """timestamp""" 77 last_update: Union[None, int] = None 78 """last update of topic""" 79 80 81class Server: 82 def __init__(self) -> None: 83 self._list_of_topics: List[Topic] = [] 84 self._sid_ip_mapping: Dict[str, str] = {} 85 self._lock = Lock() 86 87 self.sio = socketio.AsyncServer( 88 async_mode="aiohttp", cors_allowed_origins="*", logger=False, engineio_logger=False 89 ) 90 self.sio.event(self.connect) 91 self.sio.on("SUBSCRIBE_TOPIC", self.handle_subscribe) 92 self.sio.on("UNSUBSCRIBE_TOPIC", self.handle_unsubscribe) 93 self.sio.on("PUBLISH_TOPIC", self.handle_publish) 94 self.sio.on("LIST_TOPICS", self.handle_list_topics) 95 self.sio.on("GET_TOPIC_STATUS", self.handle_topic_status) 96 97 def _check_data_none_decorator(func): 98 """Decorator for checking if data is None. 99 If data is None, the client will receive an error message. 100 """ 101 102 @functools.wraps(func) 103 async def wrapper(self, *args, **kwargs): 104 sid = args[0] 105 data = args[1] if len(args) > 1 else None 106 if data is None: 107 response = TransportMessage( 108 timestamp=int(time.time()), payload="Missing payload of type TransportMessage." 109 ) 110 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 111 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 112 return None 113 return await func(self, *args, **kwargs) 114 115 return wrapper 116 117 def _check_topic_decorator(func): 118 """Decorator for checking if topic exists. 119 If topic does not exist, the client will receive an error message. 120 """ 121 122 @functools.wraps(func) 123 async def wrapper(self, *args, **kwargs): 124 sid = args[0] 125 data = args[1] if len(args) > 1 else None 126 try: 127 parsed_data = TransportMessage.parse_raw(data) 128 except Exception: 129 response = TransportMessage(timestamp=int(time.time()), payload="Invalid payload.") 130 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 131 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 132 return None 133 134 # Check if data contains topic 135 if parsed_data.topic is None: 136 response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter topic.") 137 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 138 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 139 return None 140 return await func(self, *args, **kwargs) 141 142 return wrapper 143 144 async def connect(self, sid, environ, auth=None): 145 """Called when a client connects to the server. 146 147 :param sid: Generated session id 148 :param environ: Environment variables 149 :param auth: Unused 150 """ 151 logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid) 152 self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote 153 154 @_check_data_none_decorator 155 @_check_topic_decorator 156 async def handle_subscribe(self, sid, data=None) -> None: 157 """Called when a client subscribes to a topic. 158 If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing 159 changes. Otherwise the client will be subscribed to the topic and will receive updates. 160 161 :param sid: Generated session id 162 :param data: Data sent by the client 163 """ 164 data = TransportMessage.parse_raw(data) 165 topic = self._get_topic_by_name(data.topic) 166 if topic is not None: 167 # Check if sid already subscribed to topic 168 if sid in topic.subscribers: 169 response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.") 170 else: 171 # Subscribe to topic 172 topic.subscribers.append(sid) 173 response = TransportMessage( 174 timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}." 175 ) 176 else: 177 # Create new topic if not already existing and subscribe 178 new_topic = Topic() 179 new_topic.name = data.topic 180 new_topic.subscribers.append(sid) 181 self._add_topic(new_topic) 182 response = TransportMessage( 183 timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed." 184 ) 185 186 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid) 187 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 188 189 @_check_data_none_decorator 190 @_check_topic_decorator 191 async def handle_unsubscribe(self, sid, data=None) -> None: 192 """Called when a client unsubscribes from a topic. 193 If the client is not subscribed to the topic or topic does not exist, the client will receive an error message. 194 Otherwise the client will be unsubscribed from the topic and will not receive any updates. 195 If the topic has no subscribers left it will be deleted. 196 197 :param sid: Generated session id 198 :param data: Data sent by the client 199 """ 200 201 data = TransportMessage.parse_raw(data) 202 topic = self._get_topic_by_name(data.topic) 203 204 if topic is not None: 205 # Check if sid subscribed to topic and unsubscribe 206 if sid in topic.subscribers: 207 topic.subscribers.remove(sid) 208 response = TransportMessage( 209 timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}." 210 ) 211 # Delete topic if no subscribers left 212 if len(topic.subscribers) == 0: 213 self._remove_topic(topic) 214 else: 215 # Not subscribed 216 response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.") 217 218 else: 219 # Topic not existing 220 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 221 222 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 223 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 224 225 @_check_data_none_decorator 226 @_check_topic_decorator 227 async def handle_publish(self, sid, data=None) -> None: 228 """Called when a client publishes a message to a topic. 229 The message will be published to the topic and all subscribers will receive the message. 230 231 :param sid: Generated session id 232 :param data: Data sent by the client 233 """ 234 data = TransportMessage.parse_raw(data) 235 topic = self._get_topic_by_name(data.topic) 236 237 # Check if data contains payload 238 if data.payload is None: 239 response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.") 240 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 241 return None 242 243 if topic is not None: 244 # Publish message to topic 245 topic.content = data.payload 246 topic.timestamp = data.timestamp 247 response = TransportMessage( 248 timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}." 249 ) 250 await self.update_topic(topic) 251 else: 252 # Topic not existing 253 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 254 255 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 256 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 257 258 async def handle_list_topics(self, sid, data=None) -> None: 259 """Called when a client requests a list of all topics. 260 The client will receive a list of all topics. 261 262 :param sid: Generated session id 263 :param data: Data sent by the client. Unused 264 """ 265 response_msg = "All topics on the server:" 266 for topic in self._list_of_topics: 267 response_msg += f"\n{topic.name}" 268 269 response = TransportMessage(timestamp=int(time.time()), payload=response_msg) 270 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 271 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 272 273 @_check_data_none_decorator 274 @_check_topic_decorator 275 async def handle_topic_status(self, sid, data=None) -> None: 276 """Called when a client requests the status of a topic. 277 The client will receive the status of the topic. 278 279 :param sid: Generated session id 280 :param data: Data sent by the client 281 """ 282 283 data = TransportMessage.parse_raw(data) 284 topic = self._get_topic_by_name(data.topic) 285 286 if topic is not None: 287 subscribers = "" 288 for subscriber in topic.subscribers: 289 subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t" 290 291 if topic.content is None or topic.timestamp is None: 292 topic_status = ( 293 f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet." 294 ) 295 else: 296 topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}" 297 298 response = TransportMessage(timestamp=int(time.time()), payload=topic_status) 299 else: 300 # Topic not existing 301 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 302 303 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 304 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 305 306 async def update_topic(self, topic: Topic) -> None: 307 """Called when a topic is updated. 308 The subscribers of the topic will receive the updated topic. 309 310 :param topic: The topic 311 """ 312 topic.last_update = int(time.time()) 313 response = TransportMessage( 314 timestamp=int(time.time()), 315 payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}", 316 ) 317 for sub in topic.subscribers: 318 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub) 319 320 async def heart_beat(self, time_delta): 321 """Go through all topics and check if they were updated in the last time_delta seconds. 322 If not, update the topic. 323 324 :param time_delta: Time in seconds 325 """ 326 for topic in self._list_of_topics: 327 if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta: 328 await self.update_topic(topic) 329 logging.info("Topic %s was updated through heart beat.", topic.name) 330 331 def _get_topic_by_name(self, name: str) -> Optional[Topic]: 332 """Get a topic by its name. 333 334 :param name: Name of the topic 335 :return: Topic object 336 """ 337 for topic in self._list_of_topics: 338 if topic.name == name: 339 return topic 340 return None 341 342 def _add_topic(self, topic: Topic) -> None: 343 """Add a topic to the list of topics. 344 345 :param topic: Topic object 346 """ 347 with self._lock: 348 self._list_of_topics.append(topic) 349 350 def _remove_topic(self, topic: Topic) -> None: 351 """Remove a topic from the list of topics. 352 353 :param topic: Topic object 354 """ 355 with self._lock: 356 logging.warning("Topic %s was removed.", topic.name) 357 self._list_of_topics.remove(topic) 358 359 360def get_app(): 361 """Create an ASGI application for the server. 362 363 :return: ASGI application 364 """ 365 server = Server() 366 application = web.Application(logger=None) 367 368 server.sio.attach(application) 369 370 timer = ParallelTimer(server) 371 timer.start() 372 373 return application 374 375 376if __name__ == "__main__": 377 parser = ArgumentParser(prog="server.py", description="Starts a server for publisher subscriber system") 378 parser.add_argument( 379 "-p", "--port", type=str, help="Port to run the server on. Default is 8080", default=8080, metavar="STRING" 380 ) 381 parser.add_argument( 382 "--host", type=str, help="Host to run the server on. Default is localhost", default="127.0.0.1", metavar="STRING" 383 ) 384 params = parser.parse_args() 385 386 # wrap with ASGI application 387 app = get_app() 388 web.run_app(app, host=params.host, port=params.port)
47class ParallelTimer(Thread): 48 """Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds.""" 49 50 def __init__(self, server) -> None: 51 """Constructor of ParallelTimer class. 52 53 :param server: server object 54 """ 55 super().__init__() 56 self.server = server 57 58 def run(self): 59 """ 60 Starting parallel timer in loop. 61 """ 62 while 1: 63 heartbeat = self.server.heart_beat(20) 64 asyncio.run(heartbeat)
Class to manage a parallel timer in different thread on the server triggering a heart beat algorithm every 20 seconds.
50 def __init__(self, server) -> None: 51 """Constructor of ParallelTimer class. 52 53 :param server: server object 54 """ 55 super().__init__() 56 self.server = server
Constructor of ParallelTimer class.
Parameters
- server: server object
58 def run(self): 59 """ 60 Starting parallel timer in loop. 61 """ 62 while 1: 63 heartbeat = self.server.heart_beat(20) 64 asyncio.run(heartbeat)
Starting parallel timer in loop.
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- daemon
- isDaemon
- setDaemon
- getName
- setName
- native_id
67class Topic: 68 """Class to manage the Topics with needed data.""" 69 70 name: Union[None, str] = None 71 """name of the topic""" 72 content: Union[None, str] = None 73 """content of the topic""" 74 subscribers: List[str] = [] 75 """list of subscribers""" 76 timestamp: Union[None, int] = None 77 """timestamp""" 78 last_update: Union[None, int] = None 79 """last update of topic"""
Class to manage the Topics with needed data.
82class Server: 83 def __init__(self) -> None: 84 self._list_of_topics: List[Topic] = [] 85 self._sid_ip_mapping: Dict[str, str] = {} 86 self._lock = Lock() 87 88 self.sio = socketio.AsyncServer( 89 async_mode="aiohttp", cors_allowed_origins="*", logger=False, engineio_logger=False 90 ) 91 self.sio.event(self.connect) 92 self.sio.on("SUBSCRIBE_TOPIC", self.handle_subscribe) 93 self.sio.on("UNSUBSCRIBE_TOPIC", self.handle_unsubscribe) 94 self.sio.on("PUBLISH_TOPIC", self.handle_publish) 95 self.sio.on("LIST_TOPICS", self.handle_list_topics) 96 self.sio.on("GET_TOPIC_STATUS", self.handle_topic_status) 97 98 def _check_data_none_decorator(func): 99 """Decorator for checking if data is None. 100 If data is None, the client will receive an error message. 101 """ 102 103 @functools.wraps(func) 104 async def wrapper(self, *args, **kwargs): 105 sid = args[0] 106 data = args[1] if len(args) > 1 else None 107 if data is None: 108 response = TransportMessage( 109 timestamp=int(time.time()), payload="Missing payload of type TransportMessage." 110 ) 111 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 112 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 113 return None 114 return await func(self, *args, **kwargs) 115 116 return wrapper 117 118 def _check_topic_decorator(func): 119 """Decorator for checking if topic exists. 120 If topic does not exist, the client will receive an error message. 121 """ 122 123 @functools.wraps(func) 124 async def wrapper(self, *args, **kwargs): 125 sid = args[0] 126 data = args[1] if len(args) > 1 else None 127 try: 128 parsed_data = TransportMessage.parse_raw(data) 129 except Exception: 130 response = TransportMessage(timestamp=int(time.time()), payload="Invalid payload.") 131 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 132 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 133 return None 134 135 # Check if data contains topic 136 if parsed_data.topic is None: 137 response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter topic.") 138 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 139 logging.error("%s - %s", self._sid_ip_mapping[sid], response.payload) 140 return None 141 return await func(self, *args, **kwargs) 142 143 return wrapper 144 145 async def connect(self, sid, environ, auth=None): 146 """Called when a client connects to the server. 147 148 :param sid: Generated session id 149 :param environ: Environment variables 150 :param auth: Unused 151 """ 152 logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid) 153 self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote 154 155 @_check_data_none_decorator 156 @_check_topic_decorator 157 async def handle_subscribe(self, sid, data=None) -> None: 158 """Called when a client subscribes to a topic. 159 If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing 160 changes. Otherwise the client will be subscribed to the topic and will receive updates. 161 162 :param sid: Generated session id 163 :param data: Data sent by the client 164 """ 165 data = TransportMessage.parse_raw(data) 166 topic = self._get_topic_by_name(data.topic) 167 if topic is not None: 168 # Check if sid already subscribed to topic 169 if sid in topic.subscribers: 170 response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.") 171 else: 172 # Subscribe to topic 173 topic.subscribers.append(sid) 174 response = TransportMessage( 175 timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}." 176 ) 177 else: 178 # Create new topic if not already existing and subscribe 179 new_topic = Topic() 180 new_topic.name = data.topic 181 new_topic.subscribers.append(sid) 182 self._add_topic(new_topic) 183 response = TransportMessage( 184 timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed." 185 ) 186 187 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid) 188 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 189 190 @_check_data_none_decorator 191 @_check_topic_decorator 192 async def handle_unsubscribe(self, sid, data=None) -> None: 193 """Called when a client unsubscribes from a topic. 194 If the client is not subscribed to the topic or topic does not exist, the client will receive an error message. 195 Otherwise the client will be unsubscribed from the topic and will not receive any updates. 196 If the topic has no subscribers left it will be deleted. 197 198 :param sid: Generated session id 199 :param data: Data sent by the client 200 """ 201 202 data = TransportMessage.parse_raw(data) 203 topic = self._get_topic_by_name(data.topic) 204 205 if topic is not None: 206 # Check if sid subscribed to topic and unsubscribe 207 if sid in topic.subscribers: 208 topic.subscribers.remove(sid) 209 response = TransportMessage( 210 timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}." 211 ) 212 # Delete topic if no subscribers left 213 if len(topic.subscribers) == 0: 214 self._remove_topic(topic) 215 else: 216 # Not subscribed 217 response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.") 218 219 else: 220 # Topic not existing 221 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 222 223 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 224 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 225 226 @_check_data_none_decorator 227 @_check_topic_decorator 228 async def handle_publish(self, sid, data=None) -> None: 229 """Called when a client publishes a message to a topic. 230 The message will be published to the topic and all subscribers will receive the message. 231 232 :param sid: Generated session id 233 :param data: Data sent by the client 234 """ 235 data = TransportMessage.parse_raw(data) 236 topic = self._get_topic_by_name(data.topic) 237 238 # Check if data contains payload 239 if data.payload is None: 240 response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.") 241 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 242 return None 243 244 if topic is not None: 245 # Publish message to topic 246 topic.content = data.payload 247 topic.timestamp = data.timestamp 248 response = TransportMessage( 249 timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}." 250 ) 251 await self.update_topic(topic) 252 else: 253 # Topic not existing 254 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 255 256 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 257 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 258 259 async def handle_list_topics(self, sid, data=None) -> None: 260 """Called when a client requests a list of all topics. 261 The client will receive a list of all topics. 262 263 :param sid: Generated session id 264 :param data: Data sent by the client. Unused 265 """ 266 response_msg = "All topics on the server:" 267 for topic in self._list_of_topics: 268 response_msg += f"\n{topic.name}" 269 270 response = TransportMessage(timestamp=int(time.time()), payload=response_msg) 271 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 272 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 273 274 @_check_data_none_decorator 275 @_check_topic_decorator 276 async def handle_topic_status(self, sid, data=None) -> None: 277 """Called when a client requests the status of a topic. 278 The client will receive the status of the topic. 279 280 :param sid: Generated session id 281 :param data: Data sent by the client 282 """ 283 284 data = TransportMessage.parse_raw(data) 285 topic = self._get_topic_by_name(data.topic) 286 287 if topic is not None: 288 subscribers = "" 289 for subscriber in topic.subscribers: 290 subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t" 291 292 if topic.content is None or topic.timestamp is None: 293 topic_status = ( 294 f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet." 295 ) 296 else: 297 topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}" 298 299 response = TransportMessage(timestamp=int(time.time()), payload=topic_status) 300 else: 301 # Topic not existing 302 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 303 304 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 305 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload) 306 307 async def update_topic(self, topic: Topic) -> None: 308 """Called when a topic is updated. 309 The subscribers of the topic will receive the updated topic. 310 311 :param topic: The topic 312 """ 313 topic.last_update = int(time.time()) 314 response = TransportMessage( 315 timestamp=int(time.time()), 316 payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}", 317 ) 318 for sub in topic.subscribers: 319 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub) 320 321 async def heart_beat(self, time_delta): 322 """Go through all topics and check if they were updated in the last time_delta seconds. 323 If not, update the topic. 324 325 :param time_delta: Time in seconds 326 """ 327 for topic in self._list_of_topics: 328 if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta: 329 await self.update_topic(topic) 330 logging.info("Topic %s was updated through heart beat.", topic.name) 331 332 def _get_topic_by_name(self, name: str) -> Optional[Topic]: 333 """Get a topic by its name. 334 335 :param name: Name of the topic 336 :return: Topic object 337 """ 338 for topic in self._list_of_topics: 339 if topic.name == name: 340 return topic 341 return None 342 343 def _add_topic(self, topic: Topic) -> None: 344 """Add a topic to the list of topics. 345 346 :param topic: Topic object 347 """ 348 with self._lock: 349 self._list_of_topics.append(topic) 350 351 def _remove_topic(self, topic: Topic) -> None: 352 """Remove a topic from the list of topics. 353 354 :param topic: Topic object 355 """ 356 with self._lock: 357 logging.warning("Topic %s was removed.", topic.name) 358 self._list_of_topics.remove(topic)
145 async def connect(self, sid, environ, auth=None): 146 """Called when a client connects to the server. 147 148 :param sid: Generated session id 149 :param environ: Environment variables 150 :param auth: Unused 151 """ 152 logging.info("%s - SID: %s connected", environ["aiohttp.request"].remote, sid) 153 self._sid_ip_mapping[sid] = environ["aiohttp.request"].remote
Called when a client connects to the server.
Parameters
- sid: Generated session id
- environ: Environment variables
- auth: Unused
155 @_check_data_none_decorator 156 @_check_topic_decorator 157 async def handle_subscribe(self, sid, data=None) -> None: 158 """Called when a client subscribes to a topic. 159 If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing 160 changes. Otherwise the client will be subscribed to the topic and will receive updates. 161 162 :param sid: Generated session id 163 :param data: Data sent by the client 164 """ 165 data = TransportMessage.parse_raw(data) 166 topic = self._get_topic_by_name(data.topic) 167 if topic is not None: 168 # Check if sid already subscribed to topic 169 if sid in topic.subscribers: 170 response = TransportMessage(timestamp=int(time.time()), payload=f"Already subscribed to {data.topic}.") 171 else: 172 # Subscribe to topic 173 topic.subscribers.append(sid) 174 response = TransportMessage( 175 timestamp=int(time.time()), payload=f"Successfully subscribed to {data.topic}." 176 ) 177 else: 178 # Create new topic if not already existing and subscribe 179 new_topic = Topic() 180 new_topic.name = data.topic 181 new_topic.subscribers.append(sid) 182 self._add_topic(new_topic) 183 response = TransportMessage( 184 timestamp=int(time.time()), payload=f"Created {data.topic} and successfully subscribed." 185 ) 186 187 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sid) 188 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
Called when a client subscribes to a topic. If the topic does not exist, it will be created. If the client is already subscribed to the topic, nothing changes. Otherwise the client will be subscribed to the topic and will receive updates.
Parameters
- sid: Generated session id
- data: Data sent by the client
190 @_check_data_none_decorator 191 @_check_topic_decorator 192 async def handle_unsubscribe(self, sid, data=None) -> None: 193 """Called when a client unsubscribes from a topic. 194 If the client is not subscribed to the topic or topic does not exist, the client will receive an error message. 195 Otherwise the client will be unsubscribed from the topic and will not receive any updates. 196 If the topic has no subscribers left it will be deleted. 197 198 :param sid: Generated session id 199 :param data: Data sent by the client 200 """ 201 202 data = TransportMessage.parse_raw(data) 203 topic = self._get_topic_by_name(data.topic) 204 205 if topic is not None: 206 # Check if sid subscribed to topic and unsubscribe 207 if sid in topic.subscribers: 208 topic.subscribers.remove(sid) 209 response = TransportMessage( 210 timestamp=int(time.time()), payload=f"Successfully unsubscribed from {data.topic}." 211 ) 212 # Delete topic if no subscribers left 213 if len(topic.subscribers) == 0: 214 self._remove_topic(topic) 215 else: 216 # Not subscribed 217 response = TransportMessage(timestamp=int(time.time()), payload=f"Not subscribed to {data.topic}.") 218 219 else: 220 # Topic not existing 221 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 222 223 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 224 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
Called when a client unsubscribes from a topic. If the client is not subscribed to the topic or topic does not exist, the client will receive an error message. Otherwise the client will be unsubscribed from the topic and will not receive any updates. If the topic has no subscribers left it will be deleted.
Parameters
- sid: Generated session id
- data: Data sent by the client
226 @_check_data_none_decorator 227 @_check_topic_decorator 228 async def handle_publish(self, sid, data=None) -> None: 229 """Called when a client publishes a message to a topic. 230 The message will be published to the topic and all subscribers will receive the message. 231 232 :param sid: Generated session id 233 :param data: Data sent by the client 234 """ 235 data = TransportMessage.parse_raw(data) 236 topic = self._get_topic_by_name(data.topic) 237 238 # Check if data contains payload 239 if data.payload is None: 240 response = TransportMessage(timestamp=int(time.time()), payload="Missing parameter message.") 241 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 242 return None 243 244 if topic is not None: 245 # Publish message to topic 246 topic.content = data.payload 247 topic.timestamp = data.timestamp 248 response = TransportMessage( 249 timestamp=int(time.time()), payload=f"Successfully published message to {data.topic}." 250 ) 251 await self.update_topic(topic) 252 else: 253 # Topic not existing 254 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 255 256 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 257 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
Called when a client publishes a message to a topic. The message will be published to the topic and all subscribers will receive the message.
Parameters
- sid: Generated session id
- data: Data sent by the client
259 async def handle_list_topics(self, sid, data=None) -> None: 260 """Called when a client requests a list of all topics. 261 The client will receive a list of all topics. 262 263 :param sid: Generated session id 264 :param data: Data sent by the client. Unused 265 """ 266 response_msg = "All topics on the server:" 267 for topic in self._list_of_topics: 268 response_msg += f"\n{topic.name}" 269 270 response = TransportMessage(timestamp=int(time.time()), payload=response_msg) 271 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 272 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
Called when a client requests a list of all topics. The client will receive a list of all topics.
Parameters
- sid: Generated session id
- data: Data sent by the client. Unused
274 @_check_data_none_decorator 275 @_check_topic_decorator 276 async def handle_topic_status(self, sid, data=None) -> None: 277 """Called when a client requests the status of a topic. 278 The client will receive the status of the topic. 279 280 :param sid: Generated session id 281 :param data: Data sent by the client 282 """ 283 284 data = TransportMessage.parse_raw(data) 285 topic = self._get_topic_by_name(data.topic) 286 287 if topic is not None: 288 subscribers = "" 289 for subscriber in topic.subscribers: 290 subscribers += f"\t{self._sid_ip_mapping[subscriber]}\n\t" 291 292 if topic.content is None or topic.timestamp is None: 293 topic_status = ( 294 f"\ntopic name:\t{topic.name}\n\nsubscribers:{subscribers}\nThere was no publish on this topic yet." 295 ) 296 else: 297 topic_status = f"\ntopic name:\t{topic.name}\n\ntimestamp:\t{datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}\n\ncontent:\t{topic.content}\n\nsubscribers:{subscribers}" 298 299 response = TransportMessage(timestamp=int(time.time()), payload=topic_status) 300 else: 301 # Topic not existing 302 response = TransportMessage(timestamp=int(time.time()), payload=f"{data.topic} does not exist.") 303 304 await self.sio.emit("PRINT_MESSAGE_AND_EXIT", response.json(), room=sid) 305 logging.info("%s - %s", self._sid_ip_mapping[sid], response.payload)
Called when a client requests the status of a topic. The client will receive the status of the topic.
Parameters
- sid: Generated session id
- data: Data sent by the client
307 async def update_topic(self, topic: Topic) -> None: 308 """Called when a topic is updated. 309 The subscribers of the topic will receive the updated topic. 310 311 :param topic: The topic 312 """ 313 topic.last_update = int(time.time()) 314 response = TransportMessage( 315 timestamp=int(time.time()), 316 payload=f"{topic.name} ({datetime.fromtimestamp(int(topic.timestamp)).strftime('%d-%m-%Y %H:%M:%S')}): {topic.content}", 317 ) 318 for sub in topic.subscribers: 319 await self.sio.emit("PRINT_MESSAGE", response.json(), room=sub)
Called when a topic is updated. The subscribers of the topic will receive the updated topic.
Parameters
- topic: The topic
321 async def heart_beat(self, time_delta): 322 """Go through all topics and check if they were updated in the last time_delta seconds. 323 If not, update the topic. 324 325 :param time_delta: Time in seconds 326 """ 327 for topic in self._list_of_topics: 328 if topic.last_update is not None and int(time.time()) - topic.last_update > time_delta: 329 await self.update_topic(topic) 330 logging.info("Topic %s was updated through heart beat.", topic.name)
Go through all topics and check if they were updated in the last time_delta seconds. If not, update the topic.
Parameters
- time_delta: Time in seconds
361def get_app(): 362 """Create an ASGI application for the server. 363 364 :return: ASGI application 365 """ 366 server = Server() 367 application = web.Application(logger=None) 368 369 server.sio.attach(application) 370 371 timer = ParallelTimer(server) 372 timer.start() 373 374 return application
Create an ASGI application for the server.
Returns
ASGI application