mirror of
https://github.com/octoleo/syncthing.git
synced 2024-12-22 19:08:58 +00:00
247 lines
5.1 KiB
Go
247 lines
5.1 KiB
Go
// Copyright (C) 2024 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/thejerf/suture/v4"
|
|
)
|
|
|
|
type amqpReplicator struct {
|
|
suture.Service
|
|
broker string
|
|
sender *amqpSender
|
|
receiver *amqpReceiver
|
|
outbox chan ReplicationRecord
|
|
}
|
|
|
|
func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator {
|
|
svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true})
|
|
|
|
sender := &amqpSender{
|
|
broker: broker,
|
|
clientID: clientID,
|
|
outbox: make(chan ReplicationRecord, replicationOutboxSize),
|
|
}
|
|
svc.Add(sender)
|
|
|
|
receiver := &amqpReceiver{
|
|
broker: broker,
|
|
clientID: clientID,
|
|
db: db,
|
|
}
|
|
svc.Add(receiver)
|
|
|
|
return &amqpReplicator{
|
|
Service: svc,
|
|
broker: broker,
|
|
sender: sender,
|
|
receiver: receiver,
|
|
outbox: make(chan ReplicationRecord, replicationOutboxSize),
|
|
}
|
|
}
|
|
|
|
func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) {
|
|
s.sender.send(key, ps, seen)
|
|
}
|
|
|
|
type amqpSender struct {
|
|
broker string
|
|
clientID string
|
|
outbox chan ReplicationRecord
|
|
}
|
|
|
|
func (s *amqpSender) Serve(ctx context.Context) error {
|
|
conn, ch, err := amqpChannel(s.broker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
defer conn.Close()
|
|
|
|
buf := make([]byte, 1024)
|
|
for {
|
|
select {
|
|
case rec := <-s.outbox:
|
|
size := rec.Size()
|
|
if len(buf) < size {
|
|
buf = make([]byte, size)
|
|
}
|
|
|
|
n, err := rec.MarshalTo(buf)
|
|
if err != nil {
|
|
replicationSendsTotal.WithLabelValues("error").Inc()
|
|
return fmt.Errorf("replication marshal: %w", err)
|
|
}
|
|
|
|
err = ch.PublishWithContext(ctx,
|
|
"discovery", // exchange
|
|
"", // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: "application/protobuf",
|
|
Body: buf[:n],
|
|
AppId: s.clientID,
|
|
})
|
|
if err != nil {
|
|
replicationSendsTotal.WithLabelValues("error").Inc()
|
|
return fmt.Errorf("replication publish: %w", err)
|
|
}
|
|
|
|
replicationSendsTotal.WithLabelValues("success").Inc()
|
|
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *amqpSender) String() string {
|
|
return fmt.Sprintf("amqpSender(%q)", s.broker)
|
|
}
|
|
|
|
func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) {
|
|
item := ReplicationRecord{
|
|
Key: key,
|
|
Addresses: ps,
|
|
Seen: seen,
|
|
}
|
|
|
|
// The send should never block. The inbox is suitably buffered for at
|
|
// least a few seconds of stalls, which shouldn't happen in practice.
|
|
select {
|
|
case s.outbox <- item:
|
|
default:
|
|
replicationSendsTotal.WithLabelValues("drop").Inc()
|
|
}
|
|
}
|
|
|
|
type amqpReceiver struct {
|
|
broker string
|
|
clientID string
|
|
db database
|
|
}
|
|
|
|
func (s *amqpReceiver) Serve(ctx context.Context) error {
|
|
conn, ch, err := amqpChannel(s.broker)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer ch.Close()
|
|
defer conn.Close()
|
|
|
|
msgs, err := amqpConsume(ch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case msg, ok := <-msgs:
|
|
if !ok {
|
|
return fmt.Errorf("subscription closed: %w", io.EOF)
|
|
}
|
|
|
|
// ignore messages from ourself
|
|
if msg.AppId == s.clientID {
|
|
continue
|
|
}
|
|
|
|
var rec ReplicationRecord
|
|
if err := rec.Unmarshal(msg.Body); err != nil {
|
|
replicationRecvsTotal.WithLabelValues("error").Inc()
|
|
return fmt.Errorf("replication unmarshal: %w", err)
|
|
}
|
|
|
|
if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil {
|
|
return fmt.Errorf("replication database merge: %w", err)
|
|
}
|
|
|
|
replicationRecvsTotal.WithLabelValues("success").Inc()
|
|
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *amqpReceiver) String() string {
|
|
return fmt.Sprintf("amqpReceiver(%q)", s.broker)
|
|
}
|
|
|
|
func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) {
|
|
conn, err := amqp.Dial(dst)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("AMQP dial: %w", err)
|
|
}
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("AMQP channel: %w", err)
|
|
}
|
|
|
|
err = ch.ExchangeDeclare(
|
|
"discovery", // name
|
|
"fanout", // type
|
|
false, // durable
|
|
false, // auto-deleted
|
|
false, // internal
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err)
|
|
}
|
|
|
|
return conn, ch, nil
|
|
}
|
|
|
|
func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) {
|
|
q, err := ch.QueueDeclare(
|
|
"", // name
|
|
false, // durable
|
|
false, // delete when unused
|
|
true, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("AMQP declare queue: %w", err)
|
|
}
|
|
|
|
err = ch.QueueBind(
|
|
q.Name, // queue name
|
|
"", // routing key
|
|
"discovery", // exchange
|
|
false,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("AMQP bind queue: %w", err)
|
|
}
|
|
|
|
msgs, err := ch.Consume(
|
|
q.Name, // queue
|
|
"", // consumer
|
|
true, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("AMQP consume: %w", err)
|
|
}
|
|
|
|
return msgs, nil
|
|
}
|