mirror of
https://github.com/go-i2p/go-sam-go.git
synced 2025-06-16 03:58:45 -04:00
re-add raw datagrams, likely does not work yet
This commit is contained in:
23
raw/README.md
Normal file
23
raw/README.md
Normal file
@ -0,0 +1,23 @@
|
||||
# go-sam-go/raw
|
||||
|
||||
High-level raw datagram library for unencrypted message delivery over I2P using the SAMv3 protocol.
|
||||
|
||||
## Installation
|
||||
|
||||
Install using Go modules with the package path `github.com/go-i2p/go-sam-go/raw`.
|
||||
|
||||
## Usage
|
||||
|
||||
The package provides unencrypted raw datagram messaging over I2P networks. [`RawSession`](raw/types.go) manages the session lifecycle, [`RawReader`](raw/types.go) handles incoming raw datagrams, [`RawWriter`](raw/types.go) sends outgoing raw datagrams, and [`RawConn`](raw/types.go) implements the standard `net.PacketConn` interface for seamless integration with existing Go networking code.
|
||||
|
||||
Create sessions using [`NewRawSession`](raw/session.go), send messages with [`SendDatagram()`](raw/session.go), and receive messages using [`ReceiveDatagram()`](raw/session.go). The implementation supports I2P address resolution, configurable tunnel parameters, and comprehensive error handling with proper resource cleanup.
|
||||
|
||||
Key features include full `net.PacketConn` and `net.Conn` compatibility, I2P destination management, base64 payload encoding, and concurrent raw datagram processing with proper synchronization.
|
||||
|
||||
## Dependencies
|
||||
|
||||
- github.com/go-i2p/go-sam-go/common - Core SAM protocol implementation
|
||||
- github.com/go-i2p/i2pkeys - I2P cryptographic key handling
|
||||
- github.com/go-i2p/logger - Logging functionality
|
||||
- github.com/sirupsen/logrus - Structured logging
|
||||
- github.com/samber/oops - Enhanced error handling
|
85
raw/SAM.go
Normal file
85
raw/SAM.go
Normal file
@ -0,0 +1,85 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"github.com/go-i2p/go-sam-go/common"
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/samber/oops"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// SAM wraps common.SAM to provide raw-specific functionality
|
||||
type SAM struct {
|
||||
*common.SAM
|
||||
}
|
||||
|
||||
// NewRawSession creates a new raw session with the SAM bridge
|
||||
func (s *SAM) NewRawSession(id string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) {
|
||||
return NewRawSession(s.SAM, id, keys, options)
|
||||
}
|
||||
|
||||
// NewRawSessionWithSignature creates a new raw session with custom signature type
|
||||
func (s *SAM) NewRawSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*RawSession, error) {
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"id": id,
|
||||
"options": options,
|
||||
"sigType": sigType,
|
||||
})
|
||||
logger.Debug("Creating new RawSession with signature")
|
||||
|
||||
// Create the base session using the common package with signature
|
||||
session, err := s.SAM.NewGenericSessionWithSignature("RAW", id, keys, sigType, options)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to create generic session with signature")
|
||||
return nil, oops.Errorf("failed to create raw session: %w", err)
|
||||
}
|
||||
|
||||
baseSession, ok := session.(*common.BaseSession)
|
||||
if !ok {
|
||||
logger.Error("Session is not a BaseSession")
|
||||
session.Close()
|
||||
return nil, oops.Errorf("invalid session type")
|
||||
}
|
||||
|
||||
rs := &RawSession{
|
||||
BaseSession: baseSession,
|
||||
sam: s.SAM,
|
||||
options: options,
|
||||
}
|
||||
|
||||
logger.Debug("Successfully created RawSession with signature")
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
// NewRawSessionWithPorts creates a new raw session with port specifications
|
||||
func (s *SAM) NewRawSessionWithPorts(id, fromPort, toPort string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) {
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"id": id,
|
||||
"fromPort": fromPort,
|
||||
"toPort": toPort,
|
||||
"options": options,
|
||||
})
|
||||
logger.Debug("Creating new RawSession with ports")
|
||||
|
||||
// Create the base session using the common package with ports
|
||||
session, err := s.SAM.NewGenericSessionWithSignatureAndPorts("RAW", id, fromPort, toPort, keys, common.SIG_EdDSA_SHA512_Ed25519, options)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to create generic session with ports")
|
||||
return nil, oops.Errorf("failed to create raw session: %w", err)
|
||||
}
|
||||
|
||||
baseSession, ok := session.(*common.BaseSession)
|
||||
if !ok {
|
||||
logger.Error("Session is not a BaseSession")
|
||||
session.Close()
|
||||
return nil, oops.Errorf("invalid session type")
|
||||
}
|
||||
|
||||
rs := &RawSession{
|
||||
BaseSession: baseSession,
|
||||
sam: s.SAM,
|
||||
options: options,
|
||||
}
|
||||
|
||||
logger.Debug("Successfully created RawSession with ports")
|
||||
return rs, nil
|
||||
}
|
82
raw/dial.go
Normal file
82
raw/dial.go
Normal file
@ -0,0 +1,82 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Dial establishes a raw connection to the specified destination
|
||||
func (rs *RawSession) Dial(destination string) (net.PacketConn, error) {
|
||||
return rs.DialTimeout(destination, 30*time.Second)
|
||||
}
|
||||
|
||||
// DialTimeout establishes a raw connection with a timeout
|
||||
func (rs *RawSession) DialTimeout(destination string, timeout time.Duration) (net.PacketConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return rs.DialContext(ctx, destination)
|
||||
}
|
||||
|
||||
// DialContext establishes a raw connection with context support
|
||||
func (rs *RawSession) DialContext(ctx context.Context, destination string) (net.PacketConn, error) {
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"destination": destination,
|
||||
})
|
||||
logger.Debug("Dialing raw destination")
|
||||
|
||||
// Create a raw connection
|
||||
conn := &RawConn{
|
||||
session: rs,
|
||||
reader: rs.NewReader(),
|
||||
writer: rs.NewWriter(),
|
||||
}
|
||||
|
||||
// Start the reader loop
|
||||
go conn.reader.receiveLoop()
|
||||
|
||||
logger.WithField("session_id", rs.ID()).Debug("Successfully created raw connection")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// DialI2P establishes a raw connection to an I2P address
|
||||
func (rs *RawSession) DialI2P(addr i2pkeys.I2PAddr) (net.PacketConn, error) {
|
||||
return rs.DialI2PTimeout(addr, 30*time.Second)
|
||||
}
|
||||
|
||||
// DialI2PTimeout establishes a raw connection to an I2P address with timeout
|
||||
func (rs *RawSession) DialI2PTimeout(addr i2pkeys.I2PAddr, timeout time.Duration) (net.PacketConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return rs.DialI2PContext(ctx, addr)
|
||||
}
|
||||
|
||||
// DialI2PContext establishes a raw connection to an I2P address with context support
|
||||
func (rs *RawSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr) (net.PacketConn, error) {
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"destination": addr.Base32(),
|
||||
})
|
||||
logger.Debug("Dialing I2P raw destination")
|
||||
|
||||
// Create a raw connection
|
||||
conn := &RawConn{
|
||||
session: rs,
|
||||
reader: rs.NewReader(),
|
||||
writer: rs.NewWriter(),
|
||||
}
|
||||
|
||||
// Start the reader loop
|
||||
go conn.reader.receiveLoop()
|
||||
|
||||
logger.WithField("session_id", rs.ID()).Debug("Successfully created I2P raw connection")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// generateSessionID generates a unique session identifier
|
||||
func generateSessionID() string {
|
||||
return fmt.Sprintf("raw_%d", time.Now().UnixNano())
|
||||
}
|
31
raw/listen.go
Normal file
31
raw/listen.go
Normal file
@ -0,0 +1,31 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"github.com/samber/oops"
|
||||
)
|
||||
|
||||
func (s *RawSession) Listen() (*RawListener, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if s.closed {
|
||||
return nil, oops.Errorf("session is closed")
|
||||
}
|
||||
|
||||
logger := log.WithField("id", s.ID())
|
||||
logger.Debug("Creating RawListener")
|
||||
|
||||
listener := &RawListener{
|
||||
session: s,
|
||||
reader: s.NewReader(),
|
||||
acceptChan: make(chan *RawConn, 10), // Buffer for incoming connections
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start accepting raw connections in a goroutine
|
||||
go listener.acceptLoop()
|
||||
|
||||
logger.Debug("Successfully created RawListener")
|
||||
return listener, nil
|
||||
}
|
7
raw/log.go
Normal file
7
raw/log.go
Normal file
@ -0,0 +1,7 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"github.com/go-i2p/logger"
|
||||
)
|
||||
|
||||
var log = logger.GetGoI2PLogger()
|
137
raw/packetconn.go
Normal file
137
raw/packetconn.go
Normal file
@ -0,0 +1,137 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/samber/oops"
|
||||
)
|
||||
|
||||
// ReadFrom reads a raw datagram from the connection
|
||||
func (c *RawConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
c.mu.RLock()
|
||||
if c.closed {
|
||||
c.mu.RUnlock()
|
||||
return 0, nil, oops.Errorf("connection is closed")
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
// Start receive loop if not already started
|
||||
go c.reader.receiveLoop()
|
||||
|
||||
datagram, err := c.reader.ReceiveDatagram()
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
// Copy data to the provided buffer
|
||||
n = copy(p, datagram.Data)
|
||||
addr = &RawAddr{addr: datagram.Source}
|
||||
|
||||
return n, addr, nil
|
||||
}
|
||||
|
||||
// WriteTo writes a raw datagram to the specified address
|
||||
func (c *RawConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
c.mu.RLock()
|
||||
if c.closed {
|
||||
c.mu.RUnlock()
|
||||
return 0, oops.Errorf("connection is closed")
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
// Convert address to I2P address
|
||||
i2pAddr, ok := addr.(*RawAddr)
|
||||
if !ok {
|
||||
return 0, oops.Errorf("address must be a RawAddr")
|
||||
}
|
||||
|
||||
err = c.writer.SendDatagram(p, i2pAddr.addr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close closes the raw connection
|
||||
func (c *RawConn) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := log.WithField("session_id", c.session.ID())
|
||||
logger.Debug("Closing RawConn")
|
||||
|
||||
c.closed = true
|
||||
|
||||
// Close reader and writer - these are owned by this connection
|
||||
if c.reader != nil {
|
||||
c.reader.Close()
|
||||
}
|
||||
|
||||
// DO NOT close the session - it's a shared resource that may be used by other connections
|
||||
|
||||
logger.Debug("Successfully closed RawConn")
|
||||
return nil
|
||||
}
|
||||
|
||||
// LocalAddr returns the local address
|
||||
func (c *RawConn) LocalAddr() net.Addr {
|
||||
return &RawAddr{addr: c.session.Addr()}
|
||||
}
|
||||
|
||||
// SetDeadline sets the read and write deadlines
|
||||
func (c *RawConn) SetDeadline(t time.Time) error {
|
||||
if err := c.SetReadDeadline(t); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
// SetReadDeadline sets the deadline for future ReadFrom calls
|
||||
func (c *RawConn) SetReadDeadline(t time.Time) error {
|
||||
// For raw datagrams, we handle timeouts differently
|
||||
// This is a placeholder implementation
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the deadline for future WriteTo calls
|
||||
func (c *RawConn) SetWriteDeadline(t time.Time) error {
|
||||
// Calculate timeout duration
|
||||
if !t.IsZero() {
|
||||
timeout := time.Until(t)
|
||||
c.writer.SetTimeout(timeout)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read implements net.Conn by wrapping ReadFrom
|
||||
func (c *RawConn) Read(b []byte) (n int, err error) {
|
||||
n, addr, err := c.ReadFrom(b)
|
||||
if addr != nil {
|
||||
c.remoteAddr = &addr.(*RawAddr).addr
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote address of the connection
|
||||
func (c *RawConn) RemoteAddr() net.Addr {
|
||||
if c.remoteAddr != nil {
|
||||
return &RawAddr{addr: *c.remoteAddr}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write implements net.Conn by wrapping WriteTo
|
||||
func (c *RawConn) Write(b []byte) (n int, err error) {
|
||||
if c.remoteAddr == nil {
|
||||
return 0, oops.Errorf("no remote address set, use WriteTo or Read first")
|
||||
}
|
||||
|
||||
addr := &RawAddr{addr: *c.remoteAddr}
|
||||
return c.WriteTo(b, addr)
|
||||
}
|
113
raw/packetlistener.go
Normal file
113
raw/packetlistener.go
Normal file
@ -0,0 +1,113 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/samber/oops"
|
||||
)
|
||||
|
||||
// Accept waits for and returns the next raw connection to the listener
|
||||
func (l *RawListener) Accept() (net.Conn, error) {
|
||||
l.mu.RLock()
|
||||
if l.closed {
|
||||
l.mu.RUnlock()
|
||||
return nil, oops.Errorf("listener is closed")
|
||||
}
|
||||
l.mu.RUnlock()
|
||||
|
||||
select {
|
||||
case conn := <-l.acceptChan:
|
||||
return conn, nil
|
||||
case err := <-l.errorChan:
|
||||
return nil, err
|
||||
case <-l.closeChan:
|
||||
return nil, oops.Errorf("listener is closed")
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the raw listener
|
||||
func (l *RawListener) Close() error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := log.WithField("session_id", l.session.ID())
|
||||
logger.Debug("Closing RawListener")
|
||||
|
||||
l.closed = true
|
||||
close(l.closeChan)
|
||||
|
||||
// Close the reader
|
||||
if l.reader != nil {
|
||||
l.reader.Close()
|
||||
}
|
||||
|
||||
logger.Debug("Successfully closed RawListener")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr returns the listener's network address
|
||||
func (l *RawListener) Addr() net.Addr {
|
||||
return &RawAddr{addr: l.session.Addr()}
|
||||
}
|
||||
|
||||
// acceptLoop continuously accepts incoming raw connections
|
||||
func (l *RawListener) acceptLoop() {
|
||||
logger := log.WithField("session_id", l.session.ID())
|
||||
logger.Debug("Starting raw accept loop")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-l.closeChan:
|
||||
logger.Debug("Raw accept loop terminated - listener closed")
|
||||
return
|
||||
default:
|
||||
conn, err := l.acceptRawConnection()
|
||||
if err != nil {
|
||||
l.mu.RLock()
|
||||
closed := l.closed
|
||||
l.mu.RUnlock()
|
||||
|
||||
if !closed {
|
||||
logger.WithError(err).Error("Failed to accept raw connection")
|
||||
select {
|
||||
case l.errorChan <- err:
|
||||
case <-l.closeChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case l.acceptChan <- conn:
|
||||
logger.Debug("Successfully accepted new raw connection")
|
||||
case <-l.closeChan:
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// acceptRawConnection creates a new raw connection for incoming datagrams
|
||||
func (l *RawListener) acceptRawConnection() (*RawConn, error) {
|
||||
logger := log.WithField("session_id", l.session.ID())
|
||||
logger.Debug("Creating new raw connection")
|
||||
|
||||
// For raw sessions, we create a new RawConn that shares the session
|
||||
// but has its own reader/writer for handling the specific connection
|
||||
conn := &RawConn{
|
||||
session: l.session,
|
||||
reader: l.session.NewReader(),
|
||||
writer: l.session.NewWriter(),
|
||||
}
|
||||
|
||||
// Start the reader loop for this connection
|
||||
go conn.reader.receiveLoop()
|
||||
|
||||
return conn, nil
|
||||
}
|
181
raw/read.go
Normal file
181
raw/read.go
Normal file
@ -0,0 +1,181 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/samber/oops"
|
||||
)
|
||||
|
||||
// ReceiveDatagram receives a raw datagram from any source
|
||||
func (r *RawReader) ReceiveDatagram() (*RawDatagram, error) {
|
||||
// Check if closed first, but don't rely on this check for safety
|
||||
r.mu.RLock()
|
||||
if r.closed {
|
||||
r.mu.RUnlock()
|
||||
return nil, oops.Errorf("reader is closed")
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
|
||||
// Use select with closeChan to handle concurrent close operations safely
|
||||
select {
|
||||
case datagram := <-r.recvChan:
|
||||
return datagram, nil
|
||||
case err := <-r.errorChan:
|
||||
return nil, err
|
||||
case <-r.closeChan:
|
||||
return nil, oops.Errorf("reader is closed")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RawReader) Close() error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if r.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := log.WithField("session_id", r.session.ID())
|
||||
logger.Debug("Closing RawReader")
|
||||
|
||||
r.closed = true
|
||||
|
||||
// Signal termination to receiveLoop
|
||||
close(r.closeChan)
|
||||
|
||||
// Wait for receiveLoop to signal it has exited
|
||||
select {
|
||||
case <-r.doneChan:
|
||||
// receiveLoop has confirmed it stopped
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout protection - log warning but continue cleanup
|
||||
logger.Warn("Timeout waiting for receive loop to stop")
|
||||
}
|
||||
|
||||
// Now safe to close the receiver channels since receiveLoop has stopped
|
||||
close(r.recvChan)
|
||||
close(r.errorChan)
|
||||
|
||||
logger.Debug("Successfully closed RawReader")
|
||||
return nil
|
||||
}
|
||||
|
||||
// receiveLoop continuously receives incoming raw datagrams
|
||||
func (r *RawReader) receiveLoop() {
|
||||
logger := log.WithField("session_id", r.session.ID())
|
||||
logger.Debug("Starting raw receive loop")
|
||||
|
||||
// Signal completion when this loop exits
|
||||
defer func() {
|
||||
if r.doneChan != nil {
|
||||
close(r.doneChan)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// Check for closure in a non-blocking way first
|
||||
select {
|
||||
case <-r.closeChan:
|
||||
logger.Debug("Raw receive loop terminated - reader closed")
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Now perform the blocking read operation
|
||||
datagram, err := r.receiveDatagram()
|
||||
if err != nil {
|
||||
// Use atomic check and send pattern to avoid race
|
||||
select {
|
||||
case r.errorChan <- err:
|
||||
logger.WithError(err).Error("Failed to receive raw datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during error handling
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Send the datagram or handle closure atomically
|
||||
select {
|
||||
case r.recvChan <- datagram:
|
||||
logger.Debug("Successfully received raw datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during datagram send
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// receiveDatagram handles the low-level raw datagram reception
|
||||
func (r *RawReader) receiveDatagram() (*RawDatagram, error) {
|
||||
logger := log.WithField("session_id", r.session.ID())
|
||||
|
||||
// Read from the session connection for incoming raw datagrams
|
||||
buf := make([]byte, 4096)
|
||||
n, err := r.session.Read(buf)
|
||||
if err != nil {
|
||||
return nil, oops.Errorf("failed to read from session: %w", err)
|
||||
}
|
||||
|
||||
response := string(buf[:n])
|
||||
logger.WithField("response", response).Debug("Received raw datagram data")
|
||||
|
||||
// Parse the RAW RECEIVED response
|
||||
scanner := bufio.NewScanner(strings.NewReader(response))
|
||||
scanner.Split(bufio.ScanWords)
|
||||
|
||||
var source, data string
|
||||
for scanner.Scan() {
|
||||
word := scanner.Text()
|
||||
switch {
|
||||
case word == "RAW":
|
||||
continue
|
||||
case word == "RECEIVED":
|
||||
continue
|
||||
case strings.HasPrefix(word, "DESTINATION="):
|
||||
source = word[12:]
|
||||
case strings.HasPrefix(word, "SIZE="):
|
||||
continue // We'll get the actual data size from the payload
|
||||
default:
|
||||
// Remaining data is the base64-encoded payload
|
||||
if data == "" {
|
||||
data = word
|
||||
} else {
|
||||
data += " " + word
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if source == "" {
|
||||
return nil, oops.Errorf("no source in raw datagram")
|
||||
}
|
||||
|
||||
if data == "" {
|
||||
return nil, oops.Errorf("no data in raw datagram")
|
||||
}
|
||||
|
||||
// Parse the source destination
|
||||
sourceAddr, err := i2pkeys.NewI2PAddrFromString(source)
|
||||
if err != nil {
|
||||
return nil, oops.Errorf("failed to parse source address: %w", err)
|
||||
}
|
||||
|
||||
// Decode the base64 data
|
||||
decodedData, err := base64.StdEncoding.DecodeString(data)
|
||||
if err != nil {
|
||||
return nil, oops.Errorf("failed to decode raw datagram data: %w", err)
|
||||
}
|
||||
|
||||
// Create the raw datagram
|
||||
datagram := &RawDatagram{
|
||||
Data: decodedData,
|
||||
Source: sourceAddr,
|
||||
Local: r.session.Addr(),
|
||||
}
|
||||
|
||||
return datagram, nil
|
||||
}
|
125
raw/session.go
Normal file
125
raw/session.go
Normal file
@ -0,0 +1,125 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/go-i2p/go-sam-go/common"
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/samber/oops"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// NewRawSession creates a new raw session
|
||||
func NewRawSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) {
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"id": id,
|
||||
"options": options,
|
||||
})
|
||||
logger.Debug("Creating new RawSession")
|
||||
|
||||
// Create the base session using the common package
|
||||
session, err := sam.NewGenericSession("RAW", id, keys, options)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to create generic session")
|
||||
return nil, oops.Errorf("failed to create raw session: %w", err)
|
||||
}
|
||||
|
||||
baseSession, ok := session.(*common.BaseSession)
|
||||
if !ok {
|
||||
logger.Error("Session is not a BaseSession")
|
||||
session.Close()
|
||||
return nil, oops.Errorf("invalid session type")
|
||||
}
|
||||
|
||||
rs := &RawSession{
|
||||
BaseSession: baseSession,
|
||||
sam: sam,
|
||||
options: options,
|
||||
}
|
||||
|
||||
logger.Debug("Successfully created RawSession")
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
// NewReader creates a RawReader for receiving raw datagrams
|
||||
func (s *RawSession) NewReader() *RawReader {
|
||||
return &RawReader{
|
||||
session: s,
|
||||
recvChan: make(chan *RawDatagram, 10), // Buffer for incoming datagrams
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
closed: false,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// NewWriter creates a RawWriter for sending raw datagrams
|
||||
func (s *RawSession) NewWriter() *RawWriter {
|
||||
return &RawWriter{
|
||||
session: s,
|
||||
timeout: 30, // Default timeout in seconds
|
||||
}
|
||||
}
|
||||
|
||||
// PacketConn returns a net.PacketConn interface for this session
|
||||
func (s *RawSession) PacketConn() net.PacketConn {
|
||||
return &RawConn{
|
||||
session: s,
|
||||
reader: s.NewReader(),
|
||||
writer: s.NewWriter(),
|
||||
}
|
||||
}
|
||||
|
||||
// SendDatagram sends a raw datagram to the specified destination
|
||||
func (s *RawSession) SendDatagram(data []byte, dest i2pkeys.I2PAddr) error {
|
||||
return s.NewWriter().SendDatagram(data, dest)
|
||||
}
|
||||
|
||||
// ReceiveDatagram receives a raw datagram from any source
|
||||
func (s *RawSession) ReceiveDatagram() (*RawDatagram, error) {
|
||||
reader := s.NewReader()
|
||||
// Start the receive loop
|
||||
go reader.receiveLoop()
|
||||
return reader.ReceiveDatagram()
|
||||
}
|
||||
|
||||
// Close closes the raw session and all associated resources
|
||||
func (s *RawSession) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := log.WithField("id", s.ID())
|
||||
logger.Debug("Closing RawSession")
|
||||
|
||||
s.closed = true
|
||||
|
||||
// Close the base session
|
||||
if err := s.BaseSession.Close(); err != nil {
|
||||
logger.WithError(err).Error("Failed to close base session")
|
||||
return oops.Errorf("failed to close raw session: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Successfully closed RawSession")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addr returns the I2P address of this session
|
||||
func (s *RawSession) Addr() i2pkeys.I2PAddr {
|
||||
return s.Keys().Addr()
|
||||
}
|
||||
|
||||
// Network returns the network type
|
||||
func (a *RawAddr) Network() string {
|
||||
return "i2p-raw"
|
||||
}
|
||||
|
||||
// String returns the string representation of the address
|
||||
func (a *RawAddr) String() string {
|
||||
return a.addr.Base32()
|
||||
}
|
68
raw/types.go
Normal file
68
raw/types.go
Normal file
@ -0,0 +1,68 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/go-sam-go/common"
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
)
|
||||
|
||||
// RawSession represents a raw session that can send and receive raw datagrams
|
||||
type RawSession struct {
|
||||
*common.BaseSession
|
||||
sam *common.SAM
|
||||
options []string
|
||||
mu sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// RawReader handles incoming raw datagram reception
|
||||
type RawReader struct {
|
||||
session *RawSession
|
||||
recvChan chan *RawDatagram
|
||||
errorChan chan error
|
||||
closeChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// RawWriter handles outgoing raw datagram transmission
|
||||
type RawWriter struct {
|
||||
session *RawSession
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// RawDatagram represents an I2P raw datagram message
|
||||
type RawDatagram struct {
|
||||
Data []byte
|
||||
Source i2pkeys.I2PAddr
|
||||
Local i2pkeys.I2PAddr
|
||||
}
|
||||
|
||||
// RawAddr implements net.Addr for I2P raw addresses
|
||||
type RawAddr struct {
|
||||
addr i2pkeys.I2PAddr
|
||||
}
|
||||
|
||||
// RawConn implements net.PacketConn for I2P raw datagrams
|
||||
type RawConn struct {
|
||||
session *RawSession
|
||||
reader *RawReader
|
||||
writer *RawWriter
|
||||
remoteAddr *i2pkeys.I2PAddr
|
||||
mu sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// RawListener implements net.Listener for I2P raw connections
|
||||
type RawListener struct {
|
||||
session *RawSession
|
||||
reader *RawReader
|
||||
acceptChan chan *RawConn
|
||||
errorChan chan error
|
||||
closeChan chan struct{}
|
||||
closed bool
|
||||
mu sync.RWMutex
|
||||
}
|
96
raw/write.go
Normal file
96
raw/write.go
Normal file
@ -0,0 +1,96 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/samber/oops"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// SetTimeout sets the timeout for raw datagram operations
|
||||
func (w *RawWriter) SetTimeout(timeout time.Duration) *RawWriter {
|
||||
w.timeout = timeout
|
||||
return w
|
||||
}
|
||||
|
||||
// SendDatagram sends a raw datagram to the specified destination
|
||||
func (w *RawWriter) SendDatagram(data []byte, dest i2pkeys.I2PAddr) error {
|
||||
w.session.mu.RLock()
|
||||
if w.session.closed {
|
||||
w.session.mu.RUnlock()
|
||||
return oops.Errorf("session is closed")
|
||||
}
|
||||
w.session.mu.RUnlock()
|
||||
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"session_id": w.session.ID(),
|
||||
"destination": dest.Base32(),
|
||||
"size": len(data),
|
||||
})
|
||||
logger.Debug("Sending raw datagram")
|
||||
|
||||
// Encode the data as base64
|
||||
encodedData := base64.StdEncoding.EncodeToString(data)
|
||||
|
||||
// Create the RAW SEND command
|
||||
sendCmd := fmt.Sprintf("RAW SEND ID=%s DESTINATION=%s SIZE=%d\n%s\n",
|
||||
w.session.ID(), dest.Base64(), len(data), encodedData)
|
||||
|
||||
logger.WithField("command", strings.Split(sendCmd, "\n")[0]).Debug("Sending RAW SEND")
|
||||
|
||||
// Send the command
|
||||
_, err := w.session.Write([]byte(sendCmd))
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to send raw datagram")
|
||||
return oops.Errorf("failed to send raw datagram: %w", err)
|
||||
}
|
||||
|
||||
// Read the response
|
||||
buf := make([]byte, 1024)
|
||||
n, err := w.session.Read(buf)
|
||||
if err != nil {
|
||||
logger.WithError(err).Error("Failed to read send response")
|
||||
return oops.Errorf("failed to read send response: %w", err)
|
||||
}
|
||||
|
||||
response := string(buf[:n])
|
||||
logger.WithField("response", response).Debug("Received send response")
|
||||
|
||||
// Parse the response
|
||||
if err := w.parseSendResponse(response); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Debug("Successfully sent raw datagram")
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseSendResponse parses the RAW STATUS response
|
||||
func (w *RawWriter) parseSendResponse(response string) error {
|
||||
if strings.Contains(response, "RESULT=OK") {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.Contains(response, "RESULT=CANT_REACH_PEER"):
|
||||
return oops.Errorf("cannot reach peer")
|
||||
case strings.Contains(response, "RESULT=I2P_ERROR"):
|
||||
return oops.Errorf("I2P internal error")
|
||||
case strings.Contains(response, "RESULT=INVALID_KEY"):
|
||||
return oops.Errorf("invalid destination key")
|
||||
case strings.Contains(response, "RESULT=INVALID_ID"):
|
||||
return oops.Errorf("invalid session ID")
|
||||
case strings.Contains(response, "RESULT=TIMEOUT"):
|
||||
return oops.Errorf("send timeout")
|
||||
default:
|
||||
if strings.HasPrefix(response, "RAW STATUS RESULT=") {
|
||||
result := strings.TrimSpace(response[18:])
|
||||
return oops.Errorf("send failed: %s", result)
|
||||
}
|
||||
return oops.Errorf("unexpected response format: %s", response)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user