vendor go-plugin
This commit is contained in:
parent
3cd5dd1839
commit
5fac259ae6
|
@ -0,0 +1,353 @@
|
|||
Mozilla Public License, version 2.0
|
||||
|
||||
1. Definitions
|
||||
|
||||
1.1. “Contributor”
|
||||
|
||||
means each individual or legal entity that creates, contributes to the
|
||||
creation of, or owns Covered Software.
|
||||
|
||||
1.2. “Contributor Version”
|
||||
|
||||
means the combination of the Contributions of others (if any) used by a
|
||||
Contributor and that particular Contributor’s Contribution.
|
||||
|
||||
1.3. “Contribution”
|
||||
|
||||
means Covered Software of a particular Contributor.
|
||||
|
||||
1.4. “Covered Software”
|
||||
|
||||
means Source Code Form to which the initial Contributor has attached the
|
||||
notice in Exhibit A, the Executable Form of such Source Code Form, and
|
||||
Modifications of such Source Code Form, in each case including portions
|
||||
thereof.
|
||||
|
||||
1.5. “Incompatible With Secondary Licenses”
|
||||
means
|
||||
|
||||
a. that the initial Contributor has attached the notice described in
|
||||
Exhibit B to the Covered Software; or
|
||||
|
||||
b. that the Covered Software was made available under the terms of version
|
||||
1.1 or earlier of the License, but not also under the terms of a
|
||||
Secondary License.
|
||||
|
||||
1.6. “Executable Form”
|
||||
|
||||
means any form of the work other than Source Code Form.
|
||||
|
||||
1.7. “Larger Work”
|
||||
|
||||
means a work that combines Covered Software with other material, in a separate
|
||||
file or files, that is not Covered Software.
|
||||
|
||||
1.8. “License”
|
||||
|
||||
means this document.
|
||||
|
||||
1.9. “Licensable”
|
||||
|
||||
means having the right to grant, to the maximum extent possible, whether at the
|
||||
time of the initial grant or subsequently, any and all of the rights conveyed by
|
||||
this License.
|
||||
|
||||
1.10. “Modifications”
|
||||
|
||||
means any of the following:
|
||||
|
||||
a. any file in Source Code Form that results from an addition to, deletion
|
||||
from, or modification of the contents of Covered Software; or
|
||||
|
||||
b. any new file in Source Code Form that contains any Covered Software.
|
||||
|
||||
1.11. “Patent Claims” of a Contributor
|
||||
|
||||
means any patent claim(s), including without limitation, method, process,
|
||||
and apparatus claims, in any patent Licensable by such Contributor that
|
||||
would be infringed, but for the grant of the License, by the making,
|
||||
using, selling, offering for sale, having made, import, or transfer of
|
||||
either its Contributions or its Contributor Version.
|
||||
|
||||
1.12. “Secondary License”
|
||||
|
||||
means either the GNU General Public License, Version 2.0, the GNU Lesser
|
||||
General Public License, Version 2.1, the GNU Affero General Public
|
||||
License, Version 3.0, or any later versions of those licenses.
|
||||
|
||||
1.13. “Source Code Form”
|
||||
|
||||
means the form of the work preferred for making modifications.
|
||||
|
||||
1.14. “You” (or “Your”)
|
||||
|
||||
means an individual or a legal entity exercising rights under this
|
||||
License. For legal entities, “You” includes any entity that controls, is
|
||||
controlled by, or is under common control with You. For purposes of this
|
||||
definition, “control” means (a) the power, direct or indirect, to cause
|
||||
the direction or management of such entity, whether by contract or
|
||||
otherwise, or (b) ownership of more than fifty percent (50%) of the
|
||||
outstanding shares or beneficial ownership of such entity.
|
||||
|
||||
|
||||
2. License Grants and Conditions
|
||||
|
||||
2.1. Grants
|
||||
|
||||
Each Contributor hereby grants You a world-wide, royalty-free,
|
||||
non-exclusive license:
|
||||
|
||||
a. under intellectual property rights (other than patent or trademark)
|
||||
Licensable by such Contributor to use, reproduce, make available,
|
||||
modify, display, perform, distribute, and otherwise exploit its
|
||||
Contributions, either on an unmodified basis, with Modifications, or as
|
||||
part of a Larger Work; and
|
||||
|
||||
b. under Patent Claims of such Contributor to make, use, sell, offer for
|
||||
sale, have made, import, and otherwise transfer either its Contributions
|
||||
or its Contributor Version.
|
||||
|
||||
2.2. Effective Date
|
||||
|
||||
The licenses granted in Section 2.1 with respect to any Contribution become
|
||||
effective for each Contribution on the date the Contributor first distributes
|
||||
such Contribution.
|
||||
|
||||
2.3. Limitations on Grant Scope
|
||||
|
||||
The licenses granted in this Section 2 are the only rights granted under this
|
||||
License. No additional rights or licenses will be implied from the distribution
|
||||
or licensing of Covered Software under this License. Notwithstanding Section
|
||||
2.1(b) above, no patent license is granted by a Contributor:
|
||||
|
||||
a. for any code that a Contributor has removed from Covered Software; or
|
||||
|
||||
b. for infringements caused by: (i) Your and any other third party’s
|
||||
modifications of Covered Software, or (ii) the combination of its
|
||||
Contributions with other software (except as part of its Contributor
|
||||
Version); or
|
||||
|
||||
c. under Patent Claims infringed by Covered Software in the absence of its
|
||||
Contributions.
|
||||
|
||||
This License does not grant any rights in the trademarks, service marks, or
|
||||
logos of any Contributor (except as may be necessary to comply with the
|
||||
notice requirements in Section 3.4).
|
||||
|
||||
2.4. Subsequent Licenses
|
||||
|
||||
No Contributor makes additional grants as a result of Your choice to
|
||||
distribute the Covered Software under a subsequent version of this License
|
||||
(see Section 10.2) or under the terms of a Secondary License (if permitted
|
||||
under the terms of Section 3.3).
|
||||
|
||||
2.5. Representation
|
||||
|
||||
Each Contributor represents that the Contributor believes its Contributions
|
||||
are its original creation(s) or it has sufficient rights to grant the
|
||||
rights to its Contributions conveyed by this License.
|
||||
|
||||
2.6. Fair Use
|
||||
|
||||
This License is not intended to limit any rights You have under applicable
|
||||
copyright doctrines of fair use, fair dealing, or other equivalents.
|
||||
|
||||
2.7. Conditions
|
||||
|
||||
Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in
|
||||
Section 2.1.
|
||||
|
||||
|
||||
3. Responsibilities
|
||||
|
||||
3.1. Distribution of Source Form
|
||||
|
||||
All distribution of Covered Software in Source Code Form, including any
|
||||
Modifications that You create or to which You contribute, must be under the
|
||||
terms of this License. You must inform recipients that the Source Code Form
|
||||
of the Covered Software is governed by the terms of this License, and how
|
||||
they can obtain a copy of this License. You may not attempt to alter or
|
||||
restrict the recipients’ rights in the Source Code Form.
|
||||
|
||||
3.2. Distribution of Executable Form
|
||||
|
||||
If You distribute Covered Software in Executable Form then:
|
||||
|
||||
a. such Covered Software must also be made available in Source Code Form,
|
||||
as described in Section 3.1, and You must inform recipients of the
|
||||
Executable Form how they can obtain a copy of such Source Code Form by
|
||||
reasonable means in a timely manner, at a charge no more than the cost
|
||||
of distribution to the recipient; and
|
||||
|
||||
b. You may distribute such Executable Form under the terms of this License,
|
||||
or sublicense it under different terms, provided that the license for
|
||||
the Executable Form does not attempt to limit or alter the recipients’
|
||||
rights in the Source Code Form under this License.
|
||||
|
||||
3.3. Distribution of a Larger Work
|
||||
|
||||
You may create and distribute a Larger Work under terms of Your choice,
|
||||
provided that You also comply with the requirements of this License for the
|
||||
Covered Software. If the Larger Work is a combination of Covered Software
|
||||
with a work governed by one or more Secondary Licenses, and the Covered
|
||||
Software is not Incompatible With Secondary Licenses, this License permits
|
||||
You to additionally distribute such Covered Software under the terms of
|
||||
such Secondary License(s), so that the recipient of the Larger Work may, at
|
||||
their option, further distribute the Covered Software under the terms of
|
||||
either this License or such Secondary License(s).
|
||||
|
||||
3.4. Notices
|
||||
|
||||
You may not remove or alter the substance of any license notices (including
|
||||
copyright notices, patent notices, disclaimers of warranty, or limitations
|
||||
of liability) contained within the Source Code Form of the Covered
|
||||
Software, except that You may alter any license notices to the extent
|
||||
required to remedy known factual inaccuracies.
|
||||
|
||||
3.5. Application of Additional Terms
|
||||
|
||||
You may choose to offer, and to charge a fee for, warranty, support,
|
||||
indemnity or liability obligations to one or more recipients of Covered
|
||||
Software. However, You may do so only on Your own behalf, and not on behalf
|
||||
of any Contributor. You must make it absolutely clear that any such
|
||||
warranty, support, indemnity, or liability obligation is offered by You
|
||||
alone, and You hereby agree to indemnify every Contributor for any
|
||||
liability incurred by such Contributor as a result of warranty, support,
|
||||
indemnity or liability terms You offer. You may include additional
|
||||
disclaimers of warranty and limitations of liability specific to any
|
||||
jurisdiction.
|
||||
|
||||
4. Inability to Comply Due to Statute or Regulation
|
||||
|
||||
If it is impossible for You to comply with any of the terms of this License
|
||||
with respect to some or all of the Covered Software due to statute, judicial
|
||||
order, or regulation then You must: (a) comply with the terms of this License
|
||||
to the maximum extent possible; and (b) describe the limitations and the code
|
||||
they affect. Such description must be placed in a text file included with all
|
||||
distributions of the Covered Software under this License. Except to the
|
||||
extent prohibited by statute or regulation, such description must be
|
||||
sufficiently detailed for a recipient of ordinary skill to be able to
|
||||
understand it.
|
||||
|
||||
5. Termination
|
||||
|
||||
5.1. The rights granted under this License will terminate automatically if You
|
||||
fail to comply with any of its terms. However, if You become compliant,
|
||||
then the rights granted under this License from a particular Contributor
|
||||
are reinstated (a) provisionally, unless and until such Contributor
|
||||
explicitly and finally terminates Your grants, and (b) on an ongoing basis,
|
||||
if such Contributor fails to notify You of the non-compliance by some
|
||||
reasonable means prior to 60 days after You have come back into compliance.
|
||||
Moreover, Your grants from a particular Contributor are reinstated on an
|
||||
ongoing basis if such Contributor notifies You of the non-compliance by
|
||||
some reasonable means, this is the first time You have received notice of
|
||||
non-compliance with this License from such Contributor, and You become
|
||||
compliant prior to 30 days after Your receipt of the notice.
|
||||
|
||||
5.2. If You initiate litigation against any entity by asserting a patent
|
||||
infringement claim (excluding declaratory judgment actions, counter-claims,
|
||||
and cross-claims) alleging that a Contributor Version directly or
|
||||
indirectly infringes any patent, then the rights granted to You by any and
|
||||
all Contributors for the Covered Software under Section 2.1 of this License
|
||||
shall terminate.
|
||||
|
||||
5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user
|
||||
license agreements (excluding distributors and resellers) which have been
|
||||
validly granted by You or Your distributors under this License prior to
|
||||
termination shall survive termination.
|
||||
|
||||
6. Disclaimer of Warranty
|
||||
|
||||
Covered Software is provided under this License on an “as is” basis, without
|
||||
warranty of any kind, either expressed, implied, or statutory, including,
|
||||
without limitation, warranties that the Covered Software is free of defects,
|
||||
merchantable, fit for a particular purpose or non-infringing. The entire
|
||||
risk as to the quality and performance of the Covered Software is with You.
|
||||
Should any Covered Software prove defective in any respect, You (not any
|
||||
Contributor) assume the cost of any necessary servicing, repair, or
|
||||
correction. This disclaimer of warranty constitutes an essential part of this
|
||||
License. No use of any Covered Software is authorized under this License
|
||||
except under this disclaimer.
|
||||
|
||||
7. Limitation of Liability
|
||||
|
||||
Under no circumstances and under no legal theory, whether tort (including
|
||||
negligence), contract, or otherwise, shall any Contributor, or anyone who
|
||||
distributes Covered Software as permitted above, be liable to You for any
|
||||
direct, indirect, special, incidental, or consequential damages of any
|
||||
character including, without limitation, damages for lost profits, loss of
|
||||
goodwill, work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses, even if such party shall have been
|
||||
informed of the possibility of such damages. This limitation of liability
|
||||
shall not apply to liability for death or personal injury resulting from such
|
||||
party’s negligence to the extent applicable law prohibits such limitation.
|
||||
Some jurisdictions do not allow the exclusion or limitation of incidental or
|
||||
consequential damages, so this exclusion and limitation may not apply to You.
|
||||
|
||||
8. Litigation
|
||||
|
||||
Any litigation relating to this License may be brought only in the courts of
|
||||
a jurisdiction where the defendant maintains its principal place of business
|
||||
and such litigation shall be governed by laws of that jurisdiction, without
|
||||
reference to its conflict-of-law provisions. Nothing in this Section shall
|
||||
prevent a party’s ability to bring cross-claims or counter-claims.
|
||||
|
||||
9. Miscellaneous
|
||||
|
||||
This License represents the complete agreement concerning the subject matter
|
||||
hereof. If any provision of this License is held to be unenforceable, such
|
||||
provision shall be reformed only to the extent necessary to make it
|
||||
enforceable. Any law or regulation which provides that the language of a
|
||||
contract shall be construed against the drafter shall not be used to construe
|
||||
this License against a Contributor.
|
||||
|
||||
|
||||
10. Versions of the License
|
||||
|
||||
10.1. New Versions
|
||||
|
||||
Mozilla Foundation is the license steward. Except as provided in Section
|
||||
10.3, no one other than the license steward has the right to modify or
|
||||
publish new versions of this License. Each version will be given a
|
||||
distinguishing version number.
|
||||
|
||||
10.2. Effect of New Versions
|
||||
|
||||
You may distribute the Covered Software under the terms of the version of
|
||||
the License under which You originally received the Covered Software, or
|
||||
under the terms of any subsequent version published by the license
|
||||
steward.
|
||||
|
||||
10.3. Modified Versions
|
||||
|
||||
If you create software not governed by this License, and you want to
|
||||
create a new license for such software, you may create and use a modified
|
||||
version of this License if you rename the license and remove any
|
||||
references to the name of the license steward (except to note that such
|
||||
modified license differs from this License).
|
||||
|
||||
10.4. Distributing Source Code Form that is Incompatible With Secondary Licenses
|
||||
If You choose to distribute Source Code Form that is Incompatible With
|
||||
Secondary Licenses under the terms of this version of the License, the
|
||||
notice described in Exhibit B of this License must be attached.
|
||||
|
||||
Exhibit A - Source Code Form License Notice
|
||||
|
||||
This Source Code Form is subject to the
|
||||
terms of the Mozilla Public License, v.
|
||||
2.0. If a copy of the MPL was not
|
||||
distributed with this file, You can
|
||||
obtain one at
|
||||
http://mozilla.org/MPL/2.0/.
|
||||
|
||||
If it is not possible or desirable to put the notice in a particular file, then
|
||||
You may include the notice in a location (such as a LICENSE file in a relevant
|
||||
directory) where a recipient would be likely to look for such a notice.
|
||||
|
||||
You may add additional accurate notices of copyright ownership.
|
||||
|
||||
Exhibit B - “Incompatible With Secondary Licenses” Notice
|
||||
|
||||
This Source Code Form is “Incompatible
|
||||
With Secondary Licenses”, as defined by
|
||||
the Mozilla Public License, v. 2.0.
|
|
@ -0,0 +1,161 @@
|
|||
# Go Plugin System over RPC
|
||||
|
||||
`go-plugin` is a Go (golang) plugin system over RPC. It is the plugin system
|
||||
that has been in use by HashiCorp tooling for over 3 years. While initially
|
||||
created for [Packer](https://www.packer.io), it has since been used by
|
||||
[Terraform](https://www.terraform.io) and [Otto](https://www.ottoproject.io),
|
||||
with plans to also use it for [Nomad](https://www.nomadproject.io) and
|
||||
[Vault](https://www.vaultproject.io).
|
||||
|
||||
While the plugin system is over RPC, it is currently only designed to work
|
||||
over a local [reliable] network. Plugins over a real network are not supported
|
||||
and will lead to unexpected behavior.
|
||||
|
||||
This plugin system has been used on millions of machines across many different
|
||||
projects and has proven to be battle hardened and ready for production use.
|
||||
|
||||
## Features
|
||||
|
||||
The HashiCorp plugin system supports a number of features:
|
||||
|
||||
**Plugins are Go interface implementations.** This makes writing and consuming
|
||||
plugins feel very natural. To a plugin author: you just implement an
|
||||
interface as if it were going to run in the same process. For a plugin user:
|
||||
you just use and call functions on an interface as if it were in the same
|
||||
process. This plugin system handles the communication in between.
|
||||
|
||||
**Complex arguments and return values are supported.** This library
|
||||
provides APIs for handling complex arguments and return values such
|
||||
as interfaces, `io.Reader/Writer`, etc. We do this by giving you a library
|
||||
(`MuxBroker`) for creating new connections between the client/server to
|
||||
serve additional interfaces or transfer raw data.
|
||||
|
||||
**Bidirectional communication.** Because the plugin system supports
|
||||
complex arguments, the host process can send it interface implementations
|
||||
and the plugin can call back into the host process.
|
||||
|
||||
**Built-in Logging.** Any plugins that use the `log` standard library
|
||||
will have log data automatically sent to the host process. The host
|
||||
process will mirror this output prefixed with the path to the plugin
|
||||
binary. This makes debugging with plugins simple.
|
||||
|
||||
**Protocol Versioning.** A very basic "protocol version" is supported that
|
||||
can be incremented to invalidate any previous plugins. This is useful when
|
||||
interface signatures are changing, protocol level changes are necessary,
|
||||
etc. When a protocol version is incompatible, a human friendly error
|
||||
message is shown to the end user.
|
||||
|
||||
**Stdout/Stderr Syncing.** While plugins are subprocesses, they can continue
|
||||
to use stdout/stderr as usual and the output will get mirrored back to
|
||||
the host process. The host process can control what `io.Writer` these
|
||||
streams go to to prevent this from happening.
|
||||
|
||||
**TTY Preservation.** Plugin subprocesses are connected to the identical
|
||||
stdin file descriptor as the host process, allowing software that requires
|
||||
a TTY to work. For example, a plugin can execute `ssh` and even though there
|
||||
are multiple subprocesses and RPC happening, it will look and act perfectly
|
||||
to the end user.
|
||||
|
||||
**Host upgrade while a plugin is running.** Plugins can be "reattached"
|
||||
so that the host process can be upgraded while the plugin is still running.
|
||||
This requires the host/plugin to know this is possible and daemonize
|
||||
properly. `NewClient` takes a `ReattachConfig` to determine if and how to
|
||||
reattach.
|
||||
|
||||
## Architecture
|
||||
|
||||
The HashiCorp plugin system works by launching subprocesses and communicating
|
||||
over RPC (using standard `net/rpc`). A single connection is made between
|
||||
any plugin and the host process, and we use a
|
||||
[connection multiplexing](https://github.com/hashicorp/yamux)
|
||||
library to multiplex any other connections on top.
|
||||
|
||||
This architecture has a number of benefits:
|
||||
|
||||
* Plugins can't crash your host process: A panic in a plugin doesn't
|
||||
panic the plugin user.
|
||||
|
||||
* Plugins are very easy to write: just write a Go application and `go build`.
|
||||
Theoretically you could also use another language as long as it can
|
||||
communicate the Go `net/rpc` protocol but this hasn't yet been tried.
|
||||
|
||||
* Plugins are very easy to install: just put the binary in a location where
|
||||
the host will find it (depends on the host but this library also provides
|
||||
helpers), and the plugin host handles the rest.
|
||||
|
||||
* Plugins can be relatively secure: The plugin only has access to the
|
||||
interfaces and args given to it, not to the entire memory space of the
|
||||
process. More security features are planned (see the coming soon section
|
||||
below).
|
||||
|
||||
## Usage
|
||||
|
||||
To use the plugin system, you must take the following steps. These are
|
||||
high-level steps that must be done. Examples are available in the
|
||||
`examples/` directory.
|
||||
|
||||
1. Choose the interface(s) you want to expose for plugins.
|
||||
|
||||
2. For each interface, implement an implementation of that interface
|
||||
that communicates over an `*rpc.Client` (from the standard `net/rpc`
|
||||
package) for every function call. Likewise, implement the RPC server
|
||||
struct this communicates to which is then communicating to a real,
|
||||
concrete implementation.
|
||||
|
||||
3. Create a `Plugin` implementation that knows how to create the RPC
|
||||
client/server for a given plugin type.
|
||||
|
||||
4. Plugin authors call `plugin.Serve` to serve a plugin from the
|
||||
`main` function.
|
||||
|
||||
5. Plugin users use `plugin.Client` to launch a subprocess and request
|
||||
an interface implementation over RPC.
|
||||
|
||||
That's it! In practice, step 2 is the most tedious and time consuming step.
|
||||
Even so, it isn't very difficult and you can see examples in the `examples/`
|
||||
directory as well as throughout our various open source projects.
|
||||
|
||||
For complete API documentation, see [GoDoc](https://godoc.org/github.com/hashicorp/go-plugin).
|
||||
|
||||
## Roadmap
|
||||
|
||||
Our plugin system is constantly evolving. As we use the plugin system for
|
||||
new projects or for new features in existing projects, we constantly find
|
||||
improvements we can make.
|
||||
|
||||
At this point in time, the roadmap for the plugin system is:
|
||||
|
||||
**Cryptographically Secure Plugins.** We'll implement signing plugins
|
||||
and loading signed plugins in order to allow Vault to make use of multi-process
|
||||
in a secure way.
|
||||
|
||||
**Semantic Versioning.** Plugins will be able to implement a semantic version.
|
||||
This plugin system will give host processes a system for constraining
|
||||
versions. This is in addition to the protocol versioning already present
|
||||
which is more for larger underlying changes.
|
||||
|
||||
**Plugin fetching.** We will integrate with [go-getter](https://github.com/hashicorp/go-getter)
|
||||
to support automatic download + install of plugins. Paired with cryptographically
|
||||
secure plugins (above), we can make this a safe operation for an amazing
|
||||
user experience.
|
||||
|
||||
## What About Shared Libraries?
|
||||
|
||||
When we started using plugins (late 2012, early 2013), plugins over RPC
|
||||
were the only option since Go didn't support dynamic library loading. Today,
|
||||
Go still doesn't support dynamic library loading, but they do intend to.
|
||||
Since 2012, our plugin system has stabilized from millions of users using it,
|
||||
and has many benefits we've come to value greatly.
|
||||
|
||||
For example, we intend to use this plugin system in
|
||||
[Vault](https://www.vaultproject.io), and dynamic library loading will
|
||||
simply never be acceptable in Vault for security reasons. That is an extreme
|
||||
example, but we believe our library system has more upsides than downsides
|
||||
over dynamic library loading and since we've had it built and tested for years,
|
||||
we'll likely continue to use it.
|
||||
|
||||
Shared libraries have one major advantage over our system which is much
|
||||
higher performance. In real world scenarios across our various tools,
|
||||
we've never required any more performance out of our plugin system and it
|
||||
has seen very high throughput, so this isn't a concern for us at the moment.
|
||||
|
|
@ -0,0 +1,666 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
// If this is 1, then we've called CleanupClients. This can be used
|
||||
// by plugin RPC implementations to change error behavior since you
|
||||
// can expected network connection errors at this point. This should be
|
||||
// read by using sync/atomic.
|
||||
var Killed uint32 = 0
|
||||
|
||||
// This is a slice of the "managed" clients which are cleaned up when
|
||||
// calling Cleanup
|
||||
var managedClients = make([]*Client, 0, 5)
|
||||
var managedClientsLock sync.Mutex
|
||||
|
||||
// Error types
|
||||
var (
|
||||
// ErrProcessNotFound is returned when a client is instantiated to
|
||||
// reattach to an existing process and it isn't found.
|
||||
ErrProcessNotFound = errors.New("Reattachment process not found")
|
||||
|
||||
// ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
|
||||
// the one provided in the SecureConfig.
|
||||
ErrChecksumsDoNotMatch = errors.New("checksums did not match")
|
||||
|
||||
// ErrSecureNoChecksum is returned when an empty checksum is provided to the
|
||||
// SecureConfig.
|
||||
ErrSecureConfigNoChecksum = errors.New("no checksum provided")
|
||||
|
||||
// ErrSecureNoHash is returned when a nil Hash object is provided to the
|
||||
// SecureConfig.
|
||||
ErrSecureConfigNoHash = errors.New("no hash implementation provided")
|
||||
|
||||
// ErrSecureConfigAndReattach is returned when both Reattach and
|
||||
// SecureConfig are set.
|
||||
ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
|
||||
)
|
||||
|
||||
// Client handles the lifecycle of a plugin application. It launches
|
||||
// plugins, connects to them, dispenses interface implementations, and handles
|
||||
// killing the process.
|
||||
//
|
||||
// Plugin hosts should use one Client for each plugin executable. To
|
||||
// dispense a plugin type, use the `Client.Client` function, and then
|
||||
// cal `Dispense`. This awkward API is mostly historical but is used to split
|
||||
// the client that deals with subprocess management and the client that
|
||||
// does RPC management.
|
||||
//
|
||||
// See NewClient and ClientConfig for using a Client.
|
||||
type Client struct {
|
||||
config *ClientConfig
|
||||
exited bool
|
||||
doneLogging chan struct{}
|
||||
l sync.Mutex
|
||||
address net.Addr
|
||||
process *os.Process
|
||||
client *RPCClient
|
||||
}
|
||||
|
||||
// ClientConfig is the configuration used to initialize a new
|
||||
// plugin client. After being used to initialize a plugin client,
|
||||
// that configuration must not be modified again.
|
||||
type ClientConfig struct {
|
||||
// HandshakeConfig is the configuration that must match servers.
|
||||
HandshakeConfig
|
||||
|
||||
// Plugins are the plugins that can be consumed.
|
||||
Plugins map[string]Plugin
|
||||
|
||||
// One of the following must be set, but not both.
|
||||
//
|
||||
// Cmd is the unstarted subprocess for starting the plugin. If this is
|
||||
// set, then the Client starts the plugin process on its own and connects
|
||||
// to it.
|
||||
//
|
||||
// Reattach is configuration for reattaching to an existing plugin process
|
||||
// that is already running. This isn't common.
|
||||
Cmd *exec.Cmd
|
||||
Reattach *ReattachConfig
|
||||
|
||||
// SecureConfig is configuration for verifying the integrity of the
|
||||
// executable. It can not be used with Reattach.
|
||||
SecureConfig *SecureConfig
|
||||
|
||||
// TLSConfig is used to enable TLS on the RPC client.
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// Managed represents if the client should be managed by the
|
||||
// plugin package or not. If true, then by calling CleanupClients,
|
||||
// it will automatically be cleaned up. Otherwise, the client
|
||||
// user is fully responsible for making sure to Kill all plugin
|
||||
// clients. By default the client is _not_ managed.
|
||||
Managed bool
|
||||
|
||||
// The minimum and maximum port to use for communicating with
|
||||
// the subprocess. If not set, this defaults to 10,000 and 25,000
|
||||
// respectively.
|
||||
MinPort, MaxPort uint
|
||||
|
||||
// StartTimeout is the timeout to wait for the plugin to say it
|
||||
// has started successfully.
|
||||
StartTimeout time.Duration
|
||||
|
||||
// If non-nil, then the stderr of the client will be written to here
|
||||
// (as well as the log). This is the original os.Stderr of the subprocess.
|
||||
// This isn't the output of synced stderr.
|
||||
Stderr io.Writer
|
||||
|
||||
// SyncStdout, SyncStderr can be set to override the
|
||||
// respective os.Std* values in the plugin. Care should be taken to
|
||||
// avoid races here. If these are nil, then this will automatically be
|
||||
// hooked up to os.Stdin, Stdout, and Stderr, respectively.
|
||||
//
|
||||
// If the default values (nil) are used, then this package will not
|
||||
// sync any of these streams.
|
||||
SyncStdout io.Writer
|
||||
SyncStderr io.Writer
|
||||
}
|
||||
|
||||
// ReattachConfig is used to configure a client to reattach to an
|
||||
// already-running plugin process. You can retrieve this information by
|
||||
// calling ReattachConfig on Client.
|
||||
type ReattachConfig struct {
|
||||
Addr net.Addr
|
||||
Pid int
|
||||
}
|
||||
|
||||
// SecureConfig is used to configure a client to verify the integrity of an
|
||||
// executable before running. It does this by verifying the checksum is
|
||||
// expected. Hash is used to specify the hashing method to use when checksumming
|
||||
// the file. The configuration is verified by the client by calling the
|
||||
// SecureConfig.Check() function.
|
||||
//
|
||||
// The host process should ensure the checksum was provided by a trusted and
|
||||
// authoritative source. The binary should be installed in such a way that it
|
||||
// can not be modified by an unauthorized user between the time of this check
|
||||
// and the time of execution.
|
||||
type SecureConfig struct {
|
||||
Checksum []byte
|
||||
Hash hash.Hash
|
||||
}
|
||||
|
||||
// Check takes the filepath to an executable and returns true if the checksum of
|
||||
// the file matches the checksum provided in the SecureConfig.
|
||||
func (s *SecureConfig) Check(filePath string) (bool, error) {
|
||||
if len(s.Checksum) == 0 {
|
||||
return false, ErrSecureConfigNoChecksum
|
||||
}
|
||||
|
||||
if s.Hash == nil {
|
||||
return false, ErrSecureConfigNoHash
|
||||
}
|
||||
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
_, err = io.Copy(s.Hash, file)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
sum := s.Hash.Sum(nil)
|
||||
|
||||
return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
|
||||
}
|
||||
|
||||
// This makes sure all the managed subprocesses are killed and properly
|
||||
// logged. This should be called before the parent process running the
|
||||
// plugins exits.
|
||||
//
|
||||
// This must only be called _once_.
|
||||
func CleanupClients() {
|
||||
// Set the killed to true so that we don't get unexpected panics
|
||||
atomic.StoreUint32(&Killed, 1)
|
||||
|
||||
// Kill all the managed clients in parallel and use a WaitGroup
|
||||
// to wait for them all to finish up.
|
||||
var wg sync.WaitGroup
|
||||
managedClientsLock.Lock()
|
||||
for _, client := range managedClients {
|
||||
wg.Add(1)
|
||||
|
||||
go func(client *Client) {
|
||||
client.Kill()
|
||||
wg.Done()
|
||||
}(client)
|
||||
}
|
||||
managedClientsLock.Unlock()
|
||||
|
||||
log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Creates a new plugin client which manages the lifecycle of an external
|
||||
// plugin and gets the address for the RPC connection.
|
||||
//
|
||||
// The client must be cleaned up at some point by calling Kill(). If
|
||||
// the client is a managed client (created with NewManagedClient) you
|
||||
// can just call CleanupClients at the end of your program and they will
|
||||
// be properly cleaned.
|
||||
func NewClient(config *ClientConfig) (c *Client) {
|
||||
if config.MinPort == 0 && config.MaxPort == 0 {
|
||||
config.MinPort = 10000
|
||||
config.MaxPort = 25000
|
||||
}
|
||||
|
||||
if config.StartTimeout == 0 {
|
||||
config.StartTimeout = 1 * time.Minute
|
||||
}
|
||||
|
||||
if config.Stderr == nil {
|
||||
config.Stderr = ioutil.Discard
|
||||
}
|
||||
|
||||
if config.SyncStdout == nil {
|
||||
config.SyncStdout = ioutil.Discard
|
||||
}
|
||||
if config.SyncStderr == nil {
|
||||
config.SyncStderr = ioutil.Discard
|
||||
}
|
||||
|
||||
c = &Client{config: config}
|
||||
if config.Managed {
|
||||
managedClientsLock.Lock()
|
||||
managedClients = append(managedClients, c)
|
||||
managedClientsLock.Unlock()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Client returns an RPC client for the plugin.
|
||||
//
|
||||
// Subsequent calls to this will return the same RPC client.
|
||||
func (c *Client) Client() (*RPCClient, error) {
|
||||
addr, err := c.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
|
||||
if c.client != nil {
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
// Connect to the client
|
||||
conn, err := net.Dial(addr.Network(), addr.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
// Make sure to set keep alive so that the connection doesn't die
|
||||
tcpConn.SetKeepAlive(true)
|
||||
}
|
||||
|
||||
if c.config.TLSConfig != nil {
|
||||
conn = tls.Client(conn, c.config.TLSConfig)
|
||||
}
|
||||
|
||||
// Create the actual RPC client
|
||||
c.client, err = NewRPCClient(conn, c.config.Plugins)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Begin the stream syncing so that stdin, out, err work properly
|
||||
err = c.client.SyncStreams(
|
||||
c.config.SyncStdout,
|
||||
c.config.SyncStderr)
|
||||
if err != nil {
|
||||
c.client.Close()
|
||||
c.client = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
// Tells whether or not the underlying process has exited.
|
||||
func (c *Client) Exited() bool {
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
return c.exited
|
||||
}
|
||||
|
||||
// End the executing subprocess (if it is running) and perform any cleanup
|
||||
// tasks necessary such as capturing any remaining logs and so on.
|
||||
//
|
||||
// This method blocks until the process successfully exits.
|
||||
//
|
||||
// This method can safely be called multiple times.
|
||||
func (c *Client) Kill() {
|
||||
// Grab a lock to read some private fields.
|
||||
c.l.Lock()
|
||||
process := c.process
|
||||
addr := c.address
|
||||
doneCh := c.doneLogging
|
||||
c.l.Unlock()
|
||||
|
||||
// If there is no process, we never started anything. Nothing to kill.
|
||||
if process == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// We need to check for address here. It is possible that the plugin
|
||||
// started (process != nil) but has no address (addr == nil) if the
|
||||
// plugin failed at startup. If we do have an address, we need to close
|
||||
// the plugin net connections.
|
||||
graceful := false
|
||||
if addr != nil {
|
||||
// Close the client to cleanly exit the process.
|
||||
client, err := c.Client()
|
||||
if err == nil {
|
||||
err = client.Close()
|
||||
|
||||
// If there is no error, then we attempt to wait for a graceful
|
||||
// exit. If there was an error, we assume that graceful cleanup
|
||||
// won't happen and just force kill.
|
||||
graceful = err == nil
|
||||
if err != nil {
|
||||
// If there was an error just log it. We're going to force
|
||||
// kill in a moment anyways.
|
||||
log.Printf(
|
||||
"[WARN] plugin: error closing client during Kill: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we're attempting a graceful exit, then we wait for a short period
|
||||
// of time to allow that to happen. To wait for this we just wait on the
|
||||
// doneCh which would be closed if the process exits.
|
||||
if graceful {
|
||||
select {
|
||||
case <-doneCh:
|
||||
return
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
// If graceful exiting failed, just kill it
|
||||
process.Kill()
|
||||
|
||||
// Wait for the client to finish logging so we have a complete log
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
// Starts the underlying subprocess, communicating with it to negotiate
|
||||
// a port for RPC connections, and returning the address to connect via RPC.
|
||||
//
|
||||
// This method is safe to call multiple times. Subsequent calls have no effect.
|
||||
// Once a client has been started once, it cannot be started again, even if
|
||||
// it was killed.
|
||||
func (c *Client) Start() (addr net.Addr, err error) {
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
|
||||
if c.address != nil {
|
||||
return c.address, nil
|
||||
}
|
||||
|
||||
// If one of cmd or reattach isn't set, then it is an error. We wrap
|
||||
// this in a {} for scoping reasons, and hopeful that the escape
|
||||
// analysis will pop the stock here.
|
||||
{
|
||||
cmdSet := c.config.Cmd != nil
|
||||
attachSet := c.config.Reattach != nil
|
||||
secureSet := c.config.SecureConfig != nil
|
||||
if cmdSet == attachSet {
|
||||
return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
|
||||
}
|
||||
|
||||
if secureSet && attachSet {
|
||||
return nil, ErrSecureConfigAndReattach
|
||||
}
|
||||
}
|
||||
|
||||
// Create the logging channel for when we kill
|
||||
c.doneLogging = make(chan struct{})
|
||||
|
||||
if c.config.Reattach != nil {
|
||||
// Verify the process still exists. If not, then it is an error
|
||||
p, err := os.FindProcess(c.config.Reattach.Pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Attempt to connect to the addr since on Unix systems FindProcess
|
||||
// doesn't actually return an error if it can't find the process.
|
||||
conn, err := net.Dial(
|
||||
c.config.Reattach.Addr.Network(),
|
||||
c.config.Reattach.Addr.String())
|
||||
if err != nil {
|
||||
p.Kill()
|
||||
return nil, ErrProcessNotFound
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Goroutine to mark exit status
|
||||
go func(pid int) {
|
||||
// Wait for the process to die
|
||||
pidWait(pid)
|
||||
|
||||
// Log so we can see it
|
||||
log.Printf("[DEBUG] plugin: reattached plugin process exited\n")
|
||||
|
||||
// Mark it
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
c.exited = true
|
||||
|
||||
// Close the logging channel since that doesn't work on reattach
|
||||
close(c.doneLogging)
|
||||
}(p.Pid)
|
||||
|
||||
// Set the address and process
|
||||
c.address = c.config.Reattach.Addr
|
||||
c.process = p
|
||||
|
||||
return c.address, nil
|
||||
}
|
||||
|
||||
env := []string{
|
||||
fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
|
||||
fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
|
||||
fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
|
||||
}
|
||||
|
||||
stdout_r, stdout_w := io.Pipe()
|
||||
stderr_r, stderr_w := io.Pipe()
|
||||
|
||||
cmd := c.config.Cmd
|
||||
cmd.Env = append(cmd.Env, os.Environ()...)
|
||||
cmd.Env = append(cmd.Env, env...)
|
||||
cmd.Stdin = os.Stdin
|
||||
cmd.Stderr = stderr_w
|
||||
cmd.Stdout = stdout_w
|
||||
|
||||
if c.config.SecureConfig != nil {
|
||||
if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
|
||||
return nil, fmt.Errorf("error verifying checksum: %s", err)
|
||||
} else if !ok {
|
||||
return nil, ErrChecksumsDoNotMatch
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] plugin: starting plugin: %s %#v", cmd.Path, cmd.Args)
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Set the process
|
||||
c.process = cmd.Process
|
||||
|
||||
// Make sure the command is properly cleaned up if there is an error
|
||||
defer func() {
|
||||
r := recover()
|
||||
|
||||
if err != nil || r != nil {
|
||||
cmd.Process.Kill()
|
||||
}
|
||||
|
||||
if r != nil {
|
||||
panic(r)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start goroutine to wait for process to exit
|
||||
exitCh := make(chan struct{})
|
||||
go func() {
|
||||
// Make sure we close the write end of our stderr/stdout so
|
||||
// that the readers send EOF properly.
|
||||
defer stderr_w.Close()
|
||||
defer stdout_w.Close()
|
||||
|
||||
// Wait for the command to end.
|
||||
cmd.Wait()
|
||||
|
||||
// Log and make sure to flush the logs write away
|
||||
log.Printf("[DEBUG] plugin: %s: plugin process exited\n", cmd.Path)
|
||||
os.Stderr.Sync()
|
||||
|
||||
// Mark that we exited
|
||||
close(exitCh)
|
||||
|
||||
// Set that we exited, which takes a lock
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
c.exited = true
|
||||
}()
|
||||
|
||||
// Start goroutine that logs the stderr
|
||||
go c.logStderr(stderr_r)
|
||||
|
||||
// Start a goroutine that is going to be reading the lines
|
||||
// out of stdout
|
||||
linesCh := make(chan []byte)
|
||||
go func() {
|
||||
defer close(linesCh)
|
||||
|
||||
buf := bufio.NewReader(stdout_r)
|
||||
for {
|
||||
line, err := buf.ReadBytes('\n')
|
||||
if line != nil {
|
||||
linesCh <- line
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Make sure after we exit we read the lines from stdout forever
|
||||
// so they don't block since it is an io.Pipe
|
||||
defer func() {
|
||||
go func() {
|
||||
for _ = range linesCh {
|
||||
}
|
||||
}()
|
||||
}()
|
||||
|
||||
// Some channels for the next step
|
||||
timeout := time.After(c.config.StartTimeout)
|
||||
|
||||
// Start looking for the address
|
||||
log.Printf("[DEBUG] plugin: waiting for RPC address for: %s", cmd.Path)
|
||||
select {
|
||||
case <-timeout:
|
||||
err = errors.New("timeout while waiting for plugin to start")
|
||||
case <-exitCh:
|
||||
err = errors.New("plugin exited before we could connect")
|
||||
case lineBytes := <-linesCh:
|
||||
// Trim the line and split by "|" in order to get the parts of
|
||||
// the output.
|
||||
line := strings.TrimSpace(string(lineBytes))
|
||||
parts := strings.SplitN(line, "|", 4)
|
||||
if len(parts) < 4 {
|
||||
err = fmt.Errorf(
|
||||
"Unrecognized remote plugin message: %s\n\n"+
|
||||
"This usually means that the plugin is either invalid or simply\n"+
|
||||
"needs to be recompiled to support the latest protocol.", line)
|
||||
return
|
||||
}
|
||||
|
||||
// Check the core protocol. Wrapped in a {} for scoping.
|
||||
{
|
||||
var coreProtocol int64
|
||||
coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error parsing core protocol version: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if int(coreProtocol) != CoreProtocolVersion {
|
||||
err = fmt.Errorf("Incompatible core API version with plugin. "+
|
||||
"Plugin version: %s, Ours: %d\n\n"+
|
||||
"To fix this, the plugin usually only needs to be recompiled.\n"+
|
||||
"Please report this to the plugin author.", parts[0], CoreProtocolVersion)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the protocol version
|
||||
var protocol int64
|
||||
protocol, err = strconv.ParseInt(parts[1], 10, 0)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error parsing protocol version: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Test the API version
|
||||
if uint(protocol) != c.config.ProtocolVersion {
|
||||
err = fmt.Errorf("Incompatible API version with plugin. "+
|
||||
"Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion)
|
||||
return
|
||||
}
|
||||
|
||||
switch parts[2] {
|
||||
case "tcp":
|
||||
addr, err = net.ResolveTCPAddr("tcp", parts[3])
|
||||
case "unix":
|
||||
addr, err = net.ResolveUnixAddr("unix", parts[3])
|
||||
default:
|
||||
err = fmt.Errorf("Unknown address type: %s", parts[3])
|
||||
}
|
||||
}
|
||||
|
||||
c.address = addr
|
||||
return
|
||||
}
|
||||
|
||||
// ReattachConfig returns the information that must be provided to NewClient
|
||||
// to reattach to the plugin process that this client started. This is
|
||||
// useful for plugins that detach from their parent process.
|
||||
//
|
||||
// If this returns nil then the process hasn't been started yet. Please
|
||||
// call Start or Client before calling this.
|
||||
func (c *Client) ReattachConfig() *ReattachConfig {
|
||||
c.l.Lock()
|
||||
defer c.l.Unlock()
|
||||
|
||||
if c.address == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.config.Cmd != nil && c.config.Cmd.Process == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we connected via reattach, just return the information as-is
|
||||
if c.config.Reattach != nil {
|
||||
return c.config.Reattach
|
||||
}
|
||||
|
||||
return &ReattachConfig{
|
||||
Addr: c.address,
|
||||
Pid: c.config.Cmd.Process.Pid,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) logStderr(r io.Reader) {
|
||||
bufR := bufio.NewReader(r)
|
||||
for {
|
||||
line, err := bufR.ReadString('\n')
|
||||
if line != "" {
|
||||
c.config.Stderr.Write([]byte(line))
|
||||
|
||||
line = strings.TrimRightFunc(line, unicode.IsSpace)
|
||||
log.Printf("[DEBUG] plugin: %s: %s", filepath.Base(c.config.Cmd.Path), line)
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Flag that we've completed logging for others
|
||||
close(c.doneLogging)
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// Discover discovers plugins that are in a given directory.
|
||||
//
|
||||
// The directory doesn't need to be absolute. For example, "." will work fine.
|
||||
//
|
||||
// This currently assumes any file matching the glob is a plugin.
|
||||
// In the future this may be smarter about checking that a file is
|
||||
// executable and so on.
|
||||
//
|
||||
// TODO: test
|
||||
func Discover(glob, dir string) ([]string, error) {
|
||||
var err error
|
||||
|
||||
// Make the directory absolute if it isn't already
|
||||
if !filepath.IsAbs(dir) {
|
||||
dir, err = filepath.Abs(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return filepath.Glob(filepath.Join(dir, glob))
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package plugin
|
||||
|
||||
// This is a type that wraps error types so that they can be messaged
|
||||
// across RPC channels. Since "error" is an interface, we can't always
|
||||
// gob-encode the underlying structure. This is a valid error interface
|
||||
// implementer that we will push across.
|
||||
type BasicError struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
// NewBasicError is used to create a BasicError.
|
||||
//
|
||||
// err is allowed to be nil.
|
||||
func NewBasicError(err error) *BasicError {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &BasicError{err.Error()}
|
||||
}
|
||||
|
||||
func (e *BasicError) Error() string {
|
||||
return e.Message
|
||||
}
|
|
@ -0,0 +1,204 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
// MuxBroker is responsible for brokering multiplexed connections by unique ID.
|
||||
//
|
||||
// It is used by plugins to multiplex multiple RPC connections and data
|
||||
// streams on top of a single connection between the plugin process and the
|
||||
// host process.
|
||||
//
|
||||
// This allows a plugin to request a channel with a specific ID to connect to
|
||||
// or accept a connection from, and the broker handles the details of
|
||||
// holding these channels open while they're being negotiated.
|
||||
//
|
||||
// The Plugin interface has access to these for both Server and Client.
|
||||
// The broker can be used by either (optionally) to reserve and connect to
|
||||
// new multiplexed streams. This is useful for complex args and return values,
|
||||
// or anything else you might need a data stream for.
|
||||
type MuxBroker struct {
|
||||
nextId uint32
|
||||
session *yamux.Session
|
||||
streams map[uint32]*muxBrokerPending
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type muxBrokerPending struct {
|
||||
ch chan net.Conn
|
||||
doneCh chan struct{}
|
||||
}
|
||||
|
||||
func newMuxBroker(s *yamux.Session) *MuxBroker {
|
||||
return &MuxBroker{
|
||||
session: s,
|
||||
streams: make(map[uint32]*muxBrokerPending),
|
||||
}
|
||||
}
|
||||
|
||||
// Accept accepts a connection by ID.
|
||||
//
|
||||
// This should not be called multiple times with the same ID at one time.
|
||||
func (m *MuxBroker) Accept(id uint32) (net.Conn, error) {
|
||||
var c net.Conn
|
||||
p := m.getStream(id)
|
||||
select {
|
||||
case c = <-p.ch:
|
||||
close(p.doneCh)
|
||||
case <-time.After(5 * time.Second):
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
delete(m.streams, id)
|
||||
|
||||
return nil, fmt.Errorf("timeout waiting for accept")
|
||||
}
|
||||
|
||||
// Ack our connection
|
||||
if err := binary.Write(c, binary.LittleEndian, id); err != nil {
|
||||
c.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// AcceptAndServe is used to accept a specific stream ID and immediately
|
||||
// serve an RPC server on that stream ID. This is used to easily serve
|
||||
// complex arguments.
|
||||
//
|
||||
// The served interface is always registered to the "Plugin" name.
|
||||
func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) {
|
||||
conn, err := m.Accept(id)
|
||||
if err != nil {
|
||||
log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
serve(conn, "Plugin", v)
|
||||
}
|
||||
|
||||
// Close closes the connection and all sub-connections.
|
||||
func (m *MuxBroker) Close() error {
|
||||
return m.session.Close()
|
||||
}
|
||||
|
||||
// Dial opens a connection by ID.
|
||||
func (m *MuxBroker) Dial(id uint32) (net.Conn, error) {
|
||||
// Open the stream
|
||||
stream, err := m.session.OpenStream()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Write the stream ID onto the wire.
|
||||
if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
|
||||
stream.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the ack that we connected. Then we're off!
|
||||
var ack uint32
|
||||
if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
|
||||
stream.Close()
|
||||
return nil, err
|
||||
}
|
||||
if ack != id {
|
||||
stream.Close()
|
||||
return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// NextId returns a unique ID to use next.
|
||||
//
|
||||
// It is possible for very long-running plugin hosts to wrap this value,
|
||||
// though it would require a very large amount of RPC calls. In practice
|
||||
// we've never seen it happen.
|
||||
func (m *MuxBroker) NextId() uint32 {
|
||||
return atomic.AddUint32(&m.nextId, 1)
|
||||
}
|
||||
|
||||
// Run starts the brokering and should be executed in a goroutine, since it
|
||||
// blocks forever, or until the session closes.
|
||||
//
|
||||
// Uses of MuxBroker never need to call this. It is called internally by
|
||||
// the plugin host/client.
|
||||
func (m *MuxBroker) Run() {
|
||||
for {
|
||||
stream, err := m.session.AcceptStream()
|
||||
if err != nil {
|
||||
// Once we receive an error, just exit
|
||||
break
|
||||
}
|
||||
|
||||
// Read the stream ID from the stream
|
||||
var id uint32
|
||||
if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
|
||||
stream.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
// Initialize the waiter
|
||||
p := m.getStream(id)
|
||||
select {
|
||||
case p.ch <- stream:
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for a timeout
|
||||
go m.timeoutWait(id, p)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MuxBroker) getStream(id uint32) *muxBrokerPending {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
p, ok := m.streams[id]
|
||||
if ok {
|
||||
return p
|
||||
}
|
||||
|
||||
m.streams[id] = &muxBrokerPending{
|
||||
ch: make(chan net.Conn, 1),
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
return m.streams[id]
|
||||
}
|
||||
|
||||
func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
|
||||
// Wait for the stream to either be picked up and connected, or
|
||||
// for a timeout.
|
||||
timeout := false
|
||||
select {
|
||||
case <-p.doneCh:
|
||||
case <-time.After(5 * time.Second):
|
||||
timeout = true
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Delete the stream so no one else can grab it
|
||||
delete(m.streams, id)
|
||||
|
||||
// If we timed out, then check if we have a channel in the buffer,
|
||||
// and if so, close it.
|
||||
if timeout {
|
||||
select {
|
||||
case s := <-p.ch:
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
// The plugin package exposes functions and helpers for communicating to
|
||||
// plugins which are implemented as standalone binary applications.
|
||||
//
|
||||
// plugin.Client fully manages the lifecycle of executing the application,
|
||||
// connecting to it, and returning the RPC client for dispensing plugins.
|
||||
//
|
||||
// plugin.Serve fully manages listeners to expose an RPC server from a binary
|
||||
// that plugin.Client can connect to.
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
)
|
||||
|
||||
// Plugin is the interface that is implemented to serve/connect to an
|
||||
// inteface implementation.
|
||||
type Plugin interface {
|
||||
// Server should return the RPC server compatible struct to serve
|
||||
// the methods that the Client calls over net/rpc.
|
||||
Server(*MuxBroker) (interface{}, error)
|
||||
|
||||
// Client returns an interface implementation for the plugin you're
|
||||
// serving that communicates to the server end of the plugin.
|
||||
Client(*MuxBroker, *rpc.Client) (interface{}, error)
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// pidAlive checks whether a pid is alive.
|
||||
func pidAlive(pid int) bool {
|
||||
return _pidAlive(pid)
|
||||
}
|
||||
|
||||
// pidWait blocks for a process to exit.
|
||||
func pidWait(pid int) error {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
if !pidAlive(pid) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
// +build !windows
|
||||
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// _pidAlive tests whether a process is alive or not by sending it Signal 0,
|
||||
// since Go otherwise has no way to test this.
|
||||
func _pidAlive(pid int) bool {
|
||||
proc, err := os.FindProcess(pid)
|
||||
if err == nil {
|
||||
err = proc.Signal(syscall.Signal(0))
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const (
|
||||
// Weird name but matches the MSDN docs
|
||||
exit_STILL_ACTIVE = 259
|
||||
|
||||
processDesiredAccess = syscall.STANDARD_RIGHTS_READ |
|
||||
syscall.PROCESS_QUERY_INFORMATION |
|
||||
syscall.SYNCHRONIZE
|
||||
)
|
||||
|
||||
// _pidAlive tests whether a process is alive or not
|
||||
func _pidAlive(pid int) bool {
|
||||
h, err := syscall.OpenProcess(processDesiredAccess, false, uint32(pid))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var ec uint32
|
||||
if e := syscall.GetExitCodeProcess(h, &ec); e != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return ec == exit_STILL_ACTIVE
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/rpc"
|
||||
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
|
||||
type RPCClient struct {
|
||||
broker *MuxBroker
|
||||
control *rpc.Client
|
||||
plugins map[string]Plugin
|
||||
|
||||
// These are the streams used for the various stdout/err overrides
|
||||
stdout, stderr net.Conn
|
||||
}
|
||||
|
||||
// NewRPCClient creates a client from an already-open connection-like value.
|
||||
// Dial is typically used instead.
|
||||
func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
|
||||
// Create the yamux client so we can multiplex
|
||||
mux, err := yamux.Client(conn, nil)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Connect to the control stream.
|
||||
control, err := mux.Open()
|
||||
if err != nil {
|
||||
mux.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Connect stdout, stderr streams
|
||||
stdstream := make([]net.Conn, 2)
|
||||
for i, _ := range stdstream {
|
||||
stdstream[i], err = mux.Open()
|
||||
if err != nil {
|
||||
mux.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create the broker and start it up
|
||||
broker := newMuxBroker(mux)
|
||||
go broker.Run()
|
||||
|
||||
// Build the client using our broker and control channel.
|
||||
return &RPCClient{
|
||||
broker: broker,
|
||||
control: rpc.NewClient(control),
|
||||
plugins: plugins,
|
||||
stdout: stdstream[0],
|
||||
stderr: stdstream[1],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SyncStreams should be called to enable syncing of stdout,
|
||||
// stderr with the plugin.
|
||||
//
|
||||
// This will return immediately and the syncing will continue to happen
|
||||
// in the background. You do not need to launch this in a goroutine itself.
|
||||
//
|
||||
// This should never be called multiple times.
|
||||
func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
|
||||
go copyStream("stdout", stdout, c.stdout)
|
||||
go copyStream("stderr", stderr, c.stderr)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the connection. The client is no longer usable after this
|
||||
// is called.
|
||||
func (c *RPCClient) Close() error {
|
||||
// Call the control channel and ask it to gracefully exit. If this
|
||||
// errors, then we save it so that we always return an error but we
|
||||
// want to try to close the other channels anyways.
|
||||
var empty struct{}
|
||||
returnErr := c.control.Call("Control.Quit", true, &empty)
|
||||
|
||||
// Close the other streams we have
|
||||
if err := c.control.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.stdout.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.stderr.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.broker.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Return back the error we got from Control.Quit. This is very important
|
||||
// since we MUST return non-nil error if this fails so that Client.Kill
|
||||
// will properly try a process.Kill.
|
||||
return returnErr
|
||||
}
|
||||
|
||||
func (c *RPCClient) Dispense(name string) (interface{}, error) {
|
||||
p, ok := c.plugins[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown plugin type: %s", name)
|
||||
}
|
||||
|
||||
var id uint32
|
||||
if err := c.control.Call(
|
||||
"Dispenser.Dispense", name, &id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := c.broker.Dial(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.Client(c.broker, rpc.NewClient(conn))
|
||||
}
|
|
@ -0,0 +1,185 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/yamux"
|
||||
)
|
||||
|
||||
// RPCServer listens for network connections and then dispenses interface
|
||||
// implementations over net/rpc.
|
||||
//
|
||||
// After setting the fields below, they shouldn't be read again directly
|
||||
// from the structure which may be reading/writing them concurrently.
|
||||
type RPCServer struct {
|
||||
Plugins map[string]Plugin
|
||||
|
||||
// Stdout, Stderr are what this server will use instead of the
|
||||
// normal stdin/out/err. This is because due to the multi-process nature
|
||||
// of our plugin system, we can't use the normal process values so we
|
||||
// make our own custom one we pipe across.
|
||||
Stdout io.Reader
|
||||
Stderr io.Reader
|
||||
|
||||
// DoneCh should be set to a non-nil channel that will be closed
|
||||
// when the control requests the RPC server to end.
|
||||
DoneCh chan<- struct{}
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// Accept accepts connections on a listener and serves requests for
|
||||
// each incoming connection. Accept blocks; the caller typically invokes
|
||||
// it in a go statement.
|
||||
func (s *RPCServer) Accept(lis net.Listener) {
|
||||
for {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] plugin: plugin server: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
go s.ServeConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// ServeConn runs a single connection.
|
||||
//
|
||||
// ServeConn blocks, serving the connection until the client hangs up.
|
||||
func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
|
||||
// First create the yamux server to wrap this connection
|
||||
mux, err := yamux.Server(conn, nil)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
log.Printf("[ERR] plugin: error creating yamux server: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Accept the control connection
|
||||
control, err := mux.Accept()
|
||||
if err != nil {
|
||||
mux.Close()
|
||||
if err != io.EOF {
|
||||
log.Printf("[ERR] plugin: error accepting control connection: %s", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Connect the stdstreams (in, out, err)
|
||||
stdstream := make([]net.Conn, 2)
|
||||
for i, _ := range stdstream {
|
||||
stdstream[i], err = mux.Accept()
|
||||
if err != nil {
|
||||
mux.Close()
|
||||
log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Copy std streams out to the proper place
|
||||
go copyStream("stdout", stdstream[0], s.Stdout)
|
||||
go copyStream("stderr", stdstream[1], s.Stderr)
|
||||
|
||||
// Create the broker and start it up
|
||||
broker := newMuxBroker(mux)
|
||||
go broker.Run()
|
||||
|
||||
// Use the control connection to build the dispenser and serve the
|
||||
// connection.
|
||||
server := rpc.NewServer()
|
||||
server.RegisterName("Control", &controlServer{
|
||||
server: s,
|
||||
})
|
||||
server.RegisterName("Dispenser", &dispenseServer{
|
||||
broker: broker,
|
||||
plugins: s.Plugins,
|
||||
})
|
||||
server.ServeConn(control)
|
||||
}
|
||||
|
||||
// done is called internally by the control server to trigger the
|
||||
// doneCh to close which is listened to by the main process to cleanly
|
||||
// exit.
|
||||
func (s *RPCServer) done() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.DoneCh != nil {
|
||||
close(s.DoneCh)
|
||||
s.DoneCh = nil
|
||||
}
|
||||
}
|
||||
|
||||
// dispenseServer dispenses variousinterface implementations for Terraform.
|
||||
type controlServer struct {
|
||||
server *RPCServer
|
||||
}
|
||||
|
||||
func (c *controlServer) Quit(
|
||||
null bool, response *struct{}) error {
|
||||
// End the server
|
||||
c.server.done()
|
||||
|
||||
// Always return true
|
||||
*response = struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// dispenseServer dispenses variousinterface implementations for Terraform.
|
||||
type dispenseServer struct {
|
||||
broker *MuxBroker
|
||||
plugins map[string]Plugin
|
||||
}
|
||||
|
||||
func (d *dispenseServer) Dispense(
|
||||
name string, response *uint32) error {
|
||||
// Find the function to create this implementation
|
||||
p, ok := d.plugins[name]
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown plugin type: %s", name)
|
||||
}
|
||||
|
||||
// Create the implementation first so we know if there is an error.
|
||||
impl, err := p.Server(d.broker)
|
||||
if err != nil {
|
||||
// We turn the error into an errors error so that it works across RPC
|
||||
return errors.New(err.Error())
|
||||
}
|
||||
|
||||
// Reserve an ID for our implementation
|
||||
id := d.broker.NextId()
|
||||
*response = id
|
||||
|
||||
// Run the rest in a goroutine since it can only happen once this RPC
|
||||
// call returns. We wait for a connection for the plugin implementation
|
||||
// and serve it.
|
||||
go func() {
|
||||
conn, err := d.broker.Accept(id)
|
||||
if err != nil {
|
||||
log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
serve(conn, "Plugin", impl)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func serve(conn io.ReadWriteCloser, name string, v interface{}) {
|
||||
server := rpc.NewServer()
|
||||
if err := server.RegisterName(name, v); err != nil {
|
||||
log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
server.ServeConn(conn)
|
||||
}
|
|
@ -0,0 +1,235 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// CoreProtocolVersion is the ProtocolVersion of the plugin system itself.
|
||||
// We will increment this whenever we change any protocol behavior. This
|
||||
// will invalidate any prior plugins but will at least allow us to iterate
|
||||
// on the core in a safe way. We will do our best to do this very
|
||||
// infrequently.
|
||||
const CoreProtocolVersion = 1
|
||||
|
||||
// HandshakeConfig is the configuration used by client and servers to
|
||||
// handshake before starting a plugin connection. This is embedded by
|
||||
// both ServeConfig and ClientConfig.
|
||||
//
|
||||
// In practice, the plugin host creates a HandshakeConfig that is exported
|
||||
// and plugins then can easily consume it.
|
||||
type HandshakeConfig struct {
|
||||
// ProtocolVersion is the version that clients must match on to
|
||||
// agree they can communicate. This should match the ProtocolVersion
|
||||
// set on ClientConfig when using a plugin.
|
||||
ProtocolVersion uint
|
||||
|
||||
// MagicCookieKey and value are used as a very basic verification
|
||||
// that a plugin is intended to be launched. This is not a security
|
||||
// measure, just a UX feature. If the magic cookie doesn't match,
|
||||
// we show human-friendly output.
|
||||
MagicCookieKey string
|
||||
MagicCookieValue string
|
||||
}
|
||||
|
||||
// ServeConfig configures what sorts of plugins are served.
|
||||
type ServeConfig struct {
|
||||
// HandshakeConfig is the configuration that must match clients.
|
||||
HandshakeConfig
|
||||
|
||||
// Plugins are the plugins that are served.
|
||||
Plugins map[string]Plugin
|
||||
|
||||
// TLSProvider is a function that returns a configured tls.Config.
|
||||
TLSProvider func() (*tls.Config, error)
|
||||
}
|
||||
|
||||
// Serve serves the plugins given by ServeConfig.
|
||||
//
|
||||
// Serve doesn't return until the plugin is done being executed. Any
|
||||
// errors will be outputted to the log.
|
||||
//
|
||||
// This is the method that plugins should call in their main() functions.
|
||||
func Serve(opts *ServeConfig) {
|
||||
// Validate the handshake config
|
||||
if opts.MagicCookieKey == "" || opts.MagicCookieValue == "" {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"Misconfigured ServeConfig given to serve this plugin: no magic cookie\n"+
|
||||
"key or value was set. Please notify the plugin author and report\n"+
|
||||
"this as a bug.\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// First check the cookie
|
||||
if os.Getenv(opts.MagicCookieKey) != opts.MagicCookieValue {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"This binary is a plugin. These are not meant to be executed directly.\n"+
|
||||
"Please execute the program that consumes these plugins, which will\n"+
|
||||
"load any plugins automatically\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Logging goes to the original stderr
|
||||
log.SetOutput(os.Stderr)
|
||||
|
||||
// Create our new stdout, stderr files. These will override our built-in
|
||||
// stdout/stderr so that it works across the stream boundary.
|
||||
stdout_r, stdout_w, err := os.Pipe()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
stderr_r, stderr_w, err := os.Pipe()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Register a listener so we can accept a connection
|
||||
listener, err := serverListener()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] plugin: plugin init: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if opts.TLSProvider != nil {
|
||||
tlsConfig, err := opts.TLSProvider()
|
||||
if err != nil {
|
||||
log.Printf("[ERR] plugin: plugin tls init: %s", err)
|
||||
return
|
||||
}
|
||||
listener = tls.NewListener(listener, tlsConfig)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
// Create the channel to tell us when we're done
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
// Create the RPC server to dispense
|
||||
server := &RPCServer{
|
||||
Plugins: opts.Plugins,
|
||||
Stdout: stdout_r,
|
||||
Stderr: stderr_r,
|
||||
DoneCh: doneCh,
|
||||
}
|
||||
|
||||
// Output the address and service name to stdout so that core can bring it up.
|
||||
log.Printf("[DEBUG] plugin: plugin address: %s %s\n",
|
||||
listener.Addr().Network(), listener.Addr().String())
|
||||
fmt.Printf("%d|%d|%s|%s\n",
|
||||
CoreProtocolVersion,
|
||||
opts.ProtocolVersion,
|
||||
listener.Addr().Network(),
|
||||
listener.Addr().String())
|
||||
os.Stdout.Sync()
|
||||
|
||||
// Eat the interrupts
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, os.Interrupt)
|
||||
go func() {
|
||||
var count int32 = 0
|
||||
for {
|
||||
<-ch
|
||||
newCount := atomic.AddInt32(&count, 1)
|
||||
log.Printf(
|
||||
"[DEBUG] plugin: received interrupt signal (count: %d). Ignoring.",
|
||||
newCount)
|
||||
}
|
||||
}()
|
||||
|
||||
// Set our new out, err
|
||||
os.Stdout = stdout_w
|
||||
os.Stderr = stderr_w
|
||||
|
||||
// Serve
|
||||
go server.Accept(listener)
|
||||
|
||||
// Wait for the graceful exit
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
func serverListener() (net.Listener, error) {
|
||||
if runtime.GOOS == "windows" {
|
||||
return serverListener_tcp()
|
||||
}
|
||||
|
||||
return serverListener_unix()
|
||||
}
|
||||
|
||||
func serverListener_tcp() (net.Listener, error) {
|
||||
minPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MIN_PORT"), 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
maxPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MAX_PORT"), 10, 32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for port := minPort; port <= maxPort; port++ {
|
||||
address := fmt.Sprintf("127.0.0.1:%d", port)
|
||||
listener, err := net.Listen("tcp", address)
|
||||
if err == nil {
|
||||
return listener, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("Couldn't bind plugin TCP listener")
|
||||
}
|
||||
|
||||
func serverListener_unix() (net.Listener, error) {
|
||||
tf, err := ioutil.TempFile("", "plugin")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path := tf.Name()
|
||||
|
||||
// Close the file and remove it because it has to not exist for
|
||||
// the domain socket.
|
||||
if err := tf.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.Remove(path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l, err := net.Listen("unix", path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the listener in rmListener so that the Unix domain socket file
|
||||
// is removed on close.
|
||||
return &rmListener{
|
||||
Listener: l,
|
||||
Path: path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// rmListener is an implementation of net.Listener that forwards most
|
||||
// calls to the listener but also removes a file as part of the close. We
|
||||
// use this to cleanup the unix domain socket on close.
|
||||
type rmListener struct {
|
||||
net.Listener
|
||||
Path string
|
||||
}
|
||||
|
||||
func (l *rmListener) Close() error {
|
||||
// Close the listener itself
|
||||
if err := l.Listener.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the file
|
||||
return os.Remove(l.Path)
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// ServeMuxMap is the type that is used to configure ServeMux
|
||||
type ServeMuxMap map[string]*ServeConfig
|
||||
|
||||
// ServeMux is like Serve, but serves multiple types of plugins determined
|
||||
// by the argument given on the command-line.
|
||||
//
|
||||
// This command doesn't return until the plugin is done being executed. Any
|
||||
// errors are logged or output to stderr.
|
||||
func ServeMux(m ServeMuxMap) {
|
||||
if len(os.Args) != 2 {
|
||||
fmt.Fprintf(os.Stderr,
|
||||
"Invoked improperly. This is an internal command that shouldn't\n"+
|
||||
"be manually invoked.\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
opts, ok := m[os.Args[1]]
|
||||
if !ok {
|
||||
fmt.Fprintf(os.Stderr, "Unknown plugin: %s\n", os.Args[1])
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
Serve(opts)
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
)
|
||||
|
||||
func copyStream(name string, dst io.Writer, src io.Reader) {
|
||||
if src == nil {
|
||||
panic(name + ": src is nil")
|
||||
}
|
||||
if dst == nil {
|
||||
panic(name + ": dst is nil")
|
||||
}
|
||||
if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
|
||||
log.Printf("[ERR] plugin: stream copy '%s' error: %s", name, err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package plugin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"net/rpc"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// The testing file contains test helpers that you can use outside of
|
||||
// this package for making it easier to test plugins themselves.
|
||||
|
||||
// TestConn is a helper function for returning a client and server
|
||||
// net.Conn connected to each other.
|
||||
func TestConn(t *testing.T) (net.Conn, net.Conn) {
|
||||
// Listen to any local port. This listener will be closed
|
||||
// after a single connection is established.
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Start a goroutine to accept our client connection
|
||||
var serverConn net.Conn
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(doneCh)
|
||||
defer l.Close()
|
||||
var err error
|
||||
serverConn, err = l.Accept()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Connect to the server
|
||||
clientConn, err := net.Dial("tcp", l.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Wait for the server side to acknowledge it has connected
|
||||
<-doneCh
|
||||
|
||||
return clientConn, serverConn
|
||||
}
|
||||
|
||||
// TestRPCConn returns a rpc client and server connected to each other.
|
||||
func TestRPCConn(t *testing.T) (*rpc.Client, *rpc.Server) {
|
||||
clientConn, serverConn := TestConn(t)
|
||||
|
||||
server := rpc.NewServer()
|
||||
go server.ServeConn(serverConn)
|
||||
|
||||
client := rpc.NewClient(clientConn)
|
||||
return client, server
|
||||
}
|
||||
|
||||
// TestPluginRPCConn returns a plugin RPC client and server that are connected
|
||||
// together and configured.
|
||||
func TestPluginRPCConn(t *testing.T, ps map[string]Plugin) (*RPCClient, *RPCServer) {
|
||||
// Create two net.Conns we can use to shuttle our control connection
|
||||
clientConn, serverConn := TestConn(t)
|
||||
|
||||
// Start up the server
|
||||
server := &RPCServer{Plugins: ps, Stdout: new(bytes.Buffer), Stderr: new(bytes.Buffer)}
|
||||
go server.ServeConn(serverConn)
|
||||
|
||||
// Connect the client to the server
|
||||
client, err := NewRPCClient(clientConn, ps)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
return client, server
|
||||
}
|
|
@ -804,6 +804,12 @@
|
|||
"revision": "ed905158d87462226a13fe39ddf685ea65f1c11f",
|
||||
"revisionTime": "2016-12-16T18:43:04Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "FOLPOFo4xuUaErsL99EC8azEUjw=",
|
||||
"path": "github.com/hashicorp/go-plugin",
|
||||
"revision": "b6691c5cfe7f0ec984114b056889cc90e51e38d0",
|
||||
"revisionTime": "2017-04-12T21:16:38Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ErJHGU6AVPZM9yoY/xV11TwSjQs=",
|
||||
"path": "github.com/hashicorp/go-retryablehttp",
|
||||
|
|
Loading…
Reference in New Issue