手记

[Python] 多用户,多房间全双工聊天室

需求描述


创建一个多用户,多房间的全双工聊天室。

多用户,多房间的意思是可以有多个聊天室,每个聊天室里可以有多个用户,并且用户可以通过输入房间号进入聊天室。
全双工的意思是聊天室中的用户在接收其他用户的信息的同时,也能发送信息给其他用户。而不用等待一个用户发送完信息,等其他用户接收到之后,才能允许下个用户再次发送信息。

Python I/O多路复用


全双工功能的实现,可以通过多线程,I/O多路复用等方式,我在这边采用了I/O多路复用方案。
Python的select模块提供三种I/O多路复用的具体实现——select,poll,epoll,我在这里选用select.select(下面用select代替)。

select会监听socket或者文件描述符的I/O状态变化,并返回变化的socket或者文件描述符对象

select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)

这是Python select方法的原型,接收4个参数
rlist:list类型,监听其中的socket或者文件描述符是否变为可读状态,返回那些可读的socket或者文件描述符组成的list
wlist:list类型,监听其中的socket或者文件描述符是否变为可写状态,返回那些可写的socket或者文件描述符组成的list
xlist:list类型,监听其中的socket或者文件描述符是否出错,返回那些出错的socket或者文件描述符组成的list
timeout:设置select的超时时间,设置为None代表永远不会超时,即阻塞。

注意:Python的select方法在Windows和Linux环境下的表现是不一样的,Windows下它只支持socket对象,不支持文件描述符(file descriptions),而Linux两者都支持。

Linux下,可以通过sys.stdin标准输入流获取用户的输入,而sys.stdin就是一个文件描述符。
所以可以用下面的代码来获取用户输入

rlist, wlist, xlist = select.select( [sys.stdin], [], [] )print rlist[0].readline()

由于只监听了sys.stdin,当用户输入之后,只会返回sys.stdin对象,可以通过readline方法来获取用户输入的内容。

聊天室服务端


服务端要完成三件事:

  1. 接收多个客户端的连接

  2. 管理用户的聊天室分组

  3. 将一个客户端输入的消息广播到他所在聊天室的所有其他客户端

第一件事,定义一个list类型变量_current_in_list来表示监听多个socket连接的可读事件,利用上面说的select来处理I/O多路复用,代码如下:

rlist, wlist, xlist = select.select(_current_in_list, [], [])

当select返回时,说明rlist上有可读的socket了,这里又有两种情况:
1.如果返回的是service socket(服务器创建的socket,用来监听客户端是否连接的),表示有新的客户端连接了,调用socket.accept()方法获取新的客户端socket对象和地址(ip和port组成的元组),将新的客户端socket加入到_current_in_list。
2.如果返回的是其他socket(客户端socket),表示有客户端发送数据到服务端了,调用socket.recv()方法获取数据。

为了实现用户分组,我规定每个客户端在连接服务器之前都要先输入聊天室的房间号,并且每次发送到服务器的数据都要带上房间号,最后定义了一个dict类型的变量_room用来存储用户和房间的对应关系,客户端传递的房间号就是_room的key,而它的value则是一个客户端socket的列表。数据格式如下:

<RID:111>Welcome to Chat Room</RID:111>

对于接收到的数据,首先通过正则表达式检查是否符合规定,然后提取房间号和用户发送的消息。
判断是否新加入到聊天室的用户,如果是则发送广播通知聊天室的其他用户有新人加入,否则发送用户消息给聊天室的其他用户。关键代码如下

rgx_message = CONFORM_MSG.match(raw_message)if rgx_message:
    room_id = rgx_message.group(1)
    message = rgx_message.group(2)    if sock not in _room.setdefault(room_id, []):
        _room[room_id].append(sock)
        broadcast_message(room_id, sock, '\n[%s:%s] entered room.\n'\
                                                 % sock.getpeername())    else:
        broadcast_message(room_id, sock, \                             "\n<" +str(sock.getpeername()) + ">" + message)

根据房间号将消息广播给聊天室中除发送用户之外的所有其他用户

def broadcast_message(room_id, sock, message):
    for member in _room[room_id]:        if member is not sock:            try:
                member.send(message)            except socket.error:
                member.close()
                 _current_in_list .remove(member)
                _room[room_id].remove(member)

