// Copyright (C) 2024 The Syncthing Authors. // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this file, // You can obtain one at https://mozilla.org/MPL/2.0/. package main import ( "context" "fmt" "io" amqp "github.com/rabbitmq/amqp091-go" "github.com/thejerf/suture/v4" ) type amqpReplicator struct { suture.Service broker string sender *amqpSender receiver *amqpReceiver outbox chan ReplicationRecord } func newAMQPReplicator(broker, clientID string, db database) *amqpReplicator { svc := suture.New("amqpReplicator", suture.Spec{PassThroughPanics: true}) sender := &amqpSender{ broker: broker, clientID: clientID, outbox: make(chan ReplicationRecord, replicationOutboxSize), } svc.Add(sender) receiver := &amqpReceiver{ broker: broker, clientID: clientID, db: db, } svc.Add(receiver) return &amqpReplicator{ Service: svc, broker: broker, sender: sender, receiver: receiver, outbox: make(chan ReplicationRecord, replicationOutboxSize), } } func (s *amqpReplicator) send(key string, ps []DatabaseAddress, seen int64) { s.sender.send(key, ps, seen) } type amqpSender struct { broker string clientID string outbox chan ReplicationRecord } func (s *amqpSender) Serve(ctx context.Context) error { conn, ch, err := amqpChannel(s.broker) if err != nil { return err } defer ch.Close() defer conn.Close() buf := make([]byte, 1024) for { select { case rec := <-s.outbox: size := rec.Size() if len(buf) < size { buf = make([]byte, size) } n, err := rec.MarshalTo(buf) if err != nil { replicationSendsTotal.WithLabelValues("error").Inc() return fmt.Errorf("replication marshal: %w", err) } err = ch.PublishWithContext(ctx, "discovery", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/protobuf", Body: buf[:n], AppId: s.clientID, }) if err != nil { replicationSendsTotal.WithLabelValues("error").Inc() return fmt.Errorf("replication publish: %w", err) } replicationSendsTotal.WithLabelValues("success").Inc() case <-ctx.Done(): return nil } } } func (s *amqpSender) String() string { return fmt.Sprintf("amqpSender(%q)", s.broker) } func (s *amqpSender) send(key string, ps []DatabaseAddress, seen int64) { item := ReplicationRecord{ Key: key, Addresses: ps, Seen: seen, } // The send should never block. The inbox is suitably buffered for at // least a few seconds of stalls, which shouldn't happen in practice. select { case s.outbox <- item: default: replicationSendsTotal.WithLabelValues("drop").Inc() } } type amqpReceiver struct { broker string clientID string db database } func (s *amqpReceiver) Serve(ctx context.Context) error { conn, ch, err := amqpChannel(s.broker) if err != nil { return err } defer ch.Close() defer conn.Close() msgs, err := amqpConsume(ch) if err != nil { return err } for { select { case msg, ok := <-msgs: if !ok { return fmt.Errorf("subscription closed: %w", io.EOF) } // ignore messages from ourself if msg.AppId == s.clientID { continue } var rec ReplicationRecord if err := rec.Unmarshal(msg.Body); err != nil { replicationRecvsTotal.WithLabelValues("error").Inc() return fmt.Errorf("replication unmarshal: %w", err) } if err := s.db.merge(rec.Key, rec.Addresses, rec.Seen); err != nil { return fmt.Errorf("replication database merge: %w", err) } replicationRecvsTotal.WithLabelValues("success").Inc() case <-ctx.Done(): return nil } } } func (s *amqpReceiver) String() string { return fmt.Sprintf("amqpReceiver(%q)", s.broker) } func amqpChannel(dst string) (*amqp.Connection, *amqp.Channel, error) { conn, err := amqp.Dial(dst) if err != nil { return nil, nil, fmt.Errorf("AMQP dial: %w", err) } ch, err := conn.Channel() if err != nil { return nil, nil, fmt.Errorf("AMQP channel: %w", err) } err = ch.ExchangeDeclare( "discovery", // name "fanout", // type false, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return nil, nil, fmt.Errorf("AMQP declare exchange: %w", err) } return conn, ch, nil } func amqpConsume(ch *amqp.Channel) (<-chan amqp.Delivery, error) { q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return nil, fmt.Errorf("AMQP declare queue: %w", err) } err = ch.QueueBind( q.Name, // queue name "", // routing key "discovery", // exchange false, nil, ) if err != nil { return nil, fmt.Errorf("AMQP bind queue: %w", err) } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return nil, fmt.Errorf("AMQP consume: %w", err) } return msgs, nil }