使用 ObjectPool + Microsoft.Extensions.Caching.Memory 通过缓存SshClient来实现简单的SSH连接池
目标
按照 user@ip:port 作为端点key, 缓存最大并发数(如10)个连接,当并发数降低或长时间没有使用时,逐渐关闭链接。
实现
准备
先定义一个SshTargetKey : user@ip:port , 用来描述一个ssh端点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class SshTargetKey { public SshTargetKey(string username, string password, IPEndPoint endpoint) { Username = username; Password = password; Endpoint = endpoint; }
public string Username { get; set; } public string Password { get; set; } public IPEndPoint Endpoint { get; set; }
public override string ToString() { return $"{nameof(SshTargetKey)}::{Username}@{Endpoint}"; } }
|
然后是对SshClient的包装,由于SshClient需要的连接信息需要运行时才可以知道,所以需要通过Init
方法初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class SshClientWrapper : IDisposable { private readonly object _syncRoot = new(); public SshClient SshClient { get; private set; }
public bool IsOk => SshClient?.IsConnected == true;
public void Init(SshTargetKey cacheKey, TimeSpan? keepAliveInterval = null) { lock (_syncRoot) { SshClient ??= new SshClient(cacheKey.Endpoint.Address.ToString(), cacheKey.Endpoint.Port, cacheKey.Username, cacheKey.Password) { KeepAliveInterval = keepAliveInterval ?? TimeSpan.FromSeconds(30) }; if (!SshClient.IsConnected) { SshClient.Connect(); } } }
public void Dispose() { SshClient?.Dispose(); } }
|
封装好 SshTargetKey 和 SshClientWrapper 后,我们需要一个SshPooledPolicy来描述我们的SshPool在ObjectPool中的Create、Return等逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class SshPooledPolicy : PooledObjectPolicy<SshClientWrapper> { public override SshClientWrapper Create() { return new SshClientWrapper(); }
public override bool Return(SshClientWrapper obj) { return obj.IsOk; } }
|
SshPooledPolicy
中的Return
用来确定当代码中使用完成后,如果ssh没有初始化或者ssh链接断开了,则不会将其放回到pool中
使用
首先在ServiceCollection中注册MemoryCache和ObjectPool
1 2
| services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>(); services.AddMemoryCache();
|
获取sshPool (假设注入的MemoryCache为_memCache)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| private static readonly object SyncRoot = new(); private ObjectPool<SshClientWrapper> GetSshPool(SshTargetKey sshCacheKey) { if (!_memCache.TryGetValue<ObjectPool<SshClientWrapper>>(sshCacheKey.ToString(), out var sshPool)) { lock (SyncRoot) { if (!_memCache.TryGetValue(sshCacheKey.ToString(), out sshPool)) { sshPool = _poolProvider.Create(new SshPooledPolicy()); var entryOptions = new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(120) }; entryOptions.RegisterPostEvictionCallback((_, value, _, _) => { if (value is IDisposable disposable) disposable.Dispose(); }); _memCache.Set(sshCacheKey.ToString(), sshPool, entryOptions); } } } return sshPool; }
|
使用sshPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| var sshPool = GetSshPool(sshCacheKey); var sshWrapper = sshPool.Get(); try { if (!sshWrapper.IsOk) { sshWrapper.Init(sshCacheKey); }
var ssh = sshWrapper.SshClient; if (!ssh.IsConnected) { ssh.Connect(); }
ssh.RunCommand(cmd); } finally { sshPool.Return(sshWrapper); }
|
由于MemoryCache本身对过期key的处理是lazy的,所以我们需要一个HostedService来定期的触发MemoryCache的清理行为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (_memCache is MemoryCache mc) { while (!stoppingToken.IsCancellationRequested) { try { mc.Compact(0); } catch (Exception e) } finally { await Task.Delay(_options.Value.CacheCompactInterval, stoppingToken); } } } }
|