use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::io::{self, Read};
use std::sync::Mutex;
use crate::agent::AgentState;
use crate::stream::Stream;
use crate::{Agent, Proxy};
use log::debug;
use url::Url;
pub(crate) struct ConnectionPool {
inner: Mutex<Inner>,
max_idle_connections: usize,
max_idle_connections_per_host: usize,
}
struct Inner {
recycle: HashMap<PoolKey, VecDeque<Stream>>,
lru: VecDeque<PoolKey>,
}
impl fmt::Debug for ConnectionPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionPool")
.field("max_idle", &self.max_idle_connections)
.field("max_idle_per_host", &self.max_idle_connections_per_host)
.field("connections", &self.inner.lock().unwrap().lru.len())
.finish()
}
}
fn remove_first_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
match list.iter().position(|x| x == key) {
Some(i) => list.remove(i),
None => None,
}
}
fn remove_last_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
match list.iter().rposition(|x| x == key) {
Some(i) => list.remove(i),
None => None,
}
}
impl ConnectionPool {
pub(crate) fn new_with_limits(
max_idle_connections: usize,
max_idle_connections_per_host: usize,
) -> Self {
ConnectionPool {
inner: Mutex::new(Inner {
recycle: HashMap::new(),
lru: VecDeque::new(),
}),
max_idle_connections,
max_idle_connections_per_host,
}
}
fn noop(&self) -> bool {
self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0
}
pub fn try_get_connection(&self, url: &Url, proxy: Option<Proxy>) -> Option<Stream> {
let key = PoolKey::new(url, proxy);
self.remove(&key)
}
fn remove(&self, key: &PoolKey) -> Option<Stream> {
let mut inner = self.inner.lock().unwrap();
match inner.recycle.entry(key.clone()) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
let stream = streams.pop_back();
let stream = stream.expect("invariant failed: empty VecDeque in `recycle`");
if streams.is_empty() {
occupied_entry.remove();
}
remove_last_match(&mut inner.lru, key)
.expect("invariant failed: key in recycle but not in lru");
debug!("pulling stream from pool: {:?} -> {:?}", key, stream);
Some(stream)
}
Entry::Vacant(_) => None,
}
}
pub(crate) fn add(&self, key: &PoolKey, stream: Stream) {
if self.noop() {
return;
}
debug!("adding stream to pool: {:?} -> {:?}", key, stream);
let mut inner = self.inner.lock().unwrap();
match inner.recycle.entry(key.clone()) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
streams.push_back(stream);
if streams.len() > self.max_idle_connections_per_host {
let stream = streams.pop_front().expect("empty streams list");
debug!(
"host {:?} has {} conns, dropping oldest: {:?}",
key,
streams.len(),
stream
);
remove_first_match(&mut inner.lru, key)
.expect("invariant failed: key in recycle but not in lru");
}
}
Entry::Vacant(vacant_entry) => {
vacant_entry.insert(vec![stream].into());
}
}
inner.lru.push_back(key.clone());
if inner.lru.len() > self.max_idle_connections {
drop(inner);
self.remove_oldest()
}
}
fn remove_oldest(&self) {
assert!(!self.noop(), "remove_oldest called on Pool with max of 0");
let mut inner = self.inner.lock().unwrap();
let key = inner.lru.pop_front();
let key = key.expect("tried to remove oldest but no entries found!");
match inner.recycle.entry(key) {
Entry::Occupied(mut occupied_entry) => {
let streams = occupied_entry.get_mut();
let stream = streams
.pop_front()
.expect("invariant failed: key existed in recycle but no streams available");
debug!("dropping oldest stream in pool: {:?}", stream);
if streams.is_empty() {
occupied_entry.remove();
}
}
Entry::Vacant(_) => panic!("invariant failed: key existed in lru but not in recycle"),
}
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.inner.lock().unwrap().lru.len()
}
}
#[derive(PartialEq, Clone, Eq, Hash)]
pub(crate) struct PoolKey {
scheme: String,
hostname: String,
port: Option<u16>,
proxy: Option<Proxy>,
}
use std::fmt;
impl fmt::Debug for PoolKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!(
"{}|{}|{}",
self.scheme,
self.hostname,
self.port.unwrap_or(0)
))
}
}
impl PoolKey {
fn new(url: &Url, proxy: Option<Proxy>) -> Self {
let port = url.port_or_known_default();
PoolKey {
scheme: url.scheme().to_string(),
hostname: url.host_str().unwrap_or("").to_string(),
port,
proxy,
}
}
pub(crate) fn from_parts(scheme: &str, hostname: &str, port: u16) -> Self {
PoolKey {
scheme: scheme.to_string(),
hostname: hostname.to_string(),
port: Some(port),
proxy: None,
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct PoolReturner {
inner: Option<(std::sync::Weak<AgentState>, PoolKey)>,
}
impl PoolReturner {
pub(crate) fn new(agent: &Agent, pool_key: PoolKey) -> Self {
Self {
inner: Some((agent.weak_state(), pool_key)),
}
}
pub(crate) fn none() -> Self {
Self { inner: None }
}
pub(crate) fn return_to_pool(&self, stream: Stream) {
if let Some((weak_state, pool_key)) = &self.inner {
if let Some(state) = weak_state.upgrade() {
state.pool.add(pool_key, stream);
}
}
}
}
pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
reader: Option<R>,
}
impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
pub fn new(reader: R) -> Self {
PoolReturnRead {
reader: Some(reader),
}
}
fn return_connection(&mut self) -> io::Result<()> {
if let Some(reader) = self.reader.take() {
let stream: Stream = reader.into();
stream.return_to_pool()?;
}
Ok(())
}
fn do_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.reader.as_mut() {
None => Ok(0),
Some(reader) => reader.read(buf),
}
}
}
impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let amount = self.do_read(buf)?;
if amount == 0 {
self.return_connection()?;
}
Ok(amount)
}
}
#[cfg(test)]
mod tests {
use std::io;
use crate::stream::{remote_addr_for_test, Stream};
use crate::ReadWrite;
use super::*;
#[derive(Debug)]
struct NoopStream;
impl NoopStream {
fn stream(pool_returner: PoolReturner) -> Stream {
Stream::new(NoopStream, remote_addr_for_test(), pool_returner)
}
}
impl Read for NoopStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Ok(buf.len())
}
}
impl std::io::Write for NoopStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl ReadWrite for NoopStream {
fn socket(&self) -> Option<&std::net::TcpStream> {
None
}
}
#[test]
fn poolkey_new() {
PoolKey::new(&Url::parse("zzz:///example.com").unwrap(), None);
}
#[test]
fn pool_connections_limit() {
let pool = ConnectionPool::new_with_limits(10, 1);
let hostnames = (0..pool.max_idle_connections * 2).map(|i| format!("{}.example", i));
let poolkeys = hostnames.map(|hostname| PoolKey {
scheme: "https".to_string(),
hostname,
port: Some(999),
proxy: None,
});
for key in poolkeys.clone() {
pool.add(&key, NoopStream::stream(PoolReturner::none()));
}
assert_eq!(pool.len(), pool.max_idle_connections);
for key in poolkeys.skip(pool.max_idle_connections) {
let result = pool.remove(&key);
assert!(result.is_some(), "expected key was not in pool");
}
assert_eq!(pool.len(), 0)
}
#[test]
fn pool_per_host_connections_limit() {
let pool = ConnectionPool::new_with_limits(10, 2);
let poolkey = PoolKey {
scheme: "https".to_string(),
hostname: "example.com".to_string(),
port: Some(999),
proxy: None,
};
for _ in 0..pool.max_idle_connections_per_host * 2 {
pool.add(&poolkey, NoopStream::stream(PoolReturner::none()))
}
assert_eq!(pool.len(), pool.max_idle_connections_per_host);
for _ in 0..pool.max_idle_connections_per_host {
let result = pool.remove(&poolkey);
assert!(result.is_some(), "expected key was not in pool");
}
assert_eq!(pool.len(), 0);
}
#[test]
fn pool_checks_proxy() {
let pool = ConnectionPool::new_with_limits(10, 1);
let url = Url::parse("zzz:///example.com").unwrap();
let pool_key = PoolKey::new(&url, None);
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 1);
let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap()));
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 2);
let pool_key = PoolKey::new(
&url,
Some(Proxy::new("user:password@localhost:9999").unwrap()),
);
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 3);
}
#[test]
fn read_exact() {
use crate::response::LimitedRead;
let url = Url::parse("https:///example.com").unwrap();
let mut out_buf = [0u8; 500];
let agent = Agent::new();
let pool_key = PoolKey::new(&url, None);
let stream = NoopStream::stream(PoolReturner::new(&agent, pool_key));
let mut limited_read = LimitedRead::new(stream, std::num::NonZeroUsize::new(500).unwrap());
limited_read.read_exact(&mut out_buf).unwrap();
assert_eq!(agent.state.pool.len(), 1);
}
#[test]
#[cfg(feature = "gzip")]
fn read_exact_chunked_gzip() {
use crate::response::Compression;
use std::io::Cursor;
let gz_body = vec![
b'E', b'\r', b'\n', 0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x03, 0xCB, 0x48, 0xCD, 0xC9,
b'\r', b'\n', b'E', b'\r', b'\n', 0xC9, 0x57, 0x28, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 0xC8, 0x18, 0xBC, 0x6C, 0x00, 0xA5,
b'\r', b'\n', b'7', b'\r', b'\n', 0x5C, 0x7C, 0xEF, 0xA7, 0x00, 0x00, 0x00, b'\r', b'\n', b'0', b'\r', b'\n', b'\r', b'\n', ];
let agent = Agent::new();
assert_eq!(agent.state.pool.len(), 0);
let ro = crate::test::TestStream::new(Cursor::new(gz_body), std::io::sink());
let stream = Stream::new(
ro,
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(&agent, PoolKey::from_parts("http", "1.1.1.1", 8080)),
);
let chunked = crate::chunked::Decoder::new(stream);
let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> =
Box::new(PoolReturnRead::new(chunked));
let compression = Compression::Gzip;
let mut stream = compression.wrap_reader(pool_return_read);
io::copy(&mut stream, &mut io::sink()).unwrap();
assert_eq!(agent.state.pool.len(), 1);
}
}