From c6ee463731a9fed2eba32312ec82fc49fb2b9b03 Mon Sep 17 00:00:00 2001 From: Leto_b Date: Wed, 22 Apr 2026 14:41:35 +0800 Subject: [PATCH] add python support object from 2083 --- .../Programming-Python-Native-API_timecho.md | 183 +++++++++++++++++- .../Programming-Python-Native-API_timecho.md | 180 +++++++++++++++++ .../Programming-Python-Native-API_timecho.md | 180 ++++++++++++++++- .../Programming-Python-Native-API_timecho.md | 178 +++++++++++++++++ 4 files changed, 718 insertions(+), 3 deletions(-) diff --git a/src/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md b/src/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md index 391382e4f..4dd1fe4ff 100644 --- a/src/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md +++ b/src/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md @@ -66,6 +66,19 @@ if has_next: **Note:** Avoid mixing different traversal methods (e.g., combining `todf()` with `next_df()`), which may cause unexpected errors. +**Since V2.0.8.3**, the Python client has supported `TSDataType.OBJECT` for Tablet batch write and Session value serialization. Query results are read via the `Field` object. The related interfaces are defined as follows: + +| Function Name | Description | Parameters | Return Value | +|---------------|-------------|------------|--------------| +| `encode_object_cell` | Encodes a single OBJECT cell into wire-format bytes | `is_eof: bool`,
`offset: int`,
`content: bytes` | `bytes`: `|[eof 1B]|[offset 8B BE]|[payload]|` | +| `decode_object_cell` | Parses a wire-format cell back into `eof`, `offset`, and `payload` | `cell: bytes` (length ≥ 9) | `Tuple[bool, int, bytes]`: `(is_eof, offset, payload)` | +| `Tablet.add_value_object` | Writes an OBJECT cell at the specified row and column (internally calls `encode_object_cell`) | `row_index: int`,
`column_index: int`,
`is_eof: bool`,
`offset: int`,
`content: bytes` | `None` | +| `Tablet.add_value_object_by_name` | Same as above, locates column by name | `column_name: str`,
`row_index: int`,
`is_eof: bool`,
`offset: int`,
`content: bytes` | `None` | +| `NumpyTablet.add_value_object` | Same semantics as `Tablet.add_value_object`, column data is stored as `ndarray` | Same as above (`row_index`, `column_index`, ...) | `None` | +| `Field.get_object_value` | Converts the value to a Python value based on the **target type** | `data_type: TSDataType` | Depends on type:
For OBJECT: `str` decoded from the entire `self.value` in UTF-8 (see Field.py) | +| `Field.get_string_value` | Returns a string representation | None | `str`;
For OBJECT: `self.value.decode("utf-8")` | +| `Field.get_binary_value` | Gets the binary data of TEXT/STRING/BLOB | None | `bytes` or `None`;
**Throws an error for OBJECT columns and should not be called** | + #### Sample Code @@ -511,7 +524,7 @@ def query_data(): while res.has_next(): print(res.next()) - print("get data from table1") + print("get data from table1") with session.execute_query_statement("select * from table1") as res: while res.has_next(): print(res.next()) @@ -521,7 +534,7 @@ def query_data(): with session.execute_query_statement("select * from table0") as res: while res.has_next_df(): print(res.next_df()) - + session.close() @@ -571,3 +584,169 @@ session_pool.close() print("example is finished!") ``` +**Object Type Usage Example** + +```python +import os + +import numpy as np +import pytest + +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.NumpyTablet import NumpyTablet +from iotdb.utils.Tablet import Tablet, ColumnType +from iotdb.utils.object_column import decode_object_cell + + +def _require_thrift(): + pytest.importorskip("iotdb.thrift.common.ttypes") + + +def _session_endpoint(): + host = os.environ.get("IOTDB_HOST", "127.0.0.1") + port = int(os.environ.get("IOTDB_PORT", "6667")) + return host, port + + +@pytest.fixture(scope="module") +def table_session(): + _require_thrift() + from iotdb.Session import Session + from iotdb.table_session import TableSession, TableSessionConfig + + host, port = _session_endpoint() + cfg = TableSessionConfig( + node_urls=[f"{host}:{port}"], + username=os.environ.get("IOTDB_USER", Session.DEFAULT_USER), + password=os.environ.get("IOTDB_PASSWORD", Session.DEFAULT_PASSWORD), + ) + ts = TableSession(cfg) + yield ts + ts.close() + + +def test_table_numpy_tablet_object_columns(table_session): + """ + Table model: Tablet.add_value_object / add_value_object_by_name, + NumpyTablet.add_value_object, insert + query Field + decode_object_cell; + Also includes writing OBJECT in two segments at the same timestamp + (first with is_eof=False/offset=0, then with is_eof=True/offset=length of the first segment), + and verifies the complete concatenated bytes using read_object(f1). + """ + db = "test_py_object_e2e" + table = "obj_tbl" + table_session.execute_non_query_statement(f"CREATE DATABASE IF NOT EXISTS {db}") + table_session.execute_non_query_statement(f"USE {db}") + table_session.execute_non_query_statement(f"DROP TABLE IF EXISTS {table}") + table_session.execute_non_query_statement( + f"CREATE TABLE {table}(" + "device STRING TAG, temp FLOAT FIELD, f1 OBJECT FIELD, f2 OBJECT FIELD)" + ) + + column_names = ["device", "temp", "f1", "f2"] + data_types = [ + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT, + TSDataType.OBJECT, + ] + column_types = [ + ColumnType.TAG, + ColumnType.FIELD, + ColumnType.FIELD, + ColumnType.FIELD, + ] + timestamps = [100, 200] + values = [ + ["d1", 1.5, None, None], + ["d1", 2.5, None, None], + ] + + tablet = Tablet( + table, column_names, data_types, values, timestamps, column_types + ) + tablet.add_value_object(0, 2, True, 0, b"first-row-obj") + # Single-segment write for the entire object: is_eof=True and offset=0; + # Segmented sequential writes must pass server-side offset/length validation + tablet.add_value_object_by_name("f2", 0, True, 0, b"seg") + tablet.add_value_object(1, 2, True, 0, b"second-f1") + tablet.add_value_object(1, 3, True, 0, b"second-f2") + table_session.insert(tablet) + + ts_arr = np.array([300, 400], dtype=TSDataType.INT64.np_dtype()) + np_vals = [ + np.array(["d1", "d1"]), + np.array([1.0, 2.0], dtype=np.float32), + np.array([None, None], dtype=object), + np.array([None, None], dtype=object), + ] + np_tab = NumpyTablet( + table, column_names, data_types, np_vals, ts_arr, column_types=column_types + ) + np_tab.add_value_object(0, 2, True, 0, b"np-r0-f1") + np_tab.add_value_object(0, 3, True, 0, b"np-r0-f2") + np_tab.add_value_object(1, 2, True, 0, b"np-r1-f1") + np_tab.add_value_object(1, 3, True, 0, b"np-r1-f2") + table_session.insert(np_tab) + + # Segmented OBJECT: first with is_eof=False (continue transmission), + # then with is_eof=True (last segment); offset is the length of written bytes + chunk0 = bytes((i % 256) for i in range(512)) + chunk1 = b"\xab" * 64 + expected_segmented = chunk0 + chunk1 + seg1 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg1.add_value_object(0, 2, False, 0, chunk0) + seg1.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg1) + seg2 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg2.add_value_object(0, 2, True, 512, chunk1) + seg2.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg2) + + with table_session.execute_query_statement( + f"SELECT read_object(f1) FROM {table} WHERE time = 500" + ) as ds: + assert ds.has_next() + row = ds.next() + blob = row.get_fields()[0].get_binary_value() + assert blob == expected_segmented + assert not ds.has_next() + + seen = 0 + with table_session.execute_query_statement( + f"SELECT device, temp, f1, f2 FROM {table} ORDER BY time" + ) as ds: + while ds.has_next(): + row = ds.next() + fields = row.get_fields() + assert fields[0].get_object_value(TSDataType.STRING) == "d1" + assert fields[1].get_object_value(TSDataType.FLOAT) is not None + for j in (2, 3): + raw = fields[j].value + assert isinstance(raw, (bytes, bytearray)) + eof, off, body = decode_object_cell(bytes(raw)) + assert isinstance(eof, bool) and isinstance(off, int) + assert isinstance(body, bytes) + fields[j].get_string_value() + fields[j].get_object_value(TSDataType.OBJECT) + seen += 1 + assert seen == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-rs"]) +``` \ No newline at end of file diff --git a/src/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md b/src/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md index fef00468e..4dd1fe4ff 100644 --- a/src/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md +++ b/src/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md @@ -66,6 +66,20 @@ if has_next: **Note:** Avoid mixing different traversal methods (e.g., combining `todf()` with `next_df()`), which may cause unexpected errors. +**Since V2.0.8.3**, the Python client has supported `TSDataType.OBJECT` for Tablet batch write and Session value serialization. Query results are read via the `Field` object. The related interfaces are defined as follows: + +| Function Name | Description | Parameters | Return Value | +|---------------|-------------|------------|--------------| +| `encode_object_cell` | Encodes a single OBJECT cell into wire-format bytes | `is_eof: bool`,
`offset: int`,
`content: bytes` | `bytes`: `|[eof 1B]|[offset 8B BE]|[payload]|` | +| `decode_object_cell` | Parses a wire-format cell back into `eof`, `offset`, and `payload` | `cell: bytes` (length ≥ 9) | `Tuple[bool, int, bytes]`: `(is_eof, offset, payload)` | +| `Tablet.add_value_object` | Writes an OBJECT cell at the specified row and column (internally calls `encode_object_cell`) | `row_index: int`,
`column_index: int`,
`is_eof: bool`,
`offset: int`,
`content: bytes` | `None` | +| `Tablet.add_value_object_by_name` | Same as above, locates column by name | `column_name: str`,
`row_index: int`,
`is_eof: bool`,
`offset: int`,
`content: bytes` | `None` | +| `NumpyTablet.add_value_object` | Same semantics as `Tablet.add_value_object`, column data is stored as `ndarray` | Same as above (`row_index`, `column_index`, ...) | `None` | +| `Field.get_object_value` | Converts the value to a Python value based on the **target type** | `data_type: TSDataType` | Depends on type:
For OBJECT: `str` decoded from the entire `self.value` in UTF-8 (see Field.py) | +| `Field.get_string_value` | Returns a string representation | None | `str`;
For OBJECT: `self.value.decode("utf-8")` | +| `Field.get_binary_value` | Gets the binary data of TEXT/STRING/BLOB | None | `bytes` or `None`;
**Throws an error for OBJECT columns and should not be called** | + + #### Sample Code ```Python @@ -570,3 +584,169 @@ session_pool.close() print("example is finished!") ``` +**Object Type Usage Example** + +```python +import os + +import numpy as np +import pytest + +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.NumpyTablet import NumpyTablet +from iotdb.utils.Tablet import Tablet, ColumnType +from iotdb.utils.object_column import decode_object_cell + + +def _require_thrift(): + pytest.importorskip("iotdb.thrift.common.ttypes") + + +def _session_endpoint(): + host = os.environ.get("IOTDB_HOST", "127.0.0.1") + port = int(os.environ.get("IOTDB_PORT", "6667")) + return host, port + + +@pytest.fixture(scope="module") +def table_session(): + _require_thrift() + from iotdb.Session import Session + from iotdb.table_session import TableSession, TableSessionConfig + + host, port = _session_endpoint() + cfg = TableSessionConfig( + node_urls=[f"{host}:{port}"], + username=os.environ.get("IOTDB_USER", Session.DEFAULT_USER), + password=os.environ.get("IOTDB_PASSWORD", Session.DEFAULT_PASSWORD), + ) + ts = TableSession(cfg) + yield ts + ts.close() + + +def test_table_numpy_tablet_object_columns(table_session): + """ + Table model: Tablet.add_value_object / add_value_object_by_name, + NumpyTablet.add_value_object, insert + query Field + decode_object_cell; + Also includes writing OBJECT in two segments at the same timestamp + (first with is_eof=False/offset=0, then with is_eof=True/offset=length of the first segment), + and verifies the complete concatenated bytes using read_object(f1). + """ + db = "test_py_object_e2e" + table = "obj_tbl" + table_session.execute_non_query_statement(f"CREATE DATABASE IF NOT EXISTS {db}") + table_session.execute_non_query_statement(f"USE {db}") + table_session.execute_non_query_statement(f"DROP TABLE IF EXISTS {table}") + table_session.execute_non_query_statement( + f"CREATE TABLE {table}(" + "device STRING TAG, temp FLOAT FIELD, f1 OBJECT FIELD, f2 OBJECT FIELD)" + ) + + column_names = ["device", "temp", "f1", "f2"] + data_types = [ + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT, + TSDataType.OBJECT, + ] + column_types = [ + ColumnType.TAG, + ColumnType.FIELD, + ColumnType.FIELD, + ColumnType.FIELD, + ] + timestamps = [100, 200] + values = [ + ["d1", 1.5, None, None], + ["d1", 2.5, None, None], + ] + + tablet = Tablet( + table, column_names, data_types, values, timestamps, column_types + ) + tablet.add_value_object(0, 2, True, 0, b"first-row-obj") + # Single-segment write for the entire object: is_eof=True and offset=0; + # Segmented sequential writes must pass server-side offset/length validation + tablet.add_value_object_by_name("f2", 0, True, 0, b"seg") + tablet.add_value_object(1, 2, True, 0, b"second-f1") + tablet.add_value_object(1, 3, True, 0, b"second-f2") + table_session.insert(tablet) + + ts_arr = np.array([300, 400], dtype=TSDataType.INT64.np_dtype()) + np_vals = [ + np.array(["d1", "d1"]), + np.array([1.0, 2.0], dtype=np.float32), + np.array([None, None], dtype=object), + np.array([None, None], dtype=object), + ] + np_tab = NumpyTablet( + table, column_names, data_types, np_vals, ts_arr, column_types=column_types + ) + np_tab.add_value_object(0, 2, True, 0, b"np-r0-f1") + np_tab.add_value_object(0, 3, True, 0, b"np-r0-f2") + np_tab.add_value_object(1, 2, True, 0, b"np-r1-f1") + np_tab.add_value_object(1, 3, True, 0, b"np-r1-f2") + table_session.insert(np_tab) + + # Segmented OBJECT: first with is_eof=False (continue transmission), + # then with is_eof=True (last segment); offset is the length of written bytes + chunk0 = bytes((i % 256) for i in range(512)) + chunk1 = b"\xab" * 64 + expected_segmented = chunk0 + chunk1 + seg1 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg1.add_value_object(0, 2, False, 0, chunk0) + seg1.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg1) + seg2 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg2.add_value_object(0, 2, True, 512, chunk1) + seg2.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg2) + + with table_session.execute_query_statement( + f"SELECT read_object(f1) FROM {table} WHERE time = 500" + ) as ds: + assert ds.has_next() + row = ds.next() + blob = row.get_fields()[0].get_binary_value() + assert blob == expected_segmented + assert not ds.has_next() + + seen = 0 + with table_session.execute_query_statement( + f"SELECT device, temp, f1, f2 FROM {table} ORDER BY time" + ) as ds: + while ds.has_next(): + row = ds.next() + fields = row.get_fields() + assert fields[0].get_object_value(TSDataType.STRING) == "d1" + assert fields[1].get_object_value(TSDataType.FLOAT) is not None + for j in (2, 3): + raw = fields[j].value + assert isinstance(raw, (bytes, bytearray)) + eof, off, body = decode_object_cell(bytes(raw)) + assert isinstance(eof, bool) and isinstance(off, int) + assert isinstance(body, bytes) + fields[j].get_string_value() + fields[j].get_object_value(TSDataType.OBJECT) + seen += 1 + assert seen == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-rs"]) +``` \ No newline at end of file diff --git a/src/zh/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md b/src/zh/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md index 81c09a0f2..b927a1827 100644 --- a/src/zh/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md +++ b/src/zh/UserGuide/Master/Table/API/Programming-Python-Native-API_timecho.md @@ -67,6 +67,21 @@ if has_next: **注意:** 不要混合使用不同的遍历方式,如(todf函数与 next_df 混用),否则会出现预期外的错误。 + +自 V2.0.8.3 版本起,Python 客户端在 `Tablet`批量写入与 `Session` 值序列化中支持 `TSDataType.OBJECT` ,查询结果经 `Field` 读取,相关接口定义如下: + +| 函数名 | 功能 | 参数 | 返回值 | +| ------------------------------------- | ------------------------------------------------------------ | --------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------- | +| encode\_object\_cell | 将一格 OBJECT 编成线格式字节 | is\_eof: bool,offset: int,content: bytes | bytes:\|[eof 1B]\|[offset 8B BE]\|[payload]\| | +| decode\_object\_cell | 把线格式一格解析回 eof、offset、payload | cell: bytes(长度 ≥ 9) | Tuple[bool, int, bytes]:(is\_eof, offset, payload) | +| Tablet.add\_value\_object | 在指定行列写入一格 OBJECT(内部调用 encode\_object\_cell) | row\_index: int,column\_index: int,is\_eof: bool,offset: int,content: bytes | None | +| Tablet.add\_value\_object\_by\_name | 同上,按列名定位列 | column\_name: str,row\_index: int,is\_eof: bool,offset: int,content: bytes | None | +| NumpyTablet.add\_value\_object | 与 Tablet.add\_value\_object 相同语义,列数据为 ndarray | 同上(row\_index、column\_index、…) | None | +| Field.get\_object\_value | 按「目标类型」把 value 转成 Python 值 | data\_type: TSDataType | 随类型:OBJECT 时为 self.value 整段 UTF-8 解码 得到的 str(见[Field.py](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/iotdb/utils/Field.py)) | +| Field.get\_string\_value | 字符串化展示 | 无 | str;OBJECT 时为 self.value.decode("utf-8") | +| Field.get\_binary\_value | 取 TEXT/STRING/BLOB 的二进制 | 无 | bytes 或 None;OBJECT 列会抛错,不应调用 | + + #### 2.1.3 接口展示 **TableSession:** @@ -524,7 +539,7 @@ def query_data(): # 使用分批DataFrame方式查询表数据(推荐大数据量场景) print("get data from table0 using batch DataFrame") - with session.execute_query_statement("select * from table0") as res: + with session.execute_query_statement("select * from table0") as res: while res.has_next_df(): print(res.next_df()) @@ -577,3 +592,166 @@ session_pool.close() print("example is finished!") ``` +Object 类型使用示例: + +```Python +import os + +import numpy as np +import pytest + +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.NumpyTablet import NumpyTablet +from iotdb.utils.Tablet import Tablet, ColumnType +from iotdb.utils.object_column import decode_object_cell + + +def _require_thrift(): + pytest.importorskip("iotdb.thrift.common.ttypes") + + +def _session_endpoint(): + host = os.environ.get("IOTDB_HOST", "127.0.0.1") + port = int(os.environ.get("IOTDB_PORT", "6667")) + return host, port + + +@pytest.fixture(scope="module") +def table_session(): + _require_thrift() + from iotdb.Session import Session + from iotdb.table_session import TableSession, TableSessionConfig + + host, port = _session_endpoint() + cfg = TableSessionConfig( + node_urls=[f"{host}:{port}"], + username=os.environ.get("IOTDB_USER", Session.DEFAULT_USER), + password=os.environ.get("IOTDB_PASSWORD", Session.DEFAULT_PASSWORD), + ) + ts = TableSession(cfg) + yield ts + ts.close() + + +def test_table_numpy_tablet_object_columns(table_session): + """ + Table model: Tablet.add_value_object / add_value_object_by_name, + NumpyTablet.add_value_object, insert + query Field + decode_object_cell; + 另含同一 time 上分两段写入 OBJECT(先 is_eof=False/offset=0,再 is_eof=True/offset=首段长度), + 并用 read_object(f1) 校验拼接后的完整字节。 + """ + db = "test_py_object_e2e" + table = "obj_tbl" + table_session.execute_non_query_statement(f"create database if not exists {db}") + table_session.execute_non_query_statement(f"use {db}") + table_session.execute_non_query_statement(f"drop table if exists {table}") + table_session.execute_non_query_statement( + f"create table {table}(" + "device STRING TAG, temp FLOAT FIELD, f1 OBJECT FIELD, f2 OBJECT FIELD)" + ) + + column_names = ["device", "temp", "f1", "f2"] + data_types = [ + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT, + TSDataType.OBJECT, + ] + column_types = [ + ColumnType.TAG, + ColumnType.FIELD, + ColumnType.FIELD, + ColumnType.FIELD, + ] + timestamps = [100, 200] + values = [ + ["d1", 1.5, None, None], + ["d1", 2.5, None, None], + ] + + tablet = Tablet( + table, column_names, data_types, values, timestamps, column_types + ) + tablet.add_value_object(0, 2, True, 0, b"first-row-obj") + # 整对象单段写入:is_eof=True 且 offset=0;分段续写需满足服务端 offset/长度校验 + tablet.add_value_object_by_name("f2", 0, True, 0, b"seg") + tablet.add_value_object(1, 2, True, 0, b"second-f1") + tablet.add_value_object(1, 3, True, 0, b"second-f2") + table_session.insert(tablet) + + ts_arr = np.array([300, 400], dtype=TSDataType.INT64.np_dtype()) + np_vals = [ + np.array(["d1", "d1"]), + np.array([1.0, 2.0], dtype=np.float32), + np.array([None, None], dtype=object), + np.array([None, None], dtype=object), + ] + np_tab = NumpyTablet( + table, column_names, data_types, np_vals, ts_arr, column_types=column_types + ) + np_tab.add_value_object(0, 2, True, 0, b"np-r0-f1") + np_tab.add_value_object(0, 3, True, 0, b"np-r0-f2") + np_tab.add_value_object(1, 2, True, 0, b"np-r1-f1") + np_tab.add_value_object(1, 3, True, 0, b"np-r1-f2") + table_session.insert(np_tab) + + # 分段 OBJECT:先 is_eof=False(续传),再 is_eof=True(末段);offset 为已写入字节长度 + chunk0 = bytes((i % 256) for i in range(512)) + chunk1 = b"\xab" * 64 + expected_segmented = chunk0 + chunk1 + seg1 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg1.add_value_object(0, 2, False, 0, chunk0) + seg1.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg1) + seg2 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg2.add_value_object(0, 2, True, 512, chunk1) + seg2.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg2) + + with table_session.execute_query_statement( + f"select read_object(f1) from {table} where time = 500" + ) as ds: + assert ds.has_next() + row = ds.next() + blob = row.get_fields()[0].get_binary_value() + assert blob == expected_segmented + assert not ds.has_next() + + seen = 0 + with table_session.execute_query_statement( + f"select device, temp, f1, f2 from {table} order by time" + ) as ds: + while ds.has_next(): + row = ds.next() + fields = row.get_fields() + assert fields[0].get_object_value(TSDataType.STRING) == "d1" + assert fields[1].get_object_value(TSDataType.FLOAT) is not None + for j in (2, 3): + raw = fields[j].value + assert isinstance(raw, (bytes, bytearray)) + eof, off, body = decode_object_cell(bytes(raw)) + assert isinstance(eof, bool) and isinstance(off, int) + assert isinstance(body, bytes) + fields[j].get_string_value() + fields[j].get_object_value(TSDataType.OBJECT) + seen += 1 + assert seen == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-rs"]) +``` diff --git a/src/zh/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md b/src/zh/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md index 20925783d..b927a1827 100644 --- a/src/zh/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md +++ b/src/zh/UserGuide/latest-Table/API/Programming-Python-Native-API_timecho.md @@ -67,6 +67,21 @@ if has_next: **注意:** 不要混合使用不同的遍历方式,如(todf函数与 next_df 混用),否则会出现预期外的错误。 + +自 V2.0.8.3 版本起,Python 客户端在 `Tablet`批量写入与 `Session` 值序列化中支持 `TSDataType.OBJECT` ,查询结果经 `Field` 读取,相关接口定义如下: + +| 函数名 | 功能 | 参数 | 返回值 | +| ------------------------------------- | ------------------------------------------------------------ | --------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------- | +| encode\_object\_cell | 将一格 OBJECT 编成线格式字节 | is\_eof: bool,offset: int,content: bytes | bytes:\|[eof 1B]\|[offset 8B BE]\|[payload]\| | +| decode\_object\_cell | 把线格式一格解析回 eof、offset、payload | cell: bytes(长度 ≥ 9) | Tuple[bool, int, bytes]:(is\_eof, offset, payload) | +| Tablet.add\_value\_object | 在指定行列写入一格 OBJECT(内部调用 encode\_object\_cell) | row\_index: int,column\_index: int,is\_eof: bool,offset: int,content: bytes | None | +| Tablet.add\_value\_object\_by\_name | 同上,按列名定位列 | column\_name: str,row\_index: int,is\_eof: bool,offset: int,content: bytes | None | +| NumpyTablet.add\_value\_object | 与 Tablet.add\_value\_object 相同语义,列数据为 ndarray | 同上(row\_index、column\_index、…) | None | +| Field.get\_object\_value | 按「目标类型」把 value 转成 Python 值 | data\_type: TSDataType | 随类型:OBJECT 时为 self.value 整段 UTF-8 解码 得到的 str(见[Field.py](https://github.com/apache/iotdb/blob/master/iotdb-client/client-py/iotdb/utils/Field.py)) | +| Field.get\_string\_value | 字符串化展示 | 无 | str;OBJECT 时为 self.value.decode("utf-8") | +| Field.get\_binary\_value | 取 TEXT/STRING/BLOB 的二进制 | 无 | bytes 或 None;OBJECT 列会抛错,不应调用 | + + #### 2.1.3 接口展示 **TableSession:** @@ -577,3 +592,166 @@ session_pool.close() print("example is finished!") ``` +Object 类型使用示例: + +```Python +import os + +import numpy as np +import pytest + +from iotdb.utils.IoTDBConstants import TSDataType +from iotdb.utils.NumpyTablet import NumpyTablet +from iotdb.utils.Tablet import Tablet, ColumnType +from iotdb.utils.object_column import decode_object_cell + + +def _require_thrift(): + pytest.importorskip("iotdb.thrift.common.ttypes") + + +def _session_endpoint(): + host = os.environ.get("IOTDB_HOST", "127.0.0.1") + port = int(os.environ.get("IOTDB_PORT", "6667")) + return host, port + + +@pytest.fixture(scope="module") +def table_session(): + _require_thrift() + from iotdb.Session import Session + from iotdb.table_session import TableSession, TableSessionConfig + + host, port = _session_endpoint() + cfg = TableSessionConfig( + node_urls=[f"{host}:{port}"], + username=os.environ.get("IOTDB_USER", Session.DEFAULT_USER), + password=os.environ.get("IOTDB_PASSWORD", Session.DEFAULT_PASSWORD), + ) + ts = TableSession(cfg) + yield ts + ts.close() + + +def test_table_numpy_tablet_object_columns(table_session): + """ + Table model: Tablet.add_value_object / add_value_object_by_name, + NumpyTablet.add_value_object, insert + query Field + decode_object_cell; + 另含同一 time 上分两段写入 OBJECT(先 is_eof=False/offset=0,再 is_eof=True/offset=首段长度), + 并用 read_object(f1) 校验拼接后的完整字节。 + """ + db = "test_py_object_e2e" + table = "obj_tbl" + table_session.execute_non_query_statement(f"create database if not exists {db}") + table_session.execute_non_query_statement(f"use {db}") + table_session.execute_non_query_statement(f"drop table if exists {table}") + table_session.execute_non_query_statement( + f"create table {table}(" + "device STRING TAG, temp FLOAT FIELD, f1 OBJECT FIELD, f2 OBJECT FIELD)" + ) + + column_names = ["device", "temp", "f1", "f2"] + data_types = [ + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT, + TSDataType.OBJECT, + ] + column_types = [ + ColumnType.TAG, + ColumnType.FIELD, + ColumnType.FIELD, + ColumnType.FIELD, + ] + timestamps = [100, 200] + values = [ + ["d1", 1.5, None, None], + ["d1", 2.5, None, None], + ] + + tablet = Tablet( + table, column_names, data_types, values, timestamps, column_types + ) + tablet.add_value_object(0, 2, True, 0, b"first-row-obj") + # 整对象单段写入:is_eof=True 且 offset=0;分段续写需满足服务端 offset/长度校验 + tablet.add_value_object_by_name("f2", 0, True, 0, b"seg") + tablet.add_value_object(1, 2, True, 0, b"second-f1") + tablet.add_value_object(1, 3, True, 0, b"second-f2") + table_session.insert(tablet) + + ts_arr = np.array([300, 400], dtype=TSDataType.INT64.np_dtype()) + np_vals = [ + np.array(["d1", "d1"]), + np.array([1.0, 2.0], dtype=np.float32), + np.array([None, None], dtype=object), + np.array([None, None], dtype=object), + ] + np_tab = NumpyTablet( + table, column_names, data_types, np_vals, ts_arr, column_types=column_types + ) + np_tab.add_value_object(0, 2, True, 0, b"np-r0-f1") + np_tab.add_value_object(0, 3, True, 0, b"np-r0-f2") + np_tab.add_value_object(1, 2, True, 0, b"np-r1-f1") + np_tab.add_value_object(1, 3, True, 0, b"np-r1-f2") + table_session.insert(np_tab) + + # 分段 OBJECT:先 is_eof=False(续传),再 is_eof=True(末段);offset 为已写入字节长度 + chunk0 = bytes((i % 256) for i in range(512)) + chunk1 = b"\xab" * 64 + expected_segmented = chunk0 + chunk1 + seg1 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg1.add_value_object(0, 2, False, 0, chunk0) + seg1.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg1) + seg2 = Tablet( + table, + column_names, + data_types, + [["d1", 3.0, None, None]], + [500], + column_types, + ) + seg2.add_value_object(0, 2, True, 512, chunk1) + seg2.add_value_object(0, 3, True, 0, b"f2-seg") + table_session.insert(seg2) + + with table_session.execute_query_statement( + f"select read_object(f1) from {table} where time = 500" + ) as ds: + assert ds.has_next() + row = ds.next() + blob = row.get_fields()[0].get_binary_value() + assert blob == expected_segmented + assert not ds.has_next() + + seen = 0 + with table_session.execute_query_statement( + f"select device, temp, f1, f2 from {table} order by time" + ) as ds: + while ds.has_next(): + row = ds.next() + fields = row.get_fields() + assert fields[0].get_object_value(TSDataType.STRING) == "d1" + assert fields[1].get_object_value(TSDataType.FLOAT) is not None + for j in (2, 3): + raw = fields[j].value + assert isinstance(raw, (bytes, bytearray)) + eof, off, body = decode_object_cell(bytes(raw)) + assert isinstance(eof, bool) and isinstance(off, int) + assert isinstance(body, bytes) + fields[j].get_string_value() + fields[j].get_object_value(TSDataType.OBJECT) + seen += 1 + assert seen == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-rs"]) +```