Rewrite the whole StreamSession thing, which still doesn't work

This commit is contained in:
eyedeekay
2025-05-27 18:54:18 -04:00
parent d3f085b2c8
commit 60fcf85129
16 changed files with 982 additions and 498 deletions

85
stream/SAM.go Normal file
View File

@ -0,0 +1,85 @@
package stream
import (
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/sirupsen/logrus"
)
// SAM wraps common.SAM to provide stream-specific functionality
type SAM struct {
*common.SAM
}
// NewStreamSession creates a new streaming session with the SAM bridge
func (s *SAM) NewStreamSession(id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
return NewStreamSession(s.SAM, id, keys, options)
}
// NewStreamSessionWithSignature creates a new streaming session with custom signature type
func (s *SAM) NewStreamSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"options": options,
"sigType": sigType,
})
logger.Debug("Creating new StreamSession with signature")
// Create the base session using the common package with signature
session, err := s.SAM.NewGenericSessionWithSignature("STREAM", id, keys, sigType, options)
if err != nil {
logger.WithError(err).Error("Failed to create generic session with signature")
return nil, oops.Errorf("failed to create stream session: %w", err)
}
baseSession, ok := session.(*common.BaseSession)
if !ok {
logger.Error("Session is not a BaseSession")
session.Close()
return nil, oops.Errorf("invalid session type")
}
ss := &StreamSession{
BaseSession: baseSession,
sam: s.SAM,
options: options,
}
logger.Debug("Successfully created StreamSession with signature")
return ss, nil
}
// NewStreamSessionWithPorts creates a new streaming session with port specifications
func (s *SAM) NewStreamSessionWithPorts(id, fromPort, toPort string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"fromPort": fromPort,
"toPort": toPort,
"options": options,
})
logger.Debug("Creating new StreamSession with ports")
// Create the base session using the common package with ports
session, err := s.SAM.NewGenericSessionWithSignatureAndPorts("STREAM", id, fromPort, toPort, keys, common.SIG_EdDSA_SHA512_Ed25519, options)
if err != nil {
logger.WithError(err).Error("Failed to create generic session with ports")
return nil, oops.Errorf("failed to create stream session: %w", err)
}
baseSession, ok := session.(*common.BaseSession)
if !ok {
logger.Error("Session is not a BaseSession")
session.Close()
return nil, oops.Errorf("invalid session type")
}
ss := &StreamSession{
BaseSession: baseSession,
sam: s.SAM,
options: options,
}
logger.Debug("Successfully created StreamSession with ports")
return ss, nil
}

View File

