Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,51 @@ client.write_dataframe(
)
```

### Accept partial writes and inspect failed lines
`accept_partial` defaults to `True` and allows partial success when a batch contains invalid lines.
On partial failure, the client raises `InfluxDBPartialWriteError` with structured `line_errors`.

```python
from influxdb_client_3 import InfluxDBClient3
from influxdb_client_3.exceptions import InfluxDBPartialWriteError

client = InfluxDBClient3(host="http://localhost:8181", token="token", database="db")
lp = "m v=1i 1\nm v=1.2 2"

try:
client.write(lp) # accept_partial=True by default
except InfluxDBPartialWriteError as e:
for line_err in e.line_errors:
print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})")
```

Disable partial writes:
```python
from influxdb_client_3 import WriteOptions, write_client_options

client = InfluxDBClient3(
host="http://localhost:8181",
token="token",
database="db",
write_client_options=write_client_options(
write_options=WriteOptions(accept_partial=False)
),
)
```

### V2 compatibility mode (Clustered)
Set `use_v2_api=True` to route writes through `/api/v2/write` for Clustered/v2-compatible backends.

`use_v2_api` can be configured by:
- `WriteOptions(use_v2_api=True)`
- constructor kwarg: `write_use_v2_api=True`
- env var: `INFLUX_WRITE_USE_V2_API=true`

When `use_v2_api=True`:
- `accept_partial` is ignored by the backend
- `no_sync=True` is invalid and rejected before dispatch with:
`invalid write options: no_sync cannot be used with use_v2_api`

## Querying

### Querying with SQL
Expand Down
51 changes: 38 additions & 13 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
INFLUX_AUTH_SCHEME = "INFLUX_AUTH_SCHEME"
INFLUX_GZIP_THRESHOLD = "INFLUX_GZIP_THRESHOLD"
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
INFLUX_WRITE_ACCEPT_PARTIAL = "INFLUX_WRITE_ACCEPT_PARTIAL"
INFLUX_WRITE_USE_V2_API = "INFLUX_WRITE_USE_V2_API"
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
Expand Down Expand Up @@ -155,19 +157,23 @@ def _parse_gzip_threshold(threshold: str) -> int:
return threshold


def _parse_write_no_sync(write_no_sync: str):
def _parse_write_bool(value):
"""
Parses and validates the provided write no sync value.
Parses a truthy/falsy value for write options.

This function ensures that the given value is a valid boolean,
and it raises an appropriate error if the value is not valid.
The input is normalized to string and matched against common truthy values.
Any non-truthy value is treated as False.

:param write_no_sync: The input value to be parsed and validated.
:type write_no_sync: Any
:return: The validated write no sync value as an boolean.
:param value: The input value to be parsed and validated.
:type value: Any
:return: Parsed boolean value.
:rtype: bool
"""
Comment thread
alespour marked this conversation as resolved.
return write_no_sync.strip().lower() in ['true', '1', 't', 'y', 'yes']
return str(value).strip().lower() in ['true', '1', 't', 'y', 'yes']


def _parse_write_no_sync(write_no_sync: str):
return _parse_write_bool(write_no_sync)


def _parse_timeout(to: str) -> int:
Expand Down Expand Up @@ -233,6 +239,8 @@ def __init__(
:key str password: ``password`` to authenticate via username and password credentials to the InfluxDB 2.x
:key str query_timeout: int value used to set the client query API timeout in milliseconds.
:key str write_timeout: int value used to set the client write API timeout in milliseconds.
:key bool write_accept_partial: allow partial writes when some lines fail.
:key bool write_use_v2_api: route writes through /api/v2/write compatibility endpoint.
:key list[str] profilers: list of enabled Flux profilers
"""
self._org = org if org is not None else "default"
Expand All @@ -243,22 +251,34 @@ def __init__(
write_type = DefaultWriteOptions.write_type.value
write_precision = DefaultWriteOptions.write_precision.value
write_no_sync = DefaultWriteOptions.no_sync.value
write_accept_partial = DefaultWriteOptions.accept_partial.value
write_use_v2_api = DefaultWriteOptions.use_v2_api.value
write_timeout = DefaultWriteOptions.timeout.value

if isinstance(write_client_options, dict) and write_client_options.get('write_options') is not None:
write_opts = write_client_options['write_options']
write_type = getattr(write_opts, 'write_type', write_type)
write_precision = getattr(write_opts, 'write_precision', write_precision)
write_no_sync = getattr(write_opts, 'no_sync', write_no_sync)
write_accept_partial = getattr(write_opts, 'accept_partial', write_accept_partial)
write_use_v2_api = getattr(write_opts, 'use_v2_api', write_use_v2_api)
write_timeout = getattr(write_opts, 'timeout', write_timeout)

if kw_keys.__contains__('write_timeout'):
write_timeout = kwargs.get('write_timeout')

if kw_keys.__contains__('write_accept_partial'):
write_accept_partial = _parse_write_bool(kwargs.get('write_accept_partial'))

if kw_keys.__contains__('write_use_v2_api'):
write_use_v2_api = _parse_write_bool(kwargs.get('write_use_v2_api'))

write_options = WriteOptions(
write_type=write_type,
write_precision=write_precision,
no_sync=write_no_sync,
accept_partial=write_accept_partial,
use_v2_api=write_use_v2_api,
)

self._write_client_options = {
Expand Down Expand Up @@ -347,7 +367,15 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':

write_no_sync = os.getenv(INFLUX_WRITE_NO_SYNC)
if write_no_sync is not None:
write_options.no_sync = _parse_write_no_sync(write_no_sync)
write_options.no_sync = _parse_write_bool(write_no_sync)

write_accept_partial = os.getenv(INFLUX_WRITE_ACCEPT_PARTIAL)
if write_accept_partial is not None:
write_options.accept_partial = _parse_write_bool(write_accept_partial)

write_use_v2_api = os.getenv(INFLUX_WRITE_USE_V2_API)
if write_use_v2_api is not None:
write_options.use_v2_api = _parse_write_bool(write_use_v2_api)

precision = os.getenv(INFLUX_PRECISION)
if precision is not None:
Expand Down Expand Up @@ -402,10 +430,7 @@ def write(self, record=None, database=None, **kwargs):
if database is None:
database = self._database

try:
return self._write_api.write(bucket=database, record=record, **kwargs)
except InfluxDBError as e:
raise e
return self._write_api.write(bucket=database, record=record, **kwargs)

def write_dataframe(
self,
Expand Down
3 changes: 2 additions & 1 deletion influxdb_client_3/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# flake8: noqa

from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError
from .exceptions import InfluxDB3ClientQueryError, InfluxDBError, InfluxDB3ClientError, InfluxDBPartialWriteError, \
InfluxDBPartialWriteLineError
169 changes: 150 additions & 19 deletions influxdb_client_3/exceptions/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Exceptions utils for InfluxDB."""

import json
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple

from urllib3 import HTTPResponse

Expand Down Expand Up @@ -39,6 +42,104 @@ def __init__(self, error_message, *args, **kwargs):
self.message = error_message


def _is_partial_write_error(error_message) -> bool:
if not isinstance(error_message, str) or not error_message:
return False
normalized = error_message.lower()
return (
"partial write of line protocol occurred" in normalized or
"parsing failed for write_lp endpoint" in normalized
)


def _parse_partial_write_data_item(item) -> Optional[Tuple[str, int, str]]:
if item is None:
return None
if not isinstance(item, dict):
raise ValueError("array item is not an object")

error_message = item.get("error_message")
if not isinstance(error_message, str):
raise ValueError("error_message must be string")
if not error_message:
return None

line_number_raw = item.get("line_number")
if line_number_raw is None:
line_number = 0
elif isinstance(line_number_raw, int):
line_number = line_number_raw
else:
raise ValueError("line_number must be int")

original_line_raw = item.get("original_line")
if original_line_raw is None:
original_line = ""
elif isinstance(original_line_raw, str):
original_line = original_line_raw
else:
raise ValueError("original_line must be string")

return error_message, line_number, original_line


def _parse_typed_partial_write_array(data) -> Optional[List[Tuple[str, int, str]]]:
if not isinstance(data, list):
return None
line_errors: List[Tuple[str, int, str]] = []
try:
for item in data:
parsed = _parse_partial_write_data_item(item)
if parsed is None:
continue
line_errors.append(parsed)
except ValueError:
return None
return line_errors if len(line_errors) > 0 else None


def _parse_typed_partial_write_object_or_none(data) -> Optional[Tuple[str, int, str]]:
try:
return _parse_partial_write_data_item(data)
except ValueError:
return None


def _format_partial_write_details(line_errors: List[Tuple[str, int, str]]) -> List[str]:
details: List[str] = []
for error_message, line_number, original_line in line_errors:
if line_number != 0 and original_line != "":
details.append(f"\tline {line_number}: {error_message} ({original_line})")
elif error_message:
details.append(f"\t{error_message}")
return details


def _parse_partial_write_line_error_info(data) -> Tuple[List[Tuple[str, int, str]], List[str]]:
if data is None:
return [], []

typed_array = _parse_typed_partial_write_array(data)
if typed_array is not None:
return typed_array, _format_partial_write_details(typed_array)

if isinstance(data, list):
details: List[str] = []
for item in data:
if item is None:
continue
raw = json.dumps(item, separators=(',', ':'))
if raw and raw.lower() != "null":
details.append(raw)
return [], details

typed_single = _parse_typed_partial_write_object_or_none(data)
if typed_single is not None:
return [typed_single], _format_partial_write_details([typed_single])

return [], []


# This error is for all write operations
class InfluxDBError(InfluxDB3ClientError):
"""Raised when a server error occurs."""
Expand All @@ -56,10 +157,7 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
super().__init__(self.message)

def _get_message(self, response):
# Body
if response.data:
import json

def get(d, key):
if not key or d is None:
return d
Expand All @@ -80,23 +178,15 @@ def get(d, key):
# "data": [ { "error_message": "...", "line_number": 2, "original_line": "..." }, ... ]
# }
error_text = node.get("error")
data = node.get("data")
if error_text and isinstance(data, list):
details = []
for item in data:
if not isinstance(item, dict):
continue
line_number = item.get("line_number")
error_message = item.get("error_message")
original_line = item.get("original_line")
if line_number is not None and error_message and original_line:
details.append(
f"\tline {line_number}: {error_message} ({original_line})"
)
elif error_message:
details.append(f"\t{error_message}")
if error_text and _is_partial_write_error(error_text):
_, details = _parse_partial_write_line_error_info(node.get("data"))
if details:
return error_text + ":\n" + "\n".join(details)
return error_text + ":\n" + "\n".join(
detail if detail.startswith("\t") else f"\t{detail}"
for detail in details
)
return error_text
if error_text:
return error_text
for key in [['message'], ['data', 'error_message'], ['error']]:
value = get(node, key)
Expand All @@ -119,3 +209,44 @@ def get(d, key):
def getheaders(self):
"""Helper method to make response headers more accessible."""
return self.response.getheaders()


@dataclass(frozen=True)
class InfluxDBPartialWriteLineError:
line_number: int
error_message: str
original_line: str


class InfluxDBPartialWriteError(InfluxDBError):
"""Structured partial-write error with per-line failures."""

def __init__(self, response: HTTPResponse, line_errors: List[InfluxDBPartialWriteLineError]):
super().__init__(response=response)
Comment thread
alespour marked this conversation as resolved.
self.line_errors = line_errors

@classmethod
def from_response(cls, response: HTTPResponse):
if response is None or not response.data:
return None
try:
node = json.loads(response.data)
except Exception:
return None
if not isinstance(node, dict):
return None
error_text = node.get("error")
if not _is_partial_write_error(error_text):
return None
parsed_line_errors, _ = _parse_partial_write_line_error_info(node.get("data"))
if not parsed_line_errors:
return None
line_errors = [
InfluxDBPartialWriteLineError(
line_number=line_number,
error_message=error_message,
original_line=original_line,
)
for error_message, line_number, original_line in parsed_line_errors
]
return cls(response=response, line_errors=line_errors)
Loading
Loading