// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.

package events

import (
	"encoding/json"
	"fmt"
	"sync"
	"testing"
	"time"
)

const timeout = time.Second

func init() {
	runningTests = true
}

func TestNewLogger(t *testing.T) {
	l := NewLogger()
	if l == nil {
		t.Fatal("Unexpected nil Logger")
	}
}

func TestSubscriber(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(0)
	defer s.Unsubscribe()
	if s == nil {
		t.Fatal("Unexpected nil Subscription")
	}
}

func TestTimeout(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(0)
	defer s.Unsubscribe()
	_, err := s.Poll(timeout)
	if err != ErrTimeout {
		t.Fatal("Unexpected non-Timeout error:", err)
	}
}

func TestEventBeforeSubscribe(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	l.Log(DeviceConnected, "foo")
	s := l.Subscribe(0)
	defer s.Unsubscribe()

	_, err := s.Poll(timeout)
	if err != ErrTimeout {
		t.Fatal("Unexpected non-Timeout error:", err)
	}
}

func TestEventAfterSubscribe(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()
	l.Log(DeviceConnected, "foo")

	ev, err := s.Poll(timeout)

	if err != nil {
		t.Fatal("Unexpected error:", err)
	}
	if ev.Type != DeviceConnected {
		t.Error("Incorrect event type", ev.Type)
	}
	switch v := ev.Data.(type) {
	case string:
		if v != "foo" {
			t.Error("Incorrect Data string", v)
		}
	default:
		t.Errorf("Incorrect Data type %#v", v)
	}
}

func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(DeviceDisconnected)
	defer s.Unsubscribe()
	l.Log(DeviceConnected, "foo")

	_, err := s.Poll(timeout)
	if err != ErrTimeout {
		t.Fatal("Unexpected non-Timeout error:", err)
	}
}

func TestBufferOverflow(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()

	// The first BufferSize events will be logged pretty much
	// instantaneously. The next BufferSize events will each block for up to
	// 15ms, plus overhead from race detector and thread scheduling latency
	// etc. This latency can sometimes be significant and is incurred for
	// each call. We just verify that the whole test completes in a
	// reasonable time, taking no more than 15 seconds in total.

	t0 := time.Now()
	const nEvents = BufferSize * 2
	for i := 0; i < nEvents; i++ {
		l.Log(DeviceConnected, "foo")
	}
	if d := time.Since(t0); d > 15*time.Second {
		t.Fatal("Logging took too long,", d, "avg", d/nEvents, "expected <", eventLogTimeout)
	}
}

func TestUnsubscribe(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	l.Log(DeviceConnected, "foo")

	_, err := s.Poll(timeout)
	if err != nil {
		t.Fatal("Unexpected error:", err)
	}

	s.Unsubscribe()
	l.Log(DeviceConnected, "foo")

	_, err = s.Poll(timeout)
	if err != ErrClosed {
		t.Fatal("Unexpected non-Closed error:", err)
	}
}

func TestGlobalIDs(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()
	l.Log(DeviceConnected, "foo")
	l.Subscribe(AllEvents)
	l.Log(DeviceConnected, "bar")

	ev, err := s.Poll(timeout)
	if err != nil {
		t.Fatal("Unexpected error:", err)
	}
	if ev.Data.(string) != "foo" {
		t.Fatal("Incorrect event:", ev)
	}
	id := ev.GlobalID

	ev, err = s.Poll(timeout)
	if err != nil {
		t.Fatal("Unexpected error:", err)
	}
	if ev.Data.(string) != "bar" {
		t.Fatal("Incorrect event:", ev)
	}
	if ev.GlobalID != id+1 {
		t.Fatalf("ID not incremented (%d != %d)", ev.GlobalID, id+1)
	}
}

func TestSubscriptionIDs(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(DeviceConnected)
	defer s.Unsubscribe()

	l.Log(DeviceDisconnected, "a")
	l.Log(DeviceConnected, "b")
	l.Log(DeviceConnected, "c")
	l.Log(DeviceDisconnected, "d")

	ev, err := s.Poll(timeout)
	if err != nil {
		t.Fatal("Unexpected error:", err)
	}

	if ev.GlobalID != 2 {
		t.Fatal("Incorrect GlobalID:", ev.GlobalID)
	}
	if ev.SubscriptionID != 1 {
		t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
	}

	ev, err = s.Poll(timeout)
	if err != nil {
		t.Fatal("Unexpected error:", err)
	}
	if ev.GlobalID != 3 {
		t.Fatal("Incorrect GlobalID:", ev.GlobalID)
	}
	if ev.SubscriptionID != 2 {
		t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
	}

	ev, err = s.Poll(timeout)
	if err != ErrTimeout {
		t.Fatal("Unexpected error:", err)
	}
}

