Merge pull request #4991 from hashicorp/dani/b-fix-leaked-procs

vendor: Update go-plugin
This commit is contained in:
Danielle Tomlinson 2019-01-09 00:54:52 +01:00 committed by GitHub
commit 4e412e0665
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 880 additions and 266 deletions

View File

@ -109,7 +109,7 @@ high-level steps that must be done. Examples are available in the
1. Choose the interface(s) you want to expose for plugins.
2. For each interface, implement an implementation of that interface
that communicates over a `net/rpc` connection or other a
that communicates over a `net/rpc` connection or over a
[gRPC](http://www.grpc.io) connection or both. You'll have to implement
both a client and server implementation.
@ -150,19 +150,19 @@ user experience.
When we started using plugins (late 2012, early 2013), plugins over RPC
were the only option since Go didn't support dynamic library loading. Today,
Go still doesn't support dynamic library loading, but they do intend to.
Since 2012, our plugin system has stabilized from millions of users using it,
and has many benefits we've come to value greatly.
Go supports the [plugin](https://golang.org/pkg/plugin/) standard library with
a number of limitations. Since 2012, our plugin system has stabilized
from tens of millions of users using it, and has many benefits we've come to
value greatly.
For example, we intend to use this plugin system in
[Vault](https://www.vaultproject.io), and dynamic library loading will
simply never be acceptable in Vault for security reasons. That is an extreme
For example, we use this plugin system in
[Vault](https://www.vaultproject.io) where dynamic library loading is
not acceptable for security reasons. That is an extreme
example, but we believe our library system has more upsides than downsides
over dynamic library loading and since we've had it built and tested for years,
we'll likely continue to use it.
we'll continue to use it.
Shared libraries have one major advantage over our system which is much
higher performance. In real world scenarios across our various tools,
we've never required any more performance out of our plugin system and it
has seen very high throughput, so this isn't a concern for us at the moment.

View File

@ -5,6 +5,8 @@ import (
"context"
"crypto/subtle"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"hash"
@ -70,16 +72,31 @@ var (
//
// See NewClient and ClientConfig for using a Client.
type Client struct {
config *ClientConfig
exited bool
doneLogging chan struct{}
l sync.Mutex
address net.Addr
process *os.Process
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
config *ClientConfig
exited bool
l sync.Mutex
address net.Addr
process *os.Process
client ClientProtocol
protocol Protocol
logger hclog.Logger
doneCtx context.Context
ctxCancel context.CancelFunc
negotiatedVersion int
// clientWaitGroup is used to manage the lifecycle of the plugin management
// goroutines.
clientWaitGroup sync.WaitGroup
// processKilled is used for testing only, to flag when the process was
// forcefully killed.
processKilled bool
}
// NegotiatedVersion returns the protocol version negotiated with the server.
// This is only valid after Start() is called.
func (c *Client) NegotiatedVersion() int {
return c.negotiatedVersion
}
// ClientConfig is the configuration used to initialize a new
@ -90,7 +107,13 @@ type ClientConfig struct {
HandshakeConfig
// Plugins are the plugins that can be consumed.
Plugins map[string]Plugin
// The implied version of this PluginSet is the Handshake.ProtocolVersion.
Plugins PluginSet
// VersionedPlugins is a map of PluginSets for specific protocol versions.
// These can be used to negotiate a compatible version between client and
// server. If this is set, Handshake.ProtocolVersion is not required.
VersionedPlugins map[int]PluginSet
// One of the following must be set, but not both.
//
@ -157,6 +180,29 @@ type ClientConfig struct {
// Logger is the logger that the client will used. If none is provided,
// it will default to hclog's default logger.
Logger hclog.Logger
// AutoMTLS has the client and server automatically negotiate mTLS for
// transport authentication. This ensures that only the original client will
// be allowed to connect to the server, and all other connections will be
// rejected. The client will also refuse to connect to any server that isn't
// the original instance started by the client.
//
// In this mode of operation, the client generates a one-time use tls
// certificate, sends the public x.509 certificate to the new server, and
// the server generates a one-time use tls certificate, and sends the public
// x.509 certificate back to the client. These are used to authenticate all
// rpc connections between the client and server.
//
// Setting AutoMTLS to true implies that the server must support the
// protocol, and correctly negotiate the tls certificates, or a connection
// failure will result.
//
// The client should not set TLSConfig, nor should the server set a
// TLSProvider, because AutoMTLS implies that a new certificate and tls
// configuration will be generated at startup.
//
// You cannot Reattach to a server with this option enabled.
AutoMTLS bool
}
// ReattachConfig is used to configure a client to reattach to an
@ -331,6 +377,14 @@ func (c *Client) Exited() bool {
return c.exited
}
// killed is used in tests to check if a process failed to exit gracefully, and
// needed to be killed.
func (c *Client) killed() bool {
c.l.Lock()
defer c.l.Unlock()
return c.processKilled
}
// End the executing subprocess (if it is running) and perform any cleanup
// tasks necessary such as capturing any remaining logs and so on.
//
@ -342,14 +396,24 @@ func (c *Client) Kill() {
c.l.Lock()
process := c.process
addr := c.address
doneCh := c.doneLogging
c.l.Unlock()
// If there is no process, we never started anything. Nothing to kill.
// If there is no process, there is nothing to kill.
if process == nil {
return
}
defer func() {
// Wait for the all client goroutines to finish.
c.clientWaitGroup.Wait()
// Make sure there is no reference to the old process after it has been
// killed.
c.l.Lock()
c.process = nil
c.l.Unlock()
}()
// We need to check for address here. It is possible that the plugin
// started (process != nil) but has no address (addr == nil) if the
// plugin failed at startup. If we do have an address, we need to close
@ -370,6 +434,8 @@ func (c *Client) Kill() {
// kill in a moment anyways.
c.logger.Warn("error closing client during Kill", "err", err)
}
} else {
c.logger.Error("client", "error", err)
}
}
@ -378,17 +444,20 @@ func (c *Client) Kill() {
// doneCh which would be closed if the process exits.
if graceful {
select {
case <-doneCh:
case <-c.doneCtx.Done():
c.logger.Debug("plugin exited")
return
case <-time.After(250 * time.Millisecond):
case <-time.After(2 * time.Second):
}
}
// If graceful exiting failed, just kill it
c.logger.Warn("plugin failed to exit gracefully")
process.Kill()
// Wait for the client to finish logging so we have a complete log
<-doneCh
c.l.Lock()
c.processKilled = true
c.l.Unlock()
}
// Starts the underlying subprocess, communicating with it to negotiate
@ -407,7 +476,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
// If one of cmd or reattach isn't set, then it is an error. We wrap
// this in a {} for scoping reasons, and hopeful that the escape
// analysis will pop the stock here.
// analysis will pop the stack here.
{
cmdSet := c.config.Cmd != nil
attachSet := c.config.Reattach != nil
@ -421,77 +490,49 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}
// Create the logging channel for when we kill
c.doneLogging = make(chan struct{})
// Create a context for when we kill
var ctxCancel context.CancelFunc
c.doneCtx, ctxCancel = context.WithCancel(context.Background())
if c.config.Reattach != nil {
// Verify the process still exists. If not, then it is an error
p, err := os.FindProcess(c.config.Reattach.Pid)
if err != nil {
return nil, err
}
return c.reattach()
}
// Attempt to connect to the addr since on Unix systems FindProcess
// doesn't actually return an error if it can't find the process.
conn, err := net.Dial(
c.config.Reattach.Addr.Network(),
c.config.Reattach.Addr.String())
if err != nil {
p.Kill()
return nil, ErrProcessNotFound
}
conn.Close()
if c.config.VersionedPlugins == nil {
c.config.VersionedPlugins = make(map[int]PluginSet)
}
// Goroutine to mark exit status
go func(pid int) {
// Wait for the process to die
pidWait(pid)
// handle all plugins as versioned, using the handshake config as the default.
version := int(c.config.ProtocolVersion)
// Log so we can see it
c.logger.Debug("reattached plugin process exited")
// Make sure we're not overwriting a real version 0. If ProtocolVersion was
// non-zero, then we have to just assume the user made sure that
// VersionedPlugins doesn't conflict.
if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
c.config.VersionedPlugins[version] = c.config.Plugins
}
// Mark it
c.l.Lock()
defer c.l.Unlock()
c.exited = true
// Close the logging channel since that doesn't work on reattach
close(c.doneLogging)
// Cancel the context
ctxCancel()
}(p.Pid)
// Set the address and process
c.address = c.config.Reattach.Addr
c.process = p
c.protocol = c.config.Reattach.Protocol
if c.protocol == "" {
// Default the protocol to net/rpc for backwards compatibility
c.protocol = ProtocolNetRPC
}
return c.address, nil
var versionStrings []string
for v := range c.config.VersionedPlugins {
versionStrings = append(versionStrings, strconv.Itoa(v))
}
env := []string{
fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
}
stdout_r, stdout_w := io.Pipe()
stderr_r, stderr_w := io.Pipe()
cmd := c.config.Cmd
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Stdin = os.Stdin
cmd.Stderr = stderr_w
cmd.Stdout = stdout_w
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmdStderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
if c.config.SecureConfig != nil {
if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
@ -501,6 +542,29 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}
// Setup a temporary certificate for client/server mtls, and send the public
// certificate to the plugin.
if c.config.AutoMTLS {
c.logger.Info("configuring client automatic mTLS")
certPEM, keyPEM, err := generateCert()
if err != nil {
c.logger.Error("failed to generate client certificate", "error", err)
return nil, err
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
c.logger.Error("failed to parse client certificate", "error", err)
return nil, err
}
cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
c.config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "localhost",
}
}
c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
err = cmd.Start()
if err != nil {
@ -509,6 +573,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
// Set the process
c.process = cmd.Process
c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
// Make sure the command is properly cleaned up if there is an error
defer func() {
@ -523,27 +588,37 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}()
// Start goroutine to wait for process to exit
exitCh := make(chan struct{})
// Create a context for when we kill
c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
c.clientWaitGroup.Add(1)
go func() {
// Make sure we close the write end of our stderr/stdout so
// that the readers send EOF properly.
defer stderr_w.Close()
defer stdout_w.Close()
// ensure the context is cancelled when we're done
defer c.ctxCancel()
defer c.clientWaitGroup.Done()
// get the cmd info early, since the process information will be removed
// in Kill.
pid := c.process.Pid
path := cmd.Path
// Wait for the command to end.
cmd.Wait()
err := cmd.Wait()
debugMsgArgs := []interface{}{
"path", path,
"pid", pid,
}
if err != nil {
debugMsgArgs = append(debugMsgArgs,
[]interface{}{"error", err.Error()}...)
}
// Log and make sure to flush the logs write away
c.logger.Debug("plugin process exited", "path", cmd.Path)
c.logger.Debug("plugin process exited", debugMsgArgs...)
os.Stderr.Sync()
// Mark that we exited
close(exitCh)
// Cancel the context, marking that we exited
ctxCancel()
// Set that we exited, which takes a lock
c.l.Lock()
defer c.l.Unlock()
@ -551,32 +626,33 @@ func (c *Client) Start() (addr net.Addr, err error) {
}()
// Start goroutine that logs the stderr
go c.logStderr(stderr_r)
c.clientWaitGroup.Add(1)
// logStderr calls Done()
go c.logStderr(cmdStderr)
// Start a goroutine that is going to be reading the lines
// out of stdout
linesCh := make(chan []byte)
linesCh := make(chan string)
c.clientWaitGroup.Add(1)
go func() {
defer c.clientWaitGroup.Done()
defer close(linesCh)
buf := bufio.NewReader(stdout_r)
for {
line, err := buf.ReadBytes('\n')
if line != nil {
linesCh <- line
}
if err == io.EOF {
return
}
scanner := bufio.NewScanner(cmdStdout)
for scanner.Scan() {
linesCh <- scanner.Text()
}
}()
// Make sure after we exit we read the lines from stdout forever
// so they don't block since it is an io.Pipe
// so they don't block since it is a pipe.
// The scanner goroutine above will close this, but track it with a wait
// group for completeness.
c.clientWaitGroup.Add(1)
defer func() {
go func() {
for _ = range linesCh {
defer c.clientWaitGroup.Done()
for range linesCh {
}
}()
}()
@ -589,12 +665,12 @@ func (c *Client) Start() (addr net.Addr, err error) {
select {
case <-timeout:
err = errors.New("timeout while waiting for plugin to start")
case <-exitCh:
case <-c.doneCtx.Done():
err = errors.New("plugin exited before we could connect")
case lineBytes := <-linesCh:
case line := <-linesCh:
// Trim the line and split by "|" in order to get the parts of
// the output.
line := strings.TrimSpace(string(lineBytes))
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "|", 6)
if len(parts) < 4 {
err = fmt.Errorf(
@ -622,20 +698,18 @@ func (c *Client) Start() (addr net.Addr, err error) {
}
}
// Parse the protocol version
var protocol int64
protocol, err = strconv.ParseInt(parts[1], 10, 0)
// Test the API version
version, pluginSet, err := c.checkProtoVersion(parts[1])
if err != nil {
err = fmt.Errorf("Error parsing protocol version: %s", err)
return
return addr, err
}
// Test the API version
if uint(protocol) != c.config.ProtocolVersion {
err = fmt.Errorf("Incompatible API version with plugin. "+
"Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion)
return
}
// set the Plugins value to the compatible set, so the version
// doesn't need to be passed through to the ClientProtocol
// implementation.
c.config.Plugins = pluginSet
c.negotiatedVersion = version
c.logger.Debug("using plugin", "version", version)
switch parts[2] {
case "tcp":
@ -663,15 +737,125 @@ func (c *Client) Start() (addr net.Addr, err error) {
if !found {
err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
c.protocol, c.config.AllowedProtocols)
return
return addr, err
}
// See if we have a TLS certificate from the server.
// Checking if the length is > 50 rules out catching the unused "extra"
// data returned from some older implementations.
if len(parts) >= 6 && len(parts[5]) > 50 {
err := c.loadServerCert(parts[5])
if err != nil {
return nil, fmt.Errorf("error parsing server cert: %s", err)
}
}
}
c.address = addr
return
}
// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
// server, and load it as the RootCA for the client TLSConfig.
func (c *Client) loadServerCert(cert string) error {
certPool := x509.NewCertPool()
asn1, err := base64.RawStdEncoding.DecodeString(cert)
if err != nil {
return err
}
x509Cert, err := x509.ParseCertificate([]byte(asn1))
if err != nil {
return err
}
certPool.AddCert(x509Cert)
c.config.TLSConfig.RootCAs = certPool
return nil
}
func (c *Client) reattach() (net.Addr, error) {
// Verify the process still exists. If not, then it is an error
p, err := os.FindProcess(c.config.Reattach.Pid)
if err != nil {
return nil, err
}
// Attempt to connect to the addr since on Unix systems FindProcess
// doesn't actually return an error if it can't find the process.
conn, err := net.Dial(
c.config.Reattach.Addr.Network(),
c.config.Reattach.Addr.String())
if err != nil {
p.Kill()
return nil, ErrProcessNotFound
}
conn.Close()
// Create a context for when we kill
c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
c.clientWaitGroup.Add(1)
// Goroutine to mark exit status
go func(pid int) {
defer c.clientWaitGroup.Done()
// ensure the context is cancelled when we're done
defer c.ctxCancel()
// Wait for the process to die
pidWait(pid)
// Log so we can see it
c.logger.Debug("reattached plugin process exited")
// Mark it
c.l.Lock()
defer c.l.Unlock()
c.exited = true
}(p.Pid)
// Set the address and process
c.address = c.config.Reattach.Addr
c.process = p
c.protocol = c.config.Reattach.Protocol
if c.protocol == "" {
// Default the protocol to net/rpc for backwards compatibility
c.protocol = ProtocolNetRPC
}
return c.address, nil
}
// checkProtoVersion returns the negotiated version and PluginSet.
// This returns an error if the server returned an incompatible protocol
// version, or an invalid handshake response.
func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
serverVersion, err := strconv.Atoi(protoVersion)
if err != nil {
return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
}
// record these for the error message
var clientVersions []int
// all versions, including the legacy ProtocolVersion have been added to
// the versions set
for version, plugins := range c.config.VersionedPlugins {
clientVersions = append(clientVersions, version)
if serverVersion != version {
continue
}
return version, plugins, nil
}
return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
"Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
}
// ReattachConfig returns the information that must be provided to NewClient
// to reattach to the plugin process that this client started. This is
// useful for plugins that detach from their parent process.
@ -750,43 +934,40 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
}
func (c *Client) logStderr(r io.Reader) {
bufR := bufio.NewReader(r)
defer c.clientWaitGroup.Done()
scanner := bufio.NewScanner(r)
l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
for {
line, err := bufR.ReadString('\n')
if line != "" {
c.config.Stderr.Write([]byte(line))
line = strings.TrimRightFunc(line, unicode.IsSpace)
for scanner.Scan() {
line := scanner.Text()
c.config.Stderr.Write([]byte(line + "\n"))
line = strings.TrimRightFunc(line, unicode.IsSpace)
entry, err := parseJSON(line)
// If output is not JSON format, print directly to Debug
if err != nil {
l.Debug(line)
} else {
out := flattenKVPairs(entry.KVPairs)
entry, err := parseJSON(line)
// If output is not JSON format, print directly to Debug
if err != nil {
l.Debug(line)
} else {
out := flattenKVPairs(entry.KVPairs)
out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
switch hclog.LevelFromString(entry.Level) {
case hclog.Trace:
l.Trace(entry.Message, out...)
case hclog.Debug:
l.Debug(entry.Message, out...)
case hclog.Info:
l.Info(entry.Message, out...)
case hclog.Warn:
l.Warn(entry.Message, out...)
case hclog.Error:
l.Error(entry.Message, out...)
}
out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
switch hclog.LevelFromString(entry.Level) {
case hclog.Trace:
l.Trace(entry.Message, out...)
case hclog.Debug:
l.Debug(entry.Message, out...)
case hclog.Info:
l.Info(entry.Message, out...)
case hclog.Warn:
l.Warn(entry.Message, out...)
case hclog.Error:
l.Error(entry.Message, out...)
}
}
if err == io.EOF {
break
}
}
// Flag that we've completed logging for others
close(c.doneLogging)
if err := scanner.Err(); err != nil {
l.Error("reading plugin stderr", "error", err)
}
}

13
vendor/github.com/hashicorp/go-plugin/go.mod generated vendored Normal file
View File

@ -0,0 +1,13 @@
module github.com/hashicorp/go-plugin
require (
github.com/golang/protobuf v1.2.0
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77
github.com/oklog/run v1.0.0
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
golang.org/x/text v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
google.golang.org/grpc v1.14.0
)

18
vendor/github.com/hashicorp/go-plugin/go.sum generated vendored Normal file
View File

@ -0,0 +1,18 @@
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd h1:rNuUHR+CvK1IS89MMtcF0EpcVMZtjKfPRp4MEmt/aTs=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb h1:b5rjCoWHc7eqmAS4/qyk21ZsHyb6Mxv/jykxvNTkU4M=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77 h1:7GoSOOW2jpsfkntVKaS2rAr1TJqfcxotyaUcuxoZSzg=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0 h1:ArxJuB1NWfPY6r9Gp9gqwplT0Ge7nqv9msgu03lHLmo=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=

View File

@ -11,6 +11,8 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-plugin/internal/proto"
"github.com/oklog/run"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -19,14 +21,14 @@ import (
// streamer interface is used in the broker to send/receive connection
// information.
type streamer interface {
Send(*ConnInfo) error
Recv() (*ConnInfo, error)
Send(*proto.ConnInfo) error
Recv() (*proto.ConnInfo, error)
Close()
}
// sendErr is used to pass errors back during a send.
type sendErr struct {
i *ConnInfo
i *proto.ConnInfo
ch chan error
}
@ -38,7 +40,7 @@ type gRPCBrokerServer struct {
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
recv chan *proto.ConnInfo
// quit closes down the stream.
quit chan struct{}
@ -50,7 +52,7 @@ type gRPCBrokerServer struct {
func newGRPCBrokerServer() *gRPCBrokerServer {
return &gRPCBrokerServer{
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
recv: make(chan *proto.ConnInfo),
quit: make(chan struct{}),
}
}
@ -58,7 +60,7 @@ func newGRPCBrokerServer() *gRPCBrokerServer {
// StartStream implements the GRPCBrokerServer interface and will block until
// the quit channel is closed or the context reports Done. The stream will pass
// connection information to/from the client.
func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
func (s *gRPCBrokerServer) StartStream(stream proto.GRPCBroker_StartStreamServer) error {
doneCh := stream.Context().Done()
defer s.Close()
@ -97,7 +99,7 @@ func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) erro
// Send is used by the GRPCBroker to pass connection information into the stream
// to the client.
func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
func (s *gRPCBrokerServer) Send(i *proto.ConnInfo) error {
ch := make(chan error)
defer close(ch)
@ -115,7 +117,7 @@ func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the client from the stream to the broker.
func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
func (s *gRPCBrokerServer) Recv() (*proto.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
@ -136,13 +138,13 @@ func (s *gRPCBrokerServer) Close() {
// streamer interfaces.
type gRPCBrokerClientImpl struct {
// client is the underlying GRPC client used to make calls to the server.
client GRPCBrokerClient
client proto.GRPCBrokerClient
// send is used to send connection info to the gRPC stream.
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
recv chan *ConnInfo
recv chan *proto.ConnInfo
// quit closes down the stream.
quit chan struct{}
@ -153,9 +155,9 @@ type gRPCBrokerClientImpl struct {
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
return &gRPCBrokerClientImpl{
client: NewGRPCBrokerClient(conn),
client: proto.NewGRPCBrokerClient(conn),
send: make(chan *sendErr),
recv: make(chan *ConnInfo),
recv: make(chan *proto.ConnInfo),
quit: make(chan struct{}),
}
}
@ -207,7 +209,7 @@ func (s *gRPCBrokerClientImpl) StartStream() error {
// Send is used by the GRPCBroker to pass connection information into the stream
// to the plugin.
func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
func (s *gRPCBrokerClientImpl) Send(i *proto.ConnInfo) error {
ch := make(chan error)
defer close(ch)
@ -225,7 +227,7 @@ func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the plugin to the broker.
func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
func (s *gRPCBrokerClientImpl) Recv() (*proto.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
@ -266,7 +268,7 @@ type GRPCBroker struct {
}
type gRPCBrokerPending struct {
ch chan *ConnInfo
ch chan *proto.ConnInfo
doneCh chan struct{}
}
@ -288,7 +290,7 @@ func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
return nil, err
}
err = b.streamer.Send(&ConnInfo{
err = b.streamer.Send(&proto.ConnInfo{
ServiceId: id,
Network: listener.Addr().Network(),
Address: listener.Addr().String(),
@ -363,7 +365,7 @@ func (b *GRPCBroker) Close() error {
// Dial opens a connection by ID.
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
var c *ConnInfo
var c *proto.ConnInfo
// Open the stream
p := b.getStream(id)
@ -433,7 +435,7 @@ func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
}
m.streams[id] = &gRPCBrokerPending{
ch: make(chan *ConnInfo, 1),
ch: make(chan *proto.ConnInfo, 1),
doneCh: make(chan struct{}),
}
return m.streams[id]

View File

@ -6,6 +6,7 @@ import (
"net"
"time"
"github.com/hashicorp/go-plugin/internal/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -16,12 +17,9 @@ func dialGRPCConn(tls *tls.Config, dialer func(string, time.Duration) (net.Conn,
// Build dialing options.
opts := make([]grpc.DialOption, 0, 5)
// We use a custom dialer so that we can connect over unix domain sockets
// We use a custom dialer so that we can connect over unix domain sockets.
opts = append(opts, grpc.WithDialer(dialer))
// go-plugin expects to block the connection
opts = append(opts, grpc.WithBlock())
// Fail right away
opts = append(opts, grpc.FailOnNonTempDialError(true))
@ -58,12 +56,15 @@ func newGRPCClient(doneCtx context.Context, c *Client) (*GRPCClient, error) {
go broker.Run()
go brokerGRPCClient.StartStream()
return &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
}, nil
cl := &GRPCClient{
Conn: conn,
Plugins: c.config.Plugins,
doneCtx: doneCtx,
broker: broker,
controller: proto.NewGRPCControllerClient(conn),
}
return cl, nil
}
// GRPCClient connects to a GRPCServer over gRPC to dispense plugin types.
@ -73,11 +74,14 @@ type GRPCClient struct {
doneCtx context.Context
broker *GRPCBroker
controller proto.GRPCControllerClient
}
// ClientProtocol impl.
func (c *GRPCClient) Close() error {
c.broker.Close()
c.controller.Shutdown(c.doneCtx, &proto.Empty{})
return c.Conn.Close()
}

View File

@ -0,0 +1,23 @@
package plugin
import (
"context"
"github.com/hashicorp/go-plugin/internal/proto"
)
// GRPCControllerServer handles shutdown calls to terminate the server when the
// plugin client is closed.
type grpcControllerServer struct {
server *GRPCServer
}
// Shutdown stops the grpc server. It first will attempt a graceful stop, then a
// full stop on the server.
func (s *grpcControllerServer) Shutdown(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) {
resp := &proto.Empty{}
// TODO: figure out why GracefullStop doesn't work.
s.server.Stop()
return resp, nil
}

View File

@ -8,6 +8,8 @@ import (
"io"
"net"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
@ -52,6 +54,8 @@ type GRPCServer struct {
config GRPCServerConfig
server *grpc.Server
broker *GRPCBroker
logger hclog.Logger
}
// ServerProtocol impl.
@ -71,10 +75,16 @@ func (s *GRPCServer) Init() error {
// Register the broker service
brokerServer := newGRPCBrokerServer()
RegisterGRPCBrokerServer(s.server, brokerServer)
proto.RegisterGRPCBrokerServer(s.server, brokerServer)
s.broker = newGRPCBroker(brokerServer, s.TLS)
go s.broker.Run()
// Register the controller
controllerServer := &grpcControllerServer{
server: s,
}
proto.RegisterGRPCControllerServer(s.server, controllerServer)
// Register all our plugins onto the gRPC server.
for k, raw := range s.Plugins {
p, ok := raw.(GRPCPlugin)
@ -83,7 +93,7 @@ func (s *GRPCServer) Init() error {
}
if err := p.GRPCServer(s.broker, s.server); err != nil {
return fmt.Errorf("error registring %q: %s", k, err)
return fmt.Errorf("error registering %q: %s", k, err)
}
}
@ -117,11 +127,11 @@ func (s *GRPCServer) Config() string {
}
func (s *GRPCServer) Serve(lis net.Listener) {
// Start serving in a goroutine
go s.server.Serve(lis)
// Wait until graceful completion
<-s.DoneCh
defer close(s.DoneCh)
err := s.server.Serve(lis)
if err != nil {
s.logger.Error("grpc server", "error", err)
}
}
// GRPCServerConfig is the extra configuration passed along for consumers

View File

@ -0,0 +1,3 @@
//go:generate protoc -I ./ ./grpc_broker.proto ./grpc_controller.proto --go_out=plugins=grpc:.
package proto

View File

@ -1,24 +1,14 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc_broker.proto
/*
Package plugin is a generated protocol buffer package.
It is generated from these files:
grpc_broker.proto
It has these top-level messages:
ConnInfo
*/
package plugin
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
@ -33,15 +23,38 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type ConnInfo struct {
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId" json:"service_id,omitempty"`
Network string `protobuf:"bytes,2,opt,name=network" json:"network,omitempty"`
Address string `protobuf:"bytes,3,opt,name=address" json:"address,omitempty"`
ServiceId uint32 `protobuf:"varint,1,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
Network string `protobuf:"bytes,2,opt,name=network,proto3" json:"network,omitempty"`
Address string `protobuf:"bytes,3,opt,name=address,proto3" json:"address,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
func (*ConnInfo) ProtoMessage() {}
func (*ConnInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *ConnInfo) Reset() { *m = ConnInfo{} }
func (m *ConnInfo) String() string { return proto.CompactTextString(m) }
func (*ConnInfo) ProtoMessage() {}
func (*ConnInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_802e9beed3ec3b28, []int{0}
}
func (m *ConnInfo) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ConnInfo.Unmarshal(m, b)
}
func (m *ConnInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ConnInfo.Marshal(b, m, deterministic)
}
func (m *ConnInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_ConnInfo.Merge(m, src)
}
func (m *ConnInfo) XXX_Size() int {
return xxx_messageInfo_ConnInfo.Size(m)
}
func (m *ConnInfo) XXX_DiscardUnknown() {
xxx_messageInfo_ConnInfo.DiscardUnknown(m)
}
var xxx_messageInfo_ConnInfo proto.InternalMessageInfo
func (m *ConnInfo) GetServiceId() uint32 {
if m != nil {
@ -65,7 +78,24 @@ func (m *ConnInfo) GetAddress() string {
}
func init() {
proto.RegisterType((*ConnInfo)(nil), "plugin.ConnInfo")
proto.RegisterType((*ConnInfo)(nil), "proto.ConnInfo")
}
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor_802e9beed3ec3b28) }
var fileDescriptor_802e9beed3ec3b28 = []byte{
// 164 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05,
0x53, 0x4a, 0xb1, 0x5c, 0x1c, 0xce, 0xf9, 0x79, 0x79, 0x9e, 0x79, 0x69, 0xf9, 0x42, 0xb2, 0x5c,
0x5c, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0xf1, 0x99, 0x29, 0x12, 0x8c, 0x0a, 0x8c, 0x1a,
0xbc, 0x41, 0x9c, 0x50, 0x11, 0xcf, 0x14, 0x21, 0x09, 0x2e, 0xf6, 0xbc, 0xd4, 0x92, 0xf2, 0xfc,
0xa2, 0x6c, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x18, 0x17, 0x24, 0x93, 0x98, 0x92, 0x52,
0x94, 0x5a, 0x5c, 0x2c, 0xc1, 0x0c, 0x91, 0x81, 0x72, 0x8d, 0x1c, 0xb9, 0xb8, 0xdc, 0x83, 0x02,
0x9c, 0x9d, 0xc0, 0x36, 0x0b, 0x19, 0x73, 0x71, 0x07, 0x97, 0x24, 0x16, 0x95, 0x04, 0x97, 0x14,
0xa5, 0x26, 0xe6, 0x0a, 0xf1, 0x43, 0x9c, 0xa2, 0x07, 0x73, 0x80, 0x14, 0xba, 0x80, 0x06, 0xa3,
0x01, 0x63, 0x12, 0x1b, 0x58, 0xcc, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x7a, 0xda, 0xd5, 0x84,
0xc4, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -76,8 +106,9 @@ var _ grpc.ClientConn
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// Client API for GRPCBroker service
// GRPCBrokerClient is the client API for GRPCBroker service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GRPCBrokerClient interface {
StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error)
}
@ -91,7 +122,7 @@ func NewGRPCBrokerClient(cc *grpc.ClientConn) GRPCBrokerClient {
}
func (c *gRPCBrokerClient) StartStream(ctx context.Context, opts ...grpc.CallOption) (GRPCBroker_StartStreamClient, error) {
stream, err := grpc.NewClientStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], c.cc, "/plugin.GRPCBroker/StartStream", opts...)
stream, err := c.cc.NewStream(ctx, &_GRPCBroker_serviceDesc.Streams[0], "/proto.GRPCBroker/StartStream", opts...)
if err != nil {
return nil, err
}
@ -121,8 +152,7 @@ func (x *gRPCBrokerStartStreamClient) Recv() (*ConnInfo, error) {
return m, nil
}
// Server API for GRPCBroker service
// GRPCBrokerServer is the server API for GRPCBroker service.
type GRPCBrokerServer interface {
StartStream(GRPCBroker_StartStreamServer) error
}
@ -158,7 +188,7 @@ func (x *gRPCBrokerStartStreamServer) Recv() (*ConnInfo, error) {
}
var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
ServiceName: "plugin.GRPCBroker",
ServiceName: "proto.GRPCBroker",
HandlerType: (*GRPCBrokerServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
@ -171,20 +201,3 @@ var _GRPCBroker_serviceDesc = grpc.ServiceDesc{
},
Metadata: "grpc_broker.proto",
}
func init() { proto.RegisterFile("grpc_broker.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 170 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0x2a, 0xca, 0xcf, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b,
0xc8, 0x29, 0x4d, 0xcf, 0xcc, 0x53, 0x8a, 0xe5, 0xe2, 0x70, 0xce, 0xcf, 0xcb, 0xf3, 0xcc, 0x4b,
0xcb, 0x17, 0x92, 0xe5, 0xe2, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x8d, 0xcf, 0x4c, 0x91,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0d, 0xe2, 0x84, 0x8a, 0x78, 0xa6, 0x08, 0x49, 0x70, 0xb1, 0xe7,
0xa5, 0x96, 0x94, 0xe7, 0x17, 0x65, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x70, 0x06, 0xc1, 0xb8, 0x20,
0x99, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, 0x62, 0x09, 0x66, 0x88, 0x0c, 0x94, 0x6b, 0xe4, 0xcc,
0xc5, 0xe5, 0x1e, 0x14, 0xe0, 0xec, 0x04, 0xb6, 0x5a, 0xc8, 0x94, 0x8b, 0x3b, 0xb8, 0x24, 0xb1,
0xa8, 0x24, 0xb8, 0xa4, 0x28, 0x35, 0x31, 0x57, 0x48, 0x40, 0x0f, 0xe2, 0x08, 0x3d, 0x98, 0x0b,
0xa4, 0x30, 0x44, 0x34, 0x18, 0x0d, 0x18, 0x93, 0xd8, 0xc0, 0x4e, 0x36, 0x06, 0x04, 0x00, 0x00,
0xff, 0xff, 0x7b, 0x5d, 0xfb, 0xe1, 0xc7, 0x00, 0x00, 0x00,
}

View File

@ -1,5 +1,5 @@
syntax = "proto3";
package plugin;
package proto;
message ConnInfo {
uint32 service_id = 1;

View File

@ -0,0 +1,143 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: grpc_controller.proto
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Empty struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) {
return fileDescriptor_23c2c7e42feab570, []int{0}
}
func (m *Empty) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Empty.Unmarshal(m, b)
}
func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Empty.Marshal(b, m, deterministic)
}
func (m *Empty) XXX_Merge(src proto.Message) {
xxx_messageInfo_Empty.Merge(m, src)
}
func (m *Empty) XXX_Size() int {
return xxx_messageInfo_Empty.Size(m)
}
func (m *Empty) XXX_DiscardUnknown() {
xxx_messageInfo_Empty.DiscardUnknown(m)
}
var xxx_messageInfo_Empty proto.InternalMessageInfo
func init() {
proto.RegisterType((*Empty)(nil), "proto.Empty")
}
func init() { proto.RegisterFile("grpc_controller.proto", fileDescriptor_23c2c7e42feab570) }
var fileDescriptor_23c2c7e42feab570 = []byte{
// 97 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4d, 0x2f, 0x2a, 0x48,
0x8e, 0x4f, 0xce, 0xcf, 0x2b, 0x29, 0xca, 0xcf, 0xc9, 0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f,
0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xec, 0x5c, 0xac, 0xae, 0xb9, 0x05, 0x25, 0x95, 0x46, 0x16,
0x5c, 0x7c, 0xee, 0x41, 0x01, 0xce, 0xce, 0x70, 0x75, 0x42, 0x6a, 0x5c, 0x1c, 0xc1, 0x19, 0xa5,
0x25, 0x29, 0xf9, 0xe5, 0x79, 0x42, 0x3c, 0x10, 0x5d, 0x7a, 0x60, 0xb5, 0x52, 0x28, 0xbc, 0x24,
0x36, 0x30, 0xc7, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x69, 0xa1, 0xad, 0x79, 0x69, 0x00, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// GRPCControllerClient is the client API for GRPCController service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type GRPCControllerClient interface {
Shutdown(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
}
type gRPCControllerClient struct {
cc *grpc.ClientConn
}
func NewGRPCControllerClient(cc *grpc.ClientConn) GRPCControllerClient {
return &gRPCControllerClient{cc}
}
func (c *gRPCControllerClient) Shutdown(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/proto.GRPCController/Shutdown", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GRPCControllerServer is the server API for GRPCController service.
type GRPCControllerServer interface {
Shutdown(context.Context, *Empty) (*Empty, error)
}
func RegisterGRPCControllerServer(s *grpc.Server, srv GRPCControllerServer) {
s.RegisterService(&_GRPCController_serviceDesc, srv)
}
func _GRPCController_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GRPCControllerServer).Shutdown(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.GRPCController/Shutdown",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GRPCControllerServer).Shutdown(ctx, req.(*Empty))
}
return interceptor(ctx, in, info, handler)
}
var _GRPCController_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.GRPCController",
HandlerType: (*GRPCControllerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Shutdown",
Handler: _GRPCController_Shutdown_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "grpc_controller.proto",
}

View File

@ -0,0 +1,10 @@
syntax = "proto3";
package proto;
message Empty {
}
// The GRPCController is responsible for telling the plugin server to shutdown.
service GRPCController {
rpc Shutdown(Empty) returns (Empty);
}

73
vendor/github.com/hashicorp/go-plugin/mtls.go generated vendored Normal file
View File

@ -0,0 +1,73 @@
package plugin
import (
"bytes"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"time"
)
// generateCert generates a temporary certificate for plugin authentication. The
// certificate and private key are returns in PEM format.
func generateCert() (cert []byte, privateKey []byte, err error) {
key, err := ecdsa.GenerateKey(elliptic.P521(), rand.Reader)
if err != nil {
return nil, nil, err
}
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
sn, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
return nil, nil, err
}
host := "localhost"
template := &x509.Certificate{
Subject: pkix.Name{
CommonName: host,
Organization: []string{"HashiCorp"},
},
DNSNames: []string{host},
ExtKeyUsage: []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
x509.ExtKeyUsageServerAuth,
},
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment | x509.KeyUsageKeyAgreement | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
SerialNumber: sn,
NotBefore: time.Now().Add(-30 * time.Second),
NotAfter: time.Now().Add(262980 * time.Hour),
IsCA: true,
}
der, err := x509.CreateCertificate(rand.Reader, template, template, key.Public(), key)
if err != nil {
return nil, nil, err
}
var certOut bytes.Buffer
if err := pem.Encode(&certOut, &pem.Block{Type: "CERTIFICATE", Bytes: der}); err != nil {
return nil, nil, err
}
keyBytes, err := x509.MarshalECPrivateKey(key)
if err != nil {
return nil, nil, err
}
var keyOut bytes.Buffer
if err := pem.Encode(&keyOut, &pem.Block{Type: "EC PRIVATE KEY", Bytes: keyBytes}); err != nil {
return nil, nil, err
}
cert = certOut.Bytes()
privateKey = keyOut.Bytes()
return cert, privateKey, nil
}

View File

@ -2,6 +2,7 @@ package plugin
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
@ -11,7 +12,9 @@ import (
"os"
"os/signal"
"runtime"
"sort"
"strconv"
"strings"
"sync/atomic"
"github.com/hashicorp/go-hclog"
@ -36,6 +39,8 @@ type HandshakeConfig struct {
// ProtocolVersion is the version that clients must match on to
// agree they can communicate. This should match the ProtocolVersion
// set on ClientConfig when using a plugin.
// This field is not required if VersionedPlugins are being used in the
// Client or Server configurations.
ProtocolVersion uint
// MagicCookieKey and value are used as a very basic verification
@ -46,6 +51,10 @@ type HandshakeConfig struct {
MagicCookieValue string
}
// PluginSet is a set of plugins provided to be registered in the plugin
// server.
type PluginSet map[string]Plugin
// ServeConfig configures what sorts of plugins are served.
type ServeConfig struct {
// HandshakeConfig is the configuration that must match clients.
@ -55,7 +64,13 @@ type ServeConfig struct {
TLSProvider func() (*tls.Config, error)
// Plugins are the plugins that are served.
Plugins map[string]Plugin
// The implied version of this PluginSet is the Handshake.ProtocolVersion.
Plugins PluginSet
// VersionedPlugins is a map of PluginSets for specific protocol versions.
// These can be used to negotiate a compatible version between client and
// server. If this is set, Handshake.ProtocolVersion is not required.
VersionedPlugins map[int]PluginSet
// GRPCServer should be non-nil to enable serving the plugins over
// gRPC. This is a function to create the server when needed with the
@ -72,14 +87,83 @@ type ServeConfig struct {
Logger hclog.Logger
}
// Protocol returns the protocol that this server should speak.
func (c *ServeConfig) Protocol() Protocol {
result := ProtocolNetRPC
if c.GRPCServer != nil {
result = ProtocolGRPC
// protocolVersion determines the protocol version and plugin set to be used by
// the server. In the event that there is no suitable version, the last version
// in the config is returned leaving the client to report the incompatibility.
func protocolVersion(opts *ServeConfig) (int, Protocol, PluginSet) {
protoVersion := int(opts.ProtocolVersion)
pluginSet := opts.Plugins
protoType := ProtocolNetRPC
// Check if the client sent a list of acceptable versions
var clientVersions []int
if vs := os.Getenv("PLUGIN_PROTOCOL_VERSIONS"); vs != "" {
for _, s := range strings.Split(vs, ",") {
v, err := strconv.Atoi(s)
if err != nil {
fmt.Fprintf(os.Stderr, "server sent invalid plugin version %q", s)
continue
}
clientVersions = append(clientVersions, v)
}
}
return result
// We want to iterate in reverse order, to ensure we match the newest
// compatible plugin version.
sort.Sort(sort.Reverse(sort.IntSlice(clientVersions)))
// set the old un-versioned fields as if they were versioned plugins
if opts.VersionedPlugins == nil {
opts.VersionedPlugins = make(map[int]PluginSet)
}
if pluginSet != nil {
opts.VersionedPlugins[protoVersion] = pluginSet
}
// Sort the version to make sure we match the latest first
var versions []int
for v := range opts.VersionedPlugins {
versions = append(versions, v)
}
sort.Sort(sort.Reverse(sort.IntSlice(versions)))
// See if we have multiple versions of Plugins to choose from
for _, version := range versions {
// Record each version, since we guarantee that this returns valid
// values even if they are not a protocol match.
protoVersion = version
pluginSet = opts.VersionedPlugins[version]
// If we have a configured gRPC server we should select a protocol
if opts.GRPCServer != nil {
// All plugins in a set must use the same transport, so check the first
// for the protocol type
for _, p := range pluginSet {
switch p.(type) {
case GRPCPlugin:
protoType = ProtocolGRPC
default:
protoType = ProtocolNetRPC
}
break
}
}
for _, clientVersion := range clientVersions {
if clientVersion == protoVersion {
return protoVersion, protoType, pluginSet
}
}
}
// Return the lowest version as the fallback.
// Since we iterated over all the versions in reverse order above, these
// values are from the lowest version number plugins (which may be from
// a combination of the Handshake.ProtocolVersion and ServeConfig.Plugins
// fields). This allows serving the oldest version of our plugins to a
// legacy client that did not send a PLUGIN_PROTOCOL_VERSIONS list.
return protoVersion, protoType, pluginSet
}
// Serve serves the plugins given by ServeConfig.
@ -107,6 +191,10 @@ func Serve(opts *ServeConfig) {
os.Exit(1)
}
// negotiate the version and plugins
// start with default version in the handshake config
protoVersion, protoType, pluginSet := protocolVersion(opts)
// Logging goes to the original stderr
log.SetOutput(os.Stderr)
@ -155,12 +243,47 @@ func Serve(opts *ServeConfig) {
}
}
var serverCert string
clientCert := os.Getenv("PLUGIN_CLIENT_CERT")
// If the client is configured using AutoMTLS, the certificate will be here,
// and we need to generate our own in response.
if tlsConfig == nil && clientCert != "" {
logger.Info("configuring server automatic mTLS")
clientCertPool := x509.NewCertPool()
if !clientCertPool.AppendCertsFromPEM([]byte(clientCert)) {
logger.Error("client cert provided but failed to parse", "cert", clientCert)
}
certPEM, keyPEM, err := generateCert()
if err != nil {
logger.Error("failed to generate client certificate", "error", err)
panic(err)
}
cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
logger.Error("failed to parse client certificate", "error", err)
panic(err)
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: clientCertPool,
MinVersion: tls.VersionTLS12,
}
// We send back the raw leaf cert data for the client rather than the
// PEM, since the protocol can't handle newlines.
serverCert = base64.RawStdEncoding.EncodeToString(cert.Certificate[0])
}
// Create the channel to tell us when we're done
doneCh := make(chan struct{})
// Build the server type
var server ServerProtocol
switch opts.Protocol() {
switch protoType {
case ProtocolNetRPC:
// If we have a TLS configuration then we wrap the listener
// ourselves and do it at that level.
@ -170,7 +293,7 @@ func Serve(opts *ServeConfig) {
// Create the RPC server to dispense
server = &RPCServer{
Plugins: opts.Plugins,
Plugins: pluginSet,
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
@ -179,16 +302,17 @@ func Serve(opts *ServeConfig) {
case ProtocolGRPC:
// Create the gRPC server
server = &GRPCServer{
Plugins: opts.Plugins,
Plugins: pluginSet,
Server: opts.GRPCServer,
TLS: tlsConfig,
Stdout: stdout_r,
Stderr: stderr_r,
DoneCh: doneCh,
logger: logger,
}
default:
panic("unknown server protocol: " + opts.Protocol())
panic("unknown server protocol: " + protoType)
}
// Initialize the servers
@ -197,25 +321,16 @@ func Serve(opts *ServeConfig) {
return
}
// Build the extra configuration
extra := ""
if v := server.Config(); v != "" {
extra = base64.StdEncoding.EncodeToString([]byte(v))
}
if extra != "" {
extra = "|" + extra
}
logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String())
// Output the address and service name to stdout so that core can bring it up.
fmt.Printf("%d|%d|%s|%s|%s%s\n",
// Output the address and service name to stdout so that the client can bring it up.
fmt.Printf("%d|%d|%s|%s|%s|%s\n",
CoreProtocolVersion,
opts.ProtocolVersion,
protoVersion,
listener.Addr().Network(),
listener.Addr().String(),
opts.Protocol(),
extra)
protoType,
serverCert)
os.Stdout.Sync()
// Eat the interrupts

View File

@ -7,6 +7,8 @@ import (
"net"
"net/rpc"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin/internal/proto"
"github.com/mitchellh/go-testing-interface"
"google.golang.org/grpc"
)
@ -140,9 +142,11 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
// Start up the server
server := &GRPCServer{
Plugins: ps,
DoneCh: make(chan struct{}),
Server: DefaultGRPCServer,
Stdout: new(bytes.Buffer),
Stderr: new(bytes.Buffer),
logger: hclog.Default(),
}
if err := server.Init(); err != nil {
t.Fatalf("err: %s", err)
@ -165,10 +169,11 @@ func TestPluginGRPCConn(t testing.T, ps map[string]Plugin) (*GRPCClient, *GRPCSe
// Create the client
client := &GRPCClient{
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
Conn: conn,
Plugins: ps,
broker: broker,
doneCtx: context.Background(),
controller: proto.NewGRPCControllerClient(conn),
}
return client, server

3
vendor/vendor.json vendored
View File

@ -189,7 +189,8 @@
{"path":"github.com/hashicorp/go-memdb","checksumSHA1":"FMAvwDar2bQyYAW4XMFhAt0J5xA=","revision":"20ff6434c1cc49b80963d45bf5c6aa89c78d8d57","revisionTime":"2017-08-31T20:15:40Z"},
{"path":"github.com/hashicorp/go-msgpack/codec","checksumSHA1":"TNlVzNR1OaajcNi3CbQ3bGbaLGU=","revision":"fa3f63826f7c23912c15263591e65d54d080b458"},
{"path":"github.com/hashicorp/go-multierror","checksumSHA1":"lrSl49G23l6NhfilxPM0XFs5rZo=","revision":"d30f09973e19c1dfcd120b2d9c4f168e68d6b5d5"},
{"path":"github.com/hashicorp/go-plugin","checksumSHA1":"lbG9uwM7qJlTIBg+8mjCC88sCPc=","revision":"e8d22c780116115ae5624720c9af0c97afe4f551","revisionTime":"2018-03-31T00:25:53Z"},
{"path":"github.com/hashicorp/go-plugin","checksumSHA1":"IFwYSAmxxM+fV0w9LwdWKqFOCgg=","revision":"f444068e8f5a19853177f7aa0aea7e7d95b5b528","revisionTime":"2018-12-12T15:08:38Z"},
{"path":"github.com/hashicorp/go-plugin/internal/proto","checksumSHA1":"Ikbb1FngsPR79bHhr2UmKk4CblI=","revision":"f444068e8f5a19853177f7aa0aea7e7d95b5b528","revisionTime":"2018-12-12T15:08:38Z"},
{"path":"github.com/hashicorp/go-retryablehttp","checksumSHA1":"/yKfFSspjuDzyOe/bBslrPzyfYM=","revision":"e651d75abec6fbd4f2c09508f72ae7af8a8b7171","revisionTime":"2018-07-18T19:50:05Z"},
{"path":"github.com/hashicorp/go-rootcerts","checksumSHA1":"A1PcINvF3UiwHRKn8UcgARgvGRs=","revision":"6bb64b370b90e7ef1fa532be9e591a81c3493e00","revisionTime":"2016-05-03T14:34:40Z"},
{"path":"github.com/hashicorp/go-safetemp","checksumSHA1":"CduvzBFfTv77nhjtXPGdIjQQLMI=","revision":"b1a1dbde6fdc11e3ae79efd9039009e22d4ae240","revisionTime":"2018-03-26T21:11:50Z"},