Dalam artikel terakhir saya, saya berbicara tentang manfaat menerapkan pemrosesan asinkron menggunakan Service Broker di SQL Server dibandingkan metode lain yang ada untuk pemrosesan tugas-tugas panjang yang dipisahkan. Dalam artikel ini, kita akan membahas semua komponen yang perlu dikonfigurasi untuk konfigurasi dasar Service Broker dalam satu database, dan pertimbangan penting untuk manajemen percakapan antara layanan broker. Untuk memulai, kita perlu membuat database dan mengaktifkan database untuk penggunaan Service Broker:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Mengonfigurasi komponen broker
Objek dasar yang perlu dibuat dalam database adalah jenis pesan untuk pesan, kontrak yang mendefinisikan bagaimana pesan akan dikirim antara layanan, antrian dan layanan inisiator, dan antrian dan layanan target. Banyak contoh online untuk broker layanan menunjukkan penamaan objek yang kompleks untuk jenis pesan, kontrak, dan layanan untuk Pialang Layanan. Namun, tidak ada persyaratan untuk nama yang rumit, dan nama objek sederhana dapat digunakan untuk objek apa pun.
Untuk pesan, kita perlu membuat jenis pesan untuk permintaan, yang akan disebut AsyncRequest
, dan jenis pesan untuk hasilnya, yang akan disebut AsyncResult
. Keduanya akan menggunakan XML yang akan divalidasi sebagaimana dibentuk dengan benar oleh layanan broker untuk mengirim dan menerima data yang diperlukan oleh layanan.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Kontrak menetapkan bahwa AsyncRequest
akan dikirim oleh layanan yang memulai ke layanan target dan bahwa layanan target akan mengembalikan AsyncResult
pesan kembali ke layanan awal. Kontrak juga dapat menentukan beberapa jenis pesan untuk inisiator dan target, atau bahwa jenis pesan tertentu dapat dikirim oleh layanan apa pun, jika pemrosesan tertentu memerlukannya.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Untuk setiap layanan, antrian harus dibuat untuk menyediakan penyimpanan pesan yang diterima oleh layanan. Layanan target tempat permintaan akan dikirim harus dibuat dengan menetapkan AsyncContract
untuk memungkinkan pesan dikirim ke layanan. Dalam hal ini layanan tersebut bernama ProcessingService
dan akan dibuat di ProcessingQueue
dalam database. Layanan inisiasi tidak memerlukan kontrak untuk ditentukan, yang membuatnya hanya dapat menerima pesan sebagai tanggapan atas percakapan yang dimulai darinya.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Mengirim Pesan untuk Diproses
Seperti yang saya jelaskan di artikel sebelumnya, saya lebih suka menerapkan prosedur tersimpan pembungkus untuk mengirim pesan baru ke layanan broker, sehingga dapat dimodifikasi sekali untuk mengukur kinerja jika diperlukan. Prosedur ini adalah pembungkus sederhana untuk membuat percakapan baru dan mengirim pesan ke ProcessingService
.
-- Create the wrapper procedure for sending messages CREATE PROCEDURE dbo.SendBrokerMessage @FromService SYSNAME, @ToService SYSNAME, @Contract SYSNAME, @MessageType SYSNAME, @MessageBody XML AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; BEGIN TRANSACTION; BEGIN DIALOG CONVERSATION @conversation_handle FROM SERVICE @FromService TO SERVICE @ToService ON CONTRACT @Contract WITH ENCRYPTION = OFF; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE @MessageType(@MessageBody); COMMIT TRANSACTION; END GO
Dengan menggunakan prosedur tersimpan pembungkus, sekarang kita dapat mengirim pesan pengujian ke ProcessingService
untuk memvalidasi bahwa kami telah menyiapkan layanan broker dengan benar.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Memproses Pesan
Meskipun kami dapat memproses pesan secara manual dari ProcessingQueue
, kita mungkin ingin pesan diproses secara otomatis saat dikirim ke ProcessingService
. Untuk melakukan ini, prosedur tersimpan aktivasi perlu dibuat yang akan kami uji dan kemudian mengikat ke antrian untuk mengotomatisasi pemrosesan pada aktivasi antrian. Untuk memproses pesan, kita perlu RECEIVE
pesan dari antrian dalam transaksi, bersama dengan jenis pesan dan pegangan percakapan untuk pesan tersebut. Jenis pesan memastikan bahwa logika yang sesuai diterapkan pada pesan yang sedang diproses, dan pegangan percakapan memungkinkan respons untuk dikirim kembali ke layanan awal saat pesan telah diproses.
RECEIVE
perintah memungkinkan satu pesan atau beberapa pesan dalam pegangan percakapan atau grup yang sama untuk diproses dalam satu transaksi. Untuk memproses beberapa pesan, variabel tabel harus digunakan, atau untuk melakukan pemrosesan pesan tunggal, variabel lokal dapat digunakan. Prosedur aktivasi di bawah ini mengambil satu pesan dari antrian, memeriksa jenis pesan untuk menentukan apakah itu AsyncRequest
pesan, dan kemudian melakukan proses yang berjalan lama berdasarkan informasi pesan yang diterima. Jika tidak menerima pesan dalam loop, itu akan menunggu hingga 5000ms, atau 5 detik, untuk pesan lain masuk antrian sebelum keluar dari loop dan menghentikan eksekusinya. Setelah memproses pesan, itu membangun AsyncResult
pesan dan mengirimkannya kembali ke pemrakarsa pada pegangan percakapan yang sama dari mana pesan itu diterima. Prosedur ini juga memeriksa jenis pesan untuk menentukan apakah EndDialog
atau Error
pesan telah diterima untuk membersihkan percakapan dengan mengakhirinya.
-- Create processing procedure for processing queue CREATE PROCEDURE dbo.ProcessingQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM ProcessingQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncRequest' BEGIN -- Handle complex long processing here -- For demonstration we'll pull the account number and send a reply back only DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT'); -- Build reply message and send back DECLARE @reply_message_body XML = N' ' + CAST(@AccountNumber AS NVARCHAR(11)) + ' '; SEND ON CONVERSATION @conversation_handle MESSAGE TYPE [AsyncResult] (@reply_message_body); END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN -- Log the error code and perform any required handling here -- End the conversation for the error END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
RequestQueue
juga perlu memproses pesan yang dikirim ke sana, jadi prosedur tambahan untuk memproses AsyncResult
pesan yang dikembalikan oleh prosedur ProcessingQueueActivation perlu dibuat. Karena kita tahu bahwa pesan AsnycResult berarti bahwa semua pekerjaan pemrosesan telah selesai, percakapan dapat diakhiri setelah kita memproses pesan itu, yang akan mengirim pesan EndDialog ke ProcessingService, yang kemudian akan diproses oleh prosedur aktivasinya untuk mengakhiri percakapan membersihkan semuanya dan menghindari api dan melupakan masalah yang terjadi ketika percakapan diakhiri dengan benar.
-- Create procedure for processing replies to the request queue CREATE PROCEDURE dbo.RequestQueueActivation AS BEGIN SET NOCOUNT ON; DECLARE @conversation_handle UNIQUEIDENTIFIER; DECLARE @message_body XML; DECLARE @message_type_name sysname; WHILE (1=1) BEGIN BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP (1) @conversation_handle = conversation_handle, @message_body = CAST(message_body AS XML), @message_type_name = message_type_name FROM RequestQueue ), TIMEOUT 5000; IF (@@ROWCOUNT = 0) BEGIN ROLLBACK TRANSACTION; BREAK; END IF @message_type_name = N'AsyncResult' BEGIN -- If necessary handle the reply message here DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT'); -- Since this is all the work being done, end the conversation to send the EndDialog message END CONVERSATION @conversation_handle; END -- If end dialog message, end the dialog ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @conversation_handle; END -- If error message, log and end conversation ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error' BEGIN END CONVERSATION @conversation_handle; END COMMIT TRANSACTION; END END GO
Menguji Prosedur
Sebelum mengotomatiskan pemrosesan antrean untuk layanan kami, penting untuk menguji prosedur aktivasi untuk memastikan bahwa prosedur tersebut memproses pesan dengan tepat, dan untuk mencegah antrean dinonaktifkan jika terjadi kesalahan yang tidak ditangani dengan benar. Karena sudah ada pesan di ProcessingQueue
ProcessingQueueActivation
prosedur dapat dieksekusi untuk memproses pesan itu. Perlu diingat bahwa WAITFOR
akan menyebabkan prosedur membutuhkan waktu 5 detik untuk dihentikan, meskipun pesan langsung diproses dari antrian. Setelah memproses pesan, kami dapat memverifikasi prosedur bekerja dengan benar dengan menanyakan RequestQueue
untuk melihat apakah AsyncResult
pesan ada, dan kemudian kami dapat memverifikasi bahwa RequestQueueActivation
prosedur berfungsi dengan benar dengan menjalankannya.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Mengotomatiskan Pemrosesan
Pada titik ini, semua komponen telah selesai untuk sepenuhnya mengotomatisasi pemrosesan kami. Satu-satunya yang tersisa adalah mengikat prosedur aktivasi ke antrean yang sesuai, lalu mengirim pesan pengujian lain untuk memvalidasinya agar diproses dan tidak ada yang tersisa di antrean setelahnya.
-- Alter the processing queue to specify internal activation ALTER QUEUE ProcessingQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.ProcessingQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Alter the request queue to specify internal activation ALTER QUEUE RequestQueue WITH ACTIVATION ( STATUS = ON, PROCEDURE_NAME = dbo.RequestQueueActivation, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ); GO -- Test automated activation -- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO -- Check for reply message on request queue -- nothing is there because it was automatically processed SELECT CAST(message_body AS XML) FROM RequestQueue; GO
Ringkasan
Komponen dasar untuk pemrosesan asinkron otomatis di SQL Server Service Broker dapat dikonfigurasi dalam satu pengaturan database untuk memungkinkan pemrosesan terpisah dari tugas yang berjalan lama. Ini bisa menjadi alat yang ampuh untuk meningkatkan kinerja aplikasi, dari pengalaman pengguna akhir, dengan memisahkan pemrosesan dari interaksi pengguna akhir dengan aplikasi.