diff --git a/command/agent/scada.go b/command/agent/scada.go index 05416c311..74df8a427 100644 --- a/command/agent/scada.go +++ b/command/agent/scada.go @@ -2,11 +2,14 @@ package agent import ( "crypto/tls" + "errors" "fmt" "io" "log" "net" "strconv" + "sync" + "time" "github.com/hashicorp/scada-client" ) @@ -60,13 +63,126 @@ func NewProvider(c *Config, logOutput io.Writer) (*client.Provider, net.Listener InsecureSkipVerify: true, } - // TODO: Setup the handlers - config.Handlers["http"] = nil + // TODO: AtlasACLToken + + // Create an HTTP listener and handler + list := newScadaListener(c.AtlasCluster) + config.Handlers["http"] = func(capability string, meta map[string]string, + conn io.ReadWriteCloser) error { + return list.PushRWC(conn) + } // Create the provider provider, err := client.NewProvider(config) if err != nil { + list.Close() return nil, nil, err } - return provider, nil, nil + return provider, list, nil +} + +// scadaListener is used to return a net.Listener for +// incoming SCADA connections +type scadaListener struct { + addr *scadaAddr + pending chan net.Conn + + closed bool + closedCh chan struct{} + l sync.Mutex +} + +// newScadaListener returns a new listener +func newScadaListener(cluster string) *scadaListener { + l := &scadaListener{ + addr: &scadaAddr{cluster}, + pending: make(chan net.Conn), + closedCh: make(chan struct{}), + } + return l +} + +// PushRWC is used to push a io.ReadWriteCloser as a net.Conn +func (s *scadaListener) PushRWC(conn io.ReadWriteCloser) error { + // Check if this already implements net.Conn + if nc, ok := conn.(net.Conn); ok { + return s.Push(nc) + } + + // Wrap to implement the interface + wrapped := &scadaRWC{conn, s.addr} + return s.Push(wrapped) +} + +// Push is used to add a connection to the queu +func (s *scadaListener) Push(conn net.Conn) error { + select { + case s.pending <- conn: + return nil + case <-s.closedCh: + return fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Accept() (net.Conn, error) { + select { + case conn := <-s.pending: + return conn, nil + case <-s.closedCh: + return nil, fmt.Errorf("scada listener closed") + } +} + +func (s *scadaListener) Close() error { + s.l.Lock() + defer s.l.Unlock() + if s.closed { + return nil + } + s.closed = true + close(s.pending) + close(s.closedCh) + return nil +} + +func (s *scadaListener) Addr() net.Addr { + return s.addr +} + +// scadaAddr is used to return a net.Addr for SCADA +type scadaAddr struct { + cluster string +} + +func (s *scadaAddr) Network() string { + return "SCADA" +} + +func (s *scadaAddr) String() string { + return fmt.Sprintf("SCADA::Atlas::%s", s.cluster) +} + +type scadaRWC struct { + io.ReadWriteCloser + addr *scadaAddr +} + +func (s *scadaRWC) LocalAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) RemoteAddr() net.Addr { + return s.addr +} + +func (s *scadaRWC) SetDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetReadDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") +} + +func (s *scadaRWC) SetWriteDeadline(t time.Time) error { + return errors.New("SCADA.Conn does not support deadlines") }