@ -4,55 +4,119 @@ import (
"net"
"time"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/sirupsen/logrus"
)
// Implements net.Conn
func (sc *StreamConn) Read(buf []byte) (int, error) {
n, err := sc.conn.Read(buf)
// Read reads data from the connection
func (c *StreamConn) Read(b []byte) (int, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
return 0, oops.Errorf("connection is closed")
}
conn := c.conn
c.mu.RUnlock()
n, err := conn.Read(b)
if err != nil {
log.WithFields(logrus.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).WithError(err).Debug("Read error")
}
return n, err
}
// Implements net.Conn
func (sc *StreamConn) Write(buf []byte) (int, error) {
n, err := sc.conn.Write(buf)
// Write writes data to the connection
func (c *StreamConn) Write(b []byte) (int, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
return 0, oops.Errorf("connection is closed")
}
conn := c.conn
c.mu.RUnlock()
n, err := conn.Write(b)
if err != nil {
log.WithFields(logrus.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).WithError(err).Debug("Write error")
}
return n, err
}
// Implements net.Conn
func (sc *StreamConn) Close() error {
return sc.conn.Close()
// Close closes the connection
func (c *StreamConn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return nil
}
logger := log.WithFields(logrus.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
})
logger.Debug("Closing StreamConn")
c.closed = true
if c.conn != nil {
err := c.conn.Close()
if err != nil {
logger.WithError(err).Error("Failed to close underlying connection")
return oops.Errorf("failed to close connection: %w", err)
}
}
logger.Debug("Successfully closed StreamConn")
return nil
}
func (sc *StreamConn) LocalAddr() net.Addr {
return sc.localAddr()
// LocalAddr returns the local network address
func (c *StreamConn) LocalAddr() net.Addr {
return &i2pAddr{addr: c.laddr}
}
// Implements net.Conn
func (sc *StreamConn) localAddr() i2pkeys.I2PAddr {
return sc.laddr
// RemoteAddr returns the remote network address
func (c *StreamConn) RemoteAddr() net.Addr {
return &i2pAddr{addr: c.raddr}
}
func (sc *StreamConn) RemoteAddr() net.Addr {
return sc.remoteAddr()
// SetDeadline sets the read and write deadlines
func (c *StreamConn) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}
// Implements net.Conn
func (sc *StreamConn) remoteAddr() i2pkeys.I2PAddr {
return sc.raddr
// SetReadDeadline sets the deadline for future Read calls
func (c *StreamConn) SetReadDeadline(t time.Time) error {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return oops.Errorf("connection is nil")
}
return conn.SetReadDeadline(t)
}
// Implements net.Conn
func (sc *StreamConn) SetDeadline(t time.Time) error {
return sc.conn.SetDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
func (c *StreamConn) SetWriteDeadline(t time.Time) error {
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
// Implements net.Conn
func (sc *StreamConn) SetReadDeadline(t time.Time) error {
return sc.conn.SetReadDeadline(t)
}
if conn == nil {
return oops.Errorf("connection is nil")
}
// Implements net.Conn
func (sc *StreamConn) SetWriteDeadline(t time.Time) error {
return sc.conn.SetWriteDeadline(t)
return conn.SetWriteDeadline(t)
}

View File

@ -1,11 +0,0 @@
package stream
const (
ResultOK = "RESULT=OK"
ResultCantReachPeer = "RESULT=CANT_REACH_PEER"
ResultI2PError = "RESULT=I2P_ERROR"
ResultInvalidKey = "RESULT=INVALID_KEY"
ResultInvalidID = "RESULT=INVALID_ID"
ResultTimeout = "RESULT=TIMEOUT"
StreamConnectCommand = "STREAM CONNECT ID="
)

167
stream/dialer.go Normal file
View File

@ -0,0 +1,167 @@
package stream
import (
"bufio"
"context"
"fmt"
"strings"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/sirupsen/logrus"
)
// Dial establishes a connection to the specified destination
func (d *StreamDialer) Dial(destination string) (*StreamConn, error) {
return d.DialContext(context.Background(), destination)
}
// DialI2P establishes a connection to the specified I2P address
func (d *StreamDialer) DialI2P(addr i2pkeys.I2PAddr) (*StreamConn, error) {
return d.DialI2PContext(context.Background(), addr)
}
// DialContext establishes a connection with context support
func (d *StreamDialer) DialContext(ctx context.Context, destination string) (*StreamConn, error) {
// First resolve the destination
addr, err := d.session.sam.Lookup(destination)
if err != nil {
return nil, oops.Errorf("failed to resolve destination %s: %w", destination, err)
}
return d.DialI2PContext(ctx, addr)
}
// DialI2PContext establishes a connection to an I2P address with context support
func (d *StreamDialer) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr) (*StreamConn, error) {
d.session.mu.RLock()
if d.session.closed {
d.session.mu.RUnlock()
return nil, oops.Errorf("session is closed")
}
d.session.mu.RUnlock()
logger := log.WithFields(logrus.Fields{
"session_id": d.session.ID(),
"destination": addr.Base32(),
})
logger.Debug("Dialing I2P destination")
// Create a new SAM connection for this dial
sam, err := common.NewSAM(d.session.sam.Sam())
if err != nil {
logger.WithError(err).Error("Failed to create SAM connection")
return nil, oops.Errorf("failed to create SAM connection: %w", err)
}
// Set up timeout if specified
var cancel context.CancelFunc
if d.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, d.timeout)
defer cancel()
}
// Perform the dial with timeout
connChan := make(chan *StreamConn, 1)
errChan := make(chan error, 1)
go func() {
conn, err := d.performDial(sam, addr)
if err != nil {
errChan <- err
return
}
connChan <- conn
}()
select {
case conn := <-connChan:
logger.Debug("Successfully established connection")
return conn, nil
case err := <-errChan:
sam.Close()
logger.WithError(err).Error("Failed to establish connection")
return nil, err
case <-ctx.Done():
sam.Close()
logger.Error("Connection attempt timed out")
return nil, oops.Errorf("connection attempt timed out: %w", ctx.Err())
}
}
// performDial handles the actual SAM protocol for establishing connections
func (d *StreamDialer) performDial(sam *common.SAM, addr i2pkeys.I2PAddr) (*StreamConn, error) {
logger := log.WithFields(logrus.Fields{
"session_id": d.session.ID(),
"destination": addr.Base32(),
})
// Send STREAM CONNECT command
connectCmd := fmt.Sprintf("STREAM CONNECT ID=%s DESTINATION=%s SILENT=false\n",
d.session.ID(), addr.Base64())
logger.WithField("command", strings.TrimSpace(connectCmd)).Debug("Sending STREAM CONNECT")
_, err := sam.Write([]byte(connectCmd))
if err != nil {
return nil, oops.Errorf("failed to send STREAM CONNECT: %w", err)
}
// Read the response
buf := make([]byte, 4096)
n, err := sam.Read(buf)
if err != nil {
return nil, oops.Errorf("failed to read STREAM CONNECT response: %w", err)
}
response := string(buf[:n])
logger.WithField("response", response).Debug("Received STREAM CONNECT response")
// Parse the response
if err := d.parseConnectResponse(response); err != nil {
return nil, err
}
// Create the StreamConn
conn := &StreamConn{
session: d.session,
conn: sam,
laddr: d.session.Addr(),
raddr: addr,
}
return conn, nil
}
// parseConnectResponse parses the STREAM STATUS response
func (d *StreamDialer) parseConnectResponse(response string) error {
scanner := bufio.NewScanner(strings.NewReader(response))
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
word := scanner.Text()
switch word {
case "STREAM", "STATUS":
continue
case "RESULT=OK":
return nil
case "RESULT=CANT_REACH_PEER":
return oops.Errorf("cannot reach peer")
case "RESULT=I2P_ERROR":
return oops.Errorf("I2P internal error")
case "RESULT=INVALID_KEY":
return oops.Errorf("invalid destination key")
case "RESULT=INVALID_ID":
return oops.Errorf("invalid session ID")
case "RESULT=TIMEOUT":
return oops.Errorf("connection timeout")
default:
if strings.HasPrefix(word, "RESULT=") {
return oops.Errorf("connection failed: %s", word[7:])
}
}
}
return oops.Errorf("unexpected response format: %s", response)
}

