0%

简单实现一个SSH连接池

使用 ObjectPool + Microsoft.Extensions.Caching.Memory 通过缓存SshClient来实现简单的SSH连接池

目标

按照 user@ip:port 作为端点key, 缓存最大并发数(如10)个连接,当并发数降低或长时间没有使用时,逐渐关闭链接。

实现

准备

先定义一个SshTargetKey : user@ip:port , 用来描述一个ssh端点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class SshTargetKey
{
public SshTargetKey(string username, string password, IPEndPoint endpoint)
{
Username = username;
Password = password;
Endpoint = endpoint;
}

public string Username { get; set; }
public string Password { get; set; }
public IPEndPoint Endpoint { get; set; }

public override string ToString()
{
return $"{nameof(SshTargetKey)}::{Username}@{Endpoint}";
}
}

然后是对SshClient的包装,由于SshClient需要的连接信息需要运行时才可以知道,所以需要通过Init方法初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SshClientWrapper : IDisposable
{
private readonly object _syncRoot = new();
public SshClient SshClient { get; private set; }

public bool IsOk => SshClient?.IsConnected == true;

public void Init(SshTargetKey cacheKey, TimeSpan? keepAliveInterval = null)
{
lock (_syncRoot)
{
SshClient ??= new SshClient(cacheKey.Endpoint.Address.ToString(), cacheKey.Endpoint.Port, cacheKey.Username, cacheKey.Password)
{
KeepAliveInterval = keepAliveInterval ?? TimeSpan.FromSeconds(30)
};
if (!SshClient.IsConnected)
{
SshClient.Connect();
}
}
}

public void Dispose()
{
SshClient?.Dispose();
}
}

封装好 SshTargetKey 和 SshClientWrapper 后,我们需要一个SshPooledPolicy来描述我们的SshPool在ObjectPool中的Create、Return等逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SshPooledPolicy : PooledObjectPolicy<SshClientWrapper>
{
public override SshClientWrapper Create()
{
return new SshClientWrapper();
}

public override bool Return(SshClientWrapper obj)
{
return obj.IsOk;
}
}

SshPooledPolicy 中的Return用来确定当代码中使用完成后,如果ssh没有初始化或者ssh链接断开了,则不会将其放回到pool中

使用

首先在ServiceCollection中注册MemoryCache和ObjectPool

1
2
services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddMemoryCache();

获取sshPool (假设注入的MemoryCache为_memCache)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static readonly object SyncRoot = new();
private ObjectPool<SshClientWrapper> GetSshPool(SshTargetKey sshCacheKey)
{
// ReSharper disable once InconsistentlySynchronizedField
if (!_memCache.TryGetValue<ObjectPool<SshClientWrapper>>(sshCacheKey.ToString(), out var sshPool))
{
lock (SyncRoot)
{
if (!_memCache.TryGetValue(sshCacheKey.ToString(), out sshPool))
{
sshPool = _poolProvider.Create(new SshPooledPolicy());//初始化Pool
var entryOptions = new MemoryCacheEntryOptions
{
SlidingExpiration = TimeSpan.FromSeconds(120)//120s没有被使用则标记为过期
};
entryOptions.RegisterPostEvictionCallback((_, value, _, _) => //注册 PostEvictionCallback,实现IDisposable的清理
{
if (value is IDisposable disposable)
disposable.Dispose();
});
_memCache.Set(sshCacheKey.ToString(), sshPool, entryOptions);
}
}
}
return sshPool;
}

使用sshPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var sshPool = GetSshPool(sshCacheKey);
var sshWrapper = sshPool.Get();//获取wrapper
try
{
if (!sshWrapper.IsOk)
{
sshWrapper.Init(sshCacheKey);
}

var ssh = sshWrapper.SshClient;
if (!ssh.IsConnected)
{
ssh.Connect();
}

ssh.RunCommand(cmd);// ...
}
finally
{
sshPool.Return(sshWrapper);
}

由于MemoryCache本身对过期key的处理是lazy的,所以我们需要一个HostedService来定期的触发MemoryCache的清理行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (_memCache is MemoryCache mc)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
mc.Compact(0); //仅触发清理行为
}
catch (Exception e)
//
}
finally
{
await Task.Delay(_options.Value.CacheCompactInterval, stoppingToken);
}
}
}
}