2017-05-11 21:38:54 +00:00
package cassandra
2017-04-13 20:48:32 +00:00
import (
2017-12-14 22:03:11 +00:00
"context"
2017-04-13 20:48:32 +00:00
"crypto/tls"
"fmt"
"strings"
"sync"
"time"
"github.com/mitchellh/mapstructure"
"github.com/gocql/gocql"
2018-03-21 19:05:56 +00:00
"github.com/hashicorp/errwrap"
2019-04-12 21:54:35 +00:00
"github.com/hashicorp/vault/sdk/helper/certutil"
"github.com/hashicorp/vault/sdk/helper/parseutil"
2019-04-12 22:26:54 +00:00
"github.com/hashicorp/vault/sdk/helper/tlsutil"
2017-05-11 21:38:54 +00:00
"github.com/hashicorp/vault/plugins/helper/database/connutil"
2018-10-15 15:06:03 +00:00
"github.com/hashicorp/vault/plugins/helper/database/dbutil"
2017-04-13 20:48:32 +00:00
)
2017-05-11 21:38:54 +00:00
// cassandraConnectionProducer implements ConnectionProducer and provides an
2017-04-13 20:48:32 +00:00
// interface for cassandra databases to make connections.
2017-05-11 21:38:54 +00:00
type cassandraConnectionProducer struct {
2019-02-10 23:34:50 +00:00
Hosts string ` json:"hosts" structs:"hosts" mapstructure:"hosts" `
Port int ` json:"port" structs:"port" mapstructure:"port" `
Username string ` json:"username" structs:"username" mapstructure:"username" `
Password string ` json:"password" structs:"password" mapstructure:"password" `
TLS bool ` json:"tls" structs:"tls" mapstructure:"tls" `
InsecureTLS bool ` json:"insecure_tls" structs:"insecure_tls" mapstructure:"insecure_tls" `
ProtocolVersion int ` json:"protocol_version" structs:"protocol_version" mapstructure:"protocol_version" `
ConnectTimeoutRaw interface { } ` json:"connect_timeout" structs:"connect_timeout" mapstructure:"connect_timeout" `
SocketKeepAliveRaw interface { } ` json:"socket_keep_alive" structs:"socket_keep_alive" mapstructure:"socket_keep_alive" `
TLSMinVersion string ` json:"tls_min_version" structs:"tls_min_version" mapstructure:"tls_min_version" `
Consistency string ` json:"consistency" structs:"consistency" mapstructure:"consistency" `
2019-03-18 13:28:20 +00:00
LocalDatacenter string ` json:"local_datacenter" structs:"local_datacenter" mapstructure:"local_datacenter" `
2019-02-10 23:34:50 +00:00
PemBundle string ` json:"pem_bundle" structs:"pem_bundle" mapstructure:"pem_bundle" `
PemJSON string ` json:"pem_json" structs:"pem_json" mapstructure:"pem_json" `
connectTimeout time . Duration
socketKeepAlive time . Duration
certificate string
privateKey string
issuingCA string
rawConfig map [ string ] interface { }
2017-05-03 22:36:49 +00:00
Initialized bool
Type string
session * gocql . Session
2017-04-13 20:48:32 +00:00
sync . Mutex
}
2017-12-14 22:03:11 +00:00
func ( c * cassandraConnectionProducer ) Initialize ( ctx context . Context , conf map [ string ] interface { } , verifyConnection bool ) error {
2018-03-21 19:05:56 +00:00
_ , err := c . Init ( ctx , conf , verifyConnection )
return err
}
func ( c * cassandraConnectionProducer ) Init ( ctx context . Context , conf map [ string ] interface { } , verifyConnection bool ) ( map [ string ] interface { } , error ) {
2017-04-13 20:48:32 +00:00
c . Lock ( )
defer c . Unlock ( )
2018-03-21 19:05:56 +00:00
c . rawConfig = conf
2017-06-15 01:59:27 +00:00
err := mapstructure . WeakDecode ( conf , c )
2017-04-13 20:48:32 +00:00
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , err
2017-04-13 20:48:32 +00:00
}
2017-05-03 20:45:27 +00:00
if c . ConnectTimeoutRaw == nil {
c . ConnectTimeoutRaw = "0s"
}
2017-05-03 20:11:30 +00:00
c . connectTimeout , err = parseutil . ParseDurationSecond ( c . ConnectTimeoutRaw )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "invalid connect_timeout: {{err}}" , err )
2017-05-03 20:11:30 +00:00
}
2019-02-10 23:34:50 +00:00
if c . SocketKeepAliveRaw == nil {
c . SocketKeepAliveRaw = "0s"
}
c . socketKeepAlive , err = parseutil . ParseDurationSecond ( c . SocketKeepAliveRaw )
if err != nil {
return nil , errwrap . Wrapf ( "invalid socket_keep_alive: {{err}}" , err )
}
2017-05-03 22:36:49 +00:00
switch {
case len ( c . Hosts ) == 0 :
2018-03-21 19:05:56 +00:00
return nil , fmt . Errorf ( "hosts cannot be empty" )
2017-05-03 22:36:49 +00:00
case len ( c . Username ) == 0 :
2018-03-21 19:05:56 +00:00
return nil , fmt . Errorf ( "username cannot be empty" )
2017-05-03 22:36:49 +00:00
case len ( c . Password ) == 0 :
2018-03-21 19:05:56 +00:00
return nil , fmt . Errorf ( "password cannot be empty" )
2017-05-03 22:36:49 +00:00
}
var certBundle * certutil . CertBundle
var parsedCertBundle * certutil . ParsedCertBundle
switch {
case len ( c . PemJSON ) != 0 :
parsedCertBundle , err = certutil . ParsePKIJSON ( [ ] byte ( c . PemJSON ) )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "could not parse given JSON; it must be in the format of the output of the PKI backend certificate issuing command: {{err}}" , err )
2017-05-03 22:36:49 +00:00
}
certBundle , err = parsedCertBundle . ToCertBundle ( )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "Error marshaling PEM information: {{err}}" , err )
2017-05-03 22:36:49 +00:00
}
c . certificate = certBundle . Certificate
c . privateKey = certBundle . PrivateKey
c . issuingCA = certBundle . IssuingCA
c . TLS = true
case len ( c . PemBundle ) != 0 :
parsedCertBundle , err = certutil . ParsePEMBundle ( c . PemBundle )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "Error parsing the given PEM information: {{err}}" , err )
2017-05-03 22:36:49 +00:00
}
certBundle , err = parsedCertBundle . ToCertBundle ( )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "Error marshaling PEM information: {{err}}" , err )
2017-05-03 22:36:49 +00:00
}
c . certificate = certBundle . Certificate
c . privateKey = certBundle . PrivateKey
c . issuingCA = certBundle . IssuingCA
c . TLS = true
}
2017-05-11 21:38:54 +00:00
// Set initialized to true at this point since all fields are set,
// and the connection can be established at a later time.
c . Initialized = true
2017-04-13 20:48:32 +00:00
if verifyConnection {
2017-12-14 22:03:11 +00:00
if _ , err := c . Connection ( ctx ) ; err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "error verifying connection: {{err}}" , err )
2017-04-13 20:48:32 +00:00
}
}
2017-05-11 21:38:54 +00:00
2018-03-21 19:05:56 +00:00
return conf , nil
2017-04-13 20:48:32 +00:00
}
2017-12-14 22:03:11 +00:00
func ( c * cassandraConnectionProducer ) Connection ( _ context . Context ) ( interface { } , error ) {
2017-04-13 20:48:32 +00:00
if ! c . Initialized {
2017-05-11 21:38:54 +00:00
return nil , connutil . ErrNotInitialized
2017-04-13 20:48:32 +00:00
}
// If we already have a DB, return it
2017-12-19 15:26:46 +00:00
if c . session != nil && ! c . session . Closed ( ) {
2017-04-13 20:48:32 +00:00
return c . session , nil
}
session , err := c . createSession ( )
if err != nil {
return nil , err
}
// Store the session in backend for reuse
c . session = session
return session , nil
}
2017-05-11 21:38:54 +00:00
func ( c * cassandraConnectionProducer ) Close ( ) error {
2017-04-13 20:48:32 +00:00
// Grab the write lock
c . Lock ( )
defer c . Unlock ( )
if c . session != nil {
c . session . Close ( )
}
c . session = nil
return nil
}
2017-05-11 21:38:54 +00:00
func ( c * cassandraConnectionProducer ) createSession ( ) ( * gocql . Session , error ) {
2017-09-08 03:04:40 +00:00
hosts := strings . Split ( c . Hosts , "," )
clusterConfig := gocql . NewCluster ( hosts ... )
2017-04-13 20:48:32 +00:00
clusterConfig . Authenticator = gocql . PasswordAuthenticator {
Username : c . Username ,
Password : c . Password ,
}
2017-09-08 03:04:40 +00:00
if c . Port != 0 {
clusterConfig . Port = c . Port
}
2017-04-13 20:48:32 +00:00
clusterConfig . ProtoVersion = c . ProtocolVersion
if clusterConfig . ProtoVersion == 0 {
clusterConfig . ProtoVersion = 2
}
2017-05-03 20:11:30 +00:00
clusterConfig . Timeout = c . connectTimeout
2019-02-10 23:34:50 +00:00
clusterConfig . SocketKeepalive = c . socketKeepAlive
2017-04-13 20:48:32 +00:00
if c . TLS {
var tlsConfig * tls . Config
2017-05-03 22:36:49 +00:00
if len ( c . certificate ) > 0 || len ( c . issuingCA ) > 0 {
if len ( c . certificate ) > 0 && len ( c . privateKey ) == 0 {
2017-04-23 01:02:57 +00:00
return nil , fmt . Errorf ( "found certificate for TLS authentication but no private key" )
2017-04-13 20:48:32 +00:00
}
certBundle := & certutil . CertBundle { }
2017-05-03 22:36:49 +00:00
if len ( c . certificate ) > 0 {
certBundle . Certificate = c . certificate
certBundle . PrivateKey = c . privateKey
2017-04-13 20:48:32 +00:00
}
2017-05-03 22:36:49 +00:00
if len ( c . issuingCA ) > 0 {
certBundle . IssuingCA = c . issuingCA
2017-04-13 20:48:32 +00:00
}
parsedCertBundle , err := certBundle . ToParsedCertBundle ( )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "failed to parse certificate bundle: {{err}}" , err )
2017-04-13 20:48:32 +00:00
}
tlsConfig , err = parsedCertBundle . GetTLSConfig ( certutil . TLSClient )
if err != nil || tlsConfig == nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( fmt . Sprintf ( "failed to get TLS configuration: tlsConfig:%#v err:{{err}}" , tlsConfig ) , err )
2017-04-13 20:48:32 +00:00
}
tlsConfig . InsecureSkipVerify = c . InsecureTLS
if c . TLSMinVersion != "" {
var ok bool
tlsConfig . MinVersion , ok = tlsutil . TLSLookup [ c . TLSMinVersion ]
if ! ok {
return nil , fmt . Errorf ( "invalid 'tls_min_version' in config" )
}
} else {
// MinVersion was not being set earlier. Reset it to
// zero to gracefully handle upgrades.
tlsConfig . MinVersion = 0
}
}
clusterConfig . SslOpts = & gocql . SslOptions {
2017-04-19 18:19:29 +00:00
Config : tlsConfig ,
2017-04-13 20:48:32 +00:00
}
}
2019-03-14 20:37:28 +00:00
if c . LocalDatacenter != "" {
clusterConfig . PoolConfig . HostSelectionPolicy = gocql . DCAwareRoundRobinPolicy ( c . LocalDatacenter )
}
2017-04-13 20:48:32 +00:00
session , err := clusterConfig . CreateSession ( )
if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "error creating session: {{err}}" , err )
2017-04-13 20:48:32 +00:00
}
// Set consistency
if c . Consistency != "" {
consistencyValue , err := gocql . ParseConsistencyWrapper ( c . Consistency )
if err != nil {
return nil , err
}
session . SetConsistency ( consistencyValue )
}
// Verify the info
2017-10-18 18:24:12 +00:00
err = session . Query ( ` LIST ALL ` ) . Exec ( )
2018-10-15 15:06:03 +00:00
if err != nil && len ( c . Username ) != 0 && strings . Contains ( err . Error ( ) , "not authorized" ) {
rowNum := session . Query ( dbutil . QueryHelper ( ` LIST CREATE ON ALL ROLES OF ' {{ username }} '; ` , map [ string ] string {
"username" : c . Username ,
} ) ) . Iter ( ) . NumRows ( )
if rowNum < 1 {
return nil , errwrap . Wrapf ( "error validating connection info: No role create permissions found, previous error: {{err}}" , err )
}
} else if err != nil {
2018-03-21 19:05:56 +00:00
return nil , errwrap . Wrapf ( "error validating connection info: {{err}}" , err )
2017-04-13 20:48:32 +00:00
}
return session , nil
}
2018-03-21 19:05:56 +00:00
func ( c * cassandraConnectionProducer ) secretValues ( ) map [ string ] interface { } {
return map [ string ] interface { } {
c . Password : "[password]" ,
c . PemBundle : "[pem_bundle]" ,
c . PemJSON : "[pem_json]" ,
}
}