open-nomad/api/fs_test.go

256 lines
5.3 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package api
import (
"bytes"
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
"github.com/docker/go-units"
"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
func TestFS_Logs(t *testing.T) {
testutil.RequireRoot(t)
testutil.Parallel(t)
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
node := oneNodeFromNodeList(t, c.Nodes())
index := node.ModifyIndex
var input strings.Builder
input.Grow(units.MB)
lines := 80 * units.KB
for i := 0; i < lines; i++ {
_, _ = fmt.Fprintf(&input, "%d\n", i)
}
job := &Job{
ID: pointerOf("TestFS_Logs"),
Region: pointerOf("global"),
Datacenters: []string{"dc1"},
Type: pointerOf("batch"),
TaskGroups: []*TaskGroup{
{
Name: pointerOf("TestFS_LogsGroup"),
Tasks: []*Task{
{
Name: "logger",
Driver: "mock_driver",
Config: map[string]interface{}{
"stdout_string": input.String(),
},
},
},
},
},
}
jobs := c.Jobs()
jobResp, _, err := jobs.Register(job, nil)
must.NoError(t, err)
index = jobResp.EvalCreateIndex
evaluations := c.Evaluations()
f := func() error {
resp, qm, err := evaluations.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index})
if err != nil {
return fmt.Errorf("failed to get evaluation info: %w", err)
}
must.Eq(t, "", resp.BlockedEval)
index = qm.LastIndex
if resp.Status != "complete" {
return fmt.Errorf("evaluation status is not complete, got: %s", resp.Status)
}
return nil
}
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(f),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
))
allocID := ""
g := func() error {
allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index})
if err != nil {
return fmt.Errorf("failed to get allocations: %w", err)
}
if n := len(allocs); n != 1 {
return fmt.Errorf("expected 1 allocation, got: %d", n)
}
if allocs[0].ClientStatus != "complete" {
return fmt.Errorf("allocation not complete: %s", allocs[0].ClientStatus)
}
allocID = allocs[0].ID
return nil
}
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(g),
wait.Timeout(10*time.Second),
wait.Gap(1*time.Second),
))
alloc, _, err := c.Allocations().Info(allocID, nil)
must.NoError(t, err)
for i := 0; i < 3; i++ {
stopCh := make(chan struct{})
defer close(stopCh)
frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil)
var result bytes.Buffer
READ_FRAMES:
for {
select {
case f := <-frames:
if f == nil {
break READ_FRAMES
}
result.Write(f.Data)
case err := <-errors:
// Don't Fatal here as the other assertions may
// contain helpful information.
t.Errorf("Error: %v", err)
}
}
// Check length
must.Eq(t, input.Len(), result.Len())
// Check complete ordering
for i := 0; i < lines; i++ {
line, readErr := result.ReadBytes('\n')
must.NoError(t, readErr, must.Sprintf("unexpected error on line %d: %v", i, readErr))
must.Eq(t, fmt.Sprintf("%d\n", i), string(line))
}
}
}
func TestFS_FrameReader(t *testing.T) {
testutil.Parallel(t)
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, errCh, cancelCh)
// Create some frames and send them
f1 := &StreamFrame{
File: "foo",
Offset: 5,
Data: []byte("hello"),
}
f2 := &StreamFrame{
File: "foo",
Offset: 10,
Data: []byte(", wor"),
}
f3 := &StreamFrame{
File: "foo",
Offset: 12,
Data: []byte("ld"),
}
framesCh <- f1
framesCh <- f2
framesCh <- f3
close(framesCh)
expected := []byte("hello, world")
// Read a little
p := make([]byte, 12)
n, err := r.Read(p[:5])
must.NoError(t, err)
must.Eq(t, n, r.Offset())
off := n
for {
n, err = r.Read(p[off:])
if err != nil {
if err == io.EOF {
break
}
must.NoError(t, err)
}
off += n
}
must.Eq(t, expected, p)
must.NoError(t, r.Close())
_, ok := <-cancelCh
must.False(t, ok)
must.Eq(t, len(expected), r.Offset())
}
func TestFS_FrameReader_Unblock(t *testing.T) {
testutil.Parallel(t)
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, errCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Read a little
p := make([]byte, 12)
n, err := r.Read(p)
must.NoError(t, err)
must.Zero(t, n)
// Unset the unblock
r.SetUnblockTime(0)
resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()
select {
case <-resultCh:
must.Unreachable(t, must.Sprint("must not have unblocked"))
case <-time.After(300 * time.Millisecond):
}
}
func TestFS_FrameReader_Error(t *testing.T) {
testutil.Parallel(t)
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
errCh := make(chan error, 1)
cancelCh := make(chan struct{})
r := NewFrameReader(framesCh, errCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)
// Send an error
expected := errors.New("test error")
errCh <- expected
// Read a little
p := make([]byte, 12)
_, err := r.Read(p)
must.ErrorIs(t, err, expected)
}