介绍

// Pipe creates a synchronous in-memory pipe. // It can be used to connect code expecting an io.Reader // with code expecting an io.Writer. // // Reads and Writes on the pipe are matched one to one // except when multiple Reads are needed to consume a single Write. // That is, each Write to the PipeWriter blocks until it has satisfied // one or more Reads from the PipeReader that fully consume // the written data. // The data is copied directly from the Write to the corresponding // Read (or Reads); there is no internal buffering. // // It is safe to call Read and Write in parallel with each other or with Close. // Parallel calls to Read and parallel calls to Write are also safe: // the individual calls will be gated sequentially.

源码位置github

使用例子

下面代码原链接为 https://luckymrwang.github.io/2020/12/17/Examples-For-Using-io-Pipe-in-Go/ 定义

pr, pw := io.Pipe()

Example 1: JSON to HTTP Request

pr, pw := io.Pipe()

go func() {
    // close the writer, so the reader knows there's no more data
    defer pw.Close()

    // write json data to the PipeReader through the PipeWriter
    if err := json.NewEncoder(pw).Encode(&PayLoad{Content: "Hello Pipe!"}); err != nil {
        log.Fatal(err)
    }
}()

// JSON from the PipeWriter lands in the PipeReader
// ...and we send it off...
if _, err := http.Post("http://example.com", "application/json", pr); err != nil {
    log.Fatal(err)
}

Example 2: Split up Data with TeeReader

pr, pw := io.Pipe()

// we need to wait for everything to be done
wg := sync.WaitGroup{}
wg.Add(2)

// we get some file as input
f, err := os.Open("./fruit.txt")
if err != nil {
    log.Fatal(err)
}

// TeeReader gets the data from the file and also writes it to the PipeWriter
tr := io.TeeReader(f, pw) 

go func() {
    defer wg.Done()
    defer pw.Close()

    // get data from the TeeReader, which feeds the PipeReader through the PipeWriter
    _, err := http.Post("https://example.com", "text/html", tr)
    if err != nil {
        log.Fatal(err)
    }
}()

go func() {
    defer wg.Done()
    // read from the PipeReader to stdout
    if _, err := io.Copy(os.Stdout, pr); err != nil {
        log.Fatal(err)
    }
}()

wg.Wait()

Example 3: Piping the output of Shell commands

pr, pw := io.Pipe()
defer pw.Close()

// tell the command to write to our pipe
cmd := exec.Command("cat", "fruit.txt")
cmd.Stdout = pw

go func() {
    defer pr.Close()
    // copy the data written to the PipeReader via the cmd to stdout
    if _, err := io.Copy(os.Stdout, pr); err != nil {
        log.Fatal(err)
    }
}()

// run the command, which writes all output to the PipeWriter
// which then ends up in the PipeReader
if err := cmd.Run(); err != nil {
    log.Fatal(err)
}

Example 4: Piping the output of Http response

package main

import (
	"io"
	"net/http"
	"os/exec"
)

var (
	BUF_LEN = 1024
)

func handler(w http.ResponseWriter, r *http.Request) {
	cmd := exec.Command("./build.sh")
	pipeReader, pipeWriter := io.Pipe()
	cmd.Stdout = pipeWriter
	cmd.Stderr = pipeWriter
	go writeCmdOutput(w, pipeReader)
	cmd.Run()
	pipeWriter.Close()
}

func writeCmdOutput(res http.ResponseWriter, pipeReader *io.PipeReader) {
	buffer := make([]byte, BUF_LEN)
	for {
		n, err := pipeReader.Read(buffer)
		if err != nil {
			pipeReader.Close()
			break
		}

		data := buffer[0:n]
		res.Write(data)
		if f, ok := res.(http.Flusher); ok {
			f.Flush()
		}
		//reset buffer
		for i := 0; i < n; i++ {
			buffer[i] = 0
		}
	}
}

func main() {
	http.HandleFunc("/", handler)
	http.ListenAndServe(":8080", nil)
}

Example 4+: Piping the output of Http response

package main

import (
	"io"
	"net/http"
	"os/exec"
)

func handler(w http.ResponseWriter, r *http.Request) {
	cmd := exec.Command("ls")
	pipeReader, pipeWriter := io.Pipe()
	cmd.Stdout = pipeWriter
	cmd.Stderr = pipeWriter
	go io.Copy(w, pipeReader)
	cmd.Run()
	pipeWriter.Close()
}

func main() {
	http.HandleFunc("/", handler)
	http.ListenAndServe(":8080", nil)
}

参考