K8S-利用Exec Websocket接口实现Pod间的文件拷贝,


 需求

想想咱们遇到以下问题一般怎么解决?

新建了一个Pod, 想把另外一个Pod中的文件拷贝到新Pod中进行分析, 怎么实现呢?

如何在项目中, 如何像kubectl cp拷贝文件一样, 实现Pod间文件拷贝呢?

新Pod与实例Pod共享pvc? 或者封装一个带认证上下文的kubectl执行命令行?

简介

本文通过K8S的exec websocket接口结合tar将文件树压缩为数据流以及解压还原, 实现两个Pod间拷贝文件的功能.

关于exec接口请参考:https://www.cnblogs.com/a00ium/p/10905279.html

请看图

流程说明

  • 首先初始化信号通道, 用于协程间的信号通知, 收到信号的协程执行暂停/退出循环/关闭通道等操作
  • 初始化数据通道srcStdOutCh, 类型为字节数组[]byte, 用于将源Pod的标准输出放入通道, 发送给目的Pod标准输入的数据就是从该数据通道中读取
  • 拼接exec接口的访问地址(集群连接,token), tar压缩命令, 标准输入/输出,tty, pod名,容器名等参数. tar czf - /var/log/xxx.log 表示将该文件树结构压缩为数据流
  • 调用websocket的Dialer方法与源Pod容器建立websocket连接, 并开启协程将标准输出写入数据通道srcStdOutCh
  • 参考源pod exec接口, 拼接目的Pod exec访问连接, tar xzf - -C /tmp表示从标准输入读取数据流, 并解压成文件树结构(注意:解压后包含文件目录树结构)
  • 与目的Pod建立wss连接, 开启协程从数据通道srcStdOutCh中读取源Pod标准输出, 并写入目的Pod的标准输入, 如果从数据通道读取超时,则表示数据已经传输完毕, 此时停止向目的容器输入数据, 并发送通知信号, 通知主协程可以退出,关闭源Pod的wss连接

注意事项

  • wesocket连上源Pod时, 标准输出中会输出空数据, tar命令输出等干扰数据, 所以接收数据的时候需要传入一个过滤器回调函数, 用于数据过滤
  • 向目的容器发送数据时, 需要将源容器收到的第一个字节删除, 一般为1, 表示标准输出标识, 发送给目的容器是不需要该字节的
  • 发送数据时, 需要设置第一个字节为0, 表示发送到标准输入

参考代码

