diff --git a/CHANGELOG.md b/CHANGELOG.md index 3470d909..3328250a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,10 @@ 2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py 3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION` +### Features + +1. [#678](https://github.com/influxdata/influxdb-client-python/pull/678): Implement flush method in `Write_api` + ## 1.46.0 [2024-09-13] ### Bug Fixes diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 3b3db68f..88ae4991 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -387,8 +387,8 @@ def write_payload(payload): def flush(self): """Flush data.""" - # TODO - pass + if self._subject: + self._subject.on_completed() def close(self): """Flush data and dispose a batching buffer.""" @@ -411,8 +411,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __del__(self): """Close WriteApi.""" + self.flush() if self._subject: - self._subject.on_completed() self._subject.dispose() self._subject = None diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 8befd7e5..7bc19b56 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -17,6 +17,7 @@ from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.write.point import Point from influxdb_client.client.write_api import WriteOptions, WriteApi, PointSettings +from tests.base_test import BaseTest class BatchingWriteTest(unittest.TestCase): @@ -736,6 +737,29 @@ def __call__(self, conf: (str, str, str), data: str, error: InfluxDBError): self.assertIsInstance(callback.error, InfluxDBError) self.assertEqual(429, callback.error.response.status) +class BatchingWriteFlushTest(BaseTest): + + def setUp(self): + return super().setUp() + + def test_flush(self): + write_client = self.client.write_api() + + bucket = self.create_test_bucket() + + write_client.write(bucket.name, self.org, "h2o_feet,location=coyote_creek level\\ water_level=1 1") + + write_client.flush() + + time.sleep(1) + + query = 'from(bucket:"' + bucket.name + '") |> range(start: 1970-01-01T00:00:00.000000001Z)' + flux_result = self.client.query_api().query(query) + + self.assertEqual(1, len(flux_result)) + + self.delete_test_bucket(bucket) + if __name__ == '__main__': unittest.main()