Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ if has_next:

**Note:** Avoid mixing different traversal methods (e.g., combining `todf()` with `next_df()`), which may cause unexpected errors.

**Since V2.0.8.3**, the Python client has supported `TSDataType.OBJECT` for Tablet batch write and Session value serialization. Query results are read via the `Field` object. The related interfaces are defined as follows:

| Function Name | Description | Parameters | Return Value |
|---------------|-------------|------------|--------------|
| `encode_object_cell` | Encodes a single OBJECT cell into wire-format bytes | `is_eof: bool`,<br>`offset: int`,<br>`content: bytes` | `bytes`: `|[eof 1B]|[offset 8B BE]|[payload]|` |
| `decode_object_cell` | Parses a wire-format cell back into `eof`, `offset`, and `payload` | `cell: bytes` (length ≥ 9) | `Tuple[bool, int, bytes]`: `(is_eof, offset, payload)` |
| `Tablet.add_value_object` | Writes an OBJECT cell at the specified row and column (internally calls `encode_object_cell`) | `row_index: int`,<br>`column_index: int`,<br>`is_eof: bool`,<br>`offset: int`,<br>`content: bytes` | `None` |
| `Tablet.add_value_object_by_name` | Same as above, locates column by name | `column_name: str`,<br>`row_index: int`,<br>`is_eof: bool`,<br>`offset: int`,<br>`content: bytes` | `None` |
| `NumpyTablet.add_value_object` | Same semantics as `Tablet.add_value_object`, column data is stored as `ndarray` | Same as above (`row_index`, `column_index`, ...) | `None` |
| `Field.get_object_value` | Converts the value to a Python value based on the **target type** | `data_type: TSDataType` | Depends on type:<br>For OBJECT: `str` decoded from the entire `self.value` in UTF-8 (see Field.py) |
| `Field.get_string_value` | Returns a string representation | None | `str`;<br>For OBJECT: `self.value.decode("utf-8")` |
| `Field.get_binary_value` | Gets the binary data of TEXT/STRING/BLOB | None | `bytes` or `None`;<br>**Throws an error for OBJECT columns and should not be called** |


#### Sample Code

Expand Down Expand Up @@ -511,7 +524,7 @@ def query_data():
while res.has_next():
print(res.next())

print("get data from table1")
print("get data from table1")
with session.execute_query_statement("select * from table1") as res:
while res.has_next():
print(res.next())
Expand All @@ -521,7 +534,7 @@ def query_data():
with session.execute_query_statement("select * from table0") as res:
while res.has_next_df():
print(res.next_df())

session.close()


Expand Down Expand Up @@ -571,3 +584,169 @@ session_pool.close()
print("example is finished!")
```

**Object Type Usage Example**

```python
import os

import numpy as np
import pytest

from iotdb.utils.IoTDBConstants import TSDataType
from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.Tablet import Tablet, ColumnType
from iotdb.utils.object_column import decode_object_cell


def _require_thrift():
pytest.importorskip("iotdb.thrift.common.ttypes")


def _session_endpoint():
host = os.environ.get("IOTDB_HOST", "127.0.0.1")
port = int(os.environ.get("IOTDB_PORT", "6667"))
return host, port


@pytest.fixture(scope="module")
def table_session():
_require_thrift()
from iotdb.Session import Session
from iotdb.table_session import TableSession, TableSessionConfig

host, port = _session_endpoint()
cfg = TableSessionConfig(
node_urls=[f"{host}:{port}"],
username=os.environ.get("IOTDB_USER", Session.DEFAULT_USER),
password=os.environ.get("IOTDB_PASSWORD", Session.DEFAULT_PASSWORD),
)
ts = TableSession(cfg)
yield ts
ts.close()


def test_table_numpy_tablet_object_columns(table_session):
"""
Table model: Tablet.add_value_object / add_value_object_by_name,
NumpyTablet.add_value_object, insert + query Field + decode_object_cell;
Also includes writing OBJECT in two segments at the same timestamp
(first with is_eof=False/offset=0, then with is_eof=True/offset=length of the first segment),
and verifies the complete concatenated bytes using read_object(f1).
"""
db = "test_py_object_e2e"
table = "obj_tbl"
table_session.execute_non_query_statement(f"CREATE DATABASE IF NOT EXISTS {db}")
table_session.execute_non_query_statement(f"USE {db}")
table_session.execute_non_query_statement(f"DROP TABLE IF EXISTS {table}")
table_session.execute_non_query_statement(
f"CREATE TABLE {table}("
"device STRING TAG, temp FLOAT FIELD, f1 OBJECT FIELD, f2 OBJECT FIELD)"
)

