Close Docker sockets on errors
This commit is contained in:
@@ -127,6 +127,7 @@ class DockerExecSession(Session):
|
|||||||
|
|
||||||
def _request_json(self, method: str, path: str, payload: dict | None = None) -> dict:
|
def _request_json(self, method: str, path: str, payload: dict | None = None) -> dict:
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
sock.connect(self._socket_path)
|
sock.connect(self._socket_path)
|
||||||
body = json.dumps(payload or {}).encode("utf-8") if payload is not None else b""
|
body = json.dumps(payload or {}).encode("utf-8") if payload is not None else b""
|
||||||
headers = [
|
headers = [
|
||||||
@@ -141,6 +142,7 @@ class DockerExecSession(Session):
|
|||||||
request = "\r\n".join(headers).encode("utf-8") + body
|
request = "\r\n".join(headers).encode("utf-8") + body
|
||||||
sock.sendall(request)
|
sock.sendall(request)
|
||||||
status, _headers, body_bytes = self._read_http_response(sock)
|
status, _headers, body_bytes = self._read_http_response(sock)
|
||||||
|
finally:
|
||||||
sock.close()
|
sock.close()
|
||||||
if status < 200 or status >= 300:
|
if status < 200 or status >= 300:
|
||||||
detail = body_bytes.decode("utf-8", errors="replace")
|
detail = body_bytes.decode("utf-8", errors="replace")
|
||||||
@@ -170,6 +172,7 @@ class DockerExecSession(Session):
|
|||||||
|
|
||||||
def _start_exec_socket(self, exec_id: str) -> socket.socket:
|
def _start_exec_socket(self, exec_id: str) -> socket.socket:
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
sock.connect(self._socket_path)
|
sock.connect(self._socket_path)
|
||||||
payload = json.dumps({"Detach": False, "Tty": True}).encode("utf-8")
|
payload = json.dumps({"Detach": False, "Tty": True}).encode("utf-8")
|
||||||
headers = [
|
headers = [
|
||||||
@@ -185,13 +188,15 @@ class DockerExecSession(Session):
|
|||||||
sock.sendall("\r\n".join(headers).encode("utf-8") + payload)
|
sock.sendall("\r\n".join(headers).encode("utf-8") + payload)
|
||||||
status, _headers, body = self._read_http_response(sock)
|
status, _headers, body = self._read_http_response(sock)
|
||||||
if status not in (101,) and (status < 200 or status >= 300):
|
if status not in (101,) and (status < 200 or status >= 300):
|
||||||
sock.close()
|
|
||||||
detail = body.decode("utf-8", errors="replace")
|
detail = body.decode("utf-8", errors="replace")
|
||||||
raise RuntimeError(f"Docker API exec start failed ({status}): {detail}")
|
raise RuntimeError(f"Docker API exec start failed ({status}): {detail}")
|
||||||
# Don't save body from HTTP upgrade - it contains protocol handshake data,
|
# Don't save body from HTTP upgrade - it contains protocol handshake data,
|
||||||
# not real terminal output (e.g., device attribute responses like "\x1b[?1;10;0c")
|
# not real terminal output (e.g., device attribute responses like "\x1b[?1;10;0c")
|
||||||
sock.settimeout(None)
|
sock.settimeout(None)
|
||||||
return sock
|
return sock
|
||||||
|
except Exception:
|
||||||
|
sock.close()
|
||||||
|
raise
|
||||||
|
|
||||||
def _resize_exec(self, width: int, height: int) -> None:
|
def _resize_exec(self, width: int, height: int) -> None:
|
||||||
assert self._exec_id is not None
|
assert self._exec_id is not None
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ class DockerStatsCollector:
|
|||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
def _sync_request() -> bytes | None:
|
def _sync_request() -> bytes | None:
|
||||||
|
sock: socket.socket | None = None
|
||||||
try:
|
try:
|
||||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
sock.settimeout(10.0) # Increased timeout
|
sock.settimeout(10.0) # Increased timeout
|
||||||
@@ -97,11 +98,13 @@ class DockerStatsCollector:
|
|||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
chunks.append(chunk)
|
chunks.append(chunk)
|
||||||
sock.close()
|
|
||||||
return b"".join(chunks)
|
return b"".join(chunks)
|
||||||
except (OSError, TimeoutError) as e:
|
except (OSError, TimeoutError) as e:
|
||||||
log.debug("Socket error for %s: %s", path, e)
|
log.debug("Socket error for %s: %s", path, e)
|
||||||
return None
|
return None
|
||||||
|
finally:
|
||||||
|
if sock is not None:
|
||||||
|
sock.close()
|
||||||
|
|
||||||
response = await loop.run_in_executor(None, _sync_request)
|
response = await loop.run_in_executor(None, _sync_request)
|
||||||
if response is None:
|
if response is None:
|
||||||
|
|||||||
Reference in New Issue
Block a user