100
stream/dialer_test.go Normal file
View File

@ -0,0 +1,100 @@
package stream
import (
"context"
"testing"
"time"
)
func TestStreamSession_Dial(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_dial", keys, []string{
"inbound.length=1", "outbound.length=1",
})
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
// Test dialing to a known I2P destination
// This test might fail if the destination is not reachable
// but it tests the basic dial functionality
_, err = session.Dial("idk.i2p")
// We don't fail the test if dial fails since it depends on network conditions
// but we log it for debugging
if err != nil {
t.Logf("Dial to idk.i2p failed (expected in some network conditions): %v", err)
}
}
func TestStreamSession_DialI2P(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_dial_i2p", keys, []string{
"inbound.length=1", "outbound.length=1",
})
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
// Try to lookup a destination first
addr, err := sam.Lookup("zzz.i2p")
if err != nil {
t.Skipf("Failed to lookup destination: %v", err)
}
// Test dialing to the looked up address
_, err = session.DialI2P(addr)
if err != nil {
t.Logf("DialI2P failed (expected in some network conditions): %v", err)
}
}
func TestStreamSession_DialContext(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_dial_context", keys, nil)
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
t.Run("dial with context timeout", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := session.DialContext(ctx, "nonexistent.i2p")
if err == nil {
t.Log("Dial succeeded unexpectedly")
} else {
t.Logf("Dial failed as expected: %v", err)
}
})
t.Run("dial with cancelled context", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately
_, err := session.DialContext(ctx, "test.i2p")
if err == nil {
t.Error("Expected dial to fail with cancelled context")
}
})
}

View File

