Database
 sql >> Teknologi Basis Data >  >> RDS >> Database

Mengonfigurasi Pialang Layanan untuk Pemrosesan Asinkron

Dalam artikel terakhir saya, saya berbicara tentang manfaat menerapkan pemrosesan asinkron menggunakan Service Broker di SQL Server dibandingkan metode lain yang ada untuk pemrosesan tugas-tugas panjang yang dipisahkan. Dalam artikel ini, kita akan membahas semua komponen yang perlu dikonfigurasi untuk konfigurasi dasar Service Broker dalam satu database, dan pertimbangan penting untuk manajemen percakapan antara layanan broker. Untuk memulai, kita perlu membuat database dan mengaktifkan database untuk penggunaan Service Broker:

CREATE DATABASE AsyncProcessingDemo;
GO
 
IF (SELECT is_broker_enabled FROM sys.databases WHERE name = N'AsyncProcessingDemo') = 0
BEGIN
  ALTER DATABASE AsyncProcessingDemo SET ENABLE_BROKER;
END
GO
 
USE AsyncProcessingDemo;
GO

Mengonfigurasi komponen broker

Objek dasar yang perlu dibuat dalam database adalah jenis pesan untuk pesan, kontrak yang mendefinisikan bagaimana pesan akan dikirim antara layanan, antrian dan layanan inisiator, dan antrian dan layanan target. Banyak contoh online untuk broker layanan menunjukkan penamaan objek yang kompleks untuk jenis pesan, kontrak, dan layanan untuk Pialang Layanan. Namun, tidak ada persyaratan untuk nama yang rumit, dan nama objek sederhana dapat digunakan untuk objek apa pun.

Untuk pesan, kita perlu membuat jenis pesan untuk permintaan, yang akan disebut AsyncRequest , dan jenis pesan untuk hasilnya, yang akan disebut AsyncResult . Keduanya akan menggunakan XML yang akan divalidasi sebagaimana dibentuk dengan benar oleh layanan broker untuk mengirim dan menerima data yang diperlukan oleh layanan.

-- Create the message types
CREATE MESSAGE TYPE [AsyncRequest] VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE [AsyncResult]  VALIDATION = WELL_FORMED_XML;

Kontrak menetapkan bahwa AsyncRequest akan dikirim oleh layanan yang memulai ke layanan target dan bahwa layanan target akan mengembalikan AsyncResult pesan kembali ke layanan awal. Kontrak juga dapat menentukan beberapa jenis pesan untuk inisiator dan target, atau bahwa jenis pesan tertentu dapat dikirim oleh layanan apa pun, jika pemrosesan tertentu memerlukannya.

-- Create the contract
CREATE CONTRACT [AsyncContract] 
(
  [AsyncRequest] SENT BY INITIATOR, 
  [AsyncResult]  SENT BY TARGET
);

Untuk setiap layanan, antrian harus dibuat untuk menyediakan penyimpanan pesan yang diterima oleh layanan. Layanan target tempat permintaan akan dikirim harus dibuat dengan menetapkan AsyncContract untuk memungkinkan pesan dikirim ke layanan. Dalam hal ini layanan tersebut bernama ProcessingService dan akan dibuat di ProcessingQueue dalam database. Layanan inisiasi tidak memerlukan kontrak untuk ditentukan, yang membuatnya hanya dapat menerima pesan sebagai tanggapan atas percakapan yang dimulai darinya.

-- Create the processing queue and service - specify the contract to allow sending to the service
CREATE QUEUE ProcessingQueue;
CREATE SERVICE [ProcessingService] ON QUEUE ProcessingQueue ([AsyncContract]);
 
-- Create the request queue and service 
CREATE QUEUE RequestQueue;
CREATE SERVICE [RequestService] ON QUEUE RequestQueue;

Mengirim Pesan untuk Diproses

Seperti yang saya jelaskan di artikel sebelumnya, saya lebih suka menerapkan prosedur tersimpan pembungkus untuk mengirim pesan baru ke layanan broker, sehingga dapat dimodifikasi sekali untuk mengukur kinerja jika diperlukan. Prosedur ini adalah pembungkus sederhana untuk membuat percakapan baru dan mengirim pesan ke ProcessingService .

