Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions mssql_python/pybind/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,16 @@ void Connection::setAutocommit(bool enable) {
}
SQLINTEGER value = enable ? SQL_AUTOCOMMIT_ON : SQL_AUTOCOMMIT_OFF;
LOG("Setting autocommit=%d", enable);
SQLRETURN ret =
SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_AUTOCOMMIT,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(value)), 0);
SQLRETURN ret;
{
// Release the GIL during the blocking ODBC call. Holding the GIL
// here can deadlock when the network path goes through another
// Python thread (e.g. an in-process SSH tunnel via paramiko +
// sshtunnel), since that thread also needs the GIL to run.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_AUTOCOMMIT,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(value)), 0);
}
checkError(ret);
if (value == SQL_AUTOCOMMIT_ON) {
LOG("Autocommit enabled");
Expand Down Expand Up @@ -286,9 +293,15 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {
// Get the integer value
int64_t longValue = value.cast<int64_t>();

SQLRETURN ret = SQLSetConnectAttr_ptr(
_dbcHandle->get(), attribute,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(longValue)), SQL_IS_INTEGER);
SQLRETURN ret;
{
// Release the GIL around the ODBC call for consistency with the
// other connection-attribute paths; some attributes can block.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(
_dbcHandle->get(), attribute,
reinterpret_cast<SQLPOINTER>(static_cast<SQLULEN>(longValue)), SQL_IS_INTEGER);
}