@ -1,138 +0,0 @@
package stream
import (
"bufio"
"bytes"
"context"
"io"
"net"
"strings"
"time"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/sirupsen/logrus"
)
// context-aware dialer, eventually...
func (s *StreamSession) DialContext(ctx context.Context, n, addr string) (net.Conn, error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContext called")
return s.DialContextI2P(ctx, n, addr)
}
// context-aware dialer, eventually...
func (s *StreamSession) DialContextI2P(ctx context.Context, n, addr string) (*StreamConn, error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContextI2P called")
if ctx == nil {
log.Panic("nil context")
panic("nil context")
}
deadline := s.deadline(ctx, time.Now())
if !deadline.IsZero() {
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
subCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
ctx = subCtx
}
}
i2paddr, err := i2pkeys.NewI2PAddrFromString(addr)
if err != nil {
log.WithError(err).Error("Failed to create I2P address from string")
return nil, err
}
return s.DialI2P(i2paddr)
}
// implement net.Dialer
func (s *StreamSession) Dial(n, addr string) (c net.Conn, err error) {
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("Dial called")
var i2paddr i2pkeys.I2PAddr
var host string
host, _, err = net.SplitHostPort(addr)
// log.Println("Dialing:", host)
if err = common.IgnorePortError(err); err == nil {
// check for name
if strings.HasSuffix(host, ".b32.i2p") || strings.HasSuffix(host, ".i2p") {
// name lookup
i2paddr, err = s.Lookup(host)
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Looked up I2P address")
} else {
// probably a destination
i2paddr, err = i2pkeys.NewI2PAddrFromBytes([]byte(host))
// i2paddr = i2pkeys.I2PAddr(host)
// log.Println("Destination:", i2paddr, err)
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Created I2P address from bytes")
}
if err == nil {
return s.DialI2P(i2paddr)
}
}
log.WithError(err).Error("Dial failed")
return
}
// Dials to an I2P destination and returns a SAMConn, which implements a net.Conn.
func (s *StreamSession) DialI2P(addr i2pkeys.I2PAddr) (*StreamConn, error) {
log.WithField("addr", addr).Debug("DialI2P called")
sam, err := common.NewSAM(s.SAM().Sam())
if err != nil {
log.WithError(err).Error("Failed to create new SAM instance")
return nil, err
}
conn := sam.Conn
_, err = conn.Write([]byte("STREAM CONNECT ID=" + s.SAM().ID() + s.SAM().FromPort() + s.SAM().ToPort() + " DESTINATION=" + addr.Base64() + " SILENT=false\n"))
if err != nil {
log.WithError(err).Error("Failed to write STREAM CONNECT command")
conn.Close()
return nil, err
}
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil && err != io.EOF {
log.WithError(err).Error("Failed to write STREAM CONNECT command")
conn.Close()
return nil, err
}
scanner := bufio.NewScanner(bytes.NewReader(buf[:n]))
scanner.Split(bufio.ScanWords)
for scanner.Scan() {
switch scanner.Text() {
case "STREAM":
continue
case "STATUS":
continue
case ResultOK:
log.Debug("Successfully connected to I2P destination")
return &StreamConn{s.Addr(), addr, conn}, nil
case ResultCantReachPeer:
log.Error("Can't reach peer")
conn.Close()
return nil, oops.Errorf("Can not reach peer")
case ResultI2PError:
log.Error("I2P internal error")
conn.Close()
return nil, oops.Errorf("I2P internal error")
case ResultInvalidKey:
log.Error("Invalid key - Stream Session")
conn.Close()
return nil, oops.Errorf("Invalid key - Stream Session")
case ResultInvalidID:
log.Error("Invalid tunnel ID")
conn.Close()
return nil, oops.Errorf("Invalid tunnel ID")
case ResultTimeout:
log.Error("Connection timeout")
conn.Close()
return nil, oops.Errorf("Timeout")
default:
log.WithField("error", scanner.Text()).Error("Unknown error")
conn.Close()
return nil, oops.Errorf("Unknown error: %s : %s", scanner.Text(), string(buf[:n]))
}
}
log.Panic("Unexpected end of StreamSession.DialI2P()")
panic("sam3 go library error in StreamSession.DialI2P()")
}

View File

@ -1,11 +0,0 @@
package stream
import "github.com/sirupsen/logrus"
// create a new stream listener to accept inbound connections
func (s *StreamSession) Listen() (*StreamListener, error) {
log.WithFields(logrus.Fields{"id": s.SAM().ID(), "laddr": s.Addr()}).Debug("Creating new StreamListener")
return &StreamListener{
session: s,
}, nil
}

View File

