凌峰创科服务平台

服务器端多线程socket如何高效实现?

我们将使用 Python 来实现,因为它语法简洁,能让我们更专注于理解多线程 Socket 的核心逻辑,而不是被复杂的底层细节所困扰。

服务器端多线程socket如何高效实现?-图1
(图片来源网络,侵删)

核心思想

传统的单线程 Socket 服务器一次只能处理一个客户端的请求,当它正在与客户端 A 通信时,如果客户端 B 发来请求,客户端 B 必须等待,直到服务器处理完客户端 A 的请求,这显然效率很低。

多线程 Socket 服务器的核心思想是:

  1. 主线程:负责监听端口,等待新的客户端连接,一旦有新的客户端连接,它就立即创建一个新的子线程来专门处理这个客户端的后续所有通信。
  2. 子线程:负责与对应的客户端进行一对一的通信(接收数据、处理数据、发送数据),这样,主线程就可以立即返回去监听其他客户端的连接请求,不会被阻塞。
  3. 线程池:为了避免为每个新连接都创建和销毁一个线程所带来的巨大性能开销(创建和销毁线程是昂贵的操作),更高级的实现会使用线程池,线程池中有一组预先创建好的线程,当有新连接时,从池中取出一个空闲线程来处理,处理完毕后线程并不销毁,而是返回线程池等待下一个任务。

我们将从最基础的“为每个连接创建一个线程”的模型开始,然后介绍更优化的线程池模型。


场景设定

  • 服务器:接收客户端发送的字符串,将其转换为大写,然后返回给客户端。
  • 客户端:连接服务器,发送一行文本,接收服务器返回的大写文本,然后断开连接。

第一部分:基础实现 - 为每个连接创建一个线程

这个模型简单直观,适合初学者理解。

服务器端多线程socket如何高效实现?-图2
(图片来源网络,侵删)

服务器端代码 (server_threaded.py)

import socket
import threading
# 定义服务器地址和端口
HOST = '127.0.0.1'  # 本地回环地址,代表本机
PORT = 65432        # 监听的端口号 (大于1023)
# 这个函数将在每个子线程中运行,用于处理单个客户端的通信
def handle_client(conn, addr):
    """处理客户端连接的函数"""
    print(f"[新连接] {addr} 已连接。")
    try:
        # 循环接收客户端发送的数据
        with conn:
            while True:
                # recv(1024) 表示每次最多接收1024字节的数据
                # 如果客户端正常关闭连接,recv() 会返回空字节 b''
                data = conn.recv(1024)
                if not data:
                    # 如果没有收到数据,说明客户端已断开连接
                    print(f"[客户端断开] {addr} 断开了连接。")
                    break
                # 将接收到的字节数据解码为字符串
                message = data.decode('utf-8')
                print(f"[来自 {addr}] 收到消息: '{message}'")
                # 处理数据:转换为大写
                response_message = message.upper()
                # 将处理后的字符串编码为字节流并发送回客户端
                conn.sendall(response_message.encode('utf-8'))
                print(f"[发送至 {addr}] 已发送: '{response_message}'")
    except ConnectionResetError:
        print(f"[客户端异常] {addr} 异常断开连接。")
def start_server():
    """启动服务器的主函数"""
    # 创建一个 TCP socket (SOCK_STREAM)
    # 使用 with 语句可以确保 socket 在服务器关闭时被正确关闭
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        # 设置 SO_REUSEADDR 选项,允许端口在服务器关闭后立即被重用
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 将 socket 绑定到指定的地址和端口
        s.bind((HOST, PORT))
        # 开始监听 incoming connections, backlog 参数为 5
        # 表示等待队列中最多可以有 5 个待处理的连接
        s.listen()
        print(f"服务器正在监听 {HOST}:{PORT}...")
        # 进入一个无限循环,持续接受新的客户端连接
        while True:
            # accept() 会阻塞,直到有新的客户端连接
            # conn 是一个新的 socket 对象,用于与这个特定客户端通信
            # addr 是客户端的地址 (ip, port)
            conn, addr = s.accept()
            # 创建一个新的线程来处理这个客户端连接
            # target 指定线程要执行的函数
            # args 指定传递给 target 函数的参数,必须是元组
            client_thread = threading.Thread(target=handle_client, args=(conn, addr))
            # 将线程设置为守护线程
            # 当主线程结束时,所有守护线程都会被强制终止
            # 这样可以确保服务器程序能够正常退出
            client_thread.daemon = True
            # 启动线程,handle_client 函数会在新线程中开始执行
            client_thread.start()
            # 打印当前活跃的线程数
            print(f"[活动连接数] {threading.active_count() - 1}")
if __name__ == '__main__':
    start_server()

客户端代码 (client.py)

为了方便测试,我们也写一个简单的客户端。

import socket
HOST = '127.0.0.1'  # The server's hostname or IP address
PORT = 65432        # The port used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    # 发送一条消息
    message_to_send = "hello from client"
    s.sendall(message_to_send.encode('utf-8'))
    print(f"[客户端] 已发送: '{message_to_send}'")
    # 接收服务器返回的响应
    data = s.recv(1024)
