WebSocket实现Kubernetes Pod Exec终端工具
实现原理
通过WebSocket实现Kubernetes Exec命令行终端,我们需要提供一个http server,前端页面首先请求http server,http server与k8s api server交互。
如下图,这里仅作为示例,按照正常的逻辑,肯定需要先进行一个参数校验以及权限认证,因为websocket请求上不可能出现账户相关敏感数据。比如,前端首先携带相关的clusterId、podName、namespace、
containerName等参数请求后端的一个接口,该接口负责对参数的基本校验,以及pod状态等校验,最终状态是无法exec的,重要的是要进行权限检查。权限检查之后,就可以生成一个关键信息字符串token,
前端通过这个token字符串作为参数请求server:wss://xx.com/exec?s=BbiMvEr8CpM0cmeNdd1yVQ
,可以采用JWT生成这个带时效性的token,也可以采用相关摘要算法对数据进行编码,然后作为
key存储到redis中,value则是相关的源数据,同时设定过期时间。
前端我们采用xterm.js进行交互,后端采用golang gin
框架构建http server,github.com/gorilla/websocket
包升级websocket协议,
选择client-go
下的工具包remotecommand
与kubernetes建立连接,client-go
客户端版本v1.19以上才会有这个remotecommand
工具,kubectl exec命令就是用这个包操作的。
WebSocket连接就是基于一次普通的http get请求之上建立的,客户端在这次http get请求时,需要在请求header中增加几个关键的属性,后端server才能升级到WebSocket连接。
Connection: Upgrade
连接需要升级Upgrade: websocket
升级到什么协议,具体的依赖后端支持哪些协议Sec-WebSocket-Version:13
WebSocket版本Sec-WebSocket-Key:TdSpMGWJu22IiPM6B6oSWQ==
WebSocket握手Key
后端http框架gin的ResponseWriter已经实现了http.Hijacker
接口,调用者在一次http get请求响应之前调用了Hijacker.Hijack
方法,http server库就会把这个底层连接交由调用者线程或者协程进行处理,
前后端则可以通过这个连接进行WebSocket读写数据了。http server与kubernetes集群的api server则通过spdy协议建立长连接,打开Stream进行数据读写。首先我们查看一下kubectl exec发起的http请求:
kubectl exec -t container-cms-5b75bc47c5-rj2j7 -v 8 -n container-team -- date
部分http header信息如下:
I0419 09:47:35.741716 17438 round_trippers.go:416] POST https://156.0.1.9:9443/api/v1/namespaces/team/pods/container-cms-5b75bc47c5-rj2j7/exec?command=date&container=container-cms&stderr=true&stdout=true
I0419 09:47:35.741767 17438 round_trippers.go:423] Request Headers:
I0419 09:47:35.741794 17438 round_trippers.go:426] X-Stream-Protocol-Version: v4.channel.k8s.io
I0419 09:47:35.741817 17438 round_trippers.go:426] X-Stream-Protocol-Version: v3.channel.k8s.io
I0419 09:47:35.741855 17438 round_trippers.go:426] X-Stream-Protocol-Version: v2.channel.k8s.io
I0419 09:47:35.741876 17438 round_trippers.go:426] X-Stream-Protocol-Version: channel.k8s.io
I0419 09:47:35.741907 17438 round_trippers.go:426] User-Agent: kubectl/v1.15.5 (linux/amd64) kubernetes/20c265f
I0419 09:47:35.794169 17438 round_trippers.go:441] Response Status: 101 Switching Protocols in 52 milliseconds
I0419 09:47:35.794224 17438 round_trippers.go:444] Response Headers:
I0419 09:47:35.794243 17438 round_trippers.go:447] Connection: Upgrade
I0419 09:47:35.794269 17438 round_trippers.go:447] Upgrade: SPDY/3.1
I0419 09:47:35.794288 17438 round_trippers.go:447] X-Stream-Protocol-Version: v4.channel.k8s.io
I0419 09:47:35.794303 17438 round_trippers.go:447] Date: Tue, 19 Apr 2022 01:47:35 GMT
Exec流程逻辑
通过下面的图我们再来了解一下kubectl exec 执行的流程逻辑,最终提供服务的是节点上启动的Stream Server,这个服务一般随着CRI shim启动而一起启动。
- 当在节点上执行kubectl exec时,这个请求首先是交给集群apiServer,apiServer则会调用kubelet的Exec API。
- kubelet收到请求之后,会调用CRI的Exec接口,负责响应这个接口的自然是对应的CRI shim,比如Docker shim。
- CRI shim并不会直接去调用后端的容器项目比如Docker来处理,而只是返回一个URL给kubelet,即CRI shim对应的Stream Server地址。
- kubelet拿到URL之后,通过Redirect方式返回给apiServer,apiServer则通过重定向来请求Stream Server的Exec,建立长连接。
后端功能代码
后端server既然提供了websocket服务,则需提供一个websocket连接管理器,管理器需要统计系统中维护的连接数量,根据条件延续连接的生命周期,自动关闭无任何数据交互的连接,统计连接占用的时长,读写大小数据等, 甚至可以提供Prometheus Metrics数据,实时掌控系统中各连接指标,进而作一些限制,这里简单起见,只提供一个简单功能的管理器,进行一些日志打印以及自动延续连接生命周期功能。
package cluster
import (
log "container-cms/logger"
"container-cms/server"
"github.com/gorilla/websocket"
"sync"
"time"
)
var webSocketManager *WebSocketManager
type WebSocketManager struct {
sync.Mutex
pool map[*websocket.Conn]time.Time
timer *time.Ticker
liveMinutes int
}
func init() {
webSocketManager = &WebSocketManager{}
webSocketManager.pool = make(map[*websocket.Conn]time.Time)
webSocketManager.timer = time.NewTicker(time.Second * 10)
liveMinutes := server.GetOptions().WebSocketLiveMinutes
if liveMinutes < 2 {
liveMinutes = 10
}
// 无任何输入 默认存活10分钟
webSocketManager.liveMinutes = liveMinutes
}
func (wm *WebSocketManager) Put(conn *websocket.Conn) {
if conn == nil {
return
}
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
wm.pool[conn] = time.Now().Add(time.Minute * time.Duration(wm.liveMinutes))
log.GetLogger().WithField("total", len(wm.pool)).Info("add one conn")
}
func (wm *WebSocketManager) Del(conn *websocket.Conn) {
if conn == nil {
return
}
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
if _, exist := wm.pool[conn]; exist {
delete(wm.pool, conn)
log.GetLogger().WithField("total", len(wm.pool)).Info("release one conn")
}
}
// 延续WebSocket连接生命周期
func (wm *WebSocketManager) Renew(conn *websocket.Conn) {
if conn == nil {
return
}
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
expireTime, ok := wm.pool[conn]
if ok {
expireTime = time.Now().Add(time.Minute * time.Duration(wm.liveMinutes))
wm.pool[conn] = expireTime
log.GetLogger().WithField("client", conn.RemoteAddr()).Infof("renew, expire time: %v", expireTime)
}
}
// 自动关闭过期的WebSocket连接
func (wm *WebSocketManager) DelIfExpire() {
wm.Lock()
defer wm.Unlock()
for k, v := range wm.pool {
if time.Now().Unix() > v.Unix() {
log.GetLogger().WithField("client", k.RemoteAddr().String()).Info("connect will be killed by websocket manager")
delete(wm.pool, k)
k.Close()
log.GetLogger().WithField("total", len(wm.pool)).Info("release one conn")
}
}
}
// 启动WebSocket管理器
func (wm *WebSocketManager) Run() {
for {
select {
case <-wm.timer.C:
wm.DelIfExpire()
}
}
}
func GetWebSocketManager() *WebSocketManager {
return webSocketManager
}
在main方法中增加代码 go cluster.GetWebSocketManager().Run()
启动WebSocket管理器。remotecommand包中的Stream在打开之前,需要给这个Stream设置Stream handler,StreamOptions结构:
// StreamOptions holds information pertaining to the current streaming session:
// input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue TerminalSizeQueue
}
// TerminalSizeQueue is capable of returning terminal resize events as they occur.
type TerminalSizeQueue interface {
// Next returns the new terminal size after the terminal has been resized. It returns nil when
// monitoring has been stopped.
Next() *TerminalSize
}
StreamHandler
因此,我们可以编写一个StreamHandler,实现io.Reader、io.Writer、TerminalSizeQueue接口,然后Stream在打开之后则会调用我们自定义的handler进行数据读写处理。
type WebStreamHandler struct {
// web前端到后端的WebSocket连接
conn *websocket.Conn
sync.Mutex
// 条件通知,类似Java中的lock.Condition,必须持有锁之后才能调用, 不使用条件等待通知机制,容易空耗cpu,类似for{}
cond *sync.Cond
buffer []byte
resizeEvent chan remotecommand.TerminalSize
// 用户是否取消了WebSocket
cancel chan struct{}
}
func NewWebStreamHandler(conn *websocket.Conn) *WebStreamHandler {
handler := &WebStreamHandler{
conn: conn,
resizeEvent: make(chan remotecommand.TerminalSize),
cancel: make(chan struct{}, 1),
}
return handler
}
func (wsh *WebStreamHandler) InitCond() {
wsh.cond = sync.NewCond(wsh)
}
func (wsh *WebStreamHandler) Resize(ts remotecommand.TerminalSize) {
wsh.resizeEvent <- ts
}
// 必须signal,否则还在等待条件的goroutine可能产生泄露
func (wsh *WebStreamHandler) Cancel() {
wsh.cancel <- struct{}{}
wsh.Lock()
defer wsh.Unlock()
wsh.cond.Signal()
}
func (wsh *WebStreamHandler) IsCanceled() bool {
select {
case <-wsh.cancel:
return true
default:
return false
}
}
// main线程写入数据了,通知goroutine中的Stream进行读取
func (wsh *WebStreamHandler) WriteBuffer(data []byte) {
wsh.Lock()
defer wsh.Unlock()
wsh.buffer = append(wsh.buffer, data...)
wsh.cond.Signal()
}
// stream 读取数据方法,必须实现的io.Reader
func (wsh *WebStreamHandler) Read(b []byte) (size int, err error) {
wsh.Lock()
defer wsh.Unlock()
// 如果用了if,在多线程环境下,可能出现假唤醒
for len(wsh.buffer) == 0 {
if wsh.IsCanceled() {
log.GetLogger().WithField("clientKey", wsh.conn.RemoteAddr()).Info("connect cancel")
return 0, errors.New("connect has been canceled")
}
log.GetLogger().WithField("clientKey", wsh.conn.RemoteAddr()).Info("no data, waiting condition")
wsh.cond.Wait()
log.GetLogger().WithField("clientKey", wsh.conn.RemoteAddr()).Info("condition signal")
}
size = copy(b, wsh.buffer)
wsh.buffer = wsh.buffer[size:]
return
}
// stream 输出数据方法,必须实现的io.Writer
func (wsh *WebStreamHandler) Write(b []byte) (size int, err error) {
if wsh.IsCanceled() {
log.GetLogger().WithField("clientKey", wsh.conn.RemoteAddr()).Info("connect cancel")
return 0, errors.New("connect has been canceled")
}
size = len(b)
log.GetLogger().WithField("data", string(b)).Info("stream handler write data")
err = wsh.conn.WriteMessage(websocket.TextMessage, b)
if err != nil {
log.GetLogger().WithField("err", err).Info("stream data write to websocket front error")
}
return
}
// stream改变终端大小的方法,必须实现
func (wsh *WebStreamHandler) Next() (terminalSize *remotecommand.TerminalSize) {
e := <-wsh.resizeEvent
terminalSize = &e
return
}
HttpHandler
我们自定义的WebStreamHandler已经定义完成,接下来则是提供http handler接收前端的WebSocket请求。
type ExecParam struct {
Namespace string `json:"namespace" form:"namespace"`
ClusterId string `json:"clusterId" form:"clusterId"`
AccountId string `json:"accountId" form:"accountId"`
Region string `json:"region" form:"region"`
PodName string `json:"podName" form:"podName"`
ContainerName string `json:"containerName" form:"containerName"`
}
type XtermMessage struct {
MsgType string `json:"type"` // 类型:resize客户端调整终端, input客户端输入
Input string `json:"input"` // msgtype=input情况下使用
Rows uint16 `json:"rows"` // msgtype=resize情况下使用
Cols uint16 `json:"cols"` // msgtype=resize情况下使用
}
http handler中的一些权限认证前面已经提到,由于是演示,这里我们直接忽略。
func (api *CmsApi) PodExec(c *gin.Context) {
var param common.ExecParam
log := logger.GetLoggerFromContext(c)
err := c.ShouldBindQuery(¶m)
if err != nil {
log.WithField("err", err).Error("PodExec parse param error")
c.JSON(http.StatusBadRequest, common.BadRequestErr)
return
}
if len(param.Region) == 0 || len(param.AccountId) == 0 || len(param.ClusterId) == 0 {
log.Error("region、accountId、clusterId required")
c.JSON(http.StatusBadRequest, common.BadRequestErr)
return
}
if len(param.Namespace) == 0 || len(param.PodName) == 0 {
log.Error("namespace、podName required")
c.JSON(http.StatusBadRequest, common.BadRequestErr)
return
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 直接升级为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.WithField("err", err).Error("upgrade websocket error")
return
}
cluster.GetWebSocketManager().Put(ws)
websocketKey := c.GetHeader("Sec-WebSocket-Key")
clientKey := websocketKey + "=>" + ws.RemoteAddr().String()
log.WithField("client", clientKey).Info("websocket connected")
defer func(ws *websocket.Conn) {
cluster.GetWebSocketManager().Del(ws)
log.WithField("client", clientKey).Info("websocket closing in api...")
ws.Close()
}(ws)
executor, err := cluster.GetPodExecReqExecutor(¶m)
if err != nil {
log.WithField("err", err).Error("get exec executor error")
c.JSON(http.StatusInternalServerError, common.Err(err.Error()))
return
}
streamHandler := cluster.NewWebStreamHandler(ws)
streamHandler.InitCond()
go cluster.OpenStream(ws, streamHandler, executor)
var lastInputTime = time.Now()
for {
_, p, err := ws.ReadMessage()
if err != nil {
log.WithField("err", err).Error("read front message error")
// 用户关闭页面时,主动向k8s容器发送exit命令,结束shell进程,然后等待2秒。
sph.WriteData([]byte("exit\r\n"))
time.Sleep(time.Duration(2) * time.Second)
// must signal goroutine
streamHandler.Cancel()
break
}
log.WithField("xtermMessage", string(p)).Info("receive xterm message")
var xtermMsg common.XtermMessage
if err := json.Unmarshal(p, &xtermMsg); err != nil {
log.WithField("err", err).Error("parse xterm message error")
continue
}
switch xtermMsg.MsgType {
case "input":
// 控制调用WebSocket管理器的频率
if time.Now().Unix() > lastInputTime.Add(time.Duration(1)*time.Minute).Unix() {
cluster.GetWebSocketManager().Renew(ws)
lastInputTime = time.Now()
}
streamHandler.WriteBuffer([]byte(xtermMsg.Input))
case "resize":
ts := remotecommand.TerminalSize{
Width: xtermMsg.Cols,
Height: xtermMsg.Rows,
}
streamHandler.Resize(ts)
default:
log.Info("unsupported xterm message")
}
}
log.WithField("client", clientKey).Info("websocket request end")
}
Stream
GetPodExecReqExecutor
方法则是与指定集群建立SPDY长连接。
func GetPodExecReqExecutor(param *common.ExecParam) (remotecommand.Executor, error) {
// 获取指定集群的kubeConfig,这个方法自行实现
kubeConfig, err := GetKubeConfig(param.ClusterId, param.Region, param.AccountId)
if err != nil {
return nil, err
}
restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeConfig)
if err != nil {
log.GetLogger().WithField("err", err).Error("init rest config error")
return nil, err
}
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.GetLogger().WithField("err", err).Error("new k8s client error")
return nil, err
}
opts := &v1.PodExecOptions{
Command: []string{"sh"},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
}
if len(param.ContainerName) > 0 {
opts.Container = strings.TrimSpace(param.ContainerName)
}
req := client.CoreV1().RESTClient().Post().
Resource("pods").
Name(param.PodName).
Namespace(param.Namespace).
SubResource("exec").
VersionedParams(opts, scheme.ParameterCodec)
executor, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
if err != nil {
log.GetLogger().WithField("err", err).Error("failed to new executor")
}
return executor, err
}
Executor
获取之后,在连接上打开一个读写数据的Stream,同时设定好响应的Stream Handler,executor.Stream
方法是阻塞的,所以把这个方法放在一个独立的goroutine中执行,
Stream打开之后,Stream中的Stdin、Stdout、Stderr处理器返回任何error,则Stream方法会立刻返回。
func OpenStream(conn *websocket.Conn, streamHandler *WebStreamHandler, executor remotecommand.Executor) {
defer func(conn *websocket.Conn) {
GetWebSocketManager().Del(conn)
log.GetLogger().WithField("client", conn.RemoteAddr()).Info("websocket in open stream goroutine quit...")
conn.Close()
}(conn)
opts := remotecommand.StreamOptions{
Stdin: streamHandler,
Stdout: streamHandler,
Stderr: streamHandler,
TerminalSizeQueue: streamHandler,
Tty: true,
}
log.GetLogger().WithField("clientKey", conn.RemoteAddr()).Info("opening stream")
err := executor.Stream(opts)
if err != nil {
log.GetLogger().WithField("err", err).Error("failed to open stream to pod")
return
}
log.GetLogger().Info("open stream goroutine over")
}
最后提供好http handler 即可。
router := gin.New()
auth := router.Group("/auth", middleware.LoginCheck(), middleware.PermissionCheck())
auth.GET("/pod/exec", api.PodExec)
前端代码交互
前端主要采用了xterm.js组件,当前是4.0版本,官方推荐采用ES6 module语法,这就需要WebPack来构建打包前端js模块。
除了xterm组件本身,还需下载其依赖的几个模块,如webLink
、winptyCompat
、fit
,各依赖的具体作用请github自行搜索,效果图如下。
npm -v
npm install xterm
npm install xterm-addon-fit
由于本项目未引入WebPack,我直接找了一个3.0版本的xterm.js以及相关依赖,直接引入项目即可用,相关的前端html/js/css请自行实现,这里仅供一个删减版参考。
HTML
<html>
<head>
<title>demo</title>
<link href="/static/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<link href="https://use.fontawesome.com/releases/v5.6.3/css/all.css" rel="stylesheet">
<link href="/static/css/custom.css" rel="stylesheet">
<link href="/static/css/xterm.css" rel="stylesheet">
<script src="/static/js/jquery-2.1.4.min.js"></script>
<script src="/static/js/xterm/xterm.js"></script>
<script src="/static/js/xterm/addon/fit/fit.js"></script>
<script src="/static/js/xterm/addon/webLinks/webLinks.js"></script>
<script src="/static/js/xterm/addon/winptyCompat/winptyCompat.js"></script>
</head>
<body>
<div class="container">
<div class="custom-exec-page" id="execTerminalDiv">
<div class="terminal-title">
<span class="terminal-span-key">namespace</span>
<input id="execTerminalNamespace" class="terminal-span-value" type="text" value="" readonly>
<span class="terminal-span-key">pod</span>
<input id="execTerminalPod" class="terminal-span-value" type="text" value="" readonly>
<span class="terminal-span-key">container</span>
<span>
<select id="execTerminalContainerSelect" class="terminal-span-value">
</select>
</span>
<span>
<button type="button" class="btn btn-sm" id="execTerminalConfirmBtn">确定</button>
</span>
<span>
<button type="button" class="btn btn-sm" id="execTerminalCloseBtn">关闭</button>
</span>
</div>
<div class="full-view" id="execTerminal"></div>
</div>
</div>
</body>
</html>
部分样式
.terminal-span-key {
font-weight: 800;
}
.terminal-span-value {
max-width: 140px;
overflow: hidden;
}
.custom-exec-page {
width:100%;
height: 100%;
position:fixed;
top:0;
left:0;
right:0;
margin:auto;
z-index:9999;
overflow-x: hidden;
display: none;
}
.full-view {
width:100%;
height: 100%;
}
JS模块
let execConn;
$(function(){
Terminal.applyAddon(fit);
Terminal.applyAddon(webLinks);
Terminal.applyAddon(winptyCompat);
$("#execTerminalConfirmBtn").click(function(){
let ns = $("#execTerminalNamespace").val();
let podName = $("#execTerminalPod").val();
let containerName = $("#execTerminalContainerSelect").val();
if (containerName == null) {
containerName = "";
}
if (ns === '' || podName === '') {
alert("namespace podName containerName can't be empty");
return false;
}
$(this).attr("disabled", true);
let param = {};
fillCommonParam(param);
param.namespace = ns;
param.podName = podName;
if (containerName !== '') {
param.containerName = containerName;
}
let qs = getReqQueryParam(param);
let websocketUrl = 'ws://localhost:8080';
if (websocketUrl === '') {
alert("websocket domain lost");
return false;
}
let term = initExecTerminal();
execConn = new WebSocket(websocketUrl + "/auth/pod/exec?" + qs);
execConn.onopen=function(evt) {
console.log("websocket onopen", evt);
term.writeln('Welcome, websocket connected successfully');
}
execConn.onclose = function(evt) {
console.log("websocket close", evt);
term.write('websocket has been closed')
}
execConn.onmessage = function(evt) {
term.write(evt.data);
}
execConn.onerror = function(evt) {
console.log("websocket connect error", evt);
term.write("websocket connect error")
}
window.addEventListener("resize", function () {
term.fit()
let msg = {type: "resize", rows: term.rows, cols: term.cols}
execConn.send(JSON.stringify(msg))
});
term.on('data', function(input) {
let msg = {type: "input", input: input}
execConn.send(JSON.stringify(msg))
});
});
$("#execTerminalCloseBtn").click(function(){
if (term) {
term.dispose();
}
resetExecTerminalPage();
});
});
function initExecTerminal() {
term = new Terminal({
cursorStyle: "block",
scrollback: 100,
rendererType: "canvas",
cursorBlink: true
});
term.open(document.getElementById("execTerminal"));
term.fit();
term.winptyCompatInit();
term.webLinksInit();
term.focus();
return term;
}
function fillCommonParam(param) {
let accountId = 'xxxx';
let region = 'xxxx';
let clusterId = 'xxxx';
param.accountId = accountId;
param.clusterId = clusterId;
param.region = region;
}
function getReqQueryParam(param) {
let qs = "";
for (let key in param) {
if (qs.length > 0) {
qs = qs + "&" + key + "=" + param[key];
continue;
}
qs = key + "=" + param[key];
}
return qs;
}
function resetExecTerminalPage() {
$("#execTerminalDiv").hide();
if (execConn && execConn.readyState === WebSocket.OPEN) {
execConn.close();
}
$("#execTerminalNamespace").val("");
$("#execTerminalPod").val("");
$("#execTerminalContainerSelect option").remove();
$("#execTerminalConfirmBtn").attr("disabled", false);
}