@ -2,98 +2,167 @@ package stream
import (
"bufio"
"errors"
"io"
"net"
"strings"
"github.com/sirupsen/logrus"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
)
func (l *StreamListener) From() string {
return l.session.SAM().Fromport
}
func (l *StreamListener) To() string {
return l.session.SAM().Toport
}
// get our address
// implements net.Listener
func (l *StreamListener) Addr() net.Addr {
return l.session.Addr()
}
// implements net.Listener
func (l *StreamListener) Close() error {
return l.session.Close()
}
// implements net.Listener
// Accept waits for and returns the next connection to the listener
func (l *StreamListener) Accept() (net.Conn, error) {
return l.AcceptI2P()
return l.AcceptStream()
}
// accept a new inbound connection
func (l *StreamListener) AcceptI2P() (*StreamConn, error) {
log.Debug("StreamListener.AcceptI2P() called")
s, err := common.NewSAM(l.session.SAM().Sam())
if err == nil {
log.Debug("Connected to SAM bridge")
// we connected to sam
// send accept() command
_, err = io.WriteString(s.Conn, "STREAM ACCEPT ID="+l.session.SAM().ID()+" SILENT=false\n")
if err != nil {
log.WithError(err).Error("Failed to send STREAM ACCEPT command")
s.Close()
return nil, err
}
// read reply
rd := bufio.NewReader(s.Conn)
// read first line
line, err := rd.ReadString(10)
if err != nil {
log.WithError(err).Error("Failed to read SAM bridge response")
s.Close()
return nil, err
}
log.WithField("response", line).Debug("Received SAM bridge response")
log.Println(line)
if strings.HasPrefix(line, "STREAM STATUS RESULT=OK") {
// we gud read destination line
destline, err := rd.ReadString(10)
if err == nil {
dest := common.ExtractDest(destline)
l.session.SAM().Fromport = common.ExtractPairString(destline, "FROM_PORT")
l.session.SAM().Toport = common.ExtractPairString(destline, "TO_PORT")
// return wrapped connection
dest = strings.Trim(dest, "\n")
log.WithFields(logrus.Fields{
"dest": dest,
"from": l.From(),
"to": l.To(),
}).Debug("Accepted new I2P connection")
return &StreamConn{
laddr: l.session.Addr(),
raddr: i2pkeys.I2PAddr(dest),
conn: s.Conn,
}, nil
} else {
log.WithError(err).Error("Failed to read destination line")
s.Close()
return nil, err
}
} else {
log.WithField("line", line).Error("Invalid SAM response")
s.Close()
return nil, errors.New("invalid sam line: " + line)
}
} else {
log.WithError(err).Error("Failed to connect to SAM bridge")
s.Close()
// AcceptStream waits for and returns the next I2P streaming connection
func (l *StreamListener) AcceptStream() (*StreamConn, error) {
l.mu.RLock()
if l.closed {
l.mu.RUnlock()
return nil, oops.Errorf("listener is closed")
}
l.mu.RUnlock()
select {
case conn := <-l.acceptChan:
return conn, nil
case err := <-l.errorChan:
return nil, err
case <-l.closeChan:
return nil, oops.Errorf("listener is closed")
}
}
// Close closes the listener
func (l *StreamListener) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.closed {
return nil
}
logger := log.WithField("session_id", l.session.ID())
logger.Debug("Closing StreamListener")
l.closed = true
close(l.closeChan)
logger.Debug("Successfully closed StreamListener")
return nil
}
// Addr returns the listener's network address
func (l *StreamListener) Addr() net.Addr {
return &i2pAddr{addr: l.session.Addr()}
}
// acceptLoop continuously accepts incoming connections
func (l *StreamListener) acceptLoop() {
logger := log.WithField("session_id", l.session.ID())
logger.Debug("Starting accept loop")
for {
select {
case <-l.closeChan:
logger.Debug("Accept loop terminated - listener closed")
return
default:
conn, err := l.acceptConnection()
if err != nil {
l.mu.RLock()
closed := l.closed
l.mu.RUnlock()
if !closed {
logger.WithError(err).Error("Failed to accept connection")
select {
case l.errorChan <- err:
case <-l.closeChan:
return
}
}
continue
}
select {
case l.acceptChan <- conn:
logger.Debug("Successfully accepted new connection")
case <-l.closeChan:
conn.Close()
return
}
}
}
}
// acceptConnection handles the low-level connection acceptance
func (l *StreamListener) acceptConnection() (*StreamConn, error) {
logger := log.WithField("session_id", l.session.ID())
// Read from the session connection for incoming connection requests
buf := make([]byte, 4096)
n, err := l.session.Read(buf)
if err != nil {
return nil, oops.Errorf("failed to read from session: %w", err)
}
response := string(buf[:n])
logger.WithField("response", response).Debug("Received connection request")
// Parse the STREAM STATUS response
scanner := bufio.NewScanner(strings.NewReader(response))
scanner.Split(bufio.ScanWords)
var status, dest string
for scanner.Scan() {
word := scanner.Text()
switch {
case word == "STREAM":
continue
case word == "STATUS":
continue
case strings.HasPrefix(word, "RESULT="):
status = word[7:]
case strings.HasPrefix(word, "DESTINATION="):
dest = word[12:]
}
}
if status != "OK" {
return nil, oops.Errorf("connection failed with status: %s", status)
}
if dest == "" {
return nil, oops.Errorf("no destination in connection request")
}
// Parse the remote destination
remoteAddr, err := i2pkeys.NewI2PAddrFromString(dest)
if err != nil {
return nil, oops.Errorf("failed to parse remote address: %w", err)
}
// Create a new connection object
streamConn := &StreamConn{
session: l.session,
conn: l.session.BaseSession, // Use the session connection
laddr: l.session.Addr(),
raddr: remoteAddr,
}
return streamConn, nil
}
// i2pAddr implements net.Addr for I2P addresses
type i2pAddr struct {
addr i2pkeys.I2PAddr
}
func (a *i2pAddr) Network() string {
return "i2p"
}
func (a *i2pAddr) String() string {
return a.addr.Base32()
}

61
stream/listener_test.go Normal file
View File

@ -0,0 +1,61 @@
package stream
import (
"testing"
"time"
)
func TestStreamSession_Listen(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_listen", keys, []string{
"inbound.length=0", "outbound.length=0",
})
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
listener, err := session.Listen()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
defer listener.Close()
// Verify listener properties
if listener.Addr().String() != session.Addr().String() {
t.Error("Listener address doesn't match session address")
}
}
func TestStreamSession_NewDialer(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_dialer", keys, nil)
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
dialer := session.NewDialer()
if dialer == nil {
t.Fatal("NewDialer returned nil")
}
// Test setting timeout
newTimeout := 45 * time.Second
dialer.SetTimeout(newTimeout)
if dialer.timeout != newTimeout {
t.Errorf("SetTimeout() timeout = %v, want %v", dialer.timeout, newTimeout)
}
}