print(f"[客户端] 收到服务器返回: {data.decode('utf-8')}")

如何运行

  1. 启动服务器: 在一个终端中运行:

    python server_threaded.py

    你会看到输出:

    服务器正在监听 127.0.0.1:65432...
  2. 启动一个或多个客户端: 在另一个或另外多个新的终端中分别运行:

    服务器端多线程socket如何高效实现?-图3
    (图片来源网络,侵删)
    python client.py

    每次运行客户端,你都会看到服务器终端的输出变化。

服务器端输出示例:

服务器正在监听 127.0.0.1:65432...
[新连接] ('127.0.0.1', 54321) 已连接。
[活动连接数] 1
[来自 ('127.0.0.1', 54321)] 收到消息: 'hello from client'
[发送至 ('127.0.0.1', 54321)] 已发送: 'HELLO FROM CLIENT'
[客户端断开] ('127.0.0.1', 54321) 断开了连接。

客户端输出示例:

[客户端] 已发送: 'hello from client'
[客户端] 收到服务器返回: HELLO FROM CLIENT

你可以同时打开多个客户端终端,服务器会为每个客户端创建一个独立的线程,互不干扰地处理它们的请求。


第二部分:优化实现 - 使用线程池

上面的模型虽然简单,但在高并发场景下,频繁创建和销毁线程会成为性能瓶颈,使用线程池是更好的解决方案,我们将 Python 内置的 concurrent.futures.ThreadPoolExecutor 来实现。

ThreadPoolExecutor 会管理一个线程池,我们只需将任务(处理客户端连接)提交给它即可。

服务器端代码 (server_threadpool.py)

import socket
import threading
from concurrent.futures import ThreadPoolExecutor
# 定义服务器地址和端口
HOST = '127.0.0.1'
PORT = 65432
# 处理客户端连接的函数,与之前相同
def handle_client(conn, addr):
    """处理客户端连接的函数"""
    print(f"[新连接] {addr} 已连接。")
    try:
        with conn:
            while True:
                data = conn.recv(1024)
                if not data:
                    print(f"[客户端断开] {addr} 断开了连接。")
                    break
                message = data.decode('utf-8')
                print(f"[来自 {addr}] 收到消息: '{message}'")
                response_message = message.upper()
                conn.sendall(response_message.encode('utf-8'))
                print(f"[发送至 {addr}] 已发送: '{response_message}'")
    except ConnectionResetError:
        print(f"[客户端异常] {addr} 异常断开连接。")
def start_server():
    """启动服务器的主函数,使用线程池"""
    # max_workers=5 表示线程池中最多有5个线程
    # 你可以根据需要调整这个数值
    with ThreadPoolExecutor(max_workers=5) as executor:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((HOST, PORT))
            s.listen()
            print(f"服务器 (线程池模型) 正在监听 {HOST}:{PORT}...")
            while True:
                conn, addr = s.accept()
                # 使用 submit 方法将任务(handle_client函数)提交给线程池
                # executor 会自动从池中取出一个空闲线程来执行这个任务
                # 如果没有空闲线程,任务会进入队列等待
                executor.submit(handle_client, conn, addr)
                print(f"[新任务提交] {addr} 的任务已提交到线程池。")
if __name__ == '__main__':
    start_server()

这个版本的代码改动很小,但性能和稳定性得到了很大提升。ThreadPoolExecutor 帮我们处理了线程的创建、销毁和复用,我们只需要专注于业务逻辑。


关键点总结与对比

特性 为每个连接创建线程 线程池
核心思想 来一个连接,创建一个新线程。 预先创建一组线程,复用它们来处理连接。
优点 实现简单,逻辑直观。
线程隔离性好,一个线程崩溃不会直接影响其他线程(除非共享资源未处理好)。
性能高:避免了频繁创建/销毁线程的开销。
资源可控:可以限制最大线程数,防止因过多连接导致系统资源耗尽(如内存耗尽)。
管理方便ThreadPoolExecutor 提供了优雅的接口。
缺点 性能开销大:创建和销毁线程是昂贵的操作。
资源消耗多:并发连接数多时,会创建大量线程,消耗大量内存和CPU上下文切换资源,可能导致系统不稳定。
实现相对复杂一点点(但 ThreadPoolExecutor 已大大简化)。
如果任务处理时间非常长,可能会阻塞线程池中的其他任务。
适用场景 - 学习和演示。
- 并发连接数非常少且固定的场景。
- 绝大多数生产环境
- 高并发、高吞吐量的服务器应用。

最终建议

对于任何实际的项目,强烈推荐使用线程池模型,它更健壮、更高效,是构建高性能网络服务器的标准做法,Python 的 concurrent.futures.ThreadPoolExecutor 是实现这一模式的利器。

分享:
扫描分享到社交APP
上一篇
下一篇