mirror of
https://github.com/go-i2p/go-sam-go.git
synced 2025-06-16 05:44:42 -04:00
Work on concurrent reader operations in datagrams
This commit is contained in:
@ -45,53 +45,66 @@ func (r *DatagramReader) Close() error {
|
||||
// Signal termination to receiveLoop
|
||||
close(r.closeChan)
|
||||
|
||||
// Wait for receiveLoop to signal completion with timeout protection
|
||||
// Wait for receiveLoop to signal completion with shorter timeout
|
||||
select {
|
||||
case <-r.doneChan:
|
||||
// receiveLoop has confirmed it stopped
|
||||
logger.Debug("Receive loop terminated gracefully")
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout protection - log warning but continue cleanup
|
||||
logger.Warn("Timeout waiting for receive loop to stop")
|
||||
case <-time.After(2 * time.Second):
|
||||
// Shorter timeout to prevent test hangs
|
||||
logger.Warn("Timeout waiting for receive loop to stop, forcing cleanup")
|
||||
}
|
||||
|
||||
// Close receiver channels only after receiveLoop has stopped
|
||||
// Use non-blocking close to avoid deadlock if channels are already closed
|
||||
// Force close channels to prevent goroutine leaks
|
||||
r.safeCloseChannel()
|
||||
|
||||
logger.Debug("Successfully closed DatagramReader")
|
||||
return nil
|
||||
}
|
||||
|
||||
// safeCloseChannel safely closes channels with panic recovery
|
||||
// Improved channel closing with better error handling
|
||||
func (r *DatagramReader) safeCloseChannel() {
|
||||
// Close channels in order of dependency
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
// Channel was already closed - this is expected in some race conditions
|
||||
// Channels already closed - expected in concurrent scenarios
|
||||
}
|
||||
}()
|
||||
|
||||
// First close the done channel
|
||||
select {
|
||||
case <-r.doneChan:
|
||||
// Already closed
|
||||
default:
|
||||
close(r.doneChan)
|
||||
}
|
||||
|
||||
// Then close data channels
|
||||
close(r.recvChan)
|
||||
close(r.errorChan)
|
||||
}
|
||||
|
||||
// receiveLoop continuously receives incoming datagrams
|
||||
func (r *DatagramReader) receiveLoop() {
|
||||
logger := log.WithField("session_id", r.session.ID())
|
||||
logger.Debug("Starting receive loop")
|
||||
|
||||
// Ensure doneChan is properly signaled when loop exits
|
||||
defer func() {
|
||||
// Use non-blocking send to avoid deadlock if Close() isn't waiting
|
||||
// Use non-blocking send with recovery to prevent panics
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
// doneChan already closed or other error - ignore
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case r.doneChan <- struct{}{}:
|
||||
default:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// Timeout on done signal - continue cleanup anyway
|
||||
}
|
||||
logger.Debug("Receive loop goroutine terminated")
|
||||
}()
|
||||
|
||||
for {
|
||||
// Check for closure signal before any blocking operations
|
||||
// Check for closure signal with immediate return
|
||||
select {
|
||||
case <-r.closeChan:
|
||||
logger.Debug("Receive loop terminated - reader closed")
|
||||
@ -99,15 +112,25 @@ func (r *DatagramReader) receiveLoop() {
|
||||
default:
|
||||
}
|
||||
|
||||
// Perform the blocking read operation
|
||||
// Add timeout to the blocking read to prevent indefinite hangs
|
||||
r.session.SetReadDeadline(time.Now().Add(1 * time.Second))
|
||||
datagram, err := r.receiveDatagram()
|
||||
r.session.SetReadDeadline(time.Time{}) // Clear deadline
|
||||
|
||||
if err != nil {
|
||||
// Use atomic send pattern with close check to prevent panic
|
||||
// Check if this is a timeout error during shutdown
|
||||
select {
|
||||
case <-r.closeChan:
|
||||
logger.Debug("Receive loop terminated during read timeout")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Use atomic send pattern with close check
|
||||
select {
|
||||
case r.errorChan <- err:
|
||||
logger.WithError(err).Error("Failed to receive datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during error handling - exit gracefully
|
||||
logger.Debug("Receive loop terminated during error handling")
|
||||
return
|
||||
}
|
||||
@ -119,7 +142,6 @@ func (r *DatagramReader) receiveLoop() {
|
||||
case r.recvChan <- datagram:
|
||||
logger.Debug("Successfully received datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during datagram send - exit gracefully
|
||||
logger.Debug("Receive loop terminated during datagram send")
|
||||
return
|
||||
}
|
||||
|
87
datagram/read_test.go
Normal file
87
datagram/read_test.go
Normal file
@ -0,0 +1,87 @@
|
||||
package datagram
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestDatagramSession_ConcurrentOperations(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
// Add overall test timeout
|
||||
timeout := time.After(30 * time.Second)
|
||||
done := make(chan bool)
|
||||
|
||||
go func() {
|
||||
sam, keys := setupTestSAM(t)
|
||||
defer sam.Close()
|
||||
|
||||
session, err := NewDatagramSession(sam, "test_concurrent", keys, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create session: %v", err)
|
||||
done <- false
|
||||
return
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
// Test concurrent reader creation
|
||||
readerDone := make(chan bool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
reader := session.NewReader()
|
||||
if reader == nil {
|
||||
t.Error("NewReader returned nil")
|
||||
}
|
||||
// Immediately close reader to prevent resource leaks
|
||||
reader.Close()
|
||||
readerDone <- true
|
||||
}()
|
||||
}
|
||||
|
||||
// Test concurrent writer creation
|
||||
writerDone := make(chan bool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
writer := session.NewWriter()
|
||||
if writer == nil {
|
||||
t.Error("NewWriter returned nil")
|
||||
}
|
||||
writerDone <- true
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for all goroutines with timeout
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case <-readerDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Timeout waiting for reader creation")
|
||||
done <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case <-writerDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Timeout waiting for writer creation")
|
||||
done <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
done <- true
|
||||
}()
|
||||
|
||||
select {
|
||||
case success := <-done:
|
||||
if !success {
|
||||
t.Fatal("Test failed")
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal("Test timeout - likely goroutine leak or blocking operation")
|
||||
}
|
||||
}
|
@ -2,7 +2,6 @@ package datagram
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/go-sam-go/common"
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
@ -248,55 +247,6 @@ func TestDatagramSession_PacketConn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDatagramSession_ConcurrentOperations(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
sam, keys := setupTestSAM(t)
|
||||
defer sam.Close()
|
||||
|
||||
session, err := NewDatagramSession(sam, "test_concurrent", keys, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create session: %v", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
// Test concurrent reader creation
|
||||
done := make(chan bool, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
reader := session.NewReader()
|
||||
if reader == nil {
|
||||
t.Error("NewReader returned nil")
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
}
|
||||
|
||||
// Test concurrent writer creation
|
||||
for i := 0; i < 10; i++ {
|
||||
go func() {
|
||||
writer := session.NewWriter()
|
||||
if writer == nil {
|
||||
t.Error("NewWriter returned nil")
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for all goroutines
|
||||
timeout := time.After(5 * time.Second)
|
||||
for i := 0; i < 20; i++ {
|
||||
select {
|
||||
case <-done:
|
||||
// OK
|
||||
case <-timeout:
|
||||
t.Fatal("Timeout waiting for concurrent operations")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDatagramAddr_Network(t *testing.T) {
|
||||
addr := &DatagramAddr{}
|
||||
if addr.Network() != "i2p-datagram" {
|
||||
|
Reference in New Issue
Block a user