cp.go

  1. /* 
  2. 总结: 1.不带缓冲的通道需要先读后写 2.websocket ReadMessage方法是阻塞读取的, 如果要中断读取, 关闭连接, 捕获错误即可 
  3. */ 
  4. package cpFilePod2Pod 
  5.  
  6. import ( 
  7.   "crypto/tls" 
  8.   "errors" 
  9.   "fmt" 
  10.   "log" 
  11.   "net/url" 
  12.   "regexp" 
  13.   "strings" 
  14.   "sync" 
  15.   "time" 
  16.  
  17.   "github.com/gorilla/websocket" 
  18.  
  19. // 定义过滤器回调函数 
  20. type filterCallback func(input string) bool 
  21.  
  22. // 带有互斥锁的Websocket连接对象 
  23. type WsConn struct { 
  24.   Conn *websocket.Conn 
  25.   mu   sync.Mutex 
  26.  
  27. // 发送字符串, 自动添加换行符 
  28. func (self *WsConn) Send(sender string, str string) { 
  29.   self.mu.Lock() 
  30.   defer self.mu.Unlock() 
  31.   // 利用k8s exec websocket接口发送数据时, 第一个字节需要设置为0, 表示将数据发送到标准输入 
  32.   data := []byte{0} 
  33.   data = append(data, []byte(str+"\n")...) 
  34.   err := self.Conn.WriteMessage(websocket.BinaryMessage, data) //发送二进制数据类型 
  35.   if err != nil { 
  36.     log.Printf("发送错误, %s", err.Error()) 
  37.   } 
  38.   log.Printf("%s, 数据:%s, 字节:%+v", sender, str, []byte(str+"\n")) 
  39.  
  40. //发送字符串, 不添加换行符, 内部做字节过滤,等操作 
  41. func (self *WsConn) SendWithFilter(sender string, str string) { 
  42.   self.mu.Lock() 
  43.   defer self.mu.Unlock() 
  44.   // log.Printf("向目的容器发送数据:%s", str) 
  45.   str = strings.ReplaceAll(str, "\r\n", "\n") // /r=13, /n=10, windows换行符转Linux换行符 
  46.   //去掉第一个字节(标准输出1, byte:[0 1 ...]), 因为从源容器输出的字节中, 第一位标识了标准输出1, 给目的容器发送字节时, 需要去除该标志 
  47.   //当WebSocket建立连接后,发送数据时需要在字节Buffer第一个字节设置为stdin(buf[0] = 0),而接受数据时, 需要判断第一个字节, stdout(buf[0] = 1)或stderr(buf[0] = 2) 
  48.   strByte := append([]byte(str)[:0], []byte(str)[1:]...) 
  49.   data := []byte{0} 
  50.   data = append(data, strByte...) 
  51.   err := self.Conn.WriteMessage(websocket.BinaryMessage, data) 
  52.   log.Printf("向目的容器标准输入发送数据:\n%s, 字节数:%d, 字节:%+v", string(data), len(data), data) 
  53.   if err != nil { 
  54.     log.Printf("发送错误, %s", err.Error()) 
  55.   } 
  56.  
  57. //从连接中获取数据流, 并写入字节数组通道中, 内部执行过滤器(回调函数) 
  58. func (self *WsConn) Receive(receiver string, ch chan []byte, filter filterCallback) error { 
  59.   self.mu.Lock() 
  60.   defer self.mu.Unlock() 
  61.   msgType, msgByte, err := self.Conn.ReadMessage() //阻塞读取, 类型为2表示二进制数据, 1表示文本, -1表示连接已关闭:websocket: close 1000 (normal) 
  62.   log.Printf("%s, 读取到数据:%s, 类型:%d, 字节数:%d, 字节:%+v", receiver, string(msgByte), msgType, len(msgByte), msgByte) 
  63.   if err != nil { 
  64.     log.Printf("%s, 读取出错, %s", receiver, err.Error()) 
  65.     return err 
  66.   } 
  67.   if filter(string(msgByte)) && len(msgByte) > 1 { 
  68.     ch <- msgByte 
  69.   } else { 
  70.     log.Printf("%s, 数据不满足, 直接丢弃数据, 字符:%s, 字节数:%d, 字节:%v", receiver, string(msgByte), len(msgByte), msgByte) 
  71.   } 
  72.   return nil 
  73.  
  74. func NewWsConn(host string, path string, params map[string]string, headers map[string][]string) (*websocket.Conn, error) { 
  75.   paramArray := []string{} 
  76.   for k, v := range params { 
  77.     paramArray = append(paramArray, fmt.Sprintf("%s=%s", k, v)) 
  78.   } 
  79.   u := url.URL{Scheme: "wss", Host: host, Path: path, RawQuery: strings.Join(paramArray, "&")} 
  80.   log.Printf("API:%s", u.String()) 
  81.   dialer := websocket.Dialer{TLSClientConfig: &tls.Config{RootCAs: nil, InsecureSkipVerify: true}} 
  82.   conn, _, err := dialer.Dial(u.String(), headers) 
  83.   if err != nil { 
  84.     return nil, errors.New(fmt.Sprintf("连接错误:%s", err.Error())) 
  85.   } 
  86.   return conn, nil 
  87.  
  88. //核心: tar -cf - 将具有文件夹结构的数据转换成数据流, 通过 tar -xf - 将数据流转换成 linux 文件系统 
  89. func CpPod2Pod() { 
  90.   //通知主函数可以退出的信号通道 
  91.   signalExit := make(chan bool, 1) 
  92.   defer close(signalExit) 
  93.  
  94.   //下发不要给目的容器发送数据的信号 
  95.   signalStopDstSend := make(chan bool, 1) 
  96.   defer close(signalStopDstSend) 
  97.  
  98.   //下发不要从源容器读取数据的信号 
  99.   signalStopSrcRead := make(chan bool, 1) 
  100.   defer close(signalStopSrcRead) 
  101.  
  102.   //下发不要从目的容器读取数据的信号 
  103.   signalStopDstRead := make(chan bool, 1) 
  104.   defer close(signalStopDstRead) 
  105.  
  106.   //下发不要打印目的容器的输出数据 
  107.   signalStopPrintDstStdout := make(chan bool, 1) 
  108.   defer close(signalStopPrintDstStdout) 
  109.  
  110.   //连接pod 
  111.   host := "172.16.xxx.xxx:6443" 
  112.   token := "xxx" 
  113.   headers := map[string][]string{"authorization": {fmt.Sprintf("Bearer %s", token)}} 
  114.  
  115.   pathSrc := "/api/v1/namespaces/xxx/pods/xxx/exec" 
  116.   commandSrc := "tar&command=czf&command=-&command=/var/log/mysql/slow.log" //tar czf - sourceFile 
  117.   paraSrc := map[string]string{"stdout": "1", "stdin": "0", "stderr": "1", "tty": "0", "container": "xxx", "command": commandSrc} 
  118.   srcConn, err := NewWsConn(host, pathSrc, paraSrc, headers) 
  119.   if err != nil { 
  120.     log.Printf("源Pod连接出错, %s", err.Error()) 
  121.   } 
  122.  
  123.   pathDst := "/api/v1/namespaces/xxx/pods/xxx/exec" 
  124.   commandDst := "tar&command=xzf&command=-&command=-C&command=/tmp" // tar xzf - -C /tmp 
  125.   // paraDst := map[string]string{"stdout": "1", "stdin": "1", "stderr": "1", "tty": "0", "container": "xxx", "command": commandDst} 
  126.   paraDst := map[string]string{"stdout": "0", "stdin": "1", "stderr": "0", "tty": "0", "container": "xxx", "command": commandDst} //关闭目的Pod标准输出和错误输出 
  127.   dstConn, err := NewWsConn(host, pathDst, paraDst, headers) 
  128.   if err != nil { 
  129.     log.Printf("目的Pod连接出错, %s", err.Error()) 
  130.   } 
  131.  
  132.   wsSrc := WsConn{ 
  133.     Conn: srcConn, 
  134.   } 
  135.  
  136.   wsDst := WsConn{ 
  137.     Conn: dstConn, 
  138.   } 
  139.  
  140.   defer srcConn.Close() 
  141.   defer dstConn.Close() 
  142.  
  143.   srcStdOutCh := make(chan []byte, 2048) 
  144.   dstStdOutCh := make(chan []byte) 
  145.   defer close(srcStdOutCh) 
  146.   defer close(dstStdOutCh) 
  147.  
  148.   // 接收源容器标准输出到数据通道中 
  149.   go func() { 
  150.     i := 1 
  151.     for { 
  152.       log.Printf("第%d次, 从源容器读取标准输出", i) 
  153.       i++ 
  154.       //定义匿名过滤器回调方法, 对源容器标准输出中不需要的数据进行过滤 
  155.       err := wsSrc.Receive("源容器", srcStdOutCh, func(input string) bool { 
  156.         if input == "cat /var/log/mysql/slow.log" { 
  157.           return false 
  158.           // } else if match, _ := regexp.MatchString("root@(.+)#", input); match { 
  159.           //   return false 
  160.           // } else if match, _ := regexp.MatchString("cat /(.+).log", input); match { 
  161.           //   return false 
  162.           // } else if match, _ := regexp.MatchString("cat /tmp/(.+)", input); match { 
  163.           //   return false 
  164.         } else if match, _ := regexp.MatchString("tar: Removing leading(.+)", input); match { 
  165.           return false 
  166.         } else if len(input) == 0 { //过滤空消息 
  167.           // log.Printf("读取到标准错误输出") 
  168.           return false 
  169.         } 
  170.         return true 
  171.       }) 
  172.       if err != nil { 
  173.         log.Printf("读取源容器标准输出失败") 
  174.         // signalExit <- true 
  175.         break 
  176.       } 
  177.       // time.Sleep(time.Microsecond * 100) 
  178.     } 
  179.   }() 
  180.  
  181.   /* 注意, 这里不能开启并发协程去读取目的容器的标准输出, 如果开启可能会与发送数据的协程抢锁, 从而阻塞向目的容器发送数据*/ 
  182.   // // 从目的容器获取标准输出到数据通道中 
  183.   // go func() { 
  184.   //   // i := 0 
  185.   //   for { 
  186.   //     // 该过滤器直接返回true, 仅占位 
  187.   //     err := wsDst.Receive("目的容器", dstStdOutCh, func(input string) bool { 
  188.   //       return true 
  189.   //     }) 
  190.   //     if err != nil { 
  191.   //       log.Printf("从目的容器读取数据失败") 
  192.   //       break 
  193.   //     } 
  194.   //     // wsDst.Send() 
  195.   //     time.Sleep(time.Microsecond * 100000) 
  196.   //   } 
  197.   //   // log.Printf("从目的容器读取数据, 第%d次循环", i) 
  198.   //   // i++ 
  199.   // }() 
  200.  
  201.   // //从数据通道中读取, 目的容器的标准输出, 并打印 
  202.   // go func() { 
  203.   // BreakPrintDstPodStdout: 
  204.   //   for { 
  205.   //     select { 
  206.   //     case data := <-dstStdOutCh: 
  207.   //       log.Printf("目的容器标准输出:%s", string(data)) 
  208.   //       // time.Sleep(time.Microsecond * 200) 
  209.   //     case <-signalStopPrintDstStdout: 
  210.   //       log.Printf("收到信号, 停止打印目的容器标准输出") 
  211.   //       // close(dataOutput) 
  212.   //       // close(dataCh) 
  213.   //       // signalStopRead <- true 
  214.   //       // log.Printf("发送停止读信号") 
  215.   //       // close(dataOutput) 
  216.   //       // close(dataCh) 
  217.   //       break BreakPrintDstPodStdout 
  218.   //     } 
  219.   //     // time.Sleep(time.Microsecond * 100) 
  220.   //   } 
  221.   // }() 
  222.  
  223.   //从源容器标准输出的数据通道获取数据, 然后发送给目的容器标准输入 
  224.   //定义超时时间 
  225.   timeOutSecond := 3 
  226.   timer := time.NewTimer(time.Second * time.Duration(timeOutSecond)) 
  227. Break2Main: 
  228.   for { 
  229.     select { 
  230.     case data := <-srcStdOutCh: 
  231.       wsDst.SendWithFilter("向目的容器发送", string(data)) 
  232.       // time.Sleep(time.Millisecond * 200) 
  233.       timer.Reset(time.Second * time.Duration(timeOutSecond)) 
  234.     case <-timer.C: 
  235.       // time.Sleep(time.Second * 5) 
  236.       log.Printf("================ 源容器标准输出,没有新的数据,获取超时,停止向目的容器发送数据 ================") 
  237.       // log.Printf("发送信号:停止打印目的容器标准输出") 
  238.       // signalStopPrintDstStdout <- true 
  239.       log.Printf("发送信号:停止从源容器读取数据") 
  240.       wsSrc.Conn.Close() 
  241.       // log.Printf("发送信号:停止从目的容器读取数据") 
  242.       // wsDst.Conn.Close() 
  243.       log.Printf("发送信号:主函数可以退出了") 
  244.       signalExit <- true 
  245.       log.Printf("所有信号发送完毕") 
  246.       log.Printf("================== 跳出循环 =================") 
  247.       break Break2Main 
  248.     } 
  249.     // time.Sleep(time.Microsecond * 1000) 
  250.   } 
  251.  
  252.   // signalStopRead <- true 
  253.   <-signalExit //阻塞通道, 直到收到一个信号 
  254.   // signalStopRead <- true 
  255.   log.Printf("主函数收到信号, 准备退出") 
  256.   // close(dataCh) 
  257.   // time.Sleep(time.Second) 
  258.   // close(dataOutput) 
  259.   // time.Sleep(time.Second) 
  260.   // select {} 

cp_test.go

  1. package cpFilePod2Pod 
  2.  
  3. import ( 
  4.   "log" 
  5.   "testing" 
  6.  
  7. // go test -race -test.run TestCpPod2Pod  切到该目录执行该测试 
  8. func TestCpPod2Pod(t *testing.T) { 
  9.   log.Printf("开始测试") 
  10.   CpPod2Pod() 

  1. 参考结果: 
  2. 源容器: 
  3. root@xxx-mysql-0:/var/log/mysql# md5sum slow.log 
  4. 16577613b6ea957ecb5d9d5e976d9c50  slow.log 
  5. 目的容器: 
  6. root@xxx-75bdcdb8cf-hq9wf:/tmp/var/log/mysql# md5sum slow.log 
  7. 16577613b6ea957ecb5d9d5e976d9c50  slow.log 

参考文档

Kubernetes exec API串接分析:https://www.cnblogs.com/a00ium/p/10905279.html

kubernetes-client-go-实现-kubectl-copy:https://ica10888.com/2019/08/31/kubernetes-client-go-%E5%AE%9E%E7%8E%B0-kubectl-copy.html

相关内容