核心概念
一个基本的 SSH 服务器需要处理以下核心任务:

- 监听端口:在指定的端口(默认是 22)上等待客户端的连接。
- 身份验证:当客户端连接时,服务器需要验证其身份,最常见的是用户名/密码验证,也可以使用公钥/私钥验证。
- 创建 Shell:验证通过后,服务器需要为客户端创建一个交互式的 Shell 环境,客户端可以在其中输入命令并看到输出。
- 处理 I/O:服务器的核心工作是在客户端的输入/输出流和本地 Shell 的输入/输出流之间进行数据中转,将用户输入的命令发送给本地 Shell 执行,然后将 Shell 的输出结果返回给客户端。
使用 paramiko 实现一个基础 SSH 服务器
这是最直接的方法,适合学习和理解底层原理。
安装 Paramiko
你需要安装 paramiko 库,如果还没有安装,请打开终端或命令行运行:
pip install paramiko
完整代码示例
这个例子实现了一个支持用户名/密码认证,并能提供一个简单 Shell 的服务器。
# server.py
import socket
import threading
import paramiko
import sys
import os
from paramiko import SSHServerInterface, AutoAddPolicy
from paramiko.ssh_exception import SSHException, AuthenticationException
# --- 1. 自定义 SSH 服务器处理类 ---
# 这个类继承自 SSHServerInterface,用于处理 SSH 协议的各种事件
class MySSHServer(SSHServerInterface):
"""
自定义的 SSH 服务器处理逻辑。
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event = threading.Event()
# --- 身份验证 ---
def check_auth_password(self, username, password):
"""
检查用户名/密码。
在这里实现你自己的认证逻辑。
"""
print(f"Auth attempt: username='{username}', password='{password}'")
# 简单的硬编码用户名/密码,实际应用中应查询数据库或配置文件
if username == 'admin' and password == 'password123':
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
"""
检查公钥认证(可选)。
"""
print(f"Public key auth attempt: username='{username}', key fingerprint={key.get_fingerprint().hex()}")
# 这里可以添加公钥验证逻辑
return paramiko.AUTH_FAILED
def get_allowed_auths(self, username):
"""
告诉客户端支持哪些认证方式。
"""
return 'password,publickey'
# --- 创建 Shell 或执行命令 ---
def check_channel_request(self, kind, chanid):
"""
检查客户端请求的通道类型。
我们只支持 'session' 类型,用于创建交互式 shell。
"""
if kind == 'session':
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
"""
为通道分配一个伪终端。
"""
# 可以在这里设置终端类型和大小
print(f"PTY request: term={term}, size={width}x{height}")
return True
def check_channel_shell_request(self, channel):
"""
当客户端请求一个 shell 时被调用。
"""
print(f"Shell request for channel {channel.get_id()}")
# 启动一个线程来处理这个通道的 I/O
threading.Thread(target=self.handle_shell, args=(channel,)).start()
return True
# --- 核心 Shell 处理逻辑 ---
def handle_shell(self, channel):
"""
在一个单独的线程中处理 shell 的输入和输出。
"""
# 获取本地系统的标准输入/输出/错误流
# 注意:在更复杂的应用中,可能需要使用 subprocess.Popen 来获得一个干净的 shell 环境
import subprocess
process = subprocess.Popen(
['/bin/bash'], # 使用 bash 作为 shell
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
close_fds=True
)
try:
# I/O 中转循环
while not channel.closed or not process.poll():
# 从 channel 读取客户端输入,并写入到进程的 stdin
if channel.recv_ready():
data = channel.recv(1024)
if not data:
break
process.stdin.write(data.decode('utf-8'))
process.stdin.flush()
# 从进程的 stdout 读取输出,并写入到 channel
if process.stdout.readable() and process.stdout.poll() is None:
data = process.stdout.read(1024)
if data:
channel.send(data)
# 从进程的 stderr 读取错误,并写入到 channel
if process.stderr.readable() and process.stderr.poll() is None:
data = process.stderr.read(1024)
if data:
channel.send_stderr(data)
# 短暂休眠,避免 CPU 占用过高
import time
time.sleep(0.01)
except EOFError:
print("Client disconnected.")
except Exception as e:
print(f"Error in shell handler: {e}")
finally:
# 清理资源
process.terminate()
channel.close()
# --- 2. 服务器主程序 ---
def start_server(host_key, port=2222):
"""
启动 SSH 服务器。
"""
# 创建一个 Socket 服务器
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', port))
sock.listen(100)
print(f'Starting SSH server on port {port}...')
except Exception as e:
print(f'*** Failed to listen on port {port}: {e}')
sys.exit(1)
# 加载主机密钥,服务器必须有一对主机密钥。
# 如果没有,可以使用 paramiko.RSAKey.generate(2048) 生成一个。
# 这里我们假设你有一个 'host_key' 文件
try:
server_key = paramiko.RSAKey.from_private_key_file(host_key)
except SSHException as e:
print(f'*** Unable to load host key: {e}')
print('*** 生成一个新的主机密钥:')
key = paramiko.RSAKey.generate(2048)
key.write_private_key_file('host_key')
print('*** 已生成新的 host_key 文件,请重新运行脚本。')
sys.exit(1)
# 主循环,接受新的客户端连接
try:
while True:
print('Waiting for a new connection...')
client, addr = sock.accept()
print(f"Got a connection from {addr[0]}:{addr[1]}")
# 为每个客户端连接创建一个新的 Transport 对象
transport = paramiko.Transport(client)
transport.add_server_key(server_key)
# 创建我们自定义的服务器处理程序
server_handler = MySSHServer()
# 启动 SSH 服务器协议
try:
transport.start_server(server=server_handler)
except Exception as e:
print(f'*** Caught exception: {e}')
transport.close()
continue
# 等待认证完成
chan = transport.accept(20)
if chan is None:
print('*** No channel.')
transport.close()
continue
print(f"Authenticated channel opened from {addr[0]}:{addr[1]}")
except KeyboardInterrupt:
print('\n*** Exiting...')
finally:
sock.close()
if __name__ == '__main__':
# 检查主机密钥文件是否存在,如果不存在则生成一个
HOST_KEY_PATH = 'host_key'
if not os.path.exists(HOST_KEY_PATH):
print(f"Host key file '{HOST_KEY_PATH}' not found. Generating a new one...")
key = paramiko.RSAKey.generate(2048)
key.write_private_key_file(HOST_KEY_PATH)
# 启动服务器
start_server(host_key=HOST_KEY_PATH, port=2222)
如何运行和测试
- 保存代码:将上面的代码保存为
server.py。 - 运行服务器:在终端中运行
python server.py。host_key文件不存在,它会自动生成一个。请务必保护好这个文件,它是服务器的身份凭证。- 你会看到
Starting SSH server on port 2222...的提示。
- 使用客户端连接:打开另一个终端,使用标准的
ssh命令连接到你的服务器。# -p 指定端口,因为默认是 22 ssh admin@localhost -p 2222
- 输入密码:当提示输入密码时,输入
password123。 - 交互:连接成功后,你就可以在本地输入命令(如
ls,pwd,echo hello),这些命令会在服务器的 Shell 中执行,结果会返回到你的客户端窗口。
使用 asyncssh 实现一个更现代、更高效的 SSH 服务器
asyncssh 是一个基于 Python asyncio 的 SSH 协议库,它非常适合构建高性能、高并发的网络服务,它的 API 设计得非常现代和简洁。

安装 asyncssh
pip install asyncssh
完整代码示例
asyncssh 的代码通常更短,因为很多底层细节被封装好了。
# async_server.py
import asyncssh
import sys
import os
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- 自定义 SSH 服务器 ---
class MySSHServer(asyncssh.SSHServer):
"""
自定义的 SSH 服务器类。
"""
def __init__(self):
self._semaphore = asyncio.Semaphore(10) # 限制并发连接数
async def begin_auth(self, username):
"""
如果需要认证,返回 True。
"""
logger.info(f"Begin auth for user: {username}")
return True
async def validate_password(self, username, password):
"""
验证用户名/密码。
"""
logger.info(f"Password auth attempt for: {username}")
if username == 'admin' and password == 'password123':
return True
return False
async def validate_publickey(self, username, key):
"""
验证公钥。
"""
logger.info(f"Public key auth attempt for: {username}")
# 这里可以添加公钥验证逻辑
return False
async def connection_made(self, conn):
"""
当新的 SSH 连接建立时被调用。
"""
logger.info(f"SSH connection from {conn.get_extra_info('peername')[0]}")
async def connection_lost(self, exc):
"""
当 SSH 连接断开时被调用。
"""
if exc:
logger.error(f"SSH connection lost due to exception: {exc}")
else:
logger.info("SSH connection closed.")
async def create_shell(self, process):
"""
为客户端创建一个交互式 shell。
"""
logger.info(f"Creating shell for session {process}")
# asyncssh 会自动处理 I/O 重定向,我们只需要提供一个本地的 shell
await process.stdout.write("Welcome to my AsyncSSH Server!\n")
await process.stdout.write("Type 'exit' to quit.\n")
# 使用 subprocess 来运行一个真实的 shell
# asyncssh 提供了更简单的方式:process.create_process()
# 但为了展示灵活性,我们也可以手动做 I/O 中转
import subprocess
proc = subprocess.Popen(
['/bin/bash'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
async def forward_stdin():
while not process.stdin.is_eof():
chunk = await process.stdin.read(1024)
if not chunk:
break
proc.stdin.write(chunk)
proc.stdin.flush()
async def forward_stdout():
while proc.poll() is None:
chunk = proc.stdout.read(1024)
if not chunk:
break
await process.stdout.write(chunk)
async def forward_stderr():
while proc.poll() is None:
chunk = proc.stderr.read(1024)
if not chunk:
break
await process.stderr.write(chunk)
# 并行运行所有 I/O 任务
await asyncio.gather(
forward_stdin(),
forward_stdout(),
forward_stderr(),
return_exceptions=True
)
proc.terminate()
logger.info(f"Shell for session {process} closed.")
async def main():
"""
启动 SSH 服务器。
"""
HOST_KEY_PATH = 'async_host_key'
if not os.path.exists(HOST_KEY_PATH):
logger.info(f"Host key file '{HOST_KEY_PATH}' not found. Generating a new one...")
key = asyncssh.generate_private_key('ssh-rsa', 2048)
key.write_private_key_file(HOST_KEY_PATH)
logger.info(f'已生成新的 {HOST_KEY_PATH} 文件。')
try:
# 使用 asyncssh.create_server 启动服务器
server = await asyncio.wait_for(
asyncssh.create_server(
lambda: MySSHServer(),
'',
2223, # 使用不同的端口
server_host_keys=[HOST_KEY_PATH]
),
timeout=10
)
logger.info(f'Starting AsyncSSH server on port 2223...')
# 保持服务器运行
await server.wait_closed()
except (OSError, asyncio.TimeoutError) as exc:
logger.error(f'Error starting server: {exc}')
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())
如何运行和测试
- 保存代码:将代码保存为
async_server.py。 - 运行服务器:
python async_server.py。 - 连接测试:和之前一样,使用
ssh客户端连接。ssh admin@localhost -p 2223
输入密码
password123。
总结与选择
| 特性 | paramiko (方法一) |
asyncssh (方法二) |
|---|---|---|
| 底层原理 | 需要手动处理 Socket、Transport、Channel 等概念,更接近 SSH 协议本身。 | 基于 asyncio,高度封装,API 更现代化。 |
| 代码复杂度 | 相对较高,需要编写更多的样板代码(如 I/O 中转循环)。 | 相对较低,特别是对于创建 Shell,API 非常简洁。 |
| 性能 | 多线程模型,适合处理中等数量的并发连接。 | 异步 I/O 模型,能轻松处理数千甚至数万个并发连接,性能更高。 |
| 适用场景 | 学习 SSH 协议、需要精细控制服务器行为、项目依赖 paramiko。 |
构建高性能网络服务、微服务、需要处理大量并发连接的现代 Python 应用。 |
| 依赖 | paramiko |
asyncssh (内部依赖 cryptography 等) |
给你的建议:
- 如果你是初学者,或者想深入了解 SSH 服务器的工作原理,从
paramiko的方法一 开始是最好的选择,它能让你看到每个组件是如何协同工作的。 - 如果你正在开发一个需要高性能、高并发的生产级应用,或者你的项目已经大量使用
asyncio,那么毫无疑问应该选择asyncssh的方法二,它更高效,代码也更优雅。
无论选择哪种方法,请务必在生产环境中使用强密码或公钥认证,并妥善保管你的主机密钥文件。

