Windows Azure Service Bus / Queue

Windows Azure Service Bus yapısının en önemli konularından biri olan QUEUE başlığı ile  bu yazımızda queue kavramının genel yapısının cloud ortamında ne gibi işlemlerden geçtiğinden bahsedip ufak bir demo ile devam ediyor olacağız.

Yazıya başlamadan önce kısaca Queue mantığından bahsetmem gerekiyor. Queue yapısında, FİFO yani ilk giren ilk çıkar mantığına göre verilerin sıralanması olarak özetlenebilir. Örnek olarak işletim sistemi üzerinde bir işlem yapmak istediğiniz zaman misal dosyaları silmek olabilir. Bu dosyaları seçtiğiniz sırada siler. Dosya silme işlemi devam ederken başka bir dosya silmeyi tetiklerseniz sıraya eklenir ver ilk silme işlemi bittikten sonra sıradaki işlem gerçekleşir. Bu gibi senaryolarda queue yapısına sıkça başvurulmaktadır.

sb-queues-08

Peki Windows azure service bus yapısının sunduğu queue mantığının yukarıdaki bilgilerden ne farkı olabilir ?

Öncelikle queue yapısına operasyon yapan veri gönderen kişi ile bu verileri okuyan kullanıcılar için çevrim içi şartı olma zorunluluğu bulunmamaktadır. Yani Mesajınızı gönderirsiniz ancak karşı taraf online olmadığı için mesajları göremez diye bir durum söz konusu değildir. Service bus altyapısında tutulan gönderdiğiniz mesajlar ne zaman alıcı tarafından istenirse o an gönderilir ve iletişim sağlanır.

Queue servisinin bir diğer önemli noktası, sıra mantığı işlediğinden dolayı sıra içerisindeki veriyi istemci alır ve bir sonraki veriyi talep eder şeklinde bir döngüyle devam eder. Ancak aynı veri başka bir istemci tarafından talep edilirse veriye ulaşamaz çünkü o veri ilk istemci tarafından alınmıştır ve kuyruktan çıkmıştır. Her kuyruk elemanı tek bir istemci tarafından ulaşılacak şekilde ilerlenir.

Bu kadar bilgiden sonra uygulama olarak demomuzda neler yapabileceğimizi görebiliriz.

Herzaman olduğu şekilde Management Portal Üzerinden service bus hizmetimizi oluşturmamız gerekmektedir.

Capture

Servisi oluşturduktan sonra bize sunulan Connection Information tarafından Connection String alanını oluşturduğumuz Console Application içerisindeki App.config dosyasına aşağıdaki gibi yerleştirmemiz gerekmektedir.

Capture

<appSettings>
key="Microsoft.ServiceBus.ConnectionString" value="Connection String Alanı Buraya Gelecek"
</appSettings>

Bu uygulamada biraz farklı olarak elimizde bir data listesi olan .csv uzantılı dosyada bulunan verileri queue ile sıralayarak karşı tarafa gönderme işlemini gerçekleştiriyor olacağız. bunun için aşağıdaki örnekteki gibi bir .txt dosyası içerisine data listesi oluşturup bu dosyayı .csv olarak farklı kaydediyoruz ve console application içerisine yerleştiriyoruz.

IssueID,IssueTitle,CustomerID,CategoryID,SupportPackage,Priority,Severity,Resolved
1,lost,1,1,Basic,5,1,FALSE
2,damaged,1,1,Basic,5,1,FALSE
3,defective,1,2,Premium,5,2,FALSE
4,damaged,2,2,Premium,5,2,FALSE
5,lost,2,2,Basic,5,2,TRUE
6,lost,3,2,Basic,5,2,FALSE
7,damaged,3,7,Premium,5,3,FALSE
8,defective,3,2,Premium,5,3,FALSE
9,damaged,4,6,Premium,5,3,TRUE
10,lost,4,8,Basic,5,3,FALSE
11,damaged,5,4,Basic,5,4,FALSE
12,defective,5,4,Basic,5,4,FALSE
13,lost,6,8,Basic,5,4,FALSE
14,damaged,6,7,Premium,5,5,FALSE
15,defective,6,2,Premium,5,5,FALSE

Capture

