diff --git a/interactive_test_client.py b/interactive_test_client.py new file mode 100644 index 0000000..4f073eb --- /dev/null +++ b/interactive_test_client.py @@ -0,0 +1,75 @@ +import selectors +import socket +import types +import time +import sys +import select +import tty +import termios +def isData(): + return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []) + + +HOST = '127.0.0.1' # The server's hostname or IP address +PORT = 80 # The port used by the server +sel = selectors.DefaultSelector() +i = 0 +message = "" +def service_connection(key, mask, message): + sock = key.fileobj + data = key.data + if mask & selectors.EVENT_READ: + recv_data = sock.recv(1024) # Should be ready to read + if recv_data: + print('received', repr(recv_data),flush=True) + + if mask & selectors.EVENT_WRITE: + if message: + data.outb += message + message = "" + print('sending', repr(data.outb)) + sent = sock.send(data.outb.encode('utf-8')) # Should be ready to write + data.outb = data.outb[sent:] + +def start_connection(host, port): + server_addr = (host, port) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setblocking(False) + sock.connect_ex(server_addr) + events = selectors.EVENT_READ | selectors.EVENT_WRITE + data = types.SimpleNamespace(outb='') + sel.register(sock, events, data=data) + + +start_connection(HOST, PORT) + +old_settings = termios.tcgetattr(sys.stdin) + +try: + tty.setcbreak(sys.stdin.fileno()) + + temp_message = "" + while 1: + events = sel.select(timeout=None) + for key, mask in events: + if key.data is None: + pass + else: + service_connection(key, mask, message) + message = "" + + if isData(): + c = sys.stdin.read(1) + + if c == '\x1b': # x1b is ESC + break + temp_message = temp_message+c + if c == '\n': + message = temp_message + temp_message = "" +finally: + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) + + + + \ No newline at end of file diff --git a/server.py b/server.py index 6e8197f..c38e889 100644 --- a/server.py +++ b/server.py @@ -2,26 +2,83 @@ import socket import selectors import types -sel = selectors.DefaultSelector() -host = '127.0.0.1' -port = 80 -lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -lsock.bind((host, port)) -lsock.listen() -print('listening on', (host, port)) -lsock.setblocking(False) -sel.register(lsock, selectors.EVENT_READ, data=None) -clients = [] +def decodeMessage(message, addr): + global last_client_id + global rooms + global clients + global temporary_clients + # first get the client sending the message + client = temporary_clients[addr] + decodedMessage = message.split(":") + if len(decodedMessage) < 1: print("Invalid message received"); return + messageType = decodedMessage[0] + + if not client.logged_in and messageType == '0' and len(decodedMessage) == 3: + client.username = decodedMessage[1] + client.id = last_client_id + last_client_id = last_client_id+1 + #probaby check password too against a database of some sort where we store lots of good stuff + client.logged_in = True + #add to this list too + clients[client.id] = client + print("sending back login success") + client.outb += f"0:{client.id}:\n" + elif client.logged_in: + if messageType == '1': + + response = "1:" + ",".join([room.name+"-"+str(len(room.clients)) for room in rooms.values()]) + "\n" + client.outb += response + if messageType == '2' and len(decodedMessage) > 1: + #join or create a room + roomName = decodedMessage[1] + if roomName == '-1': + #leave the room + try: + rooms[client.room].clients.remove(client) + if(len(rooms[client.room].clients) == 0): + del rooms[client.room] + except Exception as e: + print("not in room") + client.room = '' + else: + if roomName in rooms: + #join the room + rooms[roomName].clients.append(client) + else: + #create the room and join + rooms[roomName] = types.SimpleNamespace(name=roomName,clients=[client]) + client.room = roomName #client joins the room + #send everyone in the room a message + for client in rooms[roomName].clients: + client.outb += f"2:{client.id}:1\n" + if messageType == '3' and len(decodedMessage) > 2: + subMessageType = decodedMessage[1] + if subMessageType == '0': + #send a message to everyone in the room + for c in rooms[client.room].clients: + c.outb += f"3:{client.id}:{decodedMessage[2]}\n" + + elif subMessageType == '1': + for c in rooms[client.room].clients: + if client.id != c.id: + c.outb += f"3:{client.id}:{decodedMessage[2]}\n" + pass + elif subMessageType == '2': + #send a message to the client ids indicated + + pass + def accept_wrapper(sock): conn, addr = sock.accept() # Should be ready to read print('accepted connection from', addr) conn.setblocking(False) - client = types.SimpleNamespace(addr=addr, inb='', outb='') + client = types.SimpleNamespace(id=-1, addr=addr, inb='', outb='', logged_in=False, username='',room='') #surrogate for class events = selectors.EVENT_READ | selectors.EVENT_WRITE sel.register(conn, events, data=client) - clients.append(client) + temporary_clients[addr] = client #add to the clients dictionary + def service_connection(key, mask): sock = key.fileobj @@ -36,14 +93,19 @@ def service_connection(key, mask): messages[0]= data.inb + messages[0] data.inb = "" for message in messages[:-1]: - #send to other clients - for client in clients: - if client.addr != data.addr: - client.outb += message + "\n" + decodeMessage(message, data.addr) data.inb += messages[-1] else: print('closing connection to', data.addr) + client = temporary_clients[data.addr] + temporary_clients.pop(data.addr) + if client.logged_in: + clients.pop(client.id) + if(client.room != ''): + rooms[client.room].clients.remove(client) + if(len(rooms[client.room].clients) == 0): + del rooms[client.room] sel.unregister(sock) sock.close() if mask & selectors.EVENT_WRITE: @@ -52,13 +114,34 @@ def service_connection(key, mask): sent = sock.send(data.outb.encode('utf-8')) # Should be ready to write data.outb = data.outb[sent:] -while True: - events = sel.select(timeout=None) - for key, mask in events: - if key.data is None: - accept_wrapper(key.fileobj) - else: - service_connection(key, mask) +sel = selectors.DefaultSelector() +host = '127.0.0.1' +port = 80 +temporary_clients = {} #organized by addr +clients = {} #clients is a dictionary organized by an increasing id number. For now, passwords are irrelevant +rooms = {} + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as lsock: + lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + lsock.bind((host, port)) + lsock.listen() + print('listening on', (host, port)) + lsock.setblocking(False) + sel.register(lsock, selectors.EVENT_READ, data=None) + + + + last_client_id = 0 + + while True: + events = sel.select(timeout=None) + for key, mask in events: + if key.data is None: + accept_wrapper(key.fileobj) + else: + service_connection(key, mask) + \ No newline at end of file diff --git a/threaded_server.py b/threaded_server.py new file mode 100644 index 0000000..7aaea99 --- /dev/null +++ b/threaded_server.py @@ -0,0 +1,211 @@ + +import socket +from _thread import * +import threading +import types +#each room gets a message queue. This is created upon joining a room. +#List of room message queues is a locked resource for all threads. +#List of clients is a locked resource for all threads. +#Each client has a message queue. +#client write thread draws from this message queue +#client read thread reads from client, and when it gets a complete message, adds it to the message queue +#client has a read thread, write thread, + +rooms = {} #will be a list of room objects. #this must be locked when when adding or removing rooms +rooms_lock = threading.Lock() +client_dict = {} #will be a list of client objects. #this must be locked when adding or removing clients +client_lock = threading.Lock() +HOST = "" +PORT = 80 + + +def send_synced_room_message(roomName, message, exclude_client=None): #guaranteed to be received by all clients in order, mostly use for handling ownership + rooms_lock.acquire() + if roomName in rooms: + room_lock = rooms[roomName].room_lock + clients = rooms[roomName].clients + else: + rooms_lock.release() + return + rooms_lock.release() + room_lock.acquire() + for c in clients: + if (exclude_client != None) and (c.id == exclude_client.id): continue + send_client_message(c,message) + room_lock.release() + +def send_room_message(roomName, message, exclude_client=None): #not guaranteed to be received by all clients in order + rooms_lock.acquire() + if roomName in rooms: + clients = rooms[roomName].clients + else: + rooms_lock.release() + return + rooms_lock.release() + for c in clients: + if (exclude_client != None) and (c.id == exclude_client.id): continue + send_client_message(c,message) + +def send_client_message(client, message): + client.message_lock.acquire() + client.message_queue.append(message) + client.message_lock.release() + client.message_ready.set() + +def decode_message(client,message): + global rooms + global rooms_lock + decodedMessage = message.split(":") + if len(decodedMessage) < 1: print("Invalid message received"); return + messageType = decodedMessage[0] + + if not client.logged_in and messageType == '0' and len(decodedMessage) == 3: + client.username = decodedMessage[1] + #probaby check password too against a database of some sort where we store lots of good stuff + client.logged_in = True + send_client_message(client,f"0:{client.id}:\n") + + elif client.logged_in: + if messageType == '1': + rooms_lock.acquire() + response = "1:" + ",".join([room.name+"-"+str(len(room.clients)) for room in rooms.values()]) + "\n" + rooms_lock.release() + send_client_message(client,response) + if messageType == '2' and len(decodedMessage) > 1: + + #join or create a room + + roomName = decodedMessage[1] + if client.room == roomName: #don't join the same room + pass + elif (roomName == '-1') and client.room != '': #can't leave a room if you aren't in one + #leave the room + rooms_lock.acquire() + try: + + rooms[client.room].clients.remove(client) + if(len(rooms[client.room].clients) == 0): + del rooms[client.room] + except Exception as e: + print("not in room") + rooms_lock.release() + send_room_message(client.room, f"2:{client.id}:\n") + send_client_message(client,f"2:{client.id}:\n") + client.room = '' + else: #join or create the room + rooms_lock.acquire() + if roomName in rooms: + #join the room + rooms[roomName].clients.append(client) + else: + #create the room and join + rooms[roomName] = types.SimpleNamespace(name=roomName,clients=[client],room_lock=threading.Lock()) + rooms_lock.release() + + if (client.room != '') and (client.room != roomName): #client left the previous room + send_room_message(client.room, f"2:{client.id}:{roomName}:\n") + + client.room = roomName #client joins the new room + #send a message to the clients new room that they joined! + send_room_message(roomName, f"2:{client.id}:{client.room}\n") + + + + + if messageType == '3' and len(decodedMessage) > 2: + subMessageType = decodedMessage[1] + if subMessageType == '0': + #send a message to everyone in the room (not synced) + send_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n",client) + elif subMessageType == '1': + send_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n") + elif subMessageType == '2': + #send a message to everyone in the room (not synced) + send_synced_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n",client) + elif subMessageType == '3': + send_synced_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n") + +def client_read_thread(conn, addr, client): + global rooms + global rooms_lock + global client_dict + global client_lock + while client.alive: + try: + recv_data = conn.recv(1024) + except Exception as e: + client.alive = False + client.message_ready.set() + continue + if not recv_data: + client.alive = False + client.message_ready.set() #in case it's waiting for a message + else: + m = recv_data.decode("utf-8") + messages = m.split("\n") + if len(messages) > 1: + messages[0]= client.inb + messages[0] + client.inb = "" + for message in messages[:-1]: + decode_message(client, message) + client.inb += messages[-1] + while not client.write_thread_dead: + client.message_ready.set() + pass + #now we can kill the client, removing the client from the rooms + client_lock.acquire() + rooms_lock.acquire() + if client.room != '': + rooms[client.room].clients.remove(client) + if(len(rooms[client.room].clients) == 0): + del rooms[client.room] + del client_dict[client.id] #remove the client from the list of clients... + rooms_lock.release() + client_lock.release() + print("sending room message") + send_room_message(client.room, f"2:{client.id}:\n") + print("client destroyed") +def client_write_thread(conn, addr, client): + while client.alive: + + client.message_lock.acquire() + for message in client.message_queue: + try: + conn.sendall(message.encode('utf-8')) + except: + break #if the client is dead the read thread will catch it + client.message_queue = [] + client.message_lock.release() + client.message_ready.wait() + client.message_ready.clear() + client.write_thread_dead = True +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + sock.bind((HOST, PORT)) + sock.listen() + next_client_id = 0 + while True: + + c, addr = sock.accept() #blocks until a connection is made + client = types.SimpleNamespace(id=next_client_id, + alive=True, + message_queue=[], + message_lock=threading.Lock(), + inb='', + message_ready=threading.Event(), + logged_in=False, + username='', + room='', + write_thread_dead=False + ) + client_lock.acquire() + client_dict[next_client_id] = client + client_lock.release() + + next_client_id += 1 + + start_new_thread(client_read_thread, (c, addr, client)) + start_new_thread(client_write_thread, (c, addr, client)) + + diff --git a/threaded_test_client.py b/threaded_test_client.py new file mode 100644 index 0000000..a862422 --- /dev/null +++ b/threaded_test_client.py @@ -0,0 +1,41 @@ +import socket +import time +from _thread import * +import threading + +server_addr = ('127.0.0.1', 80) +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect_ex(server_addr) + + + + +def readThread(sock): + + while True: + data = sock.recv(1024) + print(data.decode('utf-8')); + +def writeThread(sock): + i = 0 + sock.sendall('0::\n'.encode('utf-8')) + sock.sendall('2:myroom\n'.encode('utf-8')) + + + while True: + sock.sendall(f'3:0:{"thasdl;fjasd;lfjasl;dfjal;skdjlask;dflasd;jkjfjkjfsfjfjakfjafjdfjakjflfjadjf;jfakdjfdjfakdjfsdj;ldjf;laskdflsdjfasdkjfkdjflskdjfskdjflkfjlkdjfskdjfkjfskdjf;kfjs;kfjadkfjas;ldfalsdkfsdkfjasdkjfasdkfjlkdjfkdjflkdjf;djfadkfjaldkfjalkfja;kfja;kfjadkfjadkfja;sdkfa;dkfj;dfkjaslkfjas;dkfs;dkfjsldfjasdfjaldfjaldkfj;lkj"}\n'.encode('utf-8')) + i = i+1 + time.sleep(0.1) + + + +start_new_thread(readThread,(sock,)) +start_new_thread(writeThread,(sock,)) + +while True: + time.sleep(1) + +sock.close() + + +