如果发送报错,可能socket已经被关闭,所以将它从_current_in_list_room中删除,因为socket已经被关闭了,但还保留在_current_in_list中,select会报错。

完整的聊天室服务端代码如下:

import socketimport selectimport re

HOST = "localhost"PORT = 9898ADDR = (HOST, PORT)
BUFSIZE = 1024CONFORM_MSG = re.compile(r'^<RID:(\d+)>([\s\S]*?)</RID:\1>')


_service_socket = socket.socket()
_service_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_service_socket.bind(ADDR)
_service_socket.listen(10)

_current_in_list = [_service_socket]
_room = dict()def broadcast_message(room_id, sock, message):
    for member in _room[room_id]:        if member is not sock:            try:
                member.send(message)            except socket.error:
                member.close()
                 _current_in_list .remove(member)
                _room[room_id].remove(member)def main():
    while True:
        rlist, wlist, xlist = select.select(_current_in_list, [], [])        for sock in rlist:            if sock is _service_socket:
                client, addr = sock.accept()
                _current_in_list.append(client)                print "Client (%s:%s) connected." % addr            else:                try:
                    raw_message = sock.recv(BUFSIZE)                    if raw_message:
                        rgx_message = CONFORM_MSG.match(raw_message)                        if rgx_message:
                            room_id = rgx_message.group(1)
                            message = rgx_message.group(2)                            if sock not in _room.setdefault(room_id, []):
                                _room[room_id].append(sock)
                                broadcast_message(room_id, sock, '\n[%s:%s] entered room.\n'\
                                                                         % sock.getpeername())                            else:
                                broadcast_message(room_id, sock, \                                                "\n<" +str(sock.getpeername()) + ">" + message)                        else:                            print "Invalid format message,", raw_message                except socket.error:                    print "Client (%s, %s) is offline" % sock.getpeername()
                    sock.close()
                    _current_in_list .remove(member)                    for room_id, socks in _room.iteritems():                        for _ in socks:                            if _ is sock:
                                _room[room_id].remove(_)                                break
                        else:                            continue
                        breakif __name__ == '__main__':
    main()

聊天室客户端


客户端也要实现三个功能:

  1. 确定房间号

  2. 根据规定的协议规则组合房间号和消息并发送给服务器

  3. 接收服务器广播的消息

客户端相对服务端的代码逻辑来的简单,房间号直接用raw_input来让用户输入获取。

用到了select I/O多路复用来实现全双工,_current_in_list中加入sys.stdin和socket,一旦用户输入或者socket接到服务器广播的消息,就返回rlist。
遍历rlist,如果是socket就通过socket.recv()接收广播消息,如果是sys.stdin则通过sys.stdin.readline()从标准输入流中获取用户输入的消息。

完整的客户单代码:

import socketimport selectimport sys

HOST = "localhost"PORT = 9898ADDR = (HOST, PORT)
BUFSIZE = 1024_current_in_list = [sys.stdin]def prompt():
    sys.stdout.write('<You> ')
    sys.stdout.flush()def gen_message(room_id, raw_message):
    return '<RID:{}>{}</RID:{}>'.format(room_id, raw_message, room_id)def main():
    room_id = raw_input('<Room ID> ')

    client_socket = socket.socket()
    client_socket.settimeout(2)    try:
        client_socket.connect(ADDR)
        _current_in_list.append(client_socket)        # notify all room's user that new client is entered
        client_socket.send(gen_message(room_id, ''))    except socket.error:        print "Unable to connect"
        sys.exit()    print 'Connected to remote host. Start sending messages'
    prompt()    while True:
        rlist, wlist, xlist = select.select(_current_in_list, [], [])        for sock in rlist:            if sock is client_socket:
                message = sock.recv(BUFSIZE)                if not message:                    print '\nDisconnected from chat server.'
                    sys.exit()                else:
                    sys.stdout.write(message)
                    prompt()            else:
                raw_message = sys.stdin.readline()
                client_socket.send(gen_message(room_id, raw_message))
                prompt()if __name__ == '__main__':
    main()



作者:四明羽客
链接:https://www.jianshu.com/p/0bd2efdadaf4


0人推荐
随时随地看视频
慕课网APP