added threaded server and test client
parent
104fe51c6d
commit
c52eb82a65
|
|
@ -0,0 +1,75 @@
|
||||||
|
import selectors
|
||||||
|
import socket
|
||||||
|
import types
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
import select
|
||||||
|
import tty
|
||||||
|
import termios
|
||||||
|
def isData():
|
||||||
|
return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
|
||||||
|
|
||||||
|
|
||||||
|
HOST = '127.0.0.1' # The server's hostname or IP address
|
||||||
|
PORT = 80 # The port used by the server
|
||||||
|
sel = selectors.DefaultSelector()
|
||||||
|
i = 0
|
||||||
|
message = ""
|
||||||
|
def service_connection(key, mask, message):
|
||||||
|
sock = key.fileobj
|
||||||
|
data = key.data
|
||||||
|
if mask & selectors.EVENT_READ:
|
||||||
|
recv_data = sock.recv(1024) # Should be ready to read
|
||||||
|
if recv_data:
|
||||||
|
print('received', repr(recv_data),flush=True)
|
||||||
|
|
||||||
|
if mask & selectors.EVENT_WRITE:
|
||||||
|
if message:
|
||||||
|
data.outb += message
|
||||||
|
message = ""
|
||||||
|
print('sending', repr(data.outb))
|
||||||
|
sent = sock.send(data.outb.encode('utf-8')) # Should be ready to write
|
||||||
|
data.outb = data.outb[sent:]
|
||||||
|
|
||||||
|
def start_connection(host, port):
|
||||||
|
server_addr = (host, port)
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.setblocking(False)
|
||||||
|
sock.connect_ex(server_addr)
|
||||||
|
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||||
|
data = types.SimpleNamespace(outb='')
|
||||||
|
sel.register(sock, events, data=data)
|
||||||
|
|
||||||
|
|
||||||
|
start_connection(HOST, PORT)
|
||||||
|
|
||||||
|
old_settings = termios.tcgetattr(sys.stdin)
|
||||||
|
|
||||||
|
try:
|
||||||
|
tty.setcbreak(sys.stdin.fileno())
|
||||||
|
|
||||||
|
temp_message = ""
|
||||||
|
while 1:
|
||||||
|
events = sel.select(timeout=None)
|
||||||
|
for key, mask in events:
|
||||||
|
if key.data is None:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
service_connection(key, mask, message)
|
||||||
|
message = ""
|
||||||
|
|
||||||
|
if isData():
|
||||||
|
c = sys.stdin.read(1)
|
||||||
|
|
||||||
|
if c == '\x1b': # x1b is ESC
|
||||||
|
break
|
||||||
|
temp_message = temp_message+c
|
||||||
|
if c == '\n':
|
||||||
|
message = temp_message
|
||||||
|
temp_message = ""
|
||||||
|
finally:
|
||||||
|
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
115
server.py
115
server.py
|
|
@ -2,26 +2,83 @@
|
||||||
import socket
|
import socket
|
||||||
import selectors
|
import selectors
|
||||||
import types
|
import types
|
||||||
sel = selectors.DefaultSelector()
|
|
||||||
host = '127.0.0.1'
|
|
||||||
port = 80
|
|
||||||
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
lsock.bind((host, port))
|
|
||||||
lsock.listen()
|
|
||||||
print('listening on', (host, port))
|
|
||||||
lsock.setblocking(False)
|
|
||||||
sel.register(lsock, selectors.EVENT_READ, data=None)
|
|
||||||
|
|
||||||
clients = []
|
def decodeMessage(message, addr):
|
||||||
|
global last_client_id
|
||||||
|
global rooms
|
||||||
|
global clients
|
||||||
|
global temporary_clients
|
||||||
|
# first get the client sending the message
|
||||||
|
client = temporary_clients[addr]
|
||||||
|
decodedMessage = message.split(":")
|
||||||
|
if len(decodedMessage) < 1: print("Invalid message received"); return
|
||||||
|
messageType = decodedMessage[0]
|
||||||
|
|
||||||
|
if not client.logged_in and messageType == '0' and len(decodedMessage) == 3:
|
||||||
|
client.username = decodedMessage[1]
|
||||||
|
client.id = last_client_id
|
||||||
|
last_client_id = last_client_id+1
|
||||||
|
#probaby check password too against a database of some sort where we store lots of good stuff
|
||||||
|
client.logged_in = True
|
||||||
|
#add to this list too
|
||||||
|
clients[client.id] = client
|
||||||
|
print("sending back login success")
|
||||||
|
client.outb += f"0:{client.id}:\n"
|
||||||
|
elif client.logged_in:
|
||||||
|
if messageType == '1':
|
||||||
|
|
||||||
|
response = "1:" + ",".join([room.name+"-"+str(len(room.clients)) for room in rooms.values()]) + "\n"
|
||||||
|
client.outb += response
|
||||||
|
if messageType == '2' and len(decodedMessage) > 1:
|
||||||
|
#join or create a room
|
||||||
|
roomName = decodedMessage[1]
|
||||||
|
if roomName == '-1':
|
||||||
|
#leave the room
|
||||||
|
try:
|
||||||
|
rooms[client.room].clients.remove(client)
|
||||||
|
if(len(rooms[client.room].clients) == 0):
|
||||||
|
del rooms[client.room]
|
||||||
|
except Exception as e:
|
||||||
|
print("not in room")
|
||||||
|
client.room = ''
|
||||||
|
else:
|
||||||
|
if roomName in rooms:
|
||||||
|
#join the room
|
||||||
|
rooms[roomName].clients.append(client)
|
||||||
|
else:
|
||||||
|
#create the room and join
|
||||||
|
rooms[roomName] = types.SimpleNamespace(name=roomName,clients=[client])
|
||||||
|
client.room = roomName #client joins the room
|
||||||
|
#send everyone in the room a message
|
||||||
|
for client in rooms[roomName].clients:
|
||||||
|
client.outb += f"2:{client.id}:1\n"
|
||||||
|
if messageType == '3' and len(decodedMessage) > 2:
|
||||||
|
subMessageType = decodedMessage[1]
|
||||||
|
if subMessageType == '0':
|
||||||
|
#send a message to everyone in the room
|
||||||
|
for c in rooms[client.room].clients:
|
||||||
|
c.outb += f"3:{client.id}:{decodedMessage[2]}\n"
|
||||||
|
|
||||||
|
elif subMessageType == '1':
|
||||||
|
for c in rooms[client.room].clients:
|
||||||
|
if client.id != c.id:
|
||||||
|
c.outb += f"3:{client.id}:{decodedMessage[2]}\n"
|
||||||
|
pass
|
||||||
|
elif subMessageType == '2':
|
||||||
|
#send a message to the client ids indicated
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def accept_wrapper(sock):
|
def accept_wrapper(sock):
|
||||||
conn, addr = sock.accept() # Should be ready to read
|
conn, addr = sock.accept() # Should be ready to read
|
||||||
print('accepted connection from', addr)
|
print('accepted connection from', addr)
|
||||||
conn.setblocking(False)
|
conn.setblocking(False)
|
||||||
client = types.SimpleNamespace(addr=addr, inb='', outb='')
|
client = types.SimpleNamespace(id=-1, addr=addr, inb='', outb='', logged_in=False, username='',room='') #surrogate for class
|
||||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||||
sel.register(conn, events, data=client)
|
sel.register(conn, events, data=client)
|
||||||
clients.append(client)
|
temporary_clients[addr] = client #add to the clients dictionary
|
||||||
|
|
||||||
|
|
||||||
def service_connection(key, mask):
|
def service_connection(key, mask):
|
||||||
sock = key.fileobj
|
sock = key.fileobj
|
||||||
|
|
@ -36,14 +93,19 @@ def service_connection(key, mask):
|
||||||
messages[0]= data.inb + messages[0]
|
messages[0]= data.inb + messages[0]
|
||||||
data.inb = ""
|
data.inb = ""
|
||||||
for message in messages[:-1]:
|
for message in messages[:-1]:
|
||||||
#send to other clients
|
decodeMessage(message, data.addr)
|
||||||
for client in clients:
|
|
||||||
if client.addr != data.addr:
|
|
||||||
client.outb += message + "\n"
|
|
||||||
data.inb += messages[-1]
|
data.inb += messages[-1]
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print('closing connection to', data.addr)
|
print('closing connection to', data.addr)
|
||||||
|
client = temporary_clients[data.addr]
|
||||||
|
temporary_clients.pop(data.addr)
|
||||||
|
if client.logged_in:
|
||||||
|
clients.pop(client.id)
|
||||||
|
if(client.room != ''):
|
||||||
|
rooms[client.room].clients.remove(client)
|
||||||
|
if(len(rooms[client.room].clients) == 0):
|
||||||
|
del rooms[client.room]
|
||||||
sel.unregister(sock)
|
sel.unregister(sock)
|
||||||
sock.close()
|
sock.close()
|
||||||
if mask & selectors.EVENT_WRITE:
|
if mask & selectors.EVENT_WRITE:
|
||||||
|
|
@ -52,6 +114,26 @@ def service_connection(key, mask):
|
||||||
sent = sock.send(data.outb.encode('utf-8')) # Should be ready to write
|
sent = sock.send(data.outb.encode('utf-8')) # Should be ready to write
|
||||||
data.outb = data.outb[sent:]
|
data.outb = data.outb[sent:]
|
||||||
|
|
||||||
|
sel = selectors.DefaultSelector()
|
||||||
|
host = '127.0.0.1'
|
||||||
|
port = 80
|
||||||
|
temporary_clients = {} #organized by addr
|
||||||
|
clients = {} #clients is a dictionary organized by an increasing id number. For now, passwords are irrelevant
|
||||||
|
rooms = {}
|
||||||
|
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as lsock:
|
||||||
|
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
|
||||||
|
lsock.bind((host, port))
|
||||||
|
lsock.listen()
|
||||||
|
print('listening on', (host, port))
|
||||||
|
lsock.setblocking(False)
|
||||||
|
sel.register(lsock, selectors.EVENT_READ, data=None)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
last_client_id = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
events = sel.select(timeout=None)
|
events = sel.select(timeout=None)
|
||||||
for key, mask in events:
|
for key, mask in events:
|
||||||
|
|
@ -62,3 +144,4 @@ while True:
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,211 @@
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from _thread import *
|
||||||
|
import threading
|
||||||
|
import types
|
||||||
|
#each room gets a message queue. This is created upon joining a room.
|
||||||
|
#List of room message queues is a locked resource for all threads.
|
||||||
|
#List of clients is a locked resource for all threads.
|
||||||
|
#Each client has a message queue.
|
||||||
|
#client write thread draws from this message queue
|
||||||
|
#client read thread reads from client, and when it gets a complete message, adds it to the message queue
|
||||||
|
#client has a read thread, write thread,
|
||||||
|
|
||||||
|
rooms = {} #will be a list of room objects. #this must be locked when when adding or removing rooms
|
||||||
|
rooms_lock = threading.Lock()
|
||||||
|
client_dict = {} #will be a list of client objects. #this must be locked when adding or removing clients
|
||||||
|
client_lock = threading.Lock()
|
||||||
|
HOST = ""
|
||||||
|
PORT = 80
|
||||||
|
|
||||||
|
|
||||||
|
def send_synced_room_message(roomName, message, exclude_client=None): #guaranteed to be received by all clients in order, mostly use for handling ownership
|
||||||
|
rooms_lock.acquire()
|
||||||
|
if roomName in rooms:
|
||||||
|
room_lock = rooms[roomName].room_lock
|
||||||
|
clients = rooms[roomName].clients
|
||||||
|
else:
|
||||||
|
rooms_lock.release()
|
||||||
|
return
|
||||||
|
rooms_lock.release()
|
||||||
|
room_lock.acquire()
|
||||||
|
for c in clients:
|
||||||
|
if (exclude_client != None) and (c.id == exclude_client.id): continue
|
||||||
|
send_client_message(c,message)
|
||||||
|
room_lock.release()
|
||||||
|
|
||||||
|
def send_room_message(roomName, message, exclude_client=None): #not guaranteed to be received by all clients in order
|
||||||
|
rooms_lock.acquire()
|
||||||
|
if roomName in rooms:
|
||||||
|
clients = rooms[roomName].clients
|
||||||
|
else:
|
||||||
|
rooms_lock.release()
|
||||||
|
return
|
||||||
|
rooms_lock.release()
|
||||||
|
for c in clients:
|
||||||
|
if (exclude_client != None) and (c.id == exclude_client.id): continue
|
||||||
|
send_client_message(c,message)
|
||||||
|
|
||||||
|
def send_client_message(client, message):
|
||||||
|
client.message_lock.acquire()
|
||||||
|
client.message_queue.append(message)
|
||||||
|
client.message_lock.release()
|
||||||
|
client.message_ready.set()
|
||||||
|
|
||||||
|
def decode_message(client,message):
|
||||||
|
global rooms
|
||||||
|
global rooms_lock
|
||||||
|
decodedMessage = message.split(":")
|
||||||
|
if len(decodedMessage) < 1: print("Invalid message received"); return
|
||||||
|
messageType = decodedMessage[0]
|
||||||
|
|
||||||
|
if not client.logged_in and messageType == '0' and len(decodedMessage) == 3:
|
||||||
|
client.username = decodedMessage[1]
|
||||||
|
#probaby check password too against a database of some sort where we store lots of good stuff
|
||||||
|
client.logged_in = True
|
||||||
|
send_client_message(client,f"0:{client.id}:\n")
|
||||||
|
|
||||||
|
elif client.logged_in:
|
||||||
|
if messageType == '1':
|
||||||
|
rooms_lock.acquire()
|
||||||
|
response = "1:" + ",".join([room.name+"-"+str(len(room.clients)) for room in rooms.values()]) + "\n"
|
||||||
|
rooms_lock.release()
|
||||||
|
send_client_message(client,response)
|
||||||
|
if messageType == '2' and len(decodedMessage) > 1:
|
||||||
|
|
||||||
|
#join or create a room
|
||||||
|
|
||||||
|
roomName = decodedMessage[1]
|
||||||
|
if client.room == roomName: #don't join the same room
|
||||||
|
pass
|
||||||
|
elif (roomName == '-1') and client.room != '': #can't leave a room if you aren't in one
|
||||||
|
#leave the room
|
||||||
|
rooms_lock.acquire()
|
||||||
|
try:
|
||||||
|
|
||||||
|
rooms[client.room].clients.remove(client)
|
||||||
|
if(len(rooms[client.room].clients) == 0):
|
||||||
|
del rooms[client.room]
|
||||||
|
except Exception as e:
|
||||||
|
print("not in room")
|
||||||
|
rooms_lock.release()
|
||||||
|
send_room_message(client.room, f"2:{client.id}:\n")
|
||||||
|
send_client_message(client,f"2:{client.id}:\n")
|
||||||
|
client.room = ''
|
||||||
|
else: #join or create the room
|
||||||
|
rooms_lock.acquire()
|
||||||
|
if roomName in rooms:
|
||||||
|
#join the room
|
||||||
|
rooms[roomName].clients.append(client)
|
||||||
|
else:
|
||||||
|
#create the room and join
|
||||||
|
rooms[roomName] = types.SimpleNamespace(name=roomName,clients=[client],room_lock=threading.Lock())
|
||||||
|
rooms_lock.release()
|
||||||
|
|
||||||
|
if (client.room != '') and (client.room != roomName): #client left the previous room
|
||||||
|
send_room_message(client.room, f"2:{client.id}:{roomName}:\n")
|
||||||
|
|
||||||
|
client.room = roomName #client joins the new room
|
||||||
|
#send a message to the clients new room that they joined!
|
||||||
|
send_room_message(roomName, f"2:{client.id}:{client.room}\n")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if messageType == '3' and len(decodedMessage) > 2:
|
||||||
|
subMessageType = decodedMessage[1]
|
||||||
|
if subMessageType == '0':
|
||||||
|
#send a message to everyone in the room (not synced)
|
||||||
|
send_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n",client)
|
||||||
|
elif subMessageType == '1':
|
||||||
|
send_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n")
|
||||||
|
elif subMessageType == '2':
|
||||||
|
#send a message to everyone in the room (not synced)
|
||||||
|
send_synced_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n",client)
|
||||||
|
elif subMessageType == '3':
|
||||||
|
send_synced_room_message(client.room,f"3:{client.id}:{decodedMessage[2]}\n")
|
||||||
|
|
||||||
|
def client_read_thread(conn, addr, client):
|
||||||
|
global rooms
|
||||||
|
global rooms_lock
|
||||||
|
global client_dict
|
||||||
|
global client_lock
|
||||||
|
while client.alive:
|
||||||
|
try:
|
||||||
|
recv_data = conn.recv(1024)
|
||||||
|
except Exception as e:
|
||||||
|
client.alive = False
|
||||||
|
client.message_ready.set()
|
||||||
|
continue
|
||||||
|
if not recv_data:
|
||||||
|
client.alive = False
|
||||||
|
client.message_ready.set() #in case it's waiting for a message
|
||||||
|
else:
|
||||||
|
m = recv_data.decode("utf-8")
|
||||||
|
messages = m.split("\n")
|
||||||
|
if len(messages) > 1:
|
||||||
|
messages[0]= client.inb + messages[0]
|
||||||
|
client.inb = ""
|
||||||
|
for message in messages[:-1]:
|
||||||
|
decode_message(client, message)
|
||||||
|
client.inb += messages[-1]
|
||||||
|
while not client.write_thread_dead:
|
||||||
|
client.message_ready.set()
|
||||||
|
pass
|
||||||
|
#now we can kill the client, removing the client from the rooms
|
||||||
|
client_lock.acquire()
|
||||||
|
rooms_lock.acquire()
|
||||||
|
if client.room != '':
|
||||||
|
rooms[client.room].clients.remove(client)
|
||||||
|
if(len(rooms[client.room].clients) == 0):
|
||||||
|
del rooms[client.room]
|
||||||
|
del client_dict[client.id] #remove the client from the list of clients...
|
||||||
|
rooms_lock.release()
|
||||||
|
client_lock.release()
|
||||||
|
print("sending room message")
|
||||||
|
send_room_message(client.room, f"2:{client.id}:\n")
|
||||||
|
print("client destroyed")
|
||||||
|
def client_write_thread(conn, addr, client):
|
||||||
|
while client.alive:
|
||||||
|
|
||||||
|
client.message_lock.acquire()
|
||||||
|
for message in client.message_queue:
|
||||||
|
try:
|
||||||
|
conn.sendall(message.encode('utf-8'))
|
||||||
|
except:
|
||||||
|
break #if the client is dead the read thread will catch it
|
||||||
|
client.message_queue = []
|
||||||
|
client.message_lock.release()
|
||||||
|
client.message_ready.wait()
|
||||||
|
client.message_ready.clear()
|
||||||
|
client.write_thread_dead = True
|
||||||
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
|
||||||
|
sock.bind((HOST, PORT))
|
||||||
|
sock.listen()
|
||||||
|
next_client_id = 0
|
||||||
|
while True:
|
||||||
|
|
||||||
|
c, addr = sock.accept() #blocks until a connection is made
|
||||||
|
client = types.SimpleNamespace(id=next_client_id,
|
||||||
|
alive=True,
|
||||||
|
message_queue=[],
|
||||||
|
message_lock=threading.Lock(),
|
||||||
|
inb='',
|
||||||
|
message_ready=threading.Event(),
|
||||||
|
logged_in=False,
|
||||||
|
username='',
|
||||||
|
room='',
|
||||||
|
write_thread_dead=False
|
||||||
|
)
|
||||||
|
client_lock.acquire()
|
||||||
|
client_dict[next_client_id] = client
|
||||||
|
client_lock.release()
|
||||||
|
|
||||||
|
next_client_id += 1
|
||||||
|
|
||||||
|
start_new_thread(client_read_thread, (c, addr, client))
|
||||||
|
start_new_thread(client_write_thread, (c, addr, client))
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
from _thread import *
|
||||||
|
import threading
|
||||||
|
|
||||||
|
server_addr = ('127.0.0.1', 80)
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.connect_ex(server_addr)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def readThread(sock):
|
||||||
|
|
||||||
|
while True:
|
||||||
|
data = sock.recv(1024)
|
||||||
|
print(data.decode('utf-8'));
|
||||||
|
|
||||||
|
def writeThread(sock):
|
||||||
|
i = 0
|
||||||
|
sock.sendall('0::\n'.encode('utf-8'))
|
||||||
|
sock.sendall('2:myroom\n'.encode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
sock.sendall(f'3:0:{"thasdl;fjasd;lfjasl;dfjal;skdjlask;dflasd;jkjfjkjfsfjfjakfjafjdfjakjflfjadjf;jfakdjfdjfakdjfsdj;ldjf;laskdflsdjfasdkjfkdjflskdjfskdjflkfjlkdjfskdjfkjfskdjf;kfjs;kfjadkfjas;ldfalsdkfsdkfjasdkjfasdkfjlkdjfkdjflkdjf;djfadkfjaldkfjalkfja;kfja;kfjadkfjadkfja;sdkfa;dkfj;dfkjaslkfjas;dkfs;dkfjsldfjasdfjaldfjaldkfj;lkj"}\n'.encode('utf-8'))
|
||||||
|
i = i+1
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
start_new_thread(readThread,(sock,))
|
||||||
|
start_new_thread(writeThread,(sock,))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
sock.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Loading…
Reference in New Issue