View File

@ -2,106 +2,126 @@ package stream
import (
"context"
"net"
"time"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/sirupsen/logrus"
)
// Read reads data from the stream.
func (s *StreamSession) Read(buf []byte) (int, error) {
return s.SAM().Conn.Read(buf)
// NewStreamSession creates a new streaming session
func NewStreamSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"options": options,
})
logger.Debug("Creating new StreamSession")
// Create the base session using the common package
session, err := sam.NewGenericSession("STREAM", id, keys, options)
if err != nil {
logger.WithError(err).Error("Failed to create generic session")
return nil, oops.Errorf("failed to create stream session: %w", err)
}
baseSession, ok := session.(*common.BaseSession)
if !ok {
logger.Error("Session is not a BaseSession")
session.Close()
return nil, oops.Errorf("invalid session type")
}
ss := &StreamSession{
BaseSession: baseSession,
sam: sam,
options: options,
}
logger.Debug("Successfully created StreamSession")
return ss, nil
}
// Write sends data over the stream.
func (s *StreamSession) Write(data []byte) (int, error) {
return s.SAM().Conn.Write(data)
// Listen creates a StreamListener that accepts incoming connections
func (s *StreamSession) Listen() (*StreamListener, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.closed {
return nil, oops.Errorf("session is closed")
}
logger := log.WithField("id", s.ID())
logger.Debug("Creating StreamListener")
listener := &StreamListener{
session: s,
acceptChan: make(chan *StreamConn, 10), // Buffer for incoming connections
errorChan: make(chan error, 1),
closeChan: make(chan struct{}),
}
// Start accepting connections in a goroutine
go listener.acceptLoop()
logger.Debug("Successfully created StreamListener")
return listener, nil
}
func (s *StreamSession) SetDeadline(t time.Time) error {
log.WithField("deadline", t).Debug("Setting deadline for StreamSession")
return s.SAM().Conn.SetDeadline(t)
// NewDialer creates a StreamDialer for establishing outbound connections
func (s *StreamSession) NewDialer() *StreamDialer {
return &StreamDialer{
session: s,
timeout: 30 * time.Second, // Default timeout
}
}
func (s *StreamSession) SetReadDeadline(t time.Time) error {
log.WithField("readDeadline", t).Debug("Setting read deadline for StreamSession")
return s.SAM().Conn.SetReadDeadline(t)
// SetTimeout sets the default timeout for new dialers
func (d *StreamDialer) SetTimeout(timeout time.Duration) *StreamDialer {
d.timeout = timeout
return d
}
func (s *StreamSession) SetWriteDeadline(t time.Time) error {
log.WithField("writeDeadline", t).Debug("Setting write deadline for StreamSession")
return s.SAM().Conn.SetWriteDeadline(t)
// Dial establishes a connection to the specified I2P destination
func (s *StreamSession) Dial(destination string) (*StreamConn, error) {
return s.NewDialer().Dial(destination)
}
func (s *StreamSession) From() string {
return s.SAM().Fromport
// DialI2P establishes a connection to the specified I2P address
func (s *StreamSession) DialI2P(addr i2pkeys.I2PAddr) (*StreamConn, error) {
return s.NewDialer().DialI2P(addr)
}
func (s *StreamSession) To() string {
return s.SAM().Toport
}
func (s *StreamSession) SignatureType() string {
return s.SAM().SignatureType()
// DialContext establishes a connection with context support
func (s *StreamSession) DialContext(ctx context.Context, destination string) (*StreamConn, error) {
return s.NewDialer().DialContext(ctx, destination)
}
// Close closes the streaming session and all associated resources
func (s *StreamSession) Close() error {
log.WithField("id", s.SAM().ID()).Debug("Closing StreamSession")
return s.SAM().Conn.Close()
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return nil
}
logger := log.WithField("id", s.ID())
logger.Debug("Closing StreamSession")
s.closed = true
// Close the base session
if err := s.BaseSession.Close(); err != nil {
logger.WithError(err).Error("Failed to close base session")
return oops.Errorf("failed to close stream session: %w", err)
}
logger.Debug("Successfully closed StreamSession")
return nil
}
// Returns the I2P destination (the address) of the stream session
// Addr returns the I2P address of this session
func (s *StreamSession) Addr() i2pkeys.I2PAddr {
return s.Keys().Address
}
func (s *StreamSession) LocalAddr() net.Addr {
return s.Addr()
}
// Returns the keys associated with the stream session
func (s *StreamSession) Keys() i2pkeys.I2PKeys {
return *s.SAM().DestinationKeys
}
// lookup name, convenience function
func (s *StreamSession) Lookup(name string) (i2pkeys.I2PAddr, error) {
log.WithField("name", name).Debug("Looking up address")
sam, err := common.NewSAM(s.SAM().Sam())
if err == nil {
addr, err := sam.Lookup(name)
defer sam.Close()
if err != nil {
log.WithError(err).Error("Lookup failed")
} else {
log.WithField("addr", addr).Debug("Lookup successful")
}
return addr, err
}
log.WithError(err).Error("Failed to create SAM instance for lookup")
return i2pkeys.I2PAddr(""), err
}
/*
func (s *StreamSession) Cancel() chan *StreamSession {
ch := make(chan *StreamSession)
ch <- s
return ch
}*/
// deadline returns the earliest of:
// - now+Timeout
// - d.Deadline
// - the context's deadline
//
// Or zero, if none of Timeout, Deadline, or context's deadline is set.
func (s *StreamSession) deadline(ctx context.Context, now time.Time) (earliest time.Time) {
if s.Timeout != 0 { // including negative, for historical reasons
earliest = now.Add(s.Timeout)
}
if d, ok := ctx.Deadline(); ok {
earliest = minNonzeroTime(earliest, d)
}
return minNonzeroTime(earliest, s.Deadline)
return s.Keys().Addr()
}

186
stream/session_test.go Normal file
View File

@ -0,0 +1,186 @@
package stream
import (
"testing"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
)
const testSAMAddr = "127.0.0.1:7656"
func setupTestSAM(t *testing.T) (*common.SAM, i2pkeys.I2PKeys) {
t.Helper()
sam, err := common.NewSAM(testSAMAddr)
if err != nil {
t.Fatalf("Failed to create SAM connection: %v", err)
}
keys, err := sam.NewKeys()
if err != nil {
sam.Close()
t.Fatalf("Failed to generate keys: %v", err)
}
return sam, keys
}
func TestNewStreamSession(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tests := []struct {
name string
id string
options []string
wantErr bool
}{
{
name: "basic session creation",
id: "test_stream_session",
options: nil,
wantErr: false,
},
{
name: "session with options",
id: "test_stream_with_opts",
options: []string{"inbound.length=1", "outbound.length=1"},
wantErr: false,
},
{
name: "session with small tunnel config",
id: "test_stream_small",
options: []string{
"inbound.length=0",
"outbound.length=0",
"inbound.lengthVariance=0",
"outbound.lengthVariance=0",
"inbound.quantity=1",
"outbound.quantity=1",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, tt.id, keys, tt.options)
if (err != nil) != tt.wantErr {
t.Errorf("NewStreamSession() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
// Verify session properties
if session.ID() != tt.id {
t.Errorf("Session ID = %v, want %v", session.ID(), tt.id)
}
if session.Keys().Addr().Base32() != keys.Addr().Base32() {
t.Error("Session keys don't match provided keys")
}
addr := session.Addr()
if addr.Base32() == "" {
t.Error("Session address is empty")
}
// Clean up
if err := session.Close(); err != nil {
t.Errorf("Failed to close session: %v", err)
}
}
})
}
}
func TestStreamSession_Close(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_close", keys, nil)
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
// Close the session
err = session.Close()
if err != nil {
t.Errorf("Close() error = %v", err)
}
// Closing again should not error
err = session.Close()
if err != nil {
t.Errorf("Second Close() error = %v", err)
}
// Operations on closed session should fail
_, err = session.Listen()
if err == nil {
t.Error("Listen() on closed session should fail")
}
}
func TestStreamSession_Addr(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_addr", keys, nil)
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
addr := session.Addr()
expectedAddr := keys.Addr()
if addr.Base32() != expectedAddr.Base32() {
t.Errorf("Addr() = %v, want %v", addr.Base32(), expectedAddr.Base32())
}
}
func TestStreamSession_ConcurrentOperations(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
sam, keys := setupTestSAM(t)
defer sam.Close()
session, err := NewStreamSession(sam, "test_concurrent", keys, nil)
if err != nil {
t.Fatalf("Failed to create session: %v", err)
}
defer session.Close()
// Test concurrent dialer creation
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
go func() {
dialer := session.NewDialer()
if dialer == nil {
t.Error("NewDialer returned nil")
}
done <- true
}()
}
// Wait for all goroutines
for i := 0; i < 10; i++ {
<-done
}
}

View File

@ -1,56 +0,0 @@
package stream
import (
"github.com/go-i2p/i2pkeys"
"github.com/sirupsen/logrus"
)
// Creates a new StreamSession with the I2CP- and streaminglib options as
// specified. See the I2P documentation for a full list of options.
func (sam SAM) NewStreamSession(id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "options": options}).Debug("Creating new StreamSession")
conn, err := sam.NewGenericSession("STREAM", id, keys, []string{})
if err != nil {
return nil, err
}
log.WithField("id", id).Debug("Created new StreamSession")
streamSession := &StreamSession{
sam: sam,
}
streamSession.Conn = conn
return streamSession, nil
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
// specified. See the I2P documentation for a full list of options.
func (sam SAM) NewStreamSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature")
conn, err := sam.NewGenericSessionWithSignature("STREAM", id, keys, sigType, []string{})
if err != nil {
return nil, err
}
log.WithFields(logrus.Fields{"id": id, "sigType": sigType}).Debug("Created new StreamSession with signature")
log.WithField("id", id).Debug("Created new StreamSession")
streamSession := &StreamSession{
sam: sam,
}
streamSession.Conn = conn
return streamSession, nil
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
// specified. See the I2P documentation for a full list of options.
func (sam SAM) NewStreamSessionWithSignatureAndPorts(id, from, to string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature and ports")
conn, err := sam.NewGenericSessionWithSignatureAndPorts("STREAM", id, from, to, keys, sigType, []string{})
if err != nil {
return nil, err
}
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "sigType": sigType}).Debug("Created new StreamSession with signature and ports")
log.WithField("id", id).Debug("Created new StreamSession")
streamSession := &StreamSession{
sam: sam,
}
streamSession.Conn = conn
return streamSession, nil
}

View File

@ -1,59 +0,0 @@
package stream
import (
"testing"
)
func TestNewStreamSession_Integration(t *testing.T) {
tests := []struct {
name string
id string
options []string
wantErr bool
}{
{
name: "basic session",
id: "test1",
options: nil,
wantErr: false,
},
{
name: "with options",
id: "test2",
options: []string{"inbound.length=2", "outbound.length=2"},
wantErr: false,
},
{
name: "invalid options",
id: "test3",
options: []string{"invalid=true"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create a fresh SAM connection for each test
sam, err := NewSAM("127.0.0.1:7656")
if err != nil {
t.Fatalf("NewSAM() error = %v", err)
}
// Generate keys through the SAM bridge
keys, err := sam.NewKeys()
if err != nil {
t.Fatalf("NewKeys() error = %v", err)
}
session, err := sam.NewStreamSession(tt.id, keys, tt.options)
if (err != nil) != tt.wantErr {
t.Errorf("NewStreamSession() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
session.Close()
}
sam.Close()
})
}
}

View File

@ -2,35 +2,44 @@ package stream
import (
"net"
"sync"
"time"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
)
type SAM struct {
common.SAM
}
// Represents a streaming session.
// StreamSession represents a streaming session that can create listeners and dialers
type StreamSession struct {
sam SAM
Timeout time.Duration
Deadline time.Time
net.Conn
}
func (s *StreamSession) SAM() *SAM {
return &s.sam
*common.BaseSession
sam *common.SAM
options []string
mu sync.RWMutex
closed bool
}
// StreamListener implements net.Listener for I2P streaming connections
type StreamListener struct {
// parent stream session
session *StreamSession
session *StreamSession
acceptChan chan *StreamConn
errorChan chan error
closeChan chan struct{}
closed bool
mu sync.RWMutex
}
// StreamConn implements net.Conn for I2P streaming connections
type StreamConn struct {
laddr i2pkeys.I2PAddr
raddr i2pkeys.I2PAddr
conn net.Conn
session *StreamSession
conn net.Conn
laddr i2pkeys.I2PAddr
raddr i2pkeys.I2PAddr
closed bool
mu sync.RWMutex
}
// StreamDialer handles client-side connection establishment
type StreamDialer struct {
session *StreamSession
timeout time.Duration
}

11
stream/types_test.go Normal file
View File

@ -0,0 +1,11 @@
package stream
import (
"net"
"github.com/go-i2p/go-sam-go/common"
)
var ss common.Session = &StreamSession{}
var sl net.Listener = &StreamListener{}
var sc net.Conn = &StreamConn{}

View File

@ -1,13 +0,0 @@
package stream
import "time"
func minNonzeroTime(a, b time.Time) time.Time {
if a.IsZero() {
return b
}
if b.IsZero() || a.Before(b) {
return a
}
return b
}