package nomad import ( "fmt" "time" metrics "github.com/armon/go-metrics" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) // ACL endpoint is used for manipulating ACL tokens and policies type ACL struct { srv *Server } // UpsertPolicies is used to create or update a set of policies func (a *ACL) UpsertPolicies(args *structs.ACLPolicyUpsertRequest, reply *structs.GenericResponse) error { if done, err := a.srv.forward("ACL.UpsertPolicies", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_policies"}, time.Now()) // Validate non-zero set of policies if len(args.Policies) == 0 { return fmt.Errorf("must specify as least one policy") } // Validate each policy for idx, policy := range args.Policies { if err := policy.Validate(); err != nil { return fmt.Errorf("policy %d invalid: %v", idx, err) } } // Update via Raft _, index, err := a.srv.raftApply(structs.ACLPolicyUpsertRequestType, args) if err != nil { return err } // Update the index reply.Index = index return nil } // DeletePolicies is used to delete policies func (a *ACL) DeletePolicies(args *structs.ACLPolicyDeleteRequest, reply *structs.GenericResponse) error { if done, err := a.srv.forward("ACL.DeletePolicies", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "delete_policies"}, time.Now()) // Validate non-zero set of policies if len(args.Names) == 0 { return fmt.Errorf("must specify as least one policy") } // Update via Raft _, index, err := a.srv.raftApply(structs.ACLPolicyDeleteRequestType, args) if err != nil { return err } // Update the index reply.Index = index return nil } // ListPolicies is used to list the policies func (a *ACL) ListPolicies(args *structs.ACLPolicyListRequest, reply *structs.ACLPolicyListResponse) error { if done, err := a.srv.forward("ACL.ListPolicies", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "list_policies"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Iterate over all the policies var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = state.ACLPolicyByNamePrefix(ws, prefix) } else { iter, err = state.ACLPolicies(ws) } if err != nil { return err } // Convert all the policies to a list stub reply.Policies = nil for { raw := iter.Next() if raw == nil { break } policy := raw.(*structs.ACLPolicy) reply.Policies = append(reply.Policies, policy.Stub()) } // Use the last index that affected the policy table index, err := state.Index("acl_policy") if err != nil { return err } reply.Index = index return nil }} return a.srv.blockingRPC(&opts) } // GetPolicy is used to get a specific policy func (a *ACL) GetPolicy(args *structs.ACLPolicySpecificRequest, reply *structs.SingleACLPolicyResponse) error { if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "get_policy"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the policy out, err := state.ACLPolicyByName(ws, args.Name) if err != nil { return err } // Setup the output reply.Policy = out if out != nil { reply.Index = out.ModifyIndex } else { // Use the last index that affected the policy table index, err := state.Index("acl_policy") if err != nil { return err } reply.Index = index } return nil }} return a.srv.blockingRPC(&opts) } // UpsertTokens is used to create or update a set of tokens func (a *ACL) UpsertTokens(args *structs.ACLTokenUpsertRequest, reply *structs.GenericResponse) error { if done, err := a.srv.forward("ACL.UpsertTokens", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "upsert_tokens"}, time.Now()) // Validate non-zero set of tokens if len(args.Tokens) == 0 { return fmt.Errorf("must specify as least one token") } // Validate each token for idx, token := range args.Tokens { if err := token.Validate(); err != nil { return fmt.Errorf("token %d invalid: %v", idx, err) } } // Update via Raft _, index, err := a.srv.raftApply(structs.ACLTokenUpsertRequestType, args) if err != nil { return err } // Update the index reply.Index = index return nil } // DeleteTokens is used to delete tokens func (a *ACL) DeleteTokens(args *structs.ACLTokenDeleteRequest, reply *structs.GenericResponse) error { if done, err := a.srv.forward("ACL.DeleteTokens", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "delete_tokens"}, time.Now()) // Validate non-zero set of tokens if len(args.AccessorIDs) == 0 { return fmt.Errorf("must specify as least one token") } // Update via Raft _, index, err := a.srv.raftApply(structs.ACLTokenDeleteRequestType, args) if err != nil { return err } // Update the index reply.Index = index return nil } // ListTokens is used to list the tokens func (a *ACL) ListTokens(args *structs.ACLTokenListRequest, reply *structs.ACLTokenListResponse) error { if done, err := a.srv.forward("ACL.ListTokens", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "list_tokens"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Iterate over all the tokens var err error var iter memdb.ResultIterator if prefix := args.QueryOptions.Prefix; prefix != "" { iter, err = state.ACLTokenByAccessorIDPrefix(ws, prefix) } else { iter, err = state.ACLTokens(ws) } if err != nil { return err } // Convert all the tokens to a list stub reply.Tokens = nil for { raw := iter.Next() if raw == nil { break } token := raw.(*structs.ACLToken) reply.Tokens = append(reply.Tokens, token.Stub()) } // Use the last index that affected the token table index, err := state.Index("acl_token") if err != nil { return err } reply.Index = index return nil }} return a.srv.blockingRPC(&opts) } // GetToken is used to get a specific token func (a *ACL) GetToken(args *structs.ACLTokenSpecificRequest, reply *structs.SingleACLTokenResponse) error { if done, err := a.srv.forward("ACL.GetToken", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"nomad", "acl", "get_token"}, time.Now()) // Setup the blocking query opts := blockingOptions{ queryOpts: &args.QueryOptions, queryMeta: &reply.QueryMeta, run: func(ws memdb.WatchSet, state *state.StateStore) error { // Look for the token out, err := state.ACLTokenByAccessorID(ws, args.AccessorID) if err != nil { return err } // Setup the output reply.Token = out if out != nil { reply.Index = out.ModifyIndex } else { // Use the last index that affected the token table index, err := state.Index("acl_token") if err != nil { return err } reply.Index = index } return nil }} return a.srv.blockingRPC(&opts) }