api: remove `mapstructure` tags from`Port` struct (#12916)

This PR solves a defect in the deserialization of api.Port structs when returning structs from theEventStream.

Previously, the api.Port struct's fields were decorated with both mapstructure and hcl tags to support the network.port stanza's use of the keyword static when posting a static port value. This works fine when posting a job and when retrieving any struct that has an embedded api.Port instance as long as the value is deserialized using JSON decoding. The EventStream, however, uses mapstructure to decode event payloads in the api package. mapstructure expects an underlying field named static which does not exist. The result was that the Port.Value field would always be set to 0.

Upon further inspection, a few things became apparent.

The struct already has hcl tags that support the indirection during job submission.
Serialization/deserialization with both the json and hcl packages produce the desired result.
The use of of the mapstructure tags provided no value as the Port struct contains only fields with primitive types.
This PR:

Removes the mapstructure tags from the api.Port structs
Updates the job parsing logic to use hcl instead of mapstructure when decoding Port instances.
Closes #11044

Co-authored-by: DerekStrickland <dstrickland@hashicorp.com>
Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>
This commit is contained in:
Derek Strickland 2022-11-08 05:26:28 -05:00 committed by GitHub
parent 1859559134
commit 80b6f27efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 187 additions and 13 deletions

3
.changelog/12916.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
event_stream: fixed a bug where dynamic port values would fail to serialize in the event stream
```

View File

@ -98,9 +98,9 @@ func (r *Resources) Merge(other *Resources) {
type Port struct { type Port struct {
Label string `hcl:",label"` Label string `hcl:",label"`
Value int `mapstructure:"static" hcl:"static,optional"` Value int `hcl:"static,optional"`
To int `mapstructure:"to" hcl:"to,optional"` To int `hcl:"to,optional"`
HostNetwork string `mapstructure:"host_network" hcl:"host_network,optional"` HostNetwork string `hcl:"host_network,optional"`
} }
type DNSConfig struct { type DNSConfig struct {

View File

@ -727,6 +727,7 @@ func TestClient_Init(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
dir := t.TempDir() dir := t.TempDir()
allocDir := filepath.Join(dir, "alloc") allocDir := filepath.Join(dir, "alloc")
config := config.DefaultConfig() config := config.DefaultConfig()

View File

@ -10,8 +10,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -202,3 +204,103 @@ func TestEventStream_QueryParse(t *testing.T) {
}) })
} }
} }
func TestHTTP_Alloc_Port_Response(t *testing.T) {
ci.Parallel(t)
httpTest(t, nil, func(srv *TestAgent) {
client := srv.Client()
defer srv.Shutdown()
defer client.Close()
testutil.WaitForLeader(t, srv.Agent.RPC)
testutil.WaitForClient(t, srv.Agent.Client().RPC, srv.Agent.Client().NodeID(), srv.Agent.Client().Region())
job := MockRunnableJob()
resp, _, err := client.Jobs().Register(job, nil)
require.NoError(t, err)
require.NotEmpty(t, resp.EvalID)
alloc := mock.Alloc()
alloc.Job = ApiJobToStructJob(job)
alloc.JobID = *job.ID
alloc.NodeID = srv.client.NodeID()
require.Nil(t, srv.server.State().UpsertJobSummary(101, mock.JobSummary(alloc.JobID)))
require.Nil(t, srv.server.State().UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc}))
running := false
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := srv.server.State().AllocByID(nil, alloc.ID)
if stateErr != nil {
return false, stateErr
}
if upsertResult.ClientStatus == structs.AllocClientStatusRunning {
running = true
return true, nil
}
return false, nil
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})
require.True(t, running)
topics := map[api.Topic][]string{
api.TopicAllocation: {*job.ID},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events := client.EventStream()
streamCh, err := events.Stream(ctx, topics, 1, nil)
require.NoError(t, err)
var allocEvents []api.Event
// gather job alloc events
go func() {
for {
select {
case event, ok := <-streamCh:
if !ok {
return
}
if event.IsHeartbeat() {
continue
}
allocEvents = append(allocEvents, event.Events...)
case <-time.After(10 * time.Second):
require.Fail(t, "failed waiting for event stream event")
}
}
}()
var networkResource *api.NetworkResource
testutil.WaitForResult(func() (bool, error) {
for _, e := range allocEvents {
if e.Type == structs.TypeAllocationUpdated {
eventAlloc, err := e.Allocation()
if err != nil {
return false, err
}
if len(eventAlloc.AllocatedResources.Tasks["web"].Networks) == 0 {
return false, nil
}
networkResource = eventAlloc.AllocatedResources.Tasks["web"].Networks[0]
if networkResource.ReservedPorts[0].Value == 5000 {
return true, nil
}
}
}
return false, nil
}, func(e error) {
require.NoError(t, err)
})
require.NotNil(t, networkResource)
require.Equal(t, 5000, networkResource.ReservedPorts[0].Value)
require.NotEqual(t, 0, networkResource.DynamicPorts[0].Value)
})
}

View File

@ -104,3 +104,21 @@ func MockRegionalJob() *api.Job {
j.Region = pointer.Of("north-america") j.Region = pointer.Of("north-america")
return j return j
} }
// MockRunnableJob returns a mock job that has a configuration that allows it to be
// placed on a TestAgent.
func MockRunnableJob() *api.Job {
job := MockJob()
// Configure job so it can be run on a TestAgent
job.Constraints = nil
job.TaskGroups[0].Constraints = nil
job.TaskGroups[0].Count = pointer.Of(1)
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Services = nil
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
return job
}

View File

@ -16,6 +16,12 @@ job "deployment_auto.nomad" {
min_healthy_time = "1s" min_healthy_time = "1s"
} }
network {
port "db" {
static = 9000
}
}
task "one" { task "one" {
driver = "raw_exec" driver = "raw_exec"
@ -24,10 +30,12 @@ job "deployment_auto.nomad" {
} }
config { config {
command = "/bin/sleep" image = "busybox:1"
command = "nc"
# change args to update the job, the only changes # change args to update the job, the only changes
args = ["1000000"] args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["db"]
} }
resources { resources {

View File

@ -15,17 +15,25 @@ job "deployment_auto.nomad" {
canary = 2 canary = 2
} }
network {
port "db" {
static = 9000
}
}
task "one" { task "one" {
driver = "raw_exec" driver = "docker"
env { env {
version = "0" version = "0"
} }
config { config {
command = "/bin/sleep" image = "busybox:1"
command = "nc"
# change args to update the job, the only changes # change args to update the job, the only changes
args = ["1000000"] args = ["-ll", "-p", "1234", "-e", "/bin/cat"]
ports = ["db"]
} }
resources { resources {

View File

@ -93,14 +93,12 @@ func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error {
if knownPortLabels[l] { if knownPortLabels[l] {
return fmt.Errorf("found a port label collision: %s", label) return fmt.Errorf("found a port label collision: %s", label)
} }
var p map[string]interface{}
var res api.Port var res api.Port
if err := hcl.DecodeObject(&p, port.Val); err != nil { if err := hcl.DecodeObject(&res, port.Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(p, &res); err != nil {
return err return err
} }
res.Label = label res.Label = label
if res.Value > 0 { if res.Value > 0 {
nw.ReservedPorts = append(nw.ReservedPorts, res) nw.ReservedPorts = append(nw.ReservedPorts, res)

View File

@ -1924,3 +1924,28 @@ func TestIncorrectKey(t *testing.T) {
t.Fatalf("Expected key error; got %v", err) t.Fatalf("Expected key error; got %v", err)
} }
} }
// TestPortParsing validates that the removal of the mapstructure tags on the
// Port struct don't cause issues with HCL 1 parsing.
//
// TODO: in the future, see if we need `mapstructure` tags on any of the API
func TestPortParsing(t *testing.T) {
ci.Parallel(t)
var err error
var path string
var job *api.Job
path, err = filepath.Abs(filepath.Join("./test-fixtures", "parse-ports.hcl"))
require.NoError(t, err, "Can't get absolute path for file: parse-ports.hcl")
job, err = ParseFile(path)
require.NoError(t, err, "cannot parse job")
require.NotNil(t, job)
require.Len(t, job.TaskGroups, 1)
require.Len(t, job.TaskGroups[0].Networks, 1)
require.Len(t, job.TaskGroups[0].Networks[0].ReservedPorts, 1)
require.Len(t, job.TaskGroups[0].Networks[0].DynamicPorts, 1)
require.Equal(t, 9000, job.TaskGroups[0].Networks[0].ReservedPorts[0].Value)
require.Equal(t, 0, job.TaskGroups[0].Networks[0].DynamicPorts[0].Value)
}

View File

@ -0,0 +1,11 @@
job "parse-ports" {
group "group" {
network {
port "static" {
static = 9000
}
port "dynamic" {}
}
}
}