if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set integer attribute=%d, ret=%d", attribute, ret);
Expand All @@ -302,11 +315,19 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {

SQLPOINTER ptr;
SQLINTEGER length;

ptr = reinterpretU16stringAsSqlWChar(this->wstrStringBuffer);
length = static_cast<SQLINTEGER>(this->wstrStringBuffer.length() * sizeof(SQLWCHAR));

SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
// Copy to a stack-local buffer so that releasing the GIL below
// doesn't expose a race where another thread overwrites the
// member wstrStringBuffer (and reallocates) while the ODBC
// driver is still reading from ptr.
std::u16string localStrBuffer = this->wstrStringBuffer;
ptr = reinterpretU16stringAsSqlWChar(localStrBuffer);
length = static_cast<SQLINTEGER>(localStrBuffer.length() * sizeof(SQLWCHAR));

SQLRETURN ret;
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
Comment thread
saurabh500 marked this conversation as resolved.
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set string attribute=%d, ret=%d", attribute, ret);
} else {
Expand All @@ -319,13 +340,18 @@ SQLRETURN Connection::setAttribute(SQLINTEGER attribute, py::object value) {
}
} else if (py::isinstance<py::bytes>(value) || py::isinstance<py::bytearray>(value)) {
try {
std::string binary_data = value.cast<std::string>();
this->strBytesBuffer.clear();
this->strBytesBuffer = std::move(binary_data);
SQLPOINTER ptr = const_cast<char*>(this->strBytesBuffer.c_str());
SQLINTEGER length = static_cast<SQLINTEGER>(this->strBytesBuffer.size());

SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
// Copy to a stack-local buffer so that releasing the GIL
// doesn't expose a race where another thread overwrites the
// member strBytesBuffer while the driver reads from ptr.
std::string localBytesBuffer = value.cast<std::string>();
SQLPOINTER ptr = const_cast<char*>(localBytesBuffer.c_str());
SQLINTEGER length = static_cast<SQLINTEGER>(localBytesBuffer.size());

SQLRETURN ret;
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), attribute, ptr, length);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to set binary attribute=%d, ret=%d", attribute, ret);
} else {
Expand Down Expand Up @@ -376,8 +402,14 @@ bool Connection::reset() {
ThrowStdException("Connection handle not allocated");
}
LOG("Resetting connection via SQL_ATTR_RESET_CONNECTION");
SQLRETURN ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_RESET_CONNECTION,
(SQLPOINTER)SQL_RESET_CONNECTION_YES, SQL_IS_INTEGER);
SQLRETURN ret;
{
// Release the GIL around the ODBC call for consistency with the
// other connection-attribute paths; some attributes can block.
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_RESET_CONNECTION,
(SQLPOINTER)SQL_RESET_CONNECTION_YES, SQL_IS_INTEGER);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to reset connection (ret=%d). Marking as dead.", ret);
return false;
Expand All @@ -387,8 +419,11 @@ bool Connection::reset() {
// Explicitly reset it to the default (SQL_TXN_READ_COMMITTED) to prevent
// isolation level settings from leaking between pooled connection usages.
LOG("Resetting transaction isolation level to READ COMMITTED");
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_TXN_ISOLATION,
(SQLPOINTER)SQL_TXN_READ_COMMITTED, SQL_IS_INTEGER);
{
py::gil_scoped_release release;
ret = SQLSetConnectAttr_ptr(_dbcHandle->get(), SQL_ATTR_TXN_ISOLATION,
(SQLPOINTER)SQL_TXN_READ_COMMITTED, SQL_IS_INTEGER);
}
if (!SQL_SUCCEEDED(ret)) {
LOG("Failed to reset transaction isolation level (ret=%d). Marking as dead.", ret);
return false;
Expand Down
75 changes: 49 additions & 26 deletions mssql_python/pybind/connection/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::u16string& connSt
std::vector<std::shared_ptr<Connection>> to_disconnect;
std::shared_ptr<Connection> valid_conn = nullptr;
bool needs_connect = false;

// Phase 1: Prune stale connections (under mutex — no ODBC calls).
{
std::lock_guard<std::mutex> lock(_mutex);
auto now = std::chrono::steady_clock::now();
size_t before = _pool.size();

// Phase 1: Remove stale connections, collect for later disconnect
_pool.erase(std::remove_if(_pool.begin(), _pool.end(),
[&](const std::shared_ptr<Connection>& conn) {
auto idle_time =
Expand All @@ -38,40 +39,62 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::u16string& connSt
_pool.end());

size_t pruned = before - _pool.size();
// Decrement _current_size eagerly so new slots can be reserved while
// stale connections are being disconnected (Phase 4). This means
// _current_size tracks *reserved capacity* (pooled + checked-out +
// in-flight new), not necessarily live ODBC handles.
_current_size = (_current_size >= pruned) ? (_current_size - pruned) : 0;
}
Comment thread
saurabh500 marked this conversation as resolved.

// Phase 2: Attempt to reuse healthy connections
while (!_pool.empty()) {
auto conn = _pool.front();
_pool.pop_front();
if (conn->isAlive()) {
if (!conn->reset()) {
to_disconnect.push_back(conn);
--_current_size;
continue;
// Phase 2: Pop one candidate at a time and validate it outside the
// mutex. isAlive() and reset() perform ODBC calls that release the
// GIL; calling them while holding the mutex would create a mutex/GIL
// lock-ordering deadlock when multiple threads acquire concurrently.
while (true) {
std::shared_ptr<Connection> candidate;
{
std::lock_guard<std::mutex> lock(_mutex);
if (_pool.empty()) {
// No more candidates — try to reserve a slot for a new connection.
if (_current_size < _max_size) {
valid_conn = std::make_shared<Connection>(connStr, true);
++_current_size;
needs_connect = true;
} else {
// NOTE: Another thread may be validating a popped candidate
// outside the mutex right now. If that candidate fails, a
// slot will open up — but we can't wait for it here without
// adding a condition-variable retry loop. This is an
// acceptable trade-off: transient "pool full" errors under
// heavy contention are rare and callers can retry.
throw std::runtime_error("ConnectionPool::acquire: pool size limit reached");
}
valid_conn = conn;
break;
} else {
to_disconnect.push_back(conn);
--_current_size;
}
candidate = _pool.front();
_pool.pop_front();
}

// Validate the candidate outside the mutex.
try {
if (candidate->isAlive() && candidate->reset()) {
valid_conn = candidate;
break;
}
} catch (const std::exception& ex) {
LOG("Candidate connection validation failed: %s", ex.what());
}

// Reserve a slot for a new connection if none reusable.
// The actual connect() call happens outside the mutex to avoid
// holding the mutex during the blocking ODBC call (which releases
// the GIL and could otherwise cause a mutex/GIL deadlock).
if (!valid_conn && _current_size < _max_size) {
valid_conn = std::make_shared<Connection>(connStr, true);
++_current_size;
needs_connect = true;
} else if (!valid_conn) {
throw std::runtime_error("ConnectionPool::acquire: pool size limit reached");
// Candidate is dead or reset failed — mark for disconnect and
// decrement the pool size.
to_disconnect.push_back(candidate);
{
std::lock_guard<std::mutex> lock(_mutex);
if (_current_size > 0) --_current_size;
}
}

// Phase 2.5: Connect the new connection outside the mutex.
// Phase 3: Connect the new connection outside the mutex.
if (needs_connect) {
try {
valid_conn->connect(attrs_before);
Expand All @@ -85,7 +108,7 @@ std::shared_ptr<Connection> ConnectionPool::acquire(const std::u16string& connSt
}
}

// Phase 3: Disconnect expired/bad connections outside lock
// Phase 4: Disconnect expired/bad connections outside lock.
for (auto& conn : to_disconnect) {
try {
conn->disconnect();
Expand Down
Loading
Loading