凌峰创科服务平台

python 实现ssh服务器

核心概念

一个基本的 SSH 服务器需要处理以下核心任务:

python 实现ssh服务器-图1
(图片来源网络,侵删)
  1. 监听端口:在指定的端口(默认是 22)上等待客户端的连接。
  2. 身份验证:当客户端连接时,服务器需要验证其身份,最常见的是用户名/密码验证,也可以使用公钥/私钥验证。
  3. 创建 Shell:验证通过后,服务器需要为客户端创建一个交互式的 Shell 环境,客户端可以在其中输入命令并看到输出。
  4. 处理 I/O:服务器的核心工作是在客户端的输入/输出流和本地 Shell 的输入/输出流之间进行数据中转,将用户输入的命令发送给本地 Shell 执行,然后将 Shell 的输出结果返回给客户端。

使用 paramiko 实现一个基础 SSH 服务器

这是最直接的方法,适合学习和理解底层原理。

安装 Paramiko

你需要安装 paramiko 库,如果还没有安装,请打开终端或命令行运行:

pip install paramiko

完整代码示例

这个例子实现了一个支持用户名/密码认证,并能提供一个简单 Shell 的服务器。

# server.py
import socket
import threading
import paramiko
import sys
import os
from paramiko import SSHServerInterface, AutoAddPolicy
from paramiko.ssh_exception import SSHException, AuthenticationException
# --- 1. 自定义 SSH 服务器处理类 ---
# 这个类继承自 SSHServerInterface,用于处理 SSH 协议的各种事件
class MySSHServer(SSHServerInterface):
    """
    自定义的 SSH 服务器处理逻辑。
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.event = threading.Event()
    # --- 身份验证 ---
    def check_auth_password(self, username, password):
        """
        检查用户名/密码。
        在这里实现你自己的认证逻辑。
        """
        print(f"Auth attempt: username='{username}', password='{password}'")
        # 简单的硬编码用户名/密码,实际应用中应查询数据库或配置文件
        if username == 'admin' and password == 'password123':
            return paramiko.AUTH_SUCCESSFUL
        return paramiko.AUTH_FAILED
    def check_auth_publickey(self, username, key):
        """
        检查公钥认证(可选)。
        """
        print(f"Public key auth attempt: username='{username}', key fingerprint={key.get_fingerprint().hex()}")
        # 这里可以添加公钥验证逻辑
        return paramiko.AUTH_FAILED
    def get_allowed_auths(self, username):
        """
        告诉客户端支持哪些认证方式。
        """
        return 'password,publickey'
    # --- 创建 Shell 或执行命令 ---
    def check_channel_request(self, kind, chanid):
        """
        检查客户端请求的通道类型。
        我们只支持 'session' 类型,用于创建交互式 shell。
        """
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED
        return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
        """
        为通道分配一个伪终端。
        """
        # 可以在这里设置终端类型和大小
        print(f"PTY request: term={term}, size={width}x{height}")
        return True
    def check_channel_shell_request(self, channel):
        """
        当客户端请求一个 shell 时被调用。
        """
        print(f"Shell request for channel {channel.get_id()}")
        # 启动一个线程来处理这个通道的 I/O
        threading.Thread(target=self.handle_shell, args=(channel,)).start()
        return True
    # --- 核心 Shell 处理逻辑 ---
    def handle_shell(self, channel):
        """
        在一个单独的线程中处理 shell 的输入和输出。
        """
        # 获取本地系统的标准输入/输出/错误流
        # 注意:在更复杂的应用中,可能需要使用 subprocess.Popen 来获得一个干净的 shell 环境
        import subprocess
        process = subprocess.Popen(
            ['/bin/bash'],  # 使用 bash 作为 shell
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            bufsize=0,
            close_fds=True
        )
        try:
            # I/O 中转循环
            while not channel.closed or not process.poll():
                # 从 channel 读取客户端输入,并写入到进程的 stdin
                if channel.recv_ready():
                    data = channel.recv(1024)
                    if not data:
                        break
                    process.stdin.write(data.decode('utf-8'))
                    process.stdin.flush()
                # 从进程的 stdout 读取输出,并写入到 channel
                if process.stdout.readable() and process.stdout.poll() is None:
                    data = process.stdout.read(1024)
                    if data:
                        channel.send(data)
                # 从进程的 stderr 读取错误,并写入到 channel
                if process.stderr.readable() and process.stderr.poll() is None:
                    data = process.stderr.read(1024)
                    if data:
                        channel.send_stderr(data)
                # 短暂休眠,避免 CPU 占用过高
                import time
                time.sleep(0.01)
        except EOFError:
            print("Client disconnected.")
        except Exception as e:
            print(f"Error in shell handler: {e}")
        finally:
            # 清理资源
            process.terminate()
            channel.close()
# --- 2. 服务器主程序 ---
def start_server(host_key, port=2222):
    """
    启动 SSH 服务器。
    """
    # 创建一个 Socket 服务器
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(('', port))
        sock.listen(100)
        print(f'Starting SSH server on port {port}...')
    except Exception as e:
        print(f'*** Failed to listen on port {port}: {e}')
        sys.exit(1)
    # 加载主机密钥,服务器必须有一对主机密钥。
    # 如果没有,可以使用 paramiko.RSAKey.generate(2048) 生成一个。
    # 这里我们假设你有一个 'host_key' 文件
    try:
        server_key = paramiko.RSAKey.from_private_key_file(host_key)
    except SSHException as e:
        print(f'*** Unable to load host key: {e}')
        print('*** 生成一个新的主机密钥:')
        key = paramiko.RSAKey.generate(2048)
        key.write_private_key_file('host_key')
        print('*** 已生成新的 host_key 文件,请重新运行脚本。')
        sys.exit(1)
    # 主循环,接受新的客户端连接
    try:
        while True:
            print('Waiting for a new connection...')
            client, addr = sock.accept()
            print(f"Got a connection from {addr[0]}:{addr[1]}")
            # 为每个客户端连接创建一个新的 Transport 对象
            transport = paramiko.Transport(client)
            transport.add_server_key(server_key)
            # 创建我们自定义的服务器处理程序
            server_handler = MySSHServer()
            # 启动 SSH 服务器协议
            try:
                transport.start_server(server=server_handler)
            except Exception as e:
                print(f'*** Caught exception: {e}')
                transport.close()
                continue
            # 等待认证完成
            chan = transport.accept(20)
            if chan is None:
                print('*** No channel.')
                transport.close()
                continue
            print(f"Authenticated channel opened from {addr[0]}:{addr[1]}")
    except KeyboardInterrupt:
        print('\n*** Exiting...')
    finally:
        sock.close()
if __name__ == '__main__':
    # 检查主机密钥文件是否存在,如果不存在则生成一个
    HOST_KEY_PATH = 'host_key'
    if not os.path.exists(HOST_KEY_PATH):
        print(f"Host key file '{HOST_KEY_PATH}' not found. Generating a new one...")
        key = paramiko.RSAKey.generate(2048)
        key.write_private_key_file(HOST_KEY_PATH)
    # 启动服务器
    start_server(host_key=HOST_KEY_PATH, port=2222)

如何运行和测试

  1. 保存代码:将上面的代码保存为 server.py
  2. 运行服务器:在终端中运行 python server.py
    • host_key 文件不存在,它会自动生成一个。请务必保护好这个文件,它是服务器的身份凭证。
    • 你会看到 Starting SSH server on port 2222... 的提示。
  3. 使用客户端连接:打开另一个终端,使用标准的 ssh 命令连接到你的服务器。
    # -p 指定端口,因为默认是 22
    ssh admin@localhost -p 2222
  4. 输入密码:当提示输入密码时,输入 password123
  5. 交互:连接成功后,你就可以在本地输入命令(如 ls, pwd, echo hello),这些命令会在服务器的 Shell 中执行,结果会返回到你的客户端窗口。

使用 asyncssh 实现一个更现代、更高效的 SSH 服务器

asyncssh 是一个基于 Python asyncio 的 SSH 协议库,它非常适合构建高性能、高并发的网络服务,它的 API 设计得非常现代和简洁。

python 实现ssh服务器-图2
(图片来源网络,侵删)

安装 asyncssh

pip install asyncssh

完整代码示例

asyncssh 的代码通常更短,因为很多底层细节被封装好了。

# async_server.py
import asyncssh
import sys
import os
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- 自定义 SSH 服务器 ---
class MySSHServer(asyncssh.SSHServer):
    """
    自定义的 SSH 服务器类。
    """
    def __init__(self):
        self._semaphore = asyncio.Semaphore(10) # 限制并发连接数
    async def begin_auth(self, username):
        """
        如果需要认证,返回 True。
        """
        logger.info(f"Begin auth for user: {username}")
        return True
    async def validate_password(self, username, password):
        """
        验证用户名/密码。
        """
        logger.info(f"Password auth attempt for: {username}")
        if username == 'admin' and password == 'password123':
            return True
        return False
    async def validate_publickey(self, username, key):
        """
        验证公钥。
        """
        logger.info(f"Public key auth attempt for: {username}")
        # 这里可以添加公钥验证逻辑
        return False
    async def connection_made(self, conn):
        """
        当新的 SSH 连接建立时被调用。
        """
        logger.info(f"SSH connection from {conn.get_extra_info('peername')[0]}")
    async def connection_lost(self, exc):
        """
        当 SSH 连接断开时被调用。
        """
        if exc:
            logger.error(f"SSH connection lost due to exception: {exc}")
        else:
            logger.info("SSH connection closed.")
    async def create_shell(self, process):
        """
        为客户端创建一个交互式 shell。
        """
        logger.info(f"Creating shell for session {process}")
        # asyncssh 会自动处理 I/O 重定向,我们只需要提供一个本地的 shell
        await process.stdout.write("Welcome to my AsyncSSH Server!\n")
        await process.stdout.write("Type 'exit' to quit.\n")
        # 使用 subprocess 来运行一个真实的 shell
        # asyncssh 提供了更简单的方式:process.create_process()
        # 但为了展示灵活性,我们也可以手动做 I/O 中转
        import subprocess
        proc = subprocess.Popen(
            ['/bin/bash'],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        async def forward_stdin():
            while not process.stdin.is_eof():
                chunk = await process.stdin.read(1024)
                if not chunk:
                    break
                proc.stdin.write(chunk)
                proc.stdin.flush()
        async def forward_stdout():
            while proc.poll() is None:
                chunk = proc.stdout.read(1024)
                if not chunk:
                    break
                await process.stdout.write(chunk)
        async def forward_stderr():
            while proc.poll() is None:
                chunk = proc.stderr.read(1024)
                if not chunk:
                    break
                await process.stderr.write(chunk)
        # 并行运行所有 I/O 任务
        await asyncio.gather(
            forward_stdin(),
            forward_stdout(),
            forward_stderr(),
            return_exceptions=True
        )
        proc.terminate()
        logger.info(f"Shell for session {process} closed.")
async def main():
    """
    启动 SSH 服务器。
    """
    HOST_KEY_PATH = 'async_host_key'
    if not os.path.exists(HOST_KEY_PATH):
        logger.info(f"Host key file '{HOST_KEY_PATH}' not found. Generating a new one...")
        key = asyncssh.generate_private_key('ssh-rsa', 2048)
        key.write_private_key_file(HOST_KEY_PATH)
        logger.info(f'已生成新的 {HOST_KEY_PATH} 文件。')
    try:
        # 使用 asyncssh.create_server 启动服务器
        server = await asyncio.wait_for(
            asyncssh.create_server(
                lambda: MySSHServer(),
                '',
                2223, # 使用不同的端口
                server_host_keys=[HOST_KEY_PATH]
            ),
            timeout=10
        )
        logger.info(f'Starting AsyncSSH server on port 2223...')
        # 保持服务器运行
        await server.wait_closed()
    except (OSError, asyncio.TimeoutError) as exc:
        logger.error(f'Error starting server: {exc}')
        sys.exit(1)
if __name__ == '__main__':
    asyncio.run(main())

如何运行和测试

  1. 保存代码:将代码保存为 async_server.py
  2. 运行服务器python async_server.py
  3. 连接测试:和之前一样,使用 ssh 客户端连接。
    ssh admin@localhost -p 2223

    输入密码 password123


总结与选择

特性 paramiko (方法一) asyncssh (方法二)
底层原理 需要手动处理 Socket、Transport、Channel 等概念,更接近 SSH 协议本身。 基于 asyncio,高度封装,API 更现代化。
代码复杂度 相对较高,需要编写更多的样板代码(如 I/O 中转循环)。 相对较低,特别是对于创建 Shell,API 非常简洁。
性能 多线程模型,适合处理中等数量的并发连接。 异步 I/O 模型,能轻松处理数千甚至数万个并发连接,性能更高。
适用场景 学习 SSH 协议、需要精细控制服务器行为、项目依赖 paramiko 构建高性能网络服务、微服务、需要处理大量并发连接的现代 Python 应用。
依赖 paramiko asyncssh (内部依赖 cryptography 等)

给你的建议:

  • 如果你是初学者,或者想深入了解 SSH 服务器的工作原理,从 paramiko 的方法一 开始是最好的选择,它能让你看到每个组件是如何协同工作的。
  • 如果你正在开发一个需要高性能、高并发的生产级应用,或者你的项目已经大量使用 asyncio,那么毫无疑问应该选择 asyncssh 的方法二,它更高效,代码也更优雅。

无论选择哪种方法,请务必在生产环境中使用强密码公钥认证,并妥善保管你的主机密钥文件。

python 实现ssh服务器-图3
(图片来源网络,侵删)
分享:
扫描分享到社交APP
上一篇
下一篇