package net
import (
)
type pipeDeadline struct {
mu sync.Mutex
timer *time.Timer
cancel chan struct{}
}
func () pipeDeadline {
return pipeDeadline{cancel: make(chan struct{})}
}
func ( *pipeDeadline) ( time.Time) {
.mu.Lock()
defer .mu.Unlock()
if .timer != nil && !.timer.Stop() {
<-.cancel
}
.timer = nil
:= isClosedChan(.cancel)
if .IsZero() {
if {
.cancel = make(chan struct{})
}
return
}
if := time.Until(); > 0 {
if {
.cancel = make(chan struct{})
}
.timer = time.AfterFunc(, func() {
close(.cancel)
})
return
}
if ! {
close(.cancel)
}
}
func ( *pipeDeadline) () chan struct{} {
.mu.Lock()
defer .mu.Unlock()
return .cancel
}
func ( <-chan struct{}) bool {
select {
case <-:
return true
default:
return false
}
}
type pipeAddr struct{}
func (pipeAddr) () string { return "pipe" }
func (pipeAddr) () string { return "pipe" }
type pipe struct {
wrMu sync.Mutex
rdRx <-chan []byte
rdTx chan<- int
wrTx chan<- []byte
wrRx <-chan int
once sync.Once
localDone chan struct{}
remoteDone <-chan struct{}
readDeadline pipeDeadline
writeDeadline pipeDeadline
}
func () (Conn, Conn) {
:= make(chan []byte)
:= make(chan []byte)
:= make(chan int)
:= make(chan int)
:= make(chan struct{})
:= make(chan struct{})
:= &pipe{
rdRx: , rdTx: ,
wrTx: , wrRx: ,
localDone: , remoteDone: ,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
}
:= &pipe{
rdRx: , rdTx: ,
wrTx: , wrRx: ,
localDone: , remoteDone: ,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
}
return ,
}
func (*pipe) () Addr { return pipeAddr{} }
func (*pipe) () Addr { return pipeAddr{} }
func ( *pipe) ( []byte) (int, error) {
, := .read()
if != nil && != io.EOF && != io.ErrClosedPipe {
= &OpError{Op: "read", Net: "pipe", Err: }
}
return ,
}
func ( *pipe) ( []byte) ( int, error) {
switch {
case isClosedChan(.localDone):
return 0, io.ErrClosedPipe
case isClosedChan(.remoteDone):
return 0, io.EOF
case isClosedChan(.readDeadline.wait()):
return 0, os.ErrDeadlineExceeded
}
select {
case := <-.rdRx:
:= copy(, )
.rdTx <-
return , nil
case <-.localDone:
return 0, io.ErrClosedPipe
case <-.remoteDone:
return 0, io.EOF
case <-.readDeadline.wait():
return 0, os.ErrDeadlineExceeded
}
}
func ( *pipe) ( []byte) (int, error) {
, := .write()
if != nil && != io.ErrClosedPipe {
= &OpError{Op: "write", Net: "pipe", Err: }
}
return ,
}
func ( *pipe) ( []byte) ( int, error) {
switch {
case isClosedChan(.localDone):
return 0, io.ErrClosedPipe
case isClosedChan(.remoteDone):
return 0, io.ErrClosedPipe
case isClosedChan(.writeDeadline.wait()):
return 0, os.ErrDeadlineExceeded
}
.wrMu.Lock()
defer .wrMu.Unlock()
for := true; || len() > 0; = false {
select {
case .wrTx <- :
:= <-.wrRx
= [:]
+=
case <-.localDone:
return , io.ErrClosedPipe
case <-.remoteDone:
return , io.ErrClosedPipe
case <-.writeDeadline.wait():
return , os.ErrDeadlineExceeded
}
}
return , nil
}
func ( *pipe) ( time.Time) error {
if isClosedChan(.localDone) || isClosedChan(.remoteDone) {
return io.ErrClosedPipe
}
.readDeadline.set()
.writeDeadline.set()
return nil
}
func ( *pipe) ( time.Time) error {
if isClosedChan(.localDone) || isClosedChan(.remoteDone) {
return io.ErrClosedPipe
}
.readDeadline.set()
return nil
}
func ( *pipe) ( time.Time) error {
if isClosedChan(.localDone) || isClosedChan(.remoteDone) {
return io.ErrClosedPipe
}
.writeDeadline.set()
return nil
}
func ( *pipe) () error {
.once.Do(func() { close(.localDone) })
return nil
}