RabbitMQ ve .NET Core ile Tüketim Toplumu
RabbitMQ ve .NET Core ile Tüketim Toplumu

Herkese selamlar. Uzunca bir süredir yazı yayınlamıyordum. Bugün farklı bir konu hakkında konuşacağız. Ancak öyle çok fazla da detaya girmek istemiyorum açıkcası.
Bu yazıda şu konulara değineceğim
- RabbitMQ Nedir?
- AMQP Nedir?
- RabbitMQ ile Bir Mesaj Oluşturma İşlemini Nasıl Yaparız
- RabbitMQ ile Oluşturulan Mesajları Nasıl Consume Ederiz?
RabbitMQ Nedir?
RabbitMQ, Erlang dili ile yazılmış bir message queue sistemidir. AMQP yani Advanced Message Queuing Protocol kullanır. İlk hali küfür gibi ama olsun :)
Yapısal olarak ise gerçek dünya örneği olarak şöyle düşünebiliriz “Şirketler reklam verir, kanallar bu reklamı alır, izleyiciye iletir ve izleyici bu reklamı izlemiş olur”. Çok anlaşılır gibi değil. O zaman gelin şöyle bakalım.
RabbitMQ’da producer’lar vardır. Bunlar göndericilerdir. Yine RabbitMQ’da consumer’lar vardır. Bunlar ise tüketicilerdir.
Bir noktadan alınan X bir data, bir başka noktaya teslim edilmek üzere çalışan bir sistemden bahsetmiş olduk aslında. Arada kalan ise RabbitMQ sunucusu olmaktadır.
RabbitMQ ise burada produce edilen mesajları taşır ve bunları depolar. Saklı olarak bekleyen bir mesaj consume edilene dek bu şekilde bekler. Daha sonrasında ise eğer hata yok ise, RabbitMQ’ya “bak ben bu mesajı işledim işim bitti” gibi bir onay verir. Ve bu sayede işlenen mesaj kuyruktan da silinir. Diyelim hata oldu. Bu durumda da mesaj tekrar işlenir. Tabiiki bunların da senaryoları vardır. (Şuraya alayım sizi). Daha fazla detaya girmek istemem sizi bu detaylarda boğmak da istemem.
- Resmi Web Sitesi: https://www.rabbitmq.com/
- GitHub Organizasyonu: https://github.com/rabbitmq
AMQP Nedir?

Evet geldik okurken sakıncalı hale gelebilmesi muhtemelen o noktaya. Açılımı Advanced Message Queuing Protocol desek daha tatlı geliyor.
Birçok büyük finans şirketi tarafından da kullanılan AMQP aslında bir hayli gelişmiş bir mesajlaşma protokolüdür. Bunu platformdan bağımsız bir şekilde uygulamaların kuyruk yapılarını kullanarak yapmaktadır.
AMQP, geleneksel mesajlaşma protokollerinden farklı şekilde çalışır. Aslında gelişmiş özelliklere sahiptir. Bunu yukarıda da söyledik. Bazı mesajlaşam protokolleri “pub-sub” yapısında çalışırken, AMQP ise “queue” yapısında çalışır bu sayede esnek bir hale bürünür. AMQP modeli hakkında konuşurken, bir mesajın depolandığı kuyruğu söyleriz. Herhangi bir bağımsız platformdan bu kuyruğa bağlanan clientlar bu mesajları alarak onu tüketirler.
AMQP TLS desteği de sunmaktadır. Ayrıca priority, management ve processing gibi kavramlarda da ön plana çıkar.
Resmi Web Sitesi: https://www.amqp.org/
RabbitMQ’nun Kurulması
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
Yukarıdaki komut ile RabbitMQ’yu local docker sunucumuza kuruyoruz. sonrasında 2 tane port bizi karşılıyor. Birisi AMQP portu, bir diğeri de expose edilen UI portu. 15672 portunu kullanarak web arayüzüne erişebiliriz.
Oluşan adres şöyle olacaktır: http://localhost:15672/
İlk girişte kullanıcı adı ve parola guest oluyor 😛 üff ne sekürity ama.
Harika login page

Harika dashboard

RabbitMQ ile Bir Mesaj Oluşturma İşlemini Nasıl Yaparız
Buradaki örneği .NET core ile gerçekleştireceğim. Öyle karmaşık configlere vs. gerek yok. Her ikisi de console uygulaması olacak.
Öncelikle 2 tane proje oluşturalım. Birinin “producer” diğerinin de “consumer” olacağını biliyoruz. Çünkü RabbitMQ nasıl çalışır anladık. Bu her 2 projeye de RabbitMQ.Client paketini kurmalıyız.