Bu işlem sonrasında referans olarak Nuget packages olarak Service Bus paketini yükleyerek gerekli .dll listesini referanslarımıza ekliyoruz. Referanslar tamamlandıktan sonra oluşturduğumuz data.csv dosyasına yönelik bir parse operasyonu gerçekleştirerek verilerin istediğimiz düzende oluşturulmasını sağlamamız gerekmektedir. Bunun için öncelikle DataTable sınıfından türeyen bir ParseCSVFile() metod oluşturarak parse işlemini gerçekleştiriyoruz.

 static DataTable ParseCSVFile()
        {
            DataTable tableIssues = new DataTable("Issues");
            string path = @"C:\Users\onur\Desktop\data.csv";
            try
            {
               using (StreamReader readFile = new StreamReader(path))
                {
                    string line;
                    string[] row;

                    // create the columns
                    line = readFile.ReadLine();
                    foreach (string columnTitle in line.Split(','))
                    {
                        tableIssues.Columns.Add(columnTitle);
                    }

                    while ((line = readFile.ReadLine()) != null)
                    {
                        row = line.Split(',');
                        tableIssues.Rows.Add(row);
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Error : " + e.ToString());
            }
            return tableIssues;
        }

Parse işlemini tamamlayıp alacağımız mesajımızın şeklini şemalini tamamladıktan sonra şimdide mesajımızı oluşturmak için GenerateMessage metodu oluşturarak service bus queue hizmetimizin barındıracağı mesaj içeriğinin katmanını yazabiliriz.

static List<BrokeredMessage> GenerateMessages(DataTable issues)
        {
            // Instantiate the brokered list object
            var result = new List<BrokeredMessage>();

            // Iterate through the table and create a brokered message for each row
            foreach (DataRow item in issues.Rows)
            {
                var message = new BrokeredMessage();
                foreach (DataColumn property in issues.Columns)
                {
                    message.Properties.Add(property.ColumnName, item[property]);
                }
                result.Add(message);
            }
            return result;
        }

Mesaj oluşturmak için gerekli yapıyı sağladıktan sonra queue katmanını oluşturarak azure hesabımız ile iletişimini saplamamız ve data.csv dosyası içerisinde parese edilmiş dataların kuyruk halinde brokeredmessage olarak Service Bus tarafına aktraımını sağlmamaız gerekmekte bunun için öncelikle Credential oluşturarak başlıyoruz. Hesabımız tarafından bize sunulan credential key sağlanıyor ve bu değerler bizim mesaj listemizin azure tarafına aktarımı için kullanılıcaktır.

Capture

static void Queue()
        {

// Create management credentials

            TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider("owner", "CREDENTIAL KEY");

// Create namespace client

            NamespaceManager namespaceClient = new NamespaceManager(ServiceBusEnvironment.CreateServiceUri("sb", "servicebusFeed1", string.Empty), credentials);

            namespaceClient.CreateQueue("IssueTrackingQueue");

            MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri("sb", "servicebusFeed1", string.Empty), credentials);

            QueueClient myQueueClient = factory.CreateQueueClient("IssueTrackingQueue");

            // Send messages
            Console.WriteLine("Now sending messages to the Queue.");

            for (int count = 0; count < MessageList.Count; count++)
            {
                var issue = MessageList[count];

                issue.Label = issue.Properties["IssueTitle"].ToString();
                var CustomerID=issue.Properties["CustomerID"].ToString();
                var SupportPackage = issue.Properties["SupportPackage"].ToString();
                var Priority = issue.Properties["Priority"].ToString();
                var Severity=issue.Properties["Severity"].ToString();
                var Resolved = issue.Properties["Resolved"].ToString();

                myQueueClient.Send(issue);
                Console.WriteLine(string.Format("Message sent: {0}, {1},{2},{3},{4},{5},{6}","Issue Title : "+ issue.Label, "Customer ID : "+ CustomerID, SupportPackage, Priority, Severity, Resolved, issue.MessageId));
            }

            Console.WriteLine("Now receiving messages from Queue.");

            BrokeredMessage message;
            while ((message = myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 0, seconds: 5))) != null)
            {
                Console.WriteLine(string.Format("Message received: {0}, {1}, {2}, {3}", message.SequenceNumber, message.Label, message.MessageId,message.ContentType));
                message.Complete();

                Console.WriteLine("Processing message (sleeping...)");
                Thread.Sleep(1000);
            }

            factory.Close();
            myQueueClient.Close();
            namespaceClient.DeleteQueue("IssueTrackingQueue");
        }

Son olarak Main bloğu üzerinde oluşturduğumuz metodları çağırarak programı çalıştırabiliriz…

private static DataTable issues;
        private static List<BrokeredMessage> MessageList;

        static void Main(string[] args)
        {
            issues = ParseCSVFile();
            MessageList = GenerateMessages(issues);
            Queue();
        }

Capture


Windows Azure Service Bus / Queue” için bir yanıt

Bir Cevap Yazın

Aşağıya bilgilerinizi girin veya oturum açmak için bir simgeye tıklayın:

WordPress.com Logosu

WordPress.com hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Facebook fotoğrafı

Facebook hesabınızı kullanarak yorum yapıyorsunuz. Çıkış  Yap /  Değiştir )

Connecting to %s