2014-01-09 16:35:49 +01:00
|
|
|
package model
|
|
|
|
|
|
|
|
import (
|
|
|
|
"log"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Monitor interface {
|
|
|
|
FileBegins(<-chan content) error
|
|
|
|
FileDone() error
|
|
|
|
}
|
|
|
|
|
|
|
|
type FileQueue struct {
|
2014-01-13 10:29:23 -07:00
|
|
|
files queuedFileList
|
|
|
|
lock sync.Mutex
|
|
|
|
sorted bool
|
|
|
|
availability map[string][]string
|
2014-01-09 16:35:49 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
type queuedFile struct {
|
|
|
|
name string
|
|
|
|
blocks []Block
|
|
|
|
activeBlocks []bool
|
|
|
|
given int
|
|
|
|
remaining int
|
|
|
|
channel chan content
|
|
|
|
nodes []string
|
|
|
|
nodesChecked time.Time
|
|
|
|
monitor Monitor
|
|
|
|
}
|
|
|
|
|
|
|
|
type content struct {
|
|
|
|
offset int64
|
|
|
|
data []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
type queuedFileList []queuedFile
|
|
|
|
|
|
|
|
func (l queuedFileList) Len() int { return len(l) }
|
|
|
|
|
|
|
|
func (l queuedFileList) Swap(a, b int) { l[a], l[b] = l[b], l[a] }
|
|
|
|
|
|
|
|
func (l queuedFileList) Less(a, b int) bool {
|
|
|
|
// Sort by most blocks already given out, then alphabetically
|
|
|
|
if l[a].given != l[b].given {
|
|
|
|
return l[a].given > l[b].given
|
|
|
|
}
|
|
|
|
return l[a].name < l[b].name
|
|
|
|
}
|
|
|
|
|
|
|
|
type queuedBlock struct {
|
|
|
|
name string
|
|
|
|
block Block
|
|
|
|
index int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) Add(name string, blocks []Block, monitor Monitor) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
q.files = append(q.files, queuedFile{
|
|
|
|
name: name,
|
|
|
|
blocks: blocks,
|
|
|
|
activeBlocks: make([]bool, len(blocks)),
|
|
|
|
remaining: len(blocks),
|
|
|
|
channel: make(chan content),
|
|
|
|
monitor: monitor,
|
|
|
|
})
|
|
|
|
q.sorted = false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) Len() int {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
return len(q.files)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) Get(nodeID string) (queuedBlock, bool) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
if !q.sorted {
|
|
|
|
sort.Sort(q.files)
|
|
|
|
q.sorted = true
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range q.files {
|
2014-01-13 10:29:23 -07:00
|
|
|
qf := &q.files[i]
|
2014-01-09 16:35:49 +01:00
|
|
|
|
2014-01-13 10:29:23 -07:00
|
|
|
if len(q.availability[qf.name]) == 0 {
|
2014-01-09 16:35:49 +01:00
|
|
|
// Noone has the file we want; abort.
|
2014-01-13 10:29:23 -07:00
|
|
|
if qf.remaining != len(qf.blocks) {
|
2014-01-09 16:35:49 +01:00
|
|
|
// We have already started on this file; close it down
|
2014-01-13 10:29:23 -07:00
|
|
|
close(qf.channel)
|
|
|
|
if mon := qf.monitor; mon != nil {
|
2014-01-09 16:35:49 +01:00
|
|
|
mon.FileDone()
|
|
|
|
}
|
|
|
|
}
|
2014-01-13 10:29:23 -07:00
|
|
|
q.deleteAt(i)
|
2014-01-09 16:35:49 +01:00
|
|
|
return queuedBlock{}, false
|
|
|
|
}
|
|
|
|
|
2014-01-13 10:29:23 -07:00
|
|
|
for _, ni := range q.availability[qf.name] {
|
2014-01-09 16:35:49 +01:00
|
|
|
// Find and return the next block in the queue
|
|
|
|
if ni == nodeID {
|
|
|
|
for j, b := range qf.blocks {
|
|
|
|
if !qf.activeBlocks[j] {
|
2014-01-13 10:29:23 -07:00
|
|
|
qf.activeBlocks[j] = true
|
|
|
|
qf.given++
|
2014-01-09 16:35:49 +01:00
|
|
|
return queuedBlock{
|
|
|
|
name: qf.name,
|
|
|
|
block: b,
|
|
|
|
index: j,
|
|
|
|
}, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// We found nothing to do
|
|
|
|
return queuedBlock{}, false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) Done(file string, offset int64, data []byte) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
c := content{
|
|
|
|
offset: offset,
|
|
|
|
data: data,
|
|
|
|
}
|
2014-01-13 10:29:23 -07:00
|
|
|
for i := range q.files {
|
|
|
|
qf := &q.files[i]
|
|
|
|
|
2014-01-09 16:35:49 +01:00
|
|
|
if qf.name == file {
|
|
|
|
if qf.monitor != nil && qf.remaining == len(qf.blocks) {
|
|
|
|
err := qf.monitor.FileBegins(qf.channel)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("WARNING: %s: %v (not synced)", qf.name, err)
|
2014-01-13 10:29:23 -07:00
|
|
|
q.deleteAt(i)
|
2014-01-09 16:35:49 +01:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
qf.channel <- c
|
2014-01-13 10:29:23 -07:00
|
|
|
qf.remaining--
|
2014-01-09 16:35:49 +01:00
|
|
|
|
2014-01-13 10:29:23 -07:00
|
|
|
if qf.remaining == 0 {
|
2014-01-09 16:35:49 +01:00
|
|
|
close(qf.channel)
|
|
|
|
if qf.monitor != nil {
|
|
|
|
err := qf.monitor.FileDone()
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("WARNING: %s: %v", qf.name, err)
|
|
|
|
}
|
|
|
|
}
|
2014-01-13 10:29:23 -07:00
|
|
|
q.deleteAt(i)
|
2014-01-09 16:35:49 +01:00
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
panic("unreachable")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) Queued(file string) bool {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
for _, qf := range q.files {
|
|
|
|
if qf.name == file {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) QueuedFiles() (files []string) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
|
|
|
|
for _, qf := range q.files {
|
|
|
|
files = append(files, qf.name)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-01-13 10:29:23 -07:00
|
|
|
func (q *FileQueue) deleteAt(i int) {
|
2014-01-09 16:35:49 +01:00
|
|
|
q.files = q.files[:i+copy(q.files[i:], q.files[i+1:])]
|
|
|
|
}
|
2014-01-13 10:29:23 -07:00
|
|
|
|
2014-01-13 11:22:57 -07:00
|
|
|
func (q *FileQueue) deleteFile(n string) {
|
|
|
|
for i, file := range q.files {
|
|
|
|
if n == file.name {
|
|
|
|
q.deleteAt(i)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-01-13 10:29:23 -07:00
|
|
|
func (q *FileQueue) SetAvailable(file, node string) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
if q.availability == nil {
|
|
|
|
q.availability = make(map[string][]string)
|
|
|
|
}
|
|
|
|
q.availability[file] = []string{node}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *FileQueue) AddAvailable(file, node string) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
if q.availability == nil {
|
|
|
|
q.availability = make(map[string][]string)
|
|
|
|
}
|
|
|
|
q.availability[file] = append(q.availability[file], node)
|
|
|
|
}
|
2014-01-13 11:22:57 -07:00
|
|
|
|
|
|
|
func (q *FileQueue) RemoveAvailable(toRemove string) {
|
|
|
|
q.lock.Lock()
|
|
|
|
defer q.lock.Unlock()
|
|
|
|
for file, nodes := range q.availability {
|
|
|
|
for i, node := range nodes {
|
|
|
|
if node == toRemove {
|
|
|
|
q.availability[file] = nodes[:i+copy(nodes[i:], nodes[i+1:])]
|
|
|
|
if len(q.availability[file]) == 0 {
|
|
|
|
q.deleteFile(file)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|