Dalam artikel terakhir saya, saya berbicara tentang manfaat menerapkan pemrosesan asinkron menggunakan Service Broker di SQL Server dibandingkan metode lain yang ada untuk pemrosesan tugas-tugas panjang yang dipisahkan. Dalam artikel ini, kita akan membahas semua komponen yang perlu dikonfigurasi untuk konfigurasi dasar Service Broker dalam satu database, dan pertimbangan penting untuk manajemen percakapan antara layanan broker. Untuk memulai, kita perlu membuat database dan mengaktifkan database untuk penggunaan Service Broker:
CREATE DATABASE AsyncProcessingDemo; GO IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0 BEGIN ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER; END GO USE AsyncProcessingDemo; GO
Mengonfigurasi komponen broker
Objek dasar yang perlu dibuat dalam database adalah jenis pesan untuk pesan, kontrak yang mendefinisikan bagaimana pesan akan dikirim antara layanan, antrian dan layanan inisiator, dan antrian dan layanan target. Banyak contoh online untuk broker layanan menunjukkan penamaan objek yang kompleks untuk jenis pesan, kontrak, dan layanan untuk Pialang Layanan. Namun, tidak ada persyaratan untuk nama yang rumit, dan nama objek sederhana dapat digunakan untuk objek apa pun.
Untuk pesan, kita perlu membuat jenis pesan untuk permintaan, yang akan disebut AsyncRequest , dan jenis pesan untuk hasilnya, yang akan disebut AsyncResult . Keduanya akan menggunakan XML yang akan divalidasi sebagaimana dibentuk dengan benar oleh layanan broker untuk mengirim dan menerima data yang diperlukan oleh layanan.
-- Create the message types CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [AsyncResult] VALIDATION = WELL_FORMED_XML;
Kontrak menetapkan bahwa AsyncRequest akan dikirim oleh layanan yang memulai ke layanan target dan bahwa layanan target akan mengembalikan AsyncResult pesan kembali ke layanan awal. Kontrak juga dapat menentukan beberapa jenis pesan untuk inisiator dan target, atau bahwa jenis pesan tertentu dapat dikirim oleh layanan apa pun, jika pemrosesan tertentu memerlukannya.
-- Create the contract CREATE CONTRACT [AsyncContract] ( [AsyncRequest] SENT BY INITIATOR, [AsyncResult] SENT BY TARGET );
Untuk setiap layanan, antrian harus dibuat untuk menyediakan penyimpanan pesan yang diterima oleh layanan. Layanan target tempat permintaan akan dikirim harus dibuat dengan menetapkan AsyncContract untuk memungkinkan pesan dikirim ke layanan. Dalam hal ini layanan tersebut bernama ProcessingService dan akan dibuat di ProcessingQueue dalam database. Layanan inisiasi tidak memerlukan kontrak untuk ditentukan, yang membuatnya hanya dapat menerima pesan sebagai tanggapan atas percakapan yang dimulai darinya.
-- Create the processing queue and service - specify the contract to allow sending to the service CREATE QUEUE ProcessingQueue; CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]); -- Create the request queue and service CREATE QUEUE RequestQueue; CREATE SERVICE [RequestService] ON QUEUE RequestQueue;
Mengirim Pesan untuk Diproses
Seperti yang saya jelaskan di artikel sebelumnya, saya lebih suka menerapkan prosedur tersimpan pembungkus untuk mengirim pesan baru ke layanan broker, sehingga dapat dimodifikasi sekali untuk mengukur kinerja jika diperlukan. Prosedur ini adalah pembungkus sederhana untuk membuat percakapan baru dan mengirim pesan ke ProcessingService .
-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage
@FromService SYSNAME,
@ToService SYSNAME,
@Contract SYSNAME,
@MessageType SYSNAME,
@MessageBody XML
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
BEGIN TRANSACTION;
BEGIN DIALOG CONVERSATION @conversation_handle
FROM SERVICE @FromService
TO SERVICE @ToService
ON CONTRACT @Contract
WITH ENCRYPTION = OFF;
SEND ON CONVERSATION @conversation_handle
MESSAGE TYPE @MessageType(@MessageBody);
COMMIT TRANSACTION;
END
GO
Dengan menggunakan prosedur tersimpan pembungkus, sekarang kita dapat mengirim pesan pengujian ke ProcessingService untuk memvalidasi bahwa kami telah menyiapkan layanan broker dengan benar.
-- Send a request EXECUTE dbo.SendBrokerMessage @FromService = N'RequestService', @ToService = N'ProcessingService', @Contract = N'AsyncContract', @MessageType = N'AsyncRequest', @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>'; -- Check for message on processing queue SELECT CAST(message_body AS XML) FROM ProcessingQueue; GO
Memproses Pesan
Meskipun kami dapat memproses pesan secara manual dari ProcessingQueue , kita mungkin ingin pesan diproses secara otomatis saat dikirim ke ProcessingService . Untuk melakukan ini, prosedur tersimpan aktivasi perlu dibuat yang akan kami uji dan kemudian mengikat ke antrian untuk mengotomatisasi pemrosesan pada aktivasi antrian. Untuk memproses pesan, kita perlu RECEIVE pesan dari antrian dalam transaksi, bersama dengan jenis pesan dan pegangan percakapan untuk pesan tersebut. Jenis pesan memastikan bahwa logika yang sesuai diterapkan pada pesan yang sedang diproses, dan pegangan percakapan memungkinkan respons untuk dikirim kembali ke layanan awal saat pesan telah diproses.
RECEIVE perintah memungkinkan satu pesan atau beberapa pesan dalam pegangan percakapan atau grup yang sama untuk diproses dalam satu transaksi. Untuk memproses beberapa pesan, variabel tabel harus digunakan, atau untuk melakukan pemrosesan pesan tunggal, variabel lokal dapat digunakan. Prosedur aktivasi di bawah ini mengambil satu pesan dari antrian, memeriksa jenis pesan untuk menentukan apakah itu AsyncRequest pesan, dan kemudian melakukan proses yang berjalan lama berdasarkan informasi pesan yang diterima. Jika tidak menerima pesan dalam loop, itu akan menunggu hingga 5000ms, atau 5 detik, untuk pesan lain masuk antrian sebelum keluar dari loop dan menghentikan eksekusinya. Setelah memproses pesan, itu membangun AsyncResult pesan dan mengirimkannya kembali ke pemrakarsa pada pegangan percakapan yang sama dari mana pesan itu diterima. Prosedur ini juga memeriksa jenis pesan untuk menentukan apakah EndDialog atau Error pesan telah diterima untuk membersihkan percakapan dengan mengakhirinya.
-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_body XML;
DECLARE @message_type_name sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1)
@conversation_handle = conversation_handle,
@message_body = CAST(message_body AS XML),
@message_type_name = message_type_name
FROM ProcessingQueue
), TIMEOUT 5000;
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END
IF @message_type_name = N'AsyncRequest'
BEGIN
-- Handle complex long processing here
-- For demonstration we'll pull the account number and send a reply back only
DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
-- Build reply message and send back
DECLARE @reply_message_body XML = N'
' + CAST(@AccountNumber AS NVARCHAR(11)) + '
';
SEND ON CONVERSATION @conversation_handle
MESSAGE TYPE [AsyncResult] (@reply_message_body);
END
-- If end dialog message, end the dialog
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversation_handle;
END
-- If error message, log and end conversation
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
-- Log the error code and perform any required handling here
-- End the conversation for the error
END CONVERSATION @conversation_handle;
END
COMMIT TRANSACTION;
END
END
GO
RequestQueue juga perlu memproses pesan yang dikirim ke sana, jadi prosedur tambahan untuk memproses AsyncResult pesan yang dikembalikan oleh prosedur ProcessingQueueActivation perlu dibuat. Karena kita tahu bahwa pesan AsnycResult berarti bahwa semua pekerjaan pemrosesan telah selesai, percakapan dapat diakhiri setelah kita memproses pesan itu, yang akan mengirim pesan EndDialog ke ProcessingService, yang kemudian akan diproses oleh prosedur aktivasinya untuk mengakhiri percakapan membersihkan semuanya dan menghindari api dan melupakan masalah yang terjadi ketika percakapan diakhiri dengan benar.
-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation_handle UNIQUEIDENTIFIER;
DECLARE @message_body XML;
DECLARE @message_type_name sysname;
WHILE (1=1)
BEGIN
BEGIN TRANSACTION;
WAITFOR
(
RECEIVE TOP (1)
@conversation_handle = conversation_handle,
@message_body = CAST(message_body AS XML),
@message_type_name = message_type_name
FROM RequestQueue
), TIMEOUT 5000;
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END
IF @message_type_name = N'AsyncResult'
BEGIN
-- If necessary handle the reply message here
DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
-- Since this is all the work being done, end the conversation to send the EndDialog message
END CONVERSATION @conversation_handle;
END
-- If end dialog message, end the dialog
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
END CONVERSATION @conversation_handle;
END
-- If error message, log and end conversation
ELSE IF @message_type_name = N'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
END CONVERSATION @conversation_handle;
END
COMMIT TRANSACTION;
END
END
GO Menguji Prosedur
Sebelum mengotomatiskan pemrosesan antrean untuk layanan kami, penting untuk menguji prosedur aktivasi untuk memastikan bahwa prosedur tersebut memproses pesan dengan tepat, dan untuk mencegah antrean dinonaktifkan jika terjadi kesalahan yang tidak ditangani dengan benar. Karena sudah ada pesan di ProcessingQueue ProcessingQueueActivation prosedur dapat dieksekusi untuk memproses pesan itu. Perlu diingat bahwa WAITFOR akan menyebabkan prosedur membutuhkan waktu 5 detik untuk dihentikan, meskipun pesan langsung diproses dari antrian. Setelah memproses pesan, kami dapat memverifikasi prosedur bekerja dengan benar dengan menanyakan RequestQueue untuk melihat apakah AsyncResult pesan ada, dan kemudian kami dapat memverifikasi bahwa RequestQueueActivation prosedur berfungsi dengan benar dengan menjalankannya.
-- Process the message from the processing queue EXECUTE dbo.ProcessingQueueActivation; GO -- Check for reply message on request queue SELECT CAST(message_body AS XML) FROM RequestQueue; GO -- Process the message from the request queue EXECUTE dbo.RequestQueueActivation; GO
Mengotomatiskan Pemrosesan
Pada titik ini, semua komponen telah selesai untuk sepenuhnya mengotomatisasi pemrosesan kami. Satu-satunya yang tersisa adalah mengikat prosedur aktivasi ke antrean yang sesuai, lalu mengirim pesan pengujian lain untuk memvalidasinya agar diproses dan tidak ada yang tersisa di antrean setelahnya.
-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
WITH ACTIVATION
(
STATUS = ON,
PROCEDURE_NAME = dbo.ProcessingQueueActivation,
MAX_QUEUE_READERS = 10,
EXECUTE AS SELF
);
GO
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
WITH ACTIVATION
(
STATUS = ON,
PROCEDURE_NAME = dbo.RequestQueueActivation,
MAX_QUEUE_READERS = 10,
EXECUTE AS SELF
);
GO
-- Test automated activation
-- Send a request
EXECUTE dbo.SendBrokerMessage
@FromService = N'RequestService',
@ToService = N'ProcessingService',
@Contract = N'AsyncContract',
@MessageType = N'AsyncRequest',
@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
-- Check for message on processing queue
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
-- Check for reply message on request queue
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO Ringkasan
Komponen dasar untuk pemrosesan asinkron otomatis di SQL Server Service Broker dapat dikonfigurasi dalam satu pengaturan database untuk memungkinkan pemrosesan terpisah dari tugas yang berjalan lama. Ini bisa menjadi alat yang ampuh untuk meningkatkan kinerja aplikasi, dari pengalaman pengguna akhir, dengan memisahkan pemrosesan dari interaksi pengguna akhir dengan aplikasi.