func TestBufferedSub(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()
	bs := NewBufferedSubscription(s, 10*BufferSize)

	go func() {
		for i := 0; i < 10*BufferSize; i++ {
			l.Log(DeviceConnected, fmt.Sprintf("event-%d", i))
			if i%30 == 0 {
				// Give the buffer routine time to pick up the events
				time.Sleep(20 * time.Millisecond)
			}
		}
	}()

	recv := 0
	for recv < 10*BufferSize {
		evs := bs.Since(recv, nil, time.Minute)
		for _, ev := range evs {
			if ev.GlobalID != recv+1 {
				t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1)
			}
			recv = ev.GlobalID
		}
	}
}

func BenchmarkBufferedSub(b *testing.B) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()
	bufferSize := BufferSize
	bs := NewBufferedSubscription(s, bufferSize)

	// The coord channel paces the sender according to the receiver,
	// ensuring that no events are dropped. The benchmark measures sending +
	// receiving + synchronization overhead.

	coord := make(chan struct{}, bufferSize)
	for i := 0; i < bufferSize-1; i++ {
		coord <- struct{}{}
	}

	// Receive the events
	done := make(chan error)
	go func() {
		recv := 0
		var evs []Event
		for i := 0; i < b.N; {
			evs = bs.Since(recv, evs[:0], time.Minute)
			for _, ev := range evs {
				if ev.GlobalID != recv+1 {
					done <- fmt.Errorf("skipped event %v %v", ev.GlobalID, recv)
					return
				}
				recv = ev.GlobalID
				coord <- struct{}{}
			}
			i += len(evs)
		}
		done <- nil
	}()

	// Send the events
	eventData := map[string]string{
		"foo":   "bar",
		"other": "data",
		"and":   "something else",
	}
	for i := 0; i < b.N; i++ {
		l.Log(DeviceConnected, eventData)
		<-coord
	}

	if err := <-done; err != nil {
		b.Error(err)
	}
	b.ReportAllocs()
}

func TestSinceUsesSubscriptionId(t *testing.T) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(DeviceConnected)
	defer s.Unsubscribe()
	bs := NewBufferedSubscription(s, 10*BufferSize)

	l.Log(DeviceConnected, "a") // SubscriptionID = 1
	l.Log(DeviceDisconnected, "b")
	l.Log(DeviceDisconnected, "c")
	l.Log(DeviceConnected, "d") // SubscriptionID = 2

	// We need to loop for the events, as they may not all have been
	// delivered to the buffered subscription when we get here.
	t0 := time.Now()
	for time.Since(t0) < time.Second {
		events := bs.Since(0, nil, time.Minute)
		if len(events) == 2 {
			break
		}
		if len(events) > 2 {
			t.Fatal("Incorrect number of events:", len(events))
		}
	}

	events := bs.Since(1, nil, time.Minute)
	if len(events) != 1 {
		t.Fatal("Incorrect number of events:", len(events))
	}
}

func TestUnmarshalEvent(t *testing.T) {
	var event Event

	s := `
	{
		"id": 1,
		"globalID": 1,
		"time": "2006-01-02T15:04:05.999999999Z",
		"type": "Starting",
		"data": {}
	}`

	if err := json.Unmarshal([]byte(s), &event); err != nil {
		t.Fatal("Failed to unmarshal event:", err)
	}
}

func TestUnsubscribeContention(t *testing.T) {
	// Check that we can unsubscribe without blocking the whole system.

	const (
		listeners = 50
		senders   = 1000
	)

	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	// Start listeners. These will poll until the stop channel is closed,
	// then exit and unsubscribe.

	stopListeners := make(chan struct{})
	var listenerWg sync.WaitGroup
	listenerWg.Add(listeners)
	for i := 0; i < listeners; i++ {
		go func() {
			defer listenerWg.Done()

			s := l.Subscribe(AllEvents)
			defer s.Unsubscribe()

			for {
				select {
				case <-s.C():

				case <-stopListeners:
					return
				}
			}
		}()
	}

	// Start senders. These send pointless events until the stop channel is
	// closed.

	stopSenders := make(chan struct{})
	defer close(stopSenders)
	var senderWg sync.WaitGroup
	senderWg.Add(senders)
	for i := 0; i < senders; i++ {
		go func() {
			defer senderWg.Done()

			t := time.NewTicker(time.Millisecond)

			for {
				select {
				case <-t.C:
					l.Log(StateChanged, nil)

				case <-stopSenders:
					return
				}
			}
		}()
	}

	// Give everything time to start up.

	time.Sleep(time.Second)

	// Stop the listeners and wait for them to exit. This should happen in a
	// reasonable time frame.

	t0 := time.Now()
	close(stopListeners)
	listenerWg.Wait()
	if d := time.Since(t0); d > time.Minute {
		t.Error("It should not take", d, "to unsubscribe from an event stream")
	}
}

func BenchmarkLogEvent(b *testing.B) {
	l := NewLogger()
	defer l.Stop()
	go l.Serve()

	s := l.Subscribe(AllEvents)
	defer s.Unsubscribe()
	NewBufferedSubscription(s, 1) // runs in the background

	for i := 0; i < b.N; i++ {
		l.Log(StateChanged, nil)
	}
}