column_names = ["device", "temp", "f1", "f2"]
data_types = [
TSDataType.STRING,
TSDataType.FLOAT,
TSDataType.OBJECT,
TSDataType.OBJECT,
]
column_types = [
ColumnType.TAG,
ColumnType.FIELD,
ColumnType.FIELD,
ColumnType.FIELD,
]
timestamps = [100, 200]
values = [
["d1", 1.5, None, None],
["d1", 2.5, None, None],
]

tablet = Tablet(
table, column_names, data_types, values, timestamps, column_types
)
tablet.add_value_object(0, 2, True, 0, b"first-row-obj")
# Single-segment write for the entire object: is_eof=True and offset=0;
# Segmented sequential writes must pass server-side offset/length validation
tablet.add_value_object_by_name("f2", 0, True, 0, b"seg")
tablet.add_value_object(1, 2, True, 0, b"second-f1")
tablet.add_value_object(1, 3, True, 0, b"second-f2")
table_session.insert(tablet)

ts_arr = np.array([300, 400], dtype=TSDataType.INT64.np_dtype())
np_vals = [
np.array(["d1", "d1"]),
np.array([1.0, 2.0], dtype=np.float32),
np.array([None, None], dtype=object),
np.array([None, None], dtype=object),
]
np_tab = NumpyTablet(
table, column_names, data_types, np_vals, ts_arr, column_types=column_types
)
np_tab.add_value_object(0, 2, True, 0, b"np-r0-f1")
np_tab.add_value_object(0, 3, True, 0, b"np-r0-f2")
np_tab.add_value_object(1, 2, True, 0, b"np-r1-f1")
np_tab.add_value_object(1, 3, True, 0, b"np-r1-f2")
table_session.insert(np_tab)

# Segmented OBJECT: first with is_eof=False (continue transmission),
# then with is_eof=True (last segment); offset is the length of written bytes
chunk0 = bytes((i % 256) for i in range(512))
chunk1 = b"\xab" * 64
expected_segmented = chunk0 + chunk1
seg1 = Tablet(
table,
column_names,
data_types,
[["d1", 3.0, None, None]],
[500],
column_types,
)
seg1.add_value_object(0, 2, False, 0, chunk0)
seg1.add_value_object(0, 3, True, 0, b"f2-seg")
table_session.insert(seg1)
seg2 = Tablet(
table,
column_names,
data_types,
[["d1", 3.0, None, None]],
[500],
column_types,
)
seg2.add_value_object(0, 2, True, 512, chunk1)
seg2.add_value_object(0, 3, True, 0, b"f2-seg")
table_session.insert(seg2)

with table_session.execute_query_statement(
f"SELECT read_object(f1) FROM {table} WHERE time = 500"
) as ds:
assert ds.has_next()
row = ds.next()
blob = row.get_fields()[0].get_binary_value()
assert blob == expected_segmented
assert not ds.has_next()

seen = 0
with table_session.execute_query_statement(
f"SELECT device, temp, f1, f2 FROM {table} ORDER BY time"
) as ds:
while ds.has_next():
row = ds.next()
fields = row.get_fields()
assert fields[0].get_object_value(TSDataType.STRING) == "d1"
assert fields[1].get_object_value(TSDataType.FLOAT) is not None
for j in (2, 3):
raw = fields[j].value
assert isinstance(raw, (bytes, bytearray))
eof, off, body = decode_object_cell(bytes(raw))
assert isinstance(eof, bool) and isinstance(off, int)
assert isinstance(body, bytes)
fields[j].get_string_value()
fields[j].get_object_value(TSDataType.OBJECT)
seen += 1
assert seen == 5


if __name__ == "__main__":
pytest.main([__file__, "-v", "-rs"])
```
Loading