-- Create the wrapper procedure for sending messages
CREATE PROCEDURE dbo.SendBrokerMessage 
	@FromService SYSNAME,
	@ToService   SYSNAME,
	@Contract    SYSNAME,
	@MessageType SYSNAME,
	@MessageBody XML
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
 
  BEGIN TRANSACTION;
 
  BEGIN DIALOG CONVERSATION @conversation_handle
    FROM SERVICE @FromService
    TO SERVICE @ToService
    ON CONTRACT @Contract
    WITH ENCRYPTION = OFF;
 
  SEND ON CONVERSATION @conversation_handle
    MESSAGE TYPE @MessageType(@MessageBody);
 
  COMMIT TRANSACTION;
END
GO

Dengan menggunakan prosedur tersimpan pembungkus, sekarang kita dapat mengirim pesan pengujian ke ProcessingService untuk memvalidasi bahwa kami telah menyiapkan layanan broker dengan benar.

-- Send a request
EXECUTE dbo.SendBrokerMessage
  @FromService = N'RequestService',
  @ToService   = N'ProcessingService',
  @Contract    = N'AsyncContract',
  @MessageType = N'AsyncRequest',
  @MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO

Memproses Pesan

Meskipun kami dapat memproses pesan secara manual dari ProcessingQueue , kita mungkin ingin pesan diproses secara otomatis saat dikirim ke ProcessingService . Untuk melakukan ini, prosedur tersimpan aktivasi perlu dibuat yang akan kami uji dan kemudian mengikat ke antrian untuk mengotomatisasi pemrosesan pada aktivasi antrian. Untuk memproses pesan, kita perlu RECEIVE pesan dari antrian dalam transaksi, bersama dengan jenis pesan dan pegangan percakapan untuk pesan tersebut. Jenis pesan memastikan bahwa logika yang sesuai diterapkan pada pesan yang sedang diproses, dan pegangan percakapan memungkinkan respons untuk dikirim kembali ke layanan awal saat pesan telah diproses.

RECEIVE perintah memungkinkan satu pesan atau beberapa pesan dalam pegangan percakapan atau grup yang sama untuk diproses dalam satu transaksi. Untuk memproses beberapa pesan, variabel tabel harus digunakan, atau untuk melakukan pemrosesan pesan tunggal, variabel lokal dapat digunakan. Prosedur aktivasi di bawah ini mengambil satu pesan dari antrian, memeriksa jenis pesan untuk menentukan apakah itu AsyncRequest pesan, dan kemudian melakukan proses yang berjalan lama berdasarkan informasi pesan yang diterima. Jika tidak menerima pesan dalam loop, itu akan menunggu hingga 5000ms, atau 5 detik, untuk pesan lain masuk antrian sebelum keluar dari loop dan menghentikan eksekusinya. Setelah memproses pesan, itu membangun AsyncResult pesan dan mengirimkannya kembali ke pemrakarsa pada pegangan percakapan yang sama dari mana pesan itu diterima. Prosedur ini juga memeriksa jenis pesan untuk menentukan apakah EndDialog atau Error pesan telah diterima untuk membersihkan percakapan dengan mengakhirinya.

-- Create processing procedure for processing queue
CREATE PROCEDURE dbo.ProcessingQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM ProcessingQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncRequest'
    BEGIN
      -- Handle complex long processing here
      -- For demonstration we'll pull the account number and send a reply back only
 
      DECLARE @AccountNumber INT = @message_body.value('(AsyncRequest/AccountNumber)[1]', 'INT');
 
      -- Build reply message and send back
      DECLARE @reply_message_body XML = N'
        ' + CAST(@AccountNumber AS NVARCHAR(11)) + '
      ';
 
      SEND ON CONVERSATION @conversation_handle
        MESSAGE TYPE [AsyncResult] (@reply_message_body);
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
      END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
      -- Log the error code and perform any required handling here
      -- End the conversation for the error
      END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

RequestQueue juga perlu memproses pesan yang dikirim ke sana, jadi prosedur tambahan untuk memproses AsyncResult pesan yang dikembalikan oleh prosedur ProcessingQueueActivation perlu dibuat. Karena kita tahu bahwa pesan AsnycResult berarti bahwa semua pekerjaan pemrosesan telah selesai, percakapan dapat diakhiri setelah kita memproses pesan itu, yang akan mengirim pesan EndDialog ke ProcessingService, yang kemudian akan diproses oleh prosedur aktivasinya untuk mengakhiri percakapan membersihkan semuanya dan menghindari api dan melupakan masalah yang terjadi ketika percakapan diakhiri dengan benar.

