前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請求進行排隊處理,避免線程池的不足.
我們日常開發中可能常做的給某web服務器配置連接數以及,請求隊列大小,那么今天我們看看如何在通過中間件形式實現一個并發量以及隊列長度限制.
Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
Copy public void ConfigureServices(IServiceCollection services)
{
services.AddQueuePolicy(options =>
{
//最大并發請求數
options.MaxConcurrentRequests = 2;
//請求隊列長度限制
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
//添加并發限制中間件
app.UseConcurrencyLimiter();
app.Run(async context =>
{
Task.Delay(100).Wait(); // 100ms sync-over-async
await context.Response.WriteAsync("Hello World!");
});
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發量限制,以及隊列的長度;那么問題來了,他是怎么實現的呢?
Copy public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Actionconfigure){ services.Configure(configure); services.AddSingleton ();
return services;
}
QueuePolicy采用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持并發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。
Copy ////// 構造方法(初始化Queue策略) /// /// public QueuePolicy(IOptionsoptions) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0)
{
throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
}
//使用SemaphoreSlim來限制任務最大個數
_serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
}
ConcurrencyLimiterMiddleware中間件
Copy ////// Invokes the logic of the middleware. /// /// The. /// A that completes when the request leaves.
public async Task Invoke(HttpContext context)
{
var waitInQueueTask = _queuePolicy.TryEnterAsync();
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
bool result;
if (waitInQueueTask.IsCompleted)
{
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else
{
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
每次當我們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法后先開啟一個私有lock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+最大并發請求數),如果當前數量超出了,那么我直接拋出,送你個503狀態;
Copy if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
問題來了,我這邊如果說還沒到你設置的大小呢,我這個請求沒有給你服務器造不成壓力,那么你給我處理一下吧.
await _serverSemaphore.WaitAsync();異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
Copy lock (_totalRequestsLock)
{
if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
{
return false;
}
TotalRequests++;
}
//異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
await _serverSemaphore.WaitAsync();
return true;
}
返回成功后那么中間件這邊再進行處理,_queuePolicy.OnExit();通過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減
Stack策略
再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.
Copy public void ConfigureServices(IServiceCollection services)
{
services.AddStackPolicy(options =>
{
//最大并發請求數
options.MaxConcurrentRequests = 2;
//請求隊列長度限制
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
通過上面的配置,我們便可以對我們的應用程序執行出相應的策略.下面再來看看他是怎么實現的呢
Copy public static IServiceCollection AddStackPolicy(this IServiceCollection services, Actionconfigure) { services.Configure(configure); services.AddSingleton ();
return services;
}
可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法
Copy ////// 構造方法(初始化參數) /// /// public StackPolicy(IOptionsoptions) { //棧分配 _buffer = new List ();
//隊列大小
_maxQueueCapacity = options.Value.RequestQueueLimit;
//最大并發請求數
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
//剩余可用空間
_freeServerSpots = options.Value.MaxConcurrentRequests;
}
當我們通過中間件請求調用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執行下一步,如果當前隊列=我們設置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;
Copy public ValueTaskTryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count)
{
_buffer[_head] = tcs;
}
else
{
_buffer.Add(tcs);
}
_queueLength++;
// increment _head for next time
_head++;
if (_head == _maxQueueCapacity)
{
_head = 0;
}
return tcs.GetValueTask();
}
}
當我們請求后調用_queuePolicy.OnExit();出棧,再將請求長度遞減
Copy public void OnExit()
{
lock (_bufferLock)
{
if (_queueLength == 0)
{
_freeServerSpots++;
if (_freeServerSpots > _maxConcurrentRequests)
{
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}
return;
}
// step backwards and launch a new task
if (_head == 0)
{
_head = _maxQueueCapacity - 1;
}
else
{
_head--;
}
//退出,出棧
_buffer[_head].Complete(true);
_queueLength--;
}
}
總結
基于棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:
- 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
- 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);
隊列存儲結構的實現有以下兩種方式:
- 順序隊列:在順序表的基礎上實現的隊列結構;
- 鏈隊列:在鏈表的基礎上實現的隊列結構;
本文為企業推廣,本網站不做任何建議,僅提供參考,作為信息展示!
推薦閱讀:手機充電寶品牌排行
網友評論
請登錄后進行評論|
0條評論
請文明發言,還可以輸入140字
您的評論已經發表成功,請等候審核
小提示:您要為您發表的言論后果負責,請各位遵守法紀注意語言文明