İlk mesaj oluşturma işlemimiz şu şekilde olsun.
using RabbitMQ.Client;
using System.Text;
var connectionFactory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var someMessage = "Hello world!";
var queueForMessage = "some_message_queue";
channel.QueueDeclare(queue: queueForMessage,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(someMessage);
channel.BasicPublish(exchange: "",
routingKey: queueForMessage,
basicProperties: null,
body: body);
Console.WriteLine($"Message sent: {someMessage}");
}
}
Basit olarak yukarıdaki kodu açıklamak gerekir ise öncelikle ConnectionFactory sınıfı ile client propertylerini set ediyoruz. Ben açık bir şekilde bunu set ettim ancak class’ın kaynak koduna bakarsanız localhost demenize gerek yok.

Sonrasında CreateConnection() ile bağlantı sağlıyoruz. Bu aşamadan sonra da bir channel oluşturma işlemi yapıyoruz. Burada CreateModel() bu konuda bize yardımcı oluyor. Bu method ile yeni bi channel, session ve model oluşturabiliyoruz.
Sonra 2 tane değişken görüyoruz. Birisi kuyruğa atılacak olan datayı taşıyan someMessage bir diğeri ise, datanın atılacağı kuyruğun adını taşıyan queueForMessage değişkeni. Kuyruktaki data someMessage değerini içerir. Kuyruk ise queueForMessage içerisinde tanımlanmıştır.
Sonrasında ise QueueDeclare ile bir kuyruk oluştururuz. Buradaki parametreleri de açıklayalım;
queue: Kuyruk adı.
durable: Broker restart edilirse, queue hala hayatta olacak mı olmayacak mı?
exclusive: Bu, declare edilen kuyruğun bağlantısıyla mı sınırlı olmalı? Bu tarz bir kuyrukta, declare eden bağlantı sonlandığında, kuyruk da silinir.
autoDelete: Eğer var ise son consumer, kuyruktan ayrıldığında bu kuyruk otomatik olarak silinmeli mi?
arguments: Bu ise dictionary bir değer alır. Opsiyonel olarak yollamak istediğiniz argumanlar burada tutulur. IDictionary<string, object> şeklindedir.
body kısmını açıklamayacağım ancak BasicPublish() methodu ile artık mesajımızı kuyruğa ekliyoruz.
Bu methodda da bazı parametreler var. Bunlar şöyle;
exchange: Kuyruğa giden mesajlar aslında direkt kuyruğa gitmez. Yönlendirme aracı gibidir. Burada boş olduğu için doğrudan kuyruğa gönderilecek.
routingKey: Exchange’e iletilen mesaj, burada yer alan routingKey yardımı ile kuyruğa iletilir.
basicProperties: Yaygın kullanılan AMQP propertylerini içerir. AppId, ClusterId vs. gibi.
body: Gönderilen mesaj
Hadi bu projeyi çalıştıralım

Şimdi oluşan kuyruğumuza gidelim ve bakalım. Bunu yapmak için web ui kullanılabilir.
Aşağıdaki görselde queue’da bir mesajın consume edilmeye hazır olduğunu görmekteyiz.

Gönderilen mesajımız şurada açıkca görüntülenebilir

RabbitMQ ile Oluşturulan Mesajları Nasıl Consume Ederiz?
Öncelikle kodumuzu aşağıda paylaşayım
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var connectionFactory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var queueForMessage = "some_message_queue";
channel.QueueDeclare(queue: queueForMessage,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Message received: {message}");
};
// Mesajların alınacağı kuyruğa abone ol
channel.BasicConsume(queue: queueForMessage,
autoAck: true,
consumer: consumer);
}
}
Burada EventingBasicConsumer ile basit bir consumer event class’ını init ediyoruz. Aldığı değer ise bir IModel interfacenin implementasyonudur yani burada bir channel almakta.
Bu consumer’ın Received event’i fırlatıldığı anda ise, consume edilecek data ne ise burada görebiliyoruz. Bu örnekte tabii string bir değer olduğu için böyle basit bir işlem gerçekleştirdik.
Consume edilen mesajı byte array olarak ele alıyoruz. Sonrasında da bu byte array’den yolladığımız mesajı elde ediyoruz.
En önemli kısım ise BasicConsume() methodu oluyor. Buradaki parametreleri de şöyle açıklayalım;
queue: Kuyruk adı
autoAck: Bu parametre true olarak ayarlanır ise, mesaj başarılı bir şekilde consume edildiğinde otomatik olarak onaylanır.
consumer: Bu da consumer 😄 Yani tüketim toplumu 😛
Bu kodu çalıştırdığımızda şöyle bir çıktı alırız

Ayrıca RabbitMQ UI’ına da giderek queue’muza bakalım

Görüldüğü üzere consume edilmeyi bekleyen datamız artık mevcut değil.
Evet bu yazı bu kadar :) Okuduğunuz için teşekkür ederim.
Yazı Sonu Kedisi
