Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

net/http: pool transport gzip readers #61390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
AlexanderYastrebov wants to merge 2 commits into golang:master
base: master
Choose a base branch
Loading
from AlexanderYastrebov:go61353-2
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions src/net/http/serve_test.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"compress/gzip"
"compress/zlib"
"context"
crand "crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand Down Expand Up @@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode)
func BenchmarkServer(b *testing.B) {
b.ReportAllocs()
// Child process mode;
if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" {
n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N"))
if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) {

cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
cmd.Env = append([]string{
fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N),
fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL),
fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
}, os.Environ()...)
out, err := cmd.CombinedOutput()
if err != nil {
Expand All @@ -5338,39 +5339,63 @@ func getNoBody(urlStr string) (*Response, error) {
// A benchmark for profiling the client without the HTTP server code.
// The server code runs in a subprocess.
func BenchmarkClient(b *testing.B) {
var data = []byte("Hello world.\n")

url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write(data)
}))

// Do b.N requests to the server.
b.StartTimer()
for i := 0; i < b.N; i++ {
res, err := Get(url)
if err != nil {
b.Fatalf("Get: %v", err)
}
body, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
b.Fatalf("ReadAll: %v", err)
}
if !bytes.Equal(body, data) {
b.Fatalf("Got body: %q", body)
}
}
b.StopTimer()
}

func startClientBenchmarkServer(b *testing.B, handler Handler) string {
b.ReportAllocs()
b.StopTimer()
defer afterTest(b)

var data = []byte("Hello world.\n")
if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
// Server process mode.
port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
if port == "" {
port = "0"
}
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
log.Fatal(err)
}
fmt.Println(ln.Addr().String())

HandleFunc("/", func(w ResponseWriter, r *Request) {
r.ParseForm()
if r.Form.Get("stop") != "" {
os.Exit(0)
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write(data)
handler.ServeHTTP(w, r)
})
var srv Server
log.Fatal(srv.Serve(ln))
}

// Start server process.
ctx, cancel := context.WithCancel(context.Background())
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$")
cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes")
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
Expand All @@ -5385,10 +5410,6 @@ func BenchmarkClient(b *testing.B) {
done <- cmd.Wait()
close(done)
}()
defer func() {
cancel()
<-done
}()

// Wait for the server in the child process to respond and tell us
// its listening address, once it's started listening:
Expand All @@ -5401,29 +5422,56 @@ func BenchmarkClient(b *testing.B) {
b.Fatalf("initial probe of child process failed: %v", err)
}

// Instruct server process to stop.
b.Cleanup(func() {
getNoBody(url + "?stop=yes")
if err := <-done; err != nil {
b.Fatalf("subprocess failed: %v", err)
}

cancel()
<-done

afterTest(b)
})

return url
}

func BenchmarkClientGzip(b *testing.B) {
const responseSize = 1024 * 1024

var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
b.Fatal(err)
}
gz.Close()

data := buf.Bytes()

url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
w.Header().Set("Content-Encoding", "gzip")
w.Write(data)
}))

// Do b.N requests to the server.
b.StartTimer()
for i := 0; i < b.N; i++ {
res, err := Get(url)
if err != nil {
b.Fatalf("Get: %v", err)
}
body, err := io.ReadAll(res.Body)
n, err := io.Copy(io.Discard, res.Body)
res.Body.Close()
if err != nil {
b.Fatalf("ReadAll: %v", err)
}
if !bytes.Equal(body, data) {
b.Fatalf("Got body: %q", body)
if n != responseSize {
b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
}
}
b.StopTimer()

// Instruct server process to stop.
getNoBody(url + "?stop=yes")
if err := <-done; err != nil {
b.Fatalf("subprocess failed: %v", err)
}
}

func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {
Expand Down
89 changes: 76 additions & 13 deletions src/net/http/transport.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package http

import (
"bufio"
"compress/flate"
"compress/gzip"
"container/list"
"context"
Expand Down Expand Up @@ -2988,6 +2989,7 @@ type bodyEOFSignal struct {
}

var errReadOnClosedResBody = errors.New("http: read on closed response body")
var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")

func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock()
Expand Down Expand Up @@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
}

// gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read
// get gzip.Reader from the pool on the first call to Read.
// After Close is called it puts gzip.Reader to the pool immediately
// if there is no Read in progress or later when Read completes.
type gzipReader struct {
_ incomparable
body *bodyEOFSignal // underlying HTTP/1 response body framing
zr *gzip.Reader // lazily-initialized gzip reader
zerr error // any error from gzip.NewReader; sticky
mu sync.Mutex // guards zr and zerr
zr *gzip.Reader
zerr error
}

func (gz *gzipReader) Read(p []byte) (n int, err error) {
type eofReader struct{}

func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }

var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}

// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
zr := gzipPool.Get().(*gzip.Reader)
if err := zr.Reset(r); err != nil {
gzipPoolPut(zr)
return nil, err
}
return zr, nil
}

// gzipPoolPut puts a gzip.Reader back into the pool.
func gzipPoolPut(zr *gzip.Reader) {
// Reset will allocate bufio.Reader if we pass it anything
// other than a flate.Reader, so ensure that it's getting one.
var r flate.Reader = eofReader{}
zr.Reset(r)
gzipPool.Put(zr)
}

// acquire returns a gzip.Reader for reading response body.
// The reader must be released after use.
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr != nil {
return nil, gz.zerr
}
if gz.zr == nil {
if gz.zerr == nil {
gz.zr, gz.zerr = gzip.NewReader(gz.body)
}
gz.zr, gz.zerr = gzipPoolGet(gz.body)
if gz.zerr != nil {
return 0, gz.zerr
return nil, gz.zerr
}
}
ret := gz.zr
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
return ret, nil
}

// release returns the gzip.Reader to the pool if Close was called during Read.
func (gz *gzipReader) release(zr *gzip.Reader) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == errConcurrentReadOnResBody {
gz.zr, gz.zerr = zr, nil
} else { // errReadOnClosedResBody
gzipPoolPut(zr)
}
}

gz.body.mu.Lock()
if gz.body.closed {
err = errReadOnClosedResBody
// close returns the gzip.Reader to the pool immediately or
// signals release to do so after Read completes.
func (gz *gzipReader) close() {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == nil && gz.zr != nil {
gzipPoolPut(gz.zr)
gz.zr = nil
}
gz.body.mu.Unlock()
gz.zerr = errReadOnClosedResBody
}

func (gz *gzipReader) Read(p []byte) (n int, err error) {
zr, err := gz.acquire()
if err != nil {
return 0, err
}
return gz.zr.Read(p)
defer gz.release(zr)

return zr.Read(p)
}

func (gz *gzipReader) Close() error {
gz.close()

return gz.body.Close()
}

Expand Down

AltStyle によって変換されたページ (->オリジナル) /