Source File
pipeline.go
Belonging Package
net/textproto
// Copyright 2010 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package textprotoimport ()// A Pipeline manages a pipelined in-order request/response sequence.//// To use a Pipeline p to manage multiple clients on a connection,// each client should run://// id := p.Next() // take a number//// p.StartRequest(id) // wait for turn to send request// «send request»// p.EndRequest(id) // notify Pipeline that request is sent//// p.StartResponse(id) // wait for turn to read response// «read response»// p.EndResponse(id) // notify Pipeline that response is read//// A pipelined server can use the same calls to ensure that// responses computed in parallel are written in the correct order.type Pipeline struct {mu sync.Mutexid uintrequest sequencerresponse sequencer}// Next returns the next id for a request/response pair.func ( *Pipeline) () uint {.mu.Lock():= .id.id++.mu.Unlock()return}// StartRequest blocks until it is time to send (or, if this is a server, receive)// the request with the given id.func ( *Pipeline) ( uint) {.request.Start()}// EndRequest notifies p that the request with the given id has been sent// (or, if this is a server, received).func ( *Pipeline) ( uint) {.request.End()}// StartResponse blocks until it is time to receive (or, if this is a server, send)// the request with the given id.func ( *Pipeline) ( uint) {.response.Start()}// EndResponse notifies p that the response with the given id has been received// (or, if this is a server, sent).func ( *Pipeline) ( uint) {.response.End()}// A sequencer schedules a sequence of numbered events that must// happen in order, one after the other. The event numbering must start// at 0 and increment without skipping. The event number wraps around// safely as long as there are not 2^32 simultaneous events pending.type sequencer struct {mu sync.Mutexid uintwait map[uint]chan struct{}}// Start waits until it is time for the event numbered id to begin.// That is, except for the first event, it waits until End(id-1) has// been called.func ( *sequencer) ( uint) {.mu.Lock()if .id == {.mu.Unlock()return}:= make(chan struct{})if .wait == nil {.wait = make(map[uint]chan struct{})}.wait[] =.mu.Unlock()<-}// End notifies the sequencer that the event numbered id has completed,// allowing it to schedule the event numbered id+1. It is a run-time error// to call End with an id that is not the number of the active event.func ( *sequencer) ( uint) {.mu.Lock()if .id != {.mu.Unlock()panic("out of sync")}++.id =if .wait == nil {.wait = make(map[uint]chan struct{})}, := .wait[]if {delete(.wait, )}.mu.Unlock()if {close()}}