-- Create procedure for processing replies to the request queue
CREATE PROCEDURE dbo.RequestQueueActivation
AS
BEGIN
  SET NOCOUNT ON;
 
  DECLARE @conversation_handle UNIQUEIDENTIFIER;
  DECLARE @message_body XML;
  DECLARE @message_type_name sysname;
 
  WHILE (1=1)
  BEGIN
    BEGIN TRANSACTION;
 
    WAITFOR
    (
      RECEIVE TOP (1)
        @conversation_handle = conversation_handle,
        @message_body = CAST(message_body AS XML),
        @message_type_name = message_type_name
      FROM RequestQueue
    ), TIMEOUT 5000;
 
    IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
      BREAK;
    END
 
    IF @message_type_name = N'AsyncResult'
    BEGIN
      -- If necessary handle the reply message here
      DECLARE @AccountNumber INT = @message_body.value('(AsyncResult/AccountNumber)[1]', 'INT');
 
      -- Since this is all the work being done, end the conversation to send the EndDialog message
      END CONVERSATION @conversation_handle;
    END
 
    -- If end dialog message, end the dialog
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    -- If error message, log and end conversation
    ELSE IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    BEGIN
       END CONVERSATION @conversation_handle;
    END
 
    COMMIT TRANSACTION;
  END
END
GO

Menguji Prosedur

Sebelum mengotomatiskan pemrosesan antrean untuk layanan kami, penting untuk menguji prosedur aktivasi untuk memastikan bahwa prosedur tersebut memproses pesan dengan tepat, dan untuk mencegah antrean dinonaktifkan jika terjadi kesalahan yang tidak ditangani dengan benar. Karena sudah ada pesan di ProcessingQueue ProcessingQueueActivation prosedur dapat dieksekusi untuk memproses pesan itu. Perlu diingat bahwa WAITFOR akan menyebabkan prosedur membutuhkan waktu 5 detik untuk dihentikan, meskipun pesan langsung diproses dari antrian. Setelah memproses pesan, kami dapat memverifikasi prosedur bekerja dengan benar dengan menanyakan RequestQueue untuk melihat apakah AsyncResult pesan ada, dan kemudian kami dapat memverifikasi bahwa RequestQueueActivation prosedur berfungsi dengan benar dengan menjalankannya.

-- Process the message from the processing queue
EXECUTE dbo.ProcessingQueueActivation;
GO
 
-- Check for reply message on request queue
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO
 
-- Process the message from the request queue
EXECUTE dbo.RequestQueueActivation;
GO

Mengotomatiskan Pemrosesan

Pada titik ini, semua komponen telah selesai untuk sepenuhnya mengotomatisasi pemrosesan kami. Satu-satunya yang tersisa adalah mengikat prosedur aktivasi ke antrean yang sesuai, lalu mengirim pesan pengujian lain untuk memvalidasinya agar diproses dan tidak ada yang tersisa di antrean setelahnya.

-- Alter the processing queue to specify internal activation
ALTER QUEUE ProcessingQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.ProcessingQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Alter the request queue to specify internal activation
ALTER QUEUE RequestQueue
    WITH ACTIVATION
    ( 
      STATUS = ON,
      PROCEDURE_NAME = dbo.RequestQueueActivation,
      MAX_QUEUE_READERS = 10,
      EXECUTE AS SELF
    );
GO
 
-- Test automated activation
-- Send a request
 
EXECUTE dbo.SendBrokerMessage
	@FromService = N'RequestService',
	@ToService   = N'ProcessingService',
	@Contract    = N'AsyncContract',
	@MessageType = N'AsyncRequest',
	@MessageBody = N'<AsyncRequest><AccountNumber>12345</AccountNumber></AsyncRequest>';
 
-- Check for message on processing queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM ProcessingQueue;
GO
 
-- Check for reply message on request queue 
-- nothing is there because it was automatically processed
SELECT CAST(message_body AS XML) FROM RequestQueue;
GO

Ringkasan

Komponen dasar untuk pemrosesan asinkron otomatis di SQL Server Service Broker dapat dikonfigurasi dalam satu pengaturan database untuk memungkinkan pemrosesan terpisah dari tugas yang berjalan lama. Ini bisa menjadi alat yang ampuh untuk meningkatkan kinerja aplikasi, dari pengalaman pengguna akhir, dengan memisahkan pemrosesan dari interaksi pengguna akhir dengan aplikasi.


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Mencegah Serangan Injeksi SQL Dengan Python

  2. Memahami peran arsitek data dalam tata kelola data

  3. Penghentian Salesforce TLS 1.0

  4. Mengaktifkan Otentikasi Dua Faktor untuk ScaleGrid DBaaS

  5. Kasus Penggunaan Sederhana untuk Indeks pada Kunci Utama