From 46d44dae75d53f07b4837ab439e935da75552dc4 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Thu, 31 Oct 2024 17:21:33 +0800 Subject: [PATCH 1/8] [hotfix] Fix runtime exception on windows (#23) (cherry picked from commit 0d0237b9e1d023f57f958dcbb83cdbf77aac8e5b) --- paimon_python_java/gateway_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon_python_java/gateway_server.py b/paimon_python_java/gateway_server.py index fc05ddc..7285e98 100644 --- a/paimon_python_java/gateway_server.py +++ b/paimon_python_java/gateway_server.py @@ -63,6 +63,7 @@ def launch_gateway_server_process(env): *main_args ] + preexec_fn = None if not on_windows(): def preexec_func(): # ignore ctrl-c / SIGINT From cbb81bbb66b86895fa18538fbeacfb0bd3dd758a Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 6 Nov 2024 17:49:17 +0800 Subject: [PATCH 2/8] Read interface support predicate (#22) (cherry picked from commit 5e7d4687e1106f4fb928fb9bbab1e50d15b99ff9) --- .github/workflows/paimon-python-checks.yml | 7 + paimon_python_api/__init__.py | 5 +- paimon_python_api/predicate.py | 95 +++++ paimon_python_api/read_builder.py | 13 +- paimon_python_java/__init__.py | 7 +- paimon_python_java/gateway_server.py | 2 +- paimon_python_java/java_gateway.py | 1 + .../apache/paimon/python/PredicationUtil.java | 111 +++++ paimon_python_java/pypaimon.py | 103 ++++- paimon_python_java/tests/test_preicates.py | 394 ++++++++++++++++++ paimon_python_java/tests/utils.py | 5 + 11 files changed, 735 insertions(+), 8 deletions(-) create mode 100644 paimon_python_api/predicate.py create mode 100644 paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java create mode 100644 paimon_python_java/tests/test_preicates.py diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml index e94820b..195783f 100644 --- a/.github/workflows/paimon-python-checks.yml +++ b/.github/workflows/paimon-python-checks.yml @@ -43,7 +43,14 @@ jobs: with: java-version: ${{ env.JDK_VERSION }} distribution: 'adopt' + - name: Set up hadoop dependency + run: | + mkdir -p ${{ github.workspace }}/temp + curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \ + https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar - name: Run lint-python.sh + env: + _PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace }}/temp/bundled-hadoop.jar run: | chmod +x dev/lint-python.sh ./dev/lint-python.sh diff --git a/paimon_python_api/__init__.py b/paimon_python_api/__init__.py index 86090c9..44717bf 100644 --- a/paimon_python_api/__init__.py +++ b/paimon_python_api/__init__.py @@ -19,6 +19,7 @@ from .split import Split from .table_read import TableRead from .table_scan import TableScan, Plan +from .predicate import Predicate, PredicateBuilder from .read_builder import ReadBuilder from .commit_message import CommitMessage from .table_commit import BatchTableCommit @@ -39,5 +40,7 @@ 'BatchWriteBuilder', 'Table', 'Schema', - 'Catalog' + 'Catalog', + 'Predicate', + 'PredicateBuilder' ] diff --git a/paimon_python_api/predicate.py b/paimon_python_api/predicate.py new file mode 100644 index 0000000..46280d1 --- /dev/null +++ b/paimon_python_api/predicate.py @@ -0,0 +1,95 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +from abc import ABC, abstractmethod +from typing import Any, List + + +class Predicate(ABC): + """Predicate which evaluates to a boolean. Now it doesn't have + any methods because only paimon_python_java implement it and + the Java implementation convert it to Java object.""" + + +class PredicateBuilder(ABC): + """A utility class to create Predicate object for common filter conditions.""" + + @abstractmethod + def equal(self, field: str, literal: Any) -> Predicate: + """field = literal""" + + @abstractmethod + def not_equal(self, field: str, literal: Any) -> Predicate: + """field <> literal""" + + @abstractmethod + def less_than(self, field: str, literal: Any) -> Predicate: + """field < literal""" + + @abstractmethod + def less_or_equal(self, field: str, literal: Any) -> Predicate: + """field <= literal""" + + @abstractmethod + def greater_than(self, field: str, literal: Any) -> Predicate: + """field > literal""" + + @abstractmethod + def greater_or_equal(self, field: str, literal: Any) -> Predicate: + """field >= literal""" + + @abstractmethod + def is_null(self, field: str) -> Predicate: + """field IS NULL""" + + @abstractmethod + def is_not_null(self, field: str) -> Predicate: + """field IS NOT NULL""" + + @abstractmethod + def startswith(self, field: str, pattern_literal: Any) -> Predicate: + """field.startswith""" + + @abstractmethod + def endswith(self, field: str, pattern_literal: Any) -> Predicate: + """field.endswith()""" + + @abstractmethod + def contains(self, field: str, pattern_literal: Any) -> Predicate: + """literal in field""" + + @abstractmethod + def is_in(self, field: str, literals: List[Any]) -> Predicate: + """field IN literals""" + + @abstractmethod + def is_not_in(self, field: str, literals: List[Any]) -> Predicate: + """field NOT IN literals""" + + @abstractmethod + def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \ + -> Predicate: + """field BETWEEN included_lower_bound AND included_upper_bound""" + + @abstractmethod + def and_predicates(self, predicates: List[Predicate]) -> Predicate: + """predicate1 AND predicate2 AND ...""" + + @abstractmethod + def or_predicates(self, predicates: List[Predicate]) -> Predicate: + """predicate1 OR predicate2 OR ...""" diff --git a/paimon_python_api/read_builder.py b/paimon_python_api/read_builder.py index 94ec073..ad5e6d6 100644 --- a/paimon_python_api/read_builder.py +++ b/paimon_python_api/read_builder.py @@ -17,13 +17,20 @@ ################################################################################# from abc import ABC, abstractmethod -from paimon_python_api import TableRead, TableScan +from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder from typing import List class ReadBuilder(ABC): """An interface for building the TableScan and TableRead.""" + @abstractmethod + def with_filter(self, predicate: Predicate): + """ + Push filters, will filter the data as much as possible, + but it is not guaranteed that it is a complete filter. + """ + @abstractmethod def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': """Push nested projection.""" @@ -39,3 +46,7 @@ def new_scan(self) -> TableScan: @abstractmethod def new_read(self) -> TableRead: """Create a TableRead to read splits.""" + + @abstractmethod + def new_predicate_builder(self) -> PredicateBuilder: + """Create a builder for Predicate.""" diff --git a/paimon_python_java/__init__.py b/paimon_python_java/__init__.py index 6e97d9e..9b0d002 100644 --- a/paimon_python_java/__init__.py +++ b/paimon_python_java/__init__.py @@ -18,7 +18,8 @@ from .util import constants from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead, - BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit) + BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit, + Predicate, PredicateBuilder) __all__ = [ 'constants', @@ -32,5 +33,7 @@ 'BatchWriteBuilder', 'BatchTableWrite', 'CommitMessage', - 'BatchTableCommit' + 'BatchTableCommit', + 'Predicate', + 'PredicateBuilder' ] diff --git a/paimon_python_java/gateway_server.py b/paimon_python_java/gateway_server.py index 7285e98..2061d59 100644 --- a/paimon_python_java/gateway_server.py +++ b/paimon_python_java/gateway_server.py @@ -103,7 +103,7 @@ def _get_hadoop_classpath(env): return env[constants.PYPAIMON_HADOOP_CLASSPATH] if 'HADOOP_CLASSPATH' in env: - return None + return env['HADOOP_CLASSPATH'] else: raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \ and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.") diff --git a/paimon_python_java/java_gateway.py b/paimon_python_java/java_gateway.py index f2b1621..3dabcfd 100644 --- a/paimon_python_java/java_gateway.py +++ b/paimon_python_java/java_gateway.py @@ -109,6 +109,7 @@ def import_paimon_view(gateway): java_import(gateway.jvm, 'org.apache.paimon.types.*') java_import(gateway.jvm, 'org.apache.paimon.python.*') java_import(gateway.jvm, "org.apache.paimon.data.*") + java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder") class Watchdog(object): diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java new file mode 100644 index 0000000..a863dfd --- /dev/null +++ b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.python; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import java.util.List; +import java.util.stream.Collectors; + +/** For building Predicate. */ +public class PredicationUtil { + + public static Predicate build( + RowType rowType, + PredicateBuilder builder, + String method, + int index, + List literals) { + literals = + literals.stream() + .map(l -> convertJavaObject(rowType.getTypeAt(index), l)) + .collect(Collectors.toList()); + switch (method) { + case "equal": + return builder.equal(index, literals.get(0)); + case "notEqual": + return builder.notEqual(index, literals.get(0)); + case "lessThan": + return builder.lessThan(index, literals.get(0)); + case "lessOrEqual": + return builder.lessOrEqual(index, literals.get(0)); + case "greaterThan": + return builder.greaterThan(index, literals.get(0)); + case "greaterOrEqual": + return builder.greaterOrEqual(index, literals.get(0)); + case "isNull": + return builder.isNull(index); + case "isNotNull": + return builder.isNotNull(index); + case "startsWith": + return builder.startsWith(index, literals.get(0)); + case "endsWith": + return builder.endsWith(index, literals.get(0)); + case "contains": + return builder.contains(index, literals.get(0)); + case "in": + return builder.in(index, literals); + case "notIn": + return builder.notIn(index, literals); + case "between": + return builder.between(index, literals.get(0), literals.get(1)); + default: + throw new UnsupportedOperationException( + "Unknown PredicateBuilder method " + method); + } + } + + /** Some type is not convenient to transfer from Python to Java. */ + private static Object convertJavaObject(DataType literalType, Object literal) { + switch (literalType.getTypeRoot()) { + case BOOLEAN: + case DOUBLE: + case INTEGER: + return literal; + case CHAR: + case VARCHAR: + return BinaryString.fromString((String) literal); + case FLOAT: + return ((Number) literal).floatValue(); + case TINYINT: + return ((Number) literal).byteValue(); + case SMALLINT: + return ((Number) literal).shortValue(); + case BIGINT: + return ((Number) literal).longValue(); + default: + throw new UnsupportedOperationException( + "Unsupported predicate leaf type " + literalType.getTypeRoot().name()); + } + } + + public static Predicate buildAnd(List predicates) { + // 'and' is keyword of Python + return PredicateBuilder.and(predicates); + } + + public static Predicate buildOr(List predicates) { + // 'or' is keyword of Python + return PredicateBuilder.or(predicates); + } +} diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index fcf0695..0d3101b 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -22,8 +22,9 @@ from paimon_python_java.java_gateway import get_gateway from paimon_python_java.util import java_utils, constants from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, - write_builder, table_write, commit_message, table_commit, Schema) -from typing import List, Iterator, Optional + write_builder, table_write, commit_message, table_commit, Schema, + predicate) +from typing import List, Iterator, Optional, Any class Catalog(catalog.Catalog): @@ -85,6 +86,10 @@ def __init__(self, j_read_builder, j_row_type, catalog_options: dict, arrow_sche self._catalog_options = catalog_options self._arrow_schema = arrow_schema + def with_filter(self, predicate: 'Predicate'): + self._j_read_builder.withFilter(predicate.to_j_predicate()) + return self + def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': self._j_read_builder.withProjection(projection) return self @@ -98,9 +103,12 @@ def new_scan(self) -> 'TableScan': return TableScan(j_table_scan) def new_read(self) -> 'TableRead': - j_table_read = self._j_read_builder.newRead() + j_table_read = self._j_read_builder.newRead().executeFilter() return TableRead(j_table_read, self._j_row_type, self._catalog_options, self._arrow_schema) + def new_predicate_builder(self) -> 'PredicateBuilder': + return PredicateBuilder(self._j_row_type) + class TableScan(table_scan.TableScan): @@ -257,3 +265,92 @@ def commit(self, commit_messages: List[CommitMessage]): def close(self): self._j_batch_table_commit.close() + + +class Predicate(predicate.Predicate): + + def __init__(self, j_predicate): + self._j_predicate = j_predicate + + def to_j_predicate(self): + return self._j_predicate + + +class PredicateBuilder(predicate.PredicateBuilder): + + def __init__(self, j_row_type): + self._field_names = j_row_type.getFieldNames() + self._j_row_type = j_row_type + self._j_predicate_builder = get_gateway().jvm.PredicateBuilder(j_row_type) + + def _build(self, method: str, field: str, literals: Optional[List[Any]] = None): + error = ValueError(f'The field {field} is not in field list {self._field_names}.') + try: + index = self._field_names.index(field) + if index == -1: + raise error + except ValueError: + raise error + + if literals is None: + literals = [] + + j_predicate = get_gateway().jvm.PredicationUtil.build( + self._j_row_type, + self._j_predicate_builder, + method, + index, + literals + ) + return Predicate(j_predicate) + + def equal(self, field: str, literal: Any) -> Predicate: + return self._build('equal', field, [literal]) + + def not_equal(self, field: str, literal: Any) -> Predicate: + return self._build('notEqual', field, [literal]) + + def less_than(self, field: str, literal: Any) -> Predicate: + return self._build('lessThan', field, [literal]) + + def less_or_equal(self, field: str, literal: Any) -> Predicate: + return self._build('lessOrEqual', field, [literal]) + + def greater_than(self, field: str, literal: Any) -> Predicate: + return self._build('greaterThan', field, [literal]) + + def greater_or_equal(self, field: str, literal: Any) -> Predicate: + return self._build('greaterOrEqual', field, [literal]) + + def is_null(self, field: str) -> Predicate: + return self._build('isNull', field) + + def is_not_null(self, field: str) -> Predicate: + return self._build('isNotNull', field) + + def startswith(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('startsWith', field, [pattern_literal]) + + def endswith(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('endsWith', field, [pattern_literal]) + + def contains(self, field: str, pattern_literal: Any) -> Predicate: + return self._build('contains', field, [pattern_literal]) + + def is_in(self, field: str, literals: List[Any]) -> Predicate: + return self._build('in', field, literals) + + def is_not_in(self, field: str, literals: List[Any]) -> Predicate: + return self._build('notIn', field, literals) + + def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \ + -> Predicate: + return self._build('between', field, [included_lower_bound, included_upper_bound]) + + def and_predicates(self, predicates: List[Predicate]) -> Predicate: + predicates = list(map(lambda p: p.to_j_predicate(), predicates)) + return Predicate(get_gateway().jvm.PredicationUtil.buildAnd(predicates)) + + def or_predicates(self, predicates: List[Predicate]) -> Predicate: + predicates = list(map(lambda p: p.to_j_predicate(), predicates)) + return Predicate(get_gateway().jvm.PredicationUtil.buildOr(predicates)) diff --git a/paimon_python_java/tests/test_preicates.py b/paimon_python_java/tests/test_preicates.py new file mode 100644 index 0000000..7ee1a91 --- /dev/null +++ b/paimon_python_java/tests/test_preicates.py @@ -0,0 +1,394 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import shutil +import tempfile +import unittest +import random +import pandas as pd +import pyarrow as pa + +from paimon_python_api import Schema +from paimon_python_java import Catalog +from paimon_python_java.tests import utils +from setup_utils import java_setuputils + + +def _check_filtered_result(read_builder, expected_df): + scan = read_builder.new_scan() + read = read_builder.new_read() + actual_df = read.to_pandas(scan.plan().splits()) + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), expected_df.reset_index(drop=True)) + + +# TODO: parquet has bug now +def _random_format(): + return random.choice(['avro', 'orc']) + + +class PredicateTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + java_setuputils.setup_java_bridge() + cls.hadoop_path = tempfile.mkdtemp() + utils.setup_hadoop_bundle_jar(cls.hadoop_path) + cls.warehouse = tempfile.mkdtemp() + + catalog = Catalog.create({'warehouse': cls.warehouse}) + catalog.create_database('default', False) + + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ]) + catalog.create_table('default.test_append', + Schema(pa_schema, options={'file.format': _random_format()}), + False) + catalog.create_table('default.test_pk', + Schema(pa_schema, primary_keys=['f0'], + options={'bucket': '1', 'file.format': _random_format()}), + False) + + df = pd.DataFrame({ + 'f0': [1, 2, 3, 4, 5], + 'f1': ['abc', 'abbc', 'bc', 'd', None], + }) + + append_table = catalog.get_table('default.test_append') + write_builder = append_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + pk_table = catalog.get_table('default.test_pk') + write_builder = pk_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + cls.catalog = catalog + cls.df = df + + @classmethod + def tearDownClass(cls): + java_setuputils.clean() + if os.path.exists(cls.hadoop_path): + shutil.rmtree(cls.hadoop_path) + if os.path.exists(cls.warehouse): + shutil.rmtree(cls.warehouse) + + def testWrongFieldName(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + with self.assertRaises(ValueError) as e: + predicate_builder.equal('f2', 'a') + self.assertEqual(str(e.exception), "The field f2 is not in field list ['f0', 'f1'].") + + def testAppendWithDuplicate(self): + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ]) + self.catalog.create_table('default.test_append_with_duplicate', Schema(pa_schema), False) + + df = pd.DataFrame({ + 'f0': [1, 1, 2, 2], + 'f1': ['a', 'b', 'c', 'd'], + }) + + table = self.catalog.get_table('default.test_append_with_duplicate') + write_builder = table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_pandas(df) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + predicate_builder = table.new_read_builder().new_predicate_builder() + + predicate = predicate_builder.equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[0:1]) + + predicate = predicate_builder.equal('f0', 0) + read_builder = table.new_read_builder().with_filter(predicate) + scan = read_builder.new_scan() + read = read_builder.new_read() + actual_df = read.to_pandas(scan.plan().splits()) + self.assertEqual(len(actual_df), 0) + + def testAllFieldTypesWithEqual(self): + pa_schema = pa.schema([ + # int + ('_tinyint', pa.int8()), + ('_smallint', pa.int16()), + ('_int', pa.int32()), + ('_bigint', pa.int64()), + # float + ('_float16', pa.float32()), # NOTE: cannot write pa.float16() data into Paimon + ('_float32', pa.float32()), + ('_double', pa.float64()), + # string + ('_string', pa.string()), + # bool + ('_boolean', pa.bool_()) + ]) + self.catalog.create_table('default.test_all_field_types', + Schema(pa_schema, options={'file.format': _random_format()}), + False) + table = self.catalog.get_table('default.test_all_field_types') + write_builder = table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + + df = pd.DataFrame({ + '_tinyint': pd.Series([1, 2], dtype='int8'), + '_smallint': pd.Series([10, 20], dtype='int16'), + '_int': pd.Series([100, 200], dtype='int32'), + '_bigint': pd.Series([1000, 2000], dtype='int64'), + '_float16': pd.Series([1.0, 2.0], dtype='float16'), + '_float32': pd.Series([1.00, 2.00], dtype='float32'), + '_double': pd.Series([1.000, 2.000], dtype='double'), + '_string': pd.Series(['A', 'B'], dtype='object'), + '_boolean': [True, False] + }) + record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema) + # prepare for assertion + df['_float16'] = df['_float16'].astype('float32') + + write.write_arrow_batch(record_batch) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + predicate_builder = table.new_read_builder().new_predicate_builder() + + predicate = predicate_builder.equal('_tinyint', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_smallint', 20) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_int', 100) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_bigint', 2000) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_float16', 1.0) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_float32', 2.00) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_double', 1.000) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + predicate = predicate_builder.equal('_string', 'B') + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[1]]) + + predicate = predicate_builder.equal('_boolean', True) + _check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]]) + + def testEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0]]) + + def testNotEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.not_equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) + + def testNotEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.not_equal('f0', 1) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4]) + + def testLessThanAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testLessThanPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testLessOrEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testLessOrEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.less_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testGreaterThanAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) + + def testGreaterThanPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_than('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4]) + + def testGreaterOrEqualAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testGreaterOrEqualPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.greater_or_equal('f0', 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testIsNullAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) + + def testIsNullPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]]) + + def testIsNotNullAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) + + def testIsNotNullPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3]) + + def testStartswithAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.startswith('f1', 'ab') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testStartswithPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.startswith('f1', 'ab') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testEndswithAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.endswith('f1', 'bc') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testEndswithPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.endswith('f1', 'bc') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testContainsAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.contains('f1', 'bb') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testContainsPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.contains('f1', 'bb') + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testIsInAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_in('f0', [1, 2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1]) + + def testIsInPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_in('f1', ['abc', 'd']) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0, 3]]) + + def testIsNotInAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_in('f0', [1, 2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4]) + + def testIsNotInPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.is_not_in('f1', ['abc', 'd']) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:2]) + + def testBetweenAppend(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.between('f0', 1, 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testBetweenPk(self): + table = self.catalog.get_table('default.test_pk') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate = predicate_builder.between('f0', 1, 3) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2]) + + def testAndPredicates(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate1 = predicate_builder.greater_than('f0', 1) + predicate2 = predicate_builder.startswith('f1', 'ab') + predicate = predicate_builder.and_predicates([predicate1, predicate2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]]) + + def testOrPredicates(self): + table = self.catalog.get_table('default.test_append') + predicate_builder = table.new_read_builder().new_predicate_builder() + predicate1 = predicate_builder.greater_than('f0', 3) + predicate2 = predicate_builder.less_than('f0', 2) + predicate = predicate_builder.or_predicates([predicate1, predicate2]) + _check_filtered_result(table.new_read_builder().with_filter(predicate), + self.df.loc[[0, 3, 4]]) diff --git a/paimon_python_java/tests/utils.py b/paimon_python_java/tests/utils.py index 2af2fe0..350f80e 100644 --- a/paimon_python_java/tests/utils.py +++ b/paimon_python_java/tests/utils.py @@ -23,6 +23,11 @@ def setup_hadoop_bundle_jar(hadoop_dir): + if constants.PYPAIMON_HADOOP_CLASSPATH in os.environ: + file = os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] + if os.path.isfile(file): + return + url = 'https://repo.maven.apache.org/maven2/org/apache/flink/' \ 'flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar' From 81252f8eb154122833b89f3769364147b6ab9f33 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:53:06 +0800 Subject: [PATCH 3/8] Fix that field nullability affects write (#24) (cherry picked from commit 00af9ad1dae60963a0cfce1c19225b97c413faf6) --- .../org/apache/paimon/python/BytesWriter.java | 37 ++++++++++ paimon_python_java/pypaimon.py | 21 +++--- .../tests/test_write_and_read.py | 74 +++++++++++++++++++ 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java index 7cf6267..f2ca4e1 100644 --- a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java +++ b/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java @@ -18,6 +18,7 @@ package org.apache.paimon.python; +import org.apache.paimon.arrow.ArrowUtils; import org.apache.paimon.arrow.reader.ArrowBatchReader; import org.apache.paimon.data.InternalRow; import org.apache.paimon.table.sink.TableWrite; @@ -27,8 +28,11 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.types.pojo.Field; import java.io.ByteArrayInputStream; +import java.util.List; +import java.util.stream.Collectors; /** Write Arrow bytes to Paimon. */ public class BytesWriter { @@ -36,17 +40,30 @@ public class BytesWriter { private final TableWrite tableWrite; private final ArrowBatchReader arrowBatchReader; private final BufferAllocator allocator; + private final List arrowFields; public BytesWriter(TableWrite tableWrite, RowType rowType) { this.tableWrite = tableWrite; this.arrowBatchReader = new ArrowBatchReader(rowType); this.allocator = new RootAllocator(); + arrowFields = + rowType.getFields().stream() + .map(f -> ArrowUtils.toArrowField(f.name(), f.type())) + .collect(Collectors.toList()); } public void write(byte[] bytes) throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bais, allocator); VectorSchemaRoot vsr = arrowStreamReader.getVectorSchemaRoot(); + if (!checkTypesIgnoreNullability(arrowFields, vsr.getSchema().getFields())) { + throw new RuntimeException( + String.format( + "Input schema isn't consistent with table schema.\n" + + "\tTable schema is: %s\n" + + "\tInput schema is: %s", + arrowFields, vsr.getSchema().getFields())); + } while (arrowStreamReader.loadNextBatch()) { Iterable rows = arrowBatchReader.readBatch(vsr); @@ -60,4 +77,24 @@ public void write(byte[] bytes) throws Exception { public void close() { allocator.close(); } + + private boolean checkTypesIgnoreNullability( + List expectedFields, List actualFields) { + if (expectedFields.size() != actualFields.size()) { + return false; + } + + for (int i = 0; i < expectedFields.size(); i++) { + Field expectedField = expectedFields.get(i); + Field actualField = actualFields.get(i); + // ArrowType doesn't have nullability (similar to DataTypeRoot) + if (!actualField.getType().equals(expectedField.getType()) + || !checkTypesIgnoreNullability( + expectedField.getChildren(), actualField.getChildren())) { + return false; + } + } + + return true; + } } diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index 0d3101b..16c7a69 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -218,24 +218,23 @@ def __init__(self, j_batch_table_write, j_row_type, arrow_schema: pa.Schema): def write_arrow(self, table): for record_batch in table.to_reader(): - # TODO: can we use a reusable stream? - stream = pa.BufferOutputStream() - with pa.RecordBatchStreamWriter(stream, self._arrow_schema) as writer: - writer.write(record_batch) - arrow_bytes = stream.getvalue().to_pybytes() - self._j_bytes_writer.write(arrow_bytes) + # TODO: can we use a reusable stream in #_write_arrow_batch ? + self._write_arrow_batch(record_batch) def write_arrow_batch(self, record_batch): + self._write_arrow_batch(record_batch) + + def write_pandas(self, dataframe: pd.DataFrame): + record_batch = pa.RecordBatch.from_pandas(dataframe, schema=self._arrow_schema) + self._write_arrow_batch(record_batch) + + def _write_arrow_batch(self, record_batch): stream = pa.BufferOutputStream() - with pa.RecordBatchStreamWriter(stream, self._arrow_schema) as writer: + with pa.RecordBatchStreamWriter(stream, record_batch.schema) as writer: writer.write(record_batch) arrow_bytes = stream.getvalue().to_pybytes() self._j_bytes_writer.write(arrow_bytes) - def write_pandas(self, dataframe: pd.DataFrame): - record_batch = pa.RecordBatch.from_pandas(dataframe, schema=self._arrow_schema) - self.write_arrow_batch(record_batch) - def prepare_commit(self) -> List['CommitMessage']: j_commit_messages = self._j_batch_table_write.prepareCommit() return list(map(lambda cm: CommitMessage(cm), j_commit_messages)) diff --git a/paimon_python_java/tests/test_write_and_read.py b/paimon_python_java/tests/test_write_and_read.py index e1ea72b..b468e9f 100644 --- a/paimon_python_java/tests/test_write_and_read.py +++ b/paimon_python_java/tests/test_write_and_read.py @@ -22,6 +22,7 @@ import unittest import pandas as pd import pyarrow as pa +from py4j.protocol import Py4JJavaError from paimon_python_api import Schema from paimon_python_java import Catalog @@ -371,3 +372,76 @@ def test_overwrite(self): df2['f0'] = df2['f0'].astype('int32') pd.testing.assert_frame_equal( actual_df2.reset_index(drop=True), df2.reset_index(drop=True)) + + def testWriteWrongSchema(self): + schema = Schema(self.simple_pa_schema) + self.catalog.create_table('default.test_wrong_schema', schema, False) + table = self.catalog.get_table('default.test_wrong_schema') + + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + } + df = pd.DataFrame(data) + schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()) + ]) + record_batch = pa.RecordBatch.from_pandas(df, schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + + with self.assertRaises(Py4JJavaError) as e: + table_write.write_arrow_batch(record_batch) + self.assertEqual( + str(e.exception.java_exception), + '''java.lang.RuntimeException: Input schema isn't consistent with table schema. +\tTable schema is: [f0: Int(32, true), f1: Utf8] +\tInput schema is: [f0: Int(64, true), f1: Utf8]''') + + def testIgnoreNullable(self): + pa_schema1 = pa.schema([ + ('f0', pa.int32(), False), + ('f1', pa.string()) + ]) + + pa_schema2 = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()) + ]) + + # write nullable to non-null + self._testIgnoreNullableImpl('test_ignore_nullable1', pa_schema1, pa_schema2) + + # write non-null to nullable + self._testIgnoreNullableImpl('test_ignore_nullable2', pa_schema2, pa_schema1) + + def _testIgnoreNullableImpl(self, table_name, table_schema, data_schema): + schema = Schema(table_schema) + self.catalog.create_table(f'default.{table_name}', schema, False) + table = self.catalog.get_table(f'default.{table_name}') + + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + } + df = pd.DataFrame(data) + record_batch = pa.RecordBatch.from_pandas(pd.DataFrame(data), data_schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow_batch(record_batch) + table_commit.commit(table_write.prepare_commit()) + + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_df = table_read.to_pandas(table_scan.plan().splits()) + df['f0'] = df['f0'].astype('int32') + pd.testing.assert_frame_equal( + actual_df.reset_index(drop=True), df.reset_index(drop=True)) From e8e56945560a050c5841db7a822647172bba0ea2 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 12 Nov 2024 13:53:25 +0800 Subject: [PATCH 4/8] [release-tool] Minor fix create_binary_release.sh (#25) (cherry picked from commit 42efa85fcc414f6522cd1754e891f718539eb700) --- tools/releasing/create_binary_release.sh | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh index 01bea1d..f216e9a 100755 --- a/tools/releasing/create_binary_release.sh +++ b/tools/releasing/create_binary_release.sh @@ -61,9 +61,12 @@ source dev/.conda/bin/activate dev/build-wheels.sh WHEEL_FILE_NAME="paimon_python-${RELEASE_VERSION}-py3-none-any.whl" -WHEEL_FILE="${RELEASE_DIR}/${WHEEL_FILE_NAME}" -cp "dist/${WHEEL_FILE_NAME}" ${WHEEL_FILE} +cp "dist/${WHEEL_FILE_NAME}" "${RELEASE_DIR}/${WHEEL_FILE_NAME}" + +cd ${RELEASE_DIR} # Sign sha the wheel package -gpg --armor --detach-sig ${WHEEL_FILE} -$SHASUM ${WHEEL_FILE} > "${WHEEL_FILE}.sha512" +gpg --armor --detach-sig ${WHEEL_FILE_NAME} +$SHASUM ${WHEEL_FILE_NAME} > "${WHEEL_FILE_NAME}.sha512" + +cd ${CURR_DIR} From c7a08df6b6e29a7c0d5d63cdd64d526593e257c9 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:08:13 +0800 Subject: [PATCH 5/8] [license] Update license/notice for project and inner python-java-bridge module and add ci for java module license check (#26) (cherry picked from commit c4bbf32b29420bf39d1fd004b03186215cfbd3ea) --- .../workflows/check-java-bridge-licensing.yml | 55 ++++ LICENSE | 20 ++ NOTICE | 267 +++++++++++++++++- .../paimon-python-java-bridge/pom.xml | 98 +++++-- .../src/main/resources/META-INF/NOTICE | 21 ++ .../tools/ci/log4j.properties | 43 +++ 6 files changed, 486 insertions(+), 18 deletions(-) create mode 100644 .github/workflows/check-java-bridge-licensing.yml create mode 100644 paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE create mode 100644 paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties diff --git a/.github/workflows/check-java-bridge-licensing.yml b/.github/workflows/check-java-bridge-licensing.yml new file mode 100644 index 0000000..1072532 --- /dev/null +++ b/.github/workflows/check-java-bridge-licensing.yml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Check Java Bridge Licensing + +on: [push, pull_request] + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + env: + MVN_COMMON_OPTIONS: -U -B --no-transfer-progress + MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out" + MVN_VALIDATION_DIR: "/tmp/paimon-validation-deployment" + + steps: + - uses: actions/checkout@v2 + + - name: Set JDK + uses: actions/setup-java@v2 + with: + java-version: 8 + distribution: 'adopt' + - name: Build + run: | + set -o pipefail + cd paimon_python_java/paimon-python-java-bridge + mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests \ + -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \ + | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} + + - name: Check licensing + run: | + cd paimon_python_java/paimon-python-java-bridge + mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-licensing -N \ + -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}" \ + -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties + env: + MAVEN_OPTS: -Xmx4096m diff --git a/LICENSE b/LICENSE index 261eeb9..f51b94d 100644 --- a/LICENSE +++ b/LICENSE @@ -199,3 +199,23 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +------------------------------------------------------------------------------------ + +This product bundles various third-party components under other open source licenses. +This section summarizes those components and their licenses. See licenses/ +for text of these licenses. + +Apache Software Foundation License 2.0 +-------------------------------------- + +paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java +paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils +paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils +paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer + +paimon_python_java/gateway_server.py +paimon_python_java/java_gateway.py +paimon_python_java/util/exceptions.py + +from http://flink.apache.org/ version 1.20.0 diff --git a/NOTICE b/NOTICE index b666c9f..81978c0 100644 --- a/NOTICE +++ b/NOTICE @@ -2,4 +2,269 @@ Apache Paimon Python Copyright 2024 The Apache Software Foundation This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file +The Apache Software Foundation (http://www.apache.org/). + + +This project bundles the following dependencies under the under the MIT license +- org.slf4j:slf4j-api:1.7.32 + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) +- org.apache.logging.log4j:log4j-1.2-api:2.17.1 +- org.apache.arrow:arrow-vector:14.0.0 +- org.apache.arrow:arrow-format:14.0.0 +- org.apache.arrow:arrow-memory-core:14.0.0 +- org.apache.arrow:arrow-memory-unsafe:14.0.0 +- org.apache.arrow:arrow-c-data:14.0.0 +- com.google.flatbuffers:flatbuffers-java:1.12.0 + +This project bundles the following dependencies under the 2-Clause BSD License +- net.sf.py4j:py4j:0.10.9.7 + + +Apache Paimon-shade (incubating) +Copyright 2023-2024 The Apache Software Foundation + +Paimon : Bundle +Copyright 2023-2024 The Apache Software Foundation + +paimon-common +Copyright 2023-2024 The Apache Software Foundation + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) +- org.roaringbitmap:RoaringBitmap:1.0.5 +- org.apache.datasketches:datasketches-java:4.2.0 +- org.apache.datasketches:datasketches-memory:2.2.0 + +This project bundles the following dependencies under the BSD 3-clause license. +You find them under licenses/LICENSE.antlr-runtime and licenses/LICENSE.janino. + +- org.antlr:antlr4-runtime:4.9.3 +- org.codehaus.janino:janino:3.0.11 +- org.codehaus.janino:commons-compiler:3.0.11 +- it.unimi.dsi:fastutil:8.5.12 +- net.openhft:zero-allocation-hashing:0.16 +- com.github.davidmoten:hilbert-curve:0.2.2 +- com.github.davidmoten:guava-mini:0.1.3 + +datasketches-java +Copyright 2015-2022 The Apache Software Foundation + +Apache DataSketches Memory +Copyright 2022 - The Apache Software Foundation + +Copyright 2015-2018 Yahoo Inc. +Copyright 2019-2020 Verizon Media +Copyright 2021 Yahoo Inc. + +Prior to moving to ASF, the software for this project was developed at +Yahoo Inc. (https://developer.yahoo.com). + +Paimon : Core +Copyright 2023-2024 The Apache Software Foundation + +Paimon : Code Gen Loader +Copyright 2023-2024 The Apache Software Foundation + +paimon-format +Copyright 2023-2024 The Apache Software Foundation + +This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- org.apache.orc:orc-core:1.9.2 +- org.apache.orc:orc-shims:1.9.2 +- org.apache.hive:hive-storage-api:2.8.1 +- io.airlift:aircompressor:0.27 +- commons-lang:commons-lang:2.6 +- org.apache.commons:commons-lang3:3.12.0 + +- org.apache.avro:avro:1.11.3 +- com.fasterxml.jackson.core:jackson-core:2.14.2 +- com.fasterxml.jackson.core:jackson-databind:2.14.2 +- com.fasterxml.jackson.core:jackson-annotations:2.14.2 +- org.apache.commons:commons-compress:1.22 + +- org.apache.parquet:parquet-hadoop:1.13.1 +- org.apache.parquet:parquet-column:1.13.1 +- org.apache.parquet:parquet-common:1.13.1 +- org.apache.parquet:parquet-encoding:1.13.1 +- org.apache.parquet:parquet-format-structures:1.13.1 +- org.apache.parquet:parquet-jackson:1.13.1 +- commons-pool:commons-pool:1.6 + +This project bundles the following dependencies under the BSD license. +You find it under licenses/LICENSE.protobuf, licenses/LICENSE.zstd-jni +and licenses/LICENSE.threeten-extra + +- com.google.protobuf:protobuf-java:3.19.6 +- com.github.luben:zstd-jni:1.5.5-11 +- org.threeten:threeten-extra:1.7.1 + +# Jackson JSON processor + +Jackson is a high-performance, Free/Open Source JSON processing library. +It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has +been in development since 2007. +It is currently developed by a community of developers. + +## Licensing + +Jackson 2.x core and extension components are licensed under Apache License 2.0 +To find the details that apply to this artifact see the accompanying LICENSE file. + +## Credits + +A list of contributors may be found from CREDITS(-2.x) file, which is included +in some artifacts (usually source distributions); but is always available +from the source code management (SCM) system project uses. + +Apache Commons Lang +Copyright 2001-2011 The Apache Software Foundation + +This product includes software developed by +The Apache Software Foundation (http://www.apache.org/). + +ORC Core +Copyright 2013-2023 The Apache Software Foundation + +ORC Shims +Copyright 2013-2023 The Apache Software Foundation + +Apache Commons Lang +Copyright 2001-2021 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (https://www.apache.org/). + +Hive Storage API +Copyright 2020 The Apache Software Foundation + +Apache Avro +Copyright 2009-2023 The Apache Software Foundation + +Apache Commons Compress +Copyright 2002-2022 The Apache Software Foundation + +--- + +The files in the package org.apache.commons.compress.archivers.sevenz +were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/), +which has been placed in the public domain: + +"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html) + +The test file lbzip2_32767.bz2 has been copied from libbzip2's source +repository: + +This program, "bzip2", the associated library "libbzip2", and all +documentation, are copyright (C) 1996-2019 Julian R Seward. All +rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + +2. The origin of this software must not be misrepresented; you must + not claim that you wrote the original software. If you use this + software in a product, an acknowledgment in the product + documentation would be appreciated but is not required. + +3. Altered source versions must be plainly marked as such, and must + not be misrepresented as being the original software. + +4. The name of the author may not be used to endorse or promote + products derived from this software without specific prior written + permission. + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS +OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Julian Seward, jseward@acm.org + +Apache Commons Pool +Copyright 2001-2012 The Apache Software Foundation + +Paimon : Hive Catalog +Copyright 2023-2024 The Apache Software Foundation + +Paimon : Hive Common +Copyright 2023-2024 The Apache Software Foundation + +paimon-shade-jackson-2 +Copyright 2023-2024 The Apache Software Foundation + +This project includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.fasterxml.jackson.core:jackson-annotations:2.14.2 +- com.fasterxml.jackson.core:jackson-core:2.14.2 +- com.fasterxml.jackson.core:jackson-databind:2.14.2 +- com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.14.2 +- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.2 +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.14.2 +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2 +- org.yaml:snakeyaml:1.33 + +Jackson is a high-performance, Free/Open Source JSON processing library. +It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has +been in development since 2007. +It is currently developed by a community of developers, as well as supported +commercially by FasterXML.com. + +Jackson core and extension components may be licensed under different licenses. +To find the details that apply to this artifact see the accompanying LICENSE file. +For more information, including possible other licensing options, contact +FasterXML.com (http://fasterxml.com). + +A list of contributors may be found from CREDITS file, which is included +in some artifacts (usually source distributions); but is always available +from the source code management (SCM) system project uses. + +paimon-shade-guava-30 +Copyright 2023-2024 The Apache Software Foundation + +- com.google.guava:guava:30.1.1-jre +- com.google.guava:failureaccess:1.0.1 + +paimon-shade-caffeine-2 +Copyright 2023-2024 The Apache Software Foundation + +- com.github.ben-manes.caffeine:caffeine:2.9.3 + +Paimon : Arrow +Copyright 2023-2024 The Apache Software Foundation + +Apache Log4j 1.x Compatibility API +Copyright 1999-1969 The Apache Software Foundation + +Arrow Vectors +Copyright 2023 The Apache Software Foundation + +Arrow Format +Copyright 2023 The Apache Software Foundation + +Arrow Memory - Core +Copyright 2023 The Apache Software Foundation + +Arrow Memory - Unsafe +Copyright 2023 The Apache Software Foundation + +Arrow Java C Data Interface +Copyright 2023 The Apache Software Foundation + +Apache Flink +Copyright 2014-2024 The Apache Software Foundation + diff --git a/paimon_python_java/paimon-python-java-bridge/pom.xml b/paimon_python_java/paimon-python-java-bridge/pom.xml index 52bc061..dd7f7f2 100644 --- a/paimon_python_java/paimon-python-java-bridge/pom.xml +++ b/paimon_python_java/paimon-python-java-bridge/pom.xml @@ -23,10 +23,11 @@ org.apache.paimon paimon-python-java-bridge - 0.1-SNAPSHOT + 0.9-SNAPSHOT Paimon : Python-Java Bridge jar + 2024 0.9.0 @@ -37,6 +38,7 @@ package 14.0.0 1.8 + 0.9.0 @@ -132,21 +134,21 @@ - org.apache.maven.plugins - maven-compiler-plugin - 3.8.0 - - ${target.java.version} - ${target.java.version} - - false - - - -Xpkginfo:always - -Xlint:deprecation - - - + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + ${target.java.version} + ${target.java.version} + + false + + + -Xpkginfo:always + -Xlint:deprecation + + + org.apache.maven.plugins @@ -159,6 +161,9 @@ shade + false + false + true org.apache.paimon:paimon-bundle @@ -174,6 +179,18 @@ net.sf.py4j:py4j + + + + + + Apache Paimon + ${project.inceptionYear} + UTF-8 + + com.fasterxml.jackson @@ -184,7 +201,54 @@ + + + org.apache.maven.plugins + maven-clean-plugin + + + + ${project.basedir} + + dependency-reduced-pom.xml + + + + + - + + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + false + + + check-license + + none + + java + + + + + org.apache.paimon.tools.ci.licensecheck.LicenseChecker + true + false + + + + org.apache.paimon + paimon-ci-tools + ${paimon.ci.tools.version} + + + + + + diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE b/paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..db5800c --- /dev/null +++ b/paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE @@ -0,0 +1,21 @@ +paimon-python-java-bridge +Copyright 2024 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the under the MIT license +- org.slf4j:slf4j-api:1.7.32 + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) +- org.apache.logging.log4j:log4j-1.2-api:2.17.1 +- org.apache.arrow:arrow-vector:14.0.0 +- org.apache.arrow:arrow-format:14.0.0 +- org.apache.arrow:arrow-memory-core:14.0.0 +- org.apache.arrow:arrow-memory-unsafe:14.0.0 +- org.apache.arrow:arrow-c-data:14.0.0 +- com.google.flatbuffers:flatbuffers-java:1.12.0 + +This project bundles the following dependencies under the 2-Clause BSD License +- net.sf.py4j:py4j:0.10.9.7 + diff --git a/paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties b/paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties new file mode 100644 index 0000000..7daf1c3 --- /dev/null +++ b/paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties @@ -0,0 +1,43 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.out.ref = ConsoleAppender + +# ----------------------------------------------------------------------------- +# Console (use 'console') +# ----------------------------------------------------------------------------- + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n + +# ----------------------------------------------------------------------------- +# File (use 'file') +# ----------------------------------------------------------------------------- +appender.file.name = FileAppender +appender.file.type = FILE +appender.file.fileName = ${sys:log.dir}/mvn-${sys:mvn.forkNumber:-output}.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n +appender.file.createOnDemand = true + +# suppress the irrelevant (wrong) warnings from the netty channel handler +logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline +logger.netty.level = ERROR From e0df3e439f1c5202f39600549d59647787471add Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Mon, 25 Nov 2024 20:40:20 +0800 Subject: [PATCH 6/8] Refactor ReadBuilder#with_projection to accept field names for better using (#27) (cherry picked from commit f09dc5887fdcab90b7bcac361e54414fab089fc9) --- paimon_python_api/read_builder.py | 2 +- paimon_python_java/pypaimon.py | 43 ++++++------- .../tests/test_write_and_read.py | 62 +++++++++++++++++++ paimon_python_java/util/java_utils.py | 9 +++ 4 files changed, 92 insertions(+), 24 deletions(-) diff --git a/paimon_python_api/read_builder.py b/paimon_python_api/read_builder.py index ad5e6d6..a031a05 100644 --- a/paimon_python_api/read_builder.py +++ b/paimon_python_api/read_builder.py @@ -32,7 +32,7 @@ def with_filter(self, predicate: Predicate): """ @abstractmethod - def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': + def with_projection(self, projection: List[str]) -> 'ReadBuilder': """Push nested projection.""" @abstractmethod diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index 16c7a69..b884fa4 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -61,37 +61,36 @@ class Table(table.Table): def __init__(self, j_table, catalog_options: dict): self._j_table = j_table self._catalog_options = catalog_options - # init arrow schema - schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_table.rowType()) - schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes)) - self._arrow_schema = schema_reader.schema - schema_reader.close() def new_read_builder(self) -> 'ReadBuilder': j_read_builder = get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table) - return ReadBuilder( - j_read_builder, self._j_table.rowType(), self._catalog_options, self._arrow_schema) + return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options) def new_batch_write_builder(self) -> 'BatchWriteBuilder': java_utils.check_batch_write(self._j_table) j_batch_write_builder = get_gateway().jvm.InvocationUtil.getBatchWriteBuilder(self._j_table) - return BatchWriteBuilder(j_batch_write_builder, self._j_table.rowType(), self._arrow_schema) + return BatchWriteBuilder(j_batch_write_builder) class ReadBuilder(read_builder.ReadBuilder): - def __init__(self, j_read_builder, j_row_type, catalog_options: dict, arrow_schema: pa.Schema): + def __init__(self, j_read_builder, j_row_type, catalog_options: dict): self._j_read_builder = j_read_builder self._j_row_type = j_row_type self._catalog_options = catalog_options - self._arrow_schema = arrow_schema def with_filter(self, predicate: 'Predicate'): self._j_read_builder.withFilter(predicate.to_j_predicate()) return self - def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder': - self._j_read_builder.withProjection(projection) + def with_projection(self, projection: List[str]) -> 'ReadBuilder': + field_names = list(map(lambda field: field.name(), self._j_row_type.getFields())) + int_projection = list(map(lambda p: field_names.index(p), projection)) + gateway = get_gateway() + int_projection_arr = gateway.new_array(gateway.jvm.int, len(projection)) + for i in range(len(projection)): + int_projection_arr[i] = int_projection[i] + self._j_read_builder.withProjection(int_projection_arr) return self def with_limit(self, limit: int) -> 'ReadBuilder': @@ -104,7 +103,7 @@ def new_scan(self) -> 'TableScan': def new_read(self) -> 'TableRead': j_table_read = self._j_read_builder.newRead().executeFilter() - return TableRead(j_table_read, self._j_row_type, self._catalog_options, self._arrow_schema) + return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options) def new_predicate_builder(self) -> 'PredicateBuilder': return PredicateBuilder(self._j_row_type) @@ -141,12 +140,12 @@ def to_j_split(self): class TableRead(table_read.TableRead): - def __init__(self, j_table_read, j_row_type, catalog_options, arrow_schema): + def __init__(self, j_table_read, j_read_type, catalog_options): self._j_table_read = j_table_read - self._j_row_type = j_row_type + self._j_read_type = j_read_type self._catalog_options = catalog_options self._j_bytes_reader = None - self._arrow_schema = arrow_schema + self._arrow_schema = java_utils.to_arrow_schema(j_read_type) def to_arrow(self, splits): record_batch_reader = self.to_arrow_batch_reader(splits) @@ -174,7 +173,7 @@ def _init(self): if max_workers <= 0: raise ValueError("max_workers must be greater than 0") self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader( - self._j_table_read, self._j_row_type, max_workers) + self._j_table_read, self._j_read_type, max_workers) def _batch_generator(self) -> Iterator[pa.RecordBatch]: while True: @@ -188,10 +187,8 @@ def _batch_generator(self) -> Iterator[pa.RecordBatch]: class BatchWriteBuilder(write_builder.BatchWriteBuilder): - def __init__(self, j_batch_write_builder, j_row_type, arrow_schema: pa.Schema): + def __init__(self, j_batch_write_builder): self._j_batch_write_builder = j_batch_write_builder - self._j_row_type = j_row_type - self._arrow_schema = arrow_schema def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuilder': if static_partition is None: @@ -201,7 +198,7 @@ def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuild def new_write(self) -> 'BatchTableWrite': j_batch_table_write = self._j_batch_write_builder.newWrite() - return BatchTableWrite(j_batch_table_write, self._j_row_type, self._arrow_schema) + return BatchTableWrite(j_batch_table_write, self._j_batch_write_builder.rowType()) def new_commit(self) -> 'BatchTableCommit': j_batch_table_commit = self._j_batch_write_builder.newCommit() @@ -210,11 +207,11 @@ def new_commit(self) -> 'BatchTableCommit': class BatchTableWrite(table_write.BatchTableWrite): - def __init__(self, j_batch_table_write, j_row_type, arrow_schema: pa.Schema): + def __init__(self, j_batch_table_write, j_row_type): self._j_batch_table_write = j_batch_table_write self._j_bytes_writer = get_gateway().jvm.InvocationUtil.createBytesWriter( j_batch_table_write, j_row_type) - self._arrow_schema = arrow_schema + self._arrow_schema = java_utils.to_arrow_schema(j_row_type) def write_arrow(self, table): for record_batch in table.to_reader(): diff --git a/paimon_python_java/tests/test_write_and_read.py b/paimon_python_java/tests/test_write_and_read.py index b468e9f..337b9f5 100644 --- a/paimon_python_java/tests/test_write_and_read.py +++ b/paimon_python_java/tests/test_write_and_read.py @@ -445,3 +445,65 @@ def _testIgnoreNullableImpl(self, table_name, table_schema, data_schema): df['f0'] = df['f0'].astype('int32') pd.testing.assert_frame_equal( actual_df.reset_index(drop=True), df.reset_index(drop=True)) + + def testProjection(self): + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ('f2', pa.bool_()), + ('f3', pa.string()) + ]) + schema = Schema(pa_schema) + self.catalog.create_table('default.test_projection', schema, False) + table = self.catalog.get_table('default.test_projection') + + # prepare data + data = { + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + 'f2': [True, True, False], + 'f3': ['A', 'B', 'C'] + } + df = pd.DataFrame(data) + + # write and commit data + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + table_write.write_pandas(df) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + + table_write.close() + table_commit.close() + + # case 1: read empty + read_builder = table.new_read_builder().with_projection([]) + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result1 = table_read.to_pandas(splits) + self.assertTrue(result1.empty) + + # case 2: read fully + read_builder = table.new_read_builder().with_projection(['f0', 'f1', 'f2', 'f3']) + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result2 = table_read.to_pandas(splits) + pd.testing.assert_frame_equal( + result2.reset_index(drop=True), df.reset_index(drop=True)) + + # case 3: read partially + read_builder = table.new_read_builder().with_projection(['f3', 'f2']) + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result3 = table_read.to_pandas(splits) + expected_df = pd.DataFrame({ + 'f3': ['A', 'B', 'C'], + 'f2': [True, True, False] + }) + pd.testing.assert_frame_equal( + result3.reset_index(drop=True), expected_df.reset_index(drop=True)) diff --git a/paimon_python_java/util/java_utils.py b/paimon_python_java/util/java_utils.py index 8c4f276..ce0404a 100644 --- a/paimon_python_java/util/java_utils.py +++ b/paimon_python_java/util/java_utils.py @@ -91,3 +91,12 @@ def _to_j_type(name, pa_type): return jvm.DataTypes.STRING() else: raise ValueError(f'Found unsupported data type {str(pa_type)} for field {name}.') + + +def to_arrow_schema(j_row_type): + # init arrow schema + schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_row_type) + schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes)) + arrow_schema = schema_reader.schema + schema_reader.close() + return arrow_schema From 9f8fe2dad80f43c51540dea718de0356763335bb Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 26 Nov 2024 13:08:14 +0800 Subject: [PATCH 7/8] Add read API to convert result to DuckDB and Ray (#28) (cherry picked from commit 03108ec2edb1b08ffb53de52c5452dc75adf2b34) --- dev/dev-requirements.txt | 2 + paimon_python_api/table_read.py | 16 +++++- paimon_python_java/pypaimon.py | 15 ++++++ .../tests/test_write_and_read.py | 49 ++++++++++++++----- 4 files changed, 70 insertions(+), 12 deletions(-) diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt index 7fd1aeb..4ed964e 100755 --- a/dev/dev-requirements.txt +++ b/dev/dev-requirements.txt @@ -26,3 +26,5 @@ numpy>=1.22.4 python-dateutil>=2.8.0,<3 pytz>=2018.3 pytest~=7.0 +duckdb>=0.5.0,<2.0.0 +ray~=2.10.0 diff --git a/paimon_python_api/table_read.py b/paimon_python_api/table_read.py index 24095b4..f0a7b59 100644 --- a/paimon_python_api/table_read.py +++ b/paimon_python_api/table_read.py @@ -18,10 +18,12 @@ import pandas as pd import pyarrow as pa +import ray from abc import ABC, abstractmethod +from duckdb.duckdb import DuckDBPyConnection from paimon_python_api import Split -from typing import List +from typing import List, Optional class TableRead(ABC): @@ -38,3 +40,15 @@ def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader: @abstractmethod def to_pandas(self, splits: List[Split]) -> pd.DataFrame: """Read data from splits and converted to pandas.DataFrame format.""" + + @abstractmethod + def to_duckdb( + self, + splits: List[Split], + table_name: str, + connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Convert splits into an in-memory DuckDB table which can be queried.""" + + @abstractmethod + def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + """Convert splits into a Ray dataset format.""" diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py index b884fa4..803540c 100644 --- a/paimon_python_java/pypaimon.py +++ b/paimon_python_java/pypaimon.py @@ -16,9 +16,12 @@ # limitations under the License. ################################################################################ +import duckdb import pandas as pd import pyarrow as pa +import ray +from duckdb.duckdb import DuckDBPyConnection from paimon_python_java.java_gateway import get_gateway from paimon_python_java.util import java_utils, constants from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, @@ -161,6 +164,18 @@ def to_arrow_batch_reader(self, splits): def to_pandas(self, splits: List[Split]) -> pd.DataFrame: return self.to_arrow(splits).to_pandas() + def to_duckdb( + self, + splits: List[Split], + table_name: str, + connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow(splits)) + return con + + def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset: + return ray.data.from_arrow(self.to_arrow(splits)) + def _init(self): if self._j_bytes_reader is None: # get thread num diff --git a/paimon_python_java/tests/test_write_and_read.py b/paimon_python_java/tests/test_write_and_read.py index 337b9f5..e2c631d 100644 --- a/paimon_python_java/tests/test_write_and_read.py +++ b/paimon_python_java/tests/test_write_and_read.py @@ -267,6 +267,12 @@ def testAllWriteAndReadApi(self): table_write.close() table_commit.close() + all_data = pd.DataFrame({ + 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], + 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], + }) + all_data['f0'] = all_data['f0'].astype('int32') + read_builder = table.new_read_builder() table_scan = read_builder.new_scan() table_read = read_builder.new_read() @@ -274,10 +280,7 @@ def testAllWriteAndReadApi(self): # to_arrow actual = table_read.to_arrow(splits) - expected = pa.Table.from_pydict({ - 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], - 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], - }, schema=self.simple_pa_schema) + expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema) self.assertEqual(actual, expected) # to_arrow_batch_reader @@ -286,18 +289,42 @@ def testAllWriteAndReadApi(self): for batch in table_read.to_arrow_batch_reader(splits) ] actual = pd.concat(data_frames) - expected = pd.DataFrame({ - 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9], - 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'], - }) - expected['f0'] = expected['f0'].astype('int32') pd.testing.assert_frame_equal( - actual.reset_index(drop=True), expected.reset_index(drop=True)) + actual.reset_index(drop=True), all_data.reset_index(drop=True)) # to_pandas actual = table_read.to_pandas(splits) pd.testing.assert_frame_equal( - actual.reset_index(drop=True), expected.reset_index(drop=True)) + actual.reset_index(drop=True), all_data.reset_index(drop=True)) + + # to_duckdb + duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') + # select * + result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf() + pd.testing.assert_frame_equal( + result1.reset_index(drop=True), all_data.reset_index(drop=True)) + # select * where + result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 < 4").fetchdf() + expected2 = pd.DataFrame({ + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'] + }) + expected2['f0'] = expected2['f0'].astype('int32') + pd.testing.assert_frame_equal( + result2.reset_index(drop=True), expected2.reset_index(drop=True)) + # select f0 where + result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 < 4").fetchdf() + expected3 = pd.DataFrame({ + 'f0': [1, 2, 3] + }) + expected3['f0'] = expected3['f0'].astype('int32') + pd.testing.assert_frame_equal( + result3.reset_index(drop=True), expected3.reset_index(drop=True)) + + # to_ray + ray_dataset = table_read.to_ray(splits) + pd.testing.assert_frame_equal( + ray_dataset.to_pandas().reset_index(drop=True), all_data.reset_index(drop=True)) def test_overwrite(self): schema = Schema(self.simple_pa_schema, partition_keys=['f0'], From b18032950ef210a8f5a9bc3df3f668475a6554ac Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Tue, 26 Nov 2024 20:51:09 +0800 Subject: [PATCH 8/8] [Rename] Rename module and package to pypaimon (#29) (cherry picked from commit 33d5253d059c82c027faa566226c1eb8329c62f1) --- .../workflows/check-java-bridge-licensing.yml | 4 ++-- pypaimon/__init__.py | 21 +++++++++++++++++++ .../api}/__init__.py | 0 .../api}/catalog.py | 2 +- .../api}/commit_message.py | 0 .../api}/predicate.py | 0 .../api}/read_builder.py | 2 +- {paimon_python_api => pypaimon/api}/split.py | 0 {paimon_python_api => pypaimon/api}/table.py | 2 +- .../api}/table_commit.py | 2 +- .../api}/table_read.py | 2 +- .../api}/table_scan.py | 2 +- .../api}/table_write.py | 2 +- .../api}/write_builder.py | 2 +- .../py4j}/__init__.py | 7 ++++--- .../py4j}/gateway_server.py | 4 ++-- .../py4j}/java_gateway.py | 6 +++--- .../py4j/java_implementation.py | 13 +++++++----- .../paimon-python-java-bridge/copyright.txt | 0 .../py4j}/paimon-python-java-bridge/pom.xml | 0 .../org/apache/paimon/python/BytesWriter.java | 0 .../org/apache/paimon/python/FileLock.java | 0 .../apache/paimon/python/InvocationUtil.java | 0 .../org/apache/paimon/python/NetUtils.java | 0 .../paimon/python/ParallelBytesReader.java | 0 .../apache/paimon/python/PredicationUtil.java | 0 .../apache/paimon/python/PythonEnvUtils.java | 0 .../paimon/python/PythonGatewayServer.java | 0 .../paimon/python/RecordBytesIterator.java | 0 .../org/apache/paimon/python/SchemaUtil.java | 0 .../src/main/resources/META-INF/NOTICE | 0 .../tools/ci/log4j.properties | 0 .../tools/maven/checkstyle.xml | 0 .../tools/maven/suppressions.xml | 0 .../py4j}/tests/__init__.py | 0 .../py4j}/tests/test_data_types.py | 8 +++---- .../py4j}/tests/test_preicates.py | 6 +++--- .../py4j}/tests/test_write_and_read.py | 10 ++++----- .../py4j}/tests/utils.py | 2 +- .../py4j}/util/__init__.py | 0 .../py4j}/util/constants.py | 0 .../py4j}/util/exceptions.py | 0 .../py4j}/util/java_utils.py | 4 ++-- setup.py | 11 +++++----- setup_utils/java_setuputils.py | 2 +- tools/releasing/create_binary_release.sh | 2 +- tools/releasing/create_source_release.sh | 2 +- tox.ini | 2 +- 48 files changed, 73 insertions(+), 47 deletions(-) create mode 100644 pypaimon/__init__.py rename {paimon_python_api => pypaimon/api}/__init__.py (100%) rename {paimon_python_api => pypaimon/api}/catalog.py (97%) rename {paimon_python_api => pypaimon/api}/commit_message.py (100%) rename {paimon_python_api => pypaimon/api}/predicate.py (100%) rename {paimon_python_api => pypaimon/api}/read_builder.py (95%) rename {paimon_python_api => pypaimon/api}/split.py (100%) rename {paimon_python_api => pypaimon/api}/table.py (96%) rename {paimon_python_api => pypaimon/api}/table_commit.py (97%) rename {paimon_python_api => pypaimon/api}/table_read.py (98%) rename {paimon_python_api => pypaimon/api}/table_scan.py (97%) rename {paimon_python_api => pypaimon/api}/table_write.py (97%) rename {paimon_python_api => pypaimon/api}/write_builder.py (96%) rename {paimon_python_java => pypaimon/py4j}/__init__.py (84%) rename {paimon_python_java => pypaimon/py4j}/gateway_server.py (97%) rename {paimon_python_java => pypaimon/py4j}/java_gateway.py (96%) rename paimon_python_java/pypaimon.py => pypaimon/py4j/java_implementation.py (97%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/copyright.txt (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/pom.xml (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/RecordBytesIterator.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/SchemaUtil.java (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/tools/ci/log4j.properties (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/tools/maven/checkstyle.xml (100%) rename {paimon_python_java => pypaimon/py4j}/paimon-python-java-bridge/tools/maven/suppressions.xml (100%) rename {paimon_python_java => pypaimon/py4j}/tests/__init__.py (100%) rename {paimon_python_java => pypaimon/py4j}/tests/test_data_types.py (95%) rename {paimon_python_java => pypaimon/py4j}/tests/test_preicates.py (99%) rename {paimon_python_java => pypaimon/py4j}/tests/test_write_and_read.py (98%) rename {paimon_python_java => pypaimon/py4j}/tests/utils.py (97%) rename {paimon_python_java => pypaimon/py4j}/util/__init__.py (100%) rename {paimon_python_java => pypaimon/py4j}/util/constants.py (100%) rename {paimon_python_java => pypaimon/py4j}/util/exceptions.py (100%) rename {paimon_python_java => pypaimon/py4j}/util/java_utils.py (97%) diff --git a/.github/workflows/check-java-bridge-licensing.yml b/.github/workflows/check-java-bridge-licensing.yml index 1072532..153dfeb 100644 --- a/.github/workflows/check-java-bridge-licensing.yml +++ b/.github/workflows/check-java-bridge-licensing.yml @@ -40,14 +40,14 @@ jobs: - name: Build run: | set -o pipefail - cd paimon_python_java/paimon-python-java-bridge + cd pypaimon/py4j/paimon-python-java-bridge mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests \ -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \ | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} - name: Check licensing run: | - cd paimon_python_java/paimon-python-java-bridge + cd pypaimon/py4j/paimon-python-java-bridge mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-licensing -N \ -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}" \ -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties diff --git a/pypaimon/__init__.py b/pypaimon/__init__.py new file mode 100644 index 0000000..0cd0eb5 --- /dev/null +++ b/pypaimon/__init__.py @@ -0,0 +1,21 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# + +from .api import Schema + +__all__ = ['Schema'] diff --git a/paimon_python_api/__init__.py b/pypaimon/api/__init__.py similarity index 100% rename from paimon_python_api/__init__.py rename to pypaimon/api/__init__.py diff --git a/paimon_python_api/catalog.py b/pypaimon/api/catalog.py similarity index 97% rename from paimon_python_api/catalog.py rename to pypaimon/api/catalog.py index a4a863f..3132159 100644 --- a/paimon_python_api/catalog.py +++ b/pypaimon/api/catalog.py @@ -18,7 +18,7 @@ from abc import ABC, abstractmethod from typing import Optional -from paimon_python_api import Table, Schema +from pypaimon.api import Table, Schema class Catalog(ABC): diff --git a/paimon_python_api/commit_message.py b/pypaimon/api/commit_message.py similarity index 100% rename from paimon_python_api/commit_message.py rename to pypaimon/api/commit_message.py diff --git a/paimon_python_api/predicate.py b/pypaimon/api/predicate.py similarity index 100% rename from paimon_python_api/predicate.py rename to pypaimon/api/predicate.py diff --git a/paimon_python_api/read_builder.py b/pypaimon/api/read_builder.py similarity index 95% rename from paimon_python_api/read_builder.py rename to pypaimon/api/read_builder.py index a031a05..68b7d46 100644 --- a/paimon_python_api/read_builder.py +++ b/pypaimon/api/read_builder.py @@ -17,7 +17,7 @@ ################################################################################# from abc import ABC, abstractmethod -from paimon_python_api import TableRead, TableScan, Predicate, PredicateBuilder +from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder from typing import List diff --git a/paimon_python_api/split.py b/pypaimon/api/split.py similarity index 100% rename from paimon_python_api/split.py rename to pypaimon/api/split.py diff --git a/paimon_python_api/table.py b/pypaimon/api/table.py similarity index 96% rename from paimon_python_api/table.py rename to pypaimon/api/table.py index 0170cb1..7eef7b4 100644 --- a/paimon_python_api/table.py +++ b/pypaimon/api/table.py @@ -19,7 +19,7 @@ import pyarrow as pa from abc import ABC, abstractmethod -from paimon_python_api import ReadBuilder, BatchWriteBuilder +from pypaimon.api import ReadBuilder, BatchWriteBuilder from typing import Optional, List diff --git a/paimon_python_api/table_commit.py b/pypaimon/api/table_commit.py similarity index 97% rename from paimon_python_api/table_commit.py rename to pypaimon/api/table_commit.py index d76ed6a..eed8a82 100644 --- a/paimon_python_api/table_commit.py +++ b/pypaimon/api/table_commit.py @@ -17,7 +17,7 @@ ################################################################################# from abc import ABC, abstractmethod -from paimon_python_api import CommitMessage +from pypaimon.api import CommitMessage from typing import List diff --git a/paimon_python_api/table_read.py b/pypaimon/api/table_read.py similarity index 98% rename from paimon_python_api/table_read.py rename to pypaimon/api/table_read.py index f0a7b59..9fcb78c 100644 --- a/paimon_python_api/table_read.py +++ b/pypaimon/api/table_read.py @@ -22,7 +22,7 @@ from abc import ABC, abstractmethod from duckdb.duckdb import DuckDBPyConnection -from paimon_python_api import Split +from pypaimon.api import Split from typing import List, Optional diff --git a/paimon_python_api/table_scan.py b/pypaimon/api/table_scan.py similarity index 97% rename from paimon_python_api/table_scan.py rename to pypaimon/api/table_scan.py index d998462..8830676 100644 --- a/paimon_python_api/table_scan.py +++ b/pypaimon/api/table_scan.py @@ -18,7 +18,7 @@ from abc import ABC, abstractmethod from typing import List -from paimon_python_api import Split +from pypaimon.api import Split class TableScan(ABC): diff --git a/paimon_python_api/table_write.py b/pypaimon/api/table_write.py similarity index 97% rename from paimon_python_api/table_write.py rename to pypaimon/api/table_write.py index d1d39a7..8839620 100644 --- a/paimon_python_api/table_write.py +++ b/pypaimon/api/table_write.py @@ -20,7 +20,7 @@ import pyarrow as pa from abc import ABC, abstractmethod -from paimon_python_api import CommitMessage +from pypaimon.api import CommitMessage from typing import List diff --git a/paimon_python_api/write_builder.py b/pypaimon/api/write_builder.py similarity index 96% rename from paimon_python_api/write_builder.py rename to pypaimon/api/write_builder.py index 7835179..ce9cba3 100644 --- a/paimon_python_api/write_builder.py +++ b/pypaimon/api/write_builder.py @@ -17,7 +17,7 @@ ################################################################################# from abc import ABC, abstractmethod -from paimon_python_api import BatchTableCommit, BatchTableWrite +from pypaimon.api import BatchTableCommit, BatchTableWrite from typing import Optional diff --git a/paimon_python_java/__init__.py b/pypaimon/py4j/__init__.py similarity index 84% rename from paimon_python_java/__init__.py rename to pypaimon/py4j/__init__.py index 9b0d002..c3b70ff 100644 --- a/paimon_python_java/__init__.py +++ b/pypaimon/py4j/__init__.py @@ -17,9 +17,10 @@ ################################################################################ from .util import constants -from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead, - BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit, - Predicate, PredicateBuilder) +from .java_implementation import \ + (Catalog, Table, ReadBuilder, TableScan, Plan, Split, + TableRead, BatchWriteBuilder, BatchTableWrite, CommitMessage, + BatchTableCommit, Predicate, PredicateBuilder) __all__ = [ 'constants', diff --git a/paimon_python_java/gateway_server.py b/pypaimon/py4j/gateway_server.py similarity index 97% rename from paimon_python_java/gateway_server.py rename to pypaimon/py4j/gateway_server.py index 2061d59..9a259e0 100644 --- a/paimon_python_java/gateway_server.py +++ b/pypaimon/py4j/gateway_server.py @@ -22,7 +22,7 @@ import signal from subprocess import Popen, PIPE -from paimon_python_java import constants +from pypaimon.py4j import constants def on_windows(): @@ -74,7 +74,7 @@ def preexec_func(): stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) -_JAVA_IMPL_MODULE = 'paimon_python_java' +_JAVA_IMPL_MODULE = 'pypaimon.py4j' _JAVA_DEPS = 'java_dependencies' _JAVA_BRIDGE = 'paimon-python-java-bridge' diff --git a/paimon_python_java/java_gateway.py b/pypaimon/py4j/java_gateway.py similarity index 96% rename from paimon_python_java/java_gateway.py rename to pypaimon/py4j/java_gateway.py index 3dabcfd..41d7a0d 100644 --- a/paimon_python_java/java_gateway.py +++ b/pypaimon/py4j/java_gateway.py @@ -26,9 +26,9 @@ CallbackServerParameters) from threading import RLock -from paimon_python_java.gateway_server import launch_gateway_server_process -from paimon_python_java import constants -from paimon_python_java.util.exceptions import install_py4j_hooks +from pypaimon.py4j.gateway_server import launch_gateway_server_process +from pypaimon.py4j import constants +from pypaimon.py4j.util.exceptions import install_py4j_hooks _gateway = None _lock = RLock() diff --git a/paimon_python_java/pypaimon.py b/pypaimon/py4j/java_implementation.py similarity index 97% rename from paimon_python_java/pypaimon.py rename to pypaimon/py4j/java_implementation.py index 803540c..17c6eda 100644 --- a/paimon_python_java/pypaimon.py +++ b/pypaimon/py4j/java_implementation.py @@ -16,17 +16,20 @@ # limitations under the License. ################################################################################ +# pypaimon.api implementation based on Java code & py4j lib + import duckdb import pandas as pd import pyarrow as pa import ray from duckdb.duckdb import DuckDBPyConnection -from paimon_python_java.java_gateway import get_gateway -from paimon_python_java.util import java_utils, constants -from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read, - write_builder, table_write, commit_message, table_commit, Schema, - predicate) +from pypaimon.py4j.java_gateway import get_gateway +from pypaimon.py4j.util import java_utils, constants +from pypaimon.api import \ + (catalog, table, read_builder, table_scan, split, + table_read, write_builder, table_write, commit_message, + table_commit, Schema, predicate) from typing import List, Iterator, Optional, Any diff --git a/paimon_python_java/paimon-python-java-bridge/copyright.txt b/pypaimon/py4j/paimon-python-java-bridge/copyright.txt similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/copyright.txt rename to pypaimon/py4j/paimon-python-java-bridge/copyright.txt diff --git a/paimon_python_java/paimon-python-java-bridge/pom.xml b/pypaimon/py4j/paimon-python-java-bridge/pom.xml similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/pom.xml rename to pypaimon/py4j/paimon-python-java-bridge/pom.xml diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/RecordBytesIterator.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/RecordBytesIterator.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/RecordBytesIterator.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/RecordBytesIterator.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/SchemaUtil.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/SchemaUtil.java similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/SchemaUtil.java rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/SchemaUtil.java diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE b/pypaimon/py4j/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE rename to pypaimon/py4j/paimon-python-java-bridge/src/main/resources/META-INF/NOTICE diff --git a/paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties b/pypaimon/py4j/paimon-python-java-bridge/tools/ci/log4j.properties similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/tools/ci/log4j.properties rename to pypaimon/py4j/paimon-python-java-bridge/tools/ci/log4j.properties diff --git a/paimon_python_java/paimon-python-java-bridge/tools/maven/checkstyle.xml b/pypaimon/py4j/paimon-python-java-bridge/tools/maven/checkstyle.xml similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/tools/maven/checkstyle.xml rename to pypaimon/py4j/paimon-python-java-bridge/tools/maven/checkstyle.xml diff --git a/paimon_python_java/paimon-python-java-bridge/tools/maven/suppressions.xml b/pypaimon/py4j/paimon-python-java-bridge/tools/maven/suppressions.xml similarity index 100% rename from paimon_python_java/paimon-python-java-bridge/tools/maven/suppressions.xml rename to pypaimon/py4j/paimon-python-java-bridge/tools/maven/suppressions.xml diff --git a/paimon_python_java/tests/__init__.py b/pypaimon/py4j/tests/__init__.py similarity index 100% rename from paimon_python_java/tests/__init__.py rename to pypaimon/py4j/tests/__init__.py diff --git a/paimon_python_java/tests/test_data_types.py b/pypaimon/py4j/tests/test_data_types.py similarity index 95% rename from paimon_python_java/tests/test_data_types.py rename to pypaimon/py4j/tests/test_data_types.py index 3920cd2..5fb809e 100644 --- a/paimon_python_java/tests/test_data_types.py +++ b/pypaimon/py4j/tests/test_data_types.py @@ -24,10 +24,10 @@ import pyarrow as pa import unittest -from paimon_python_api import Schema -from paimon_python_java import Catalog -from paimon_python_java.tests import utils -from paimon_python_java.util import java_utils +from pypaimon import Schema +from pypaimon.py4j import Catalog +from pypaimon.py4j.tests import utils +from pypaimon.py4j.util import java_utils from setup_utils import java_setuputils diff --git a/paimon_python_java/tests/test_preicates.py b/pypaimon/py4j/tests/test_preicates.py similarity index 99% rename from paimon_python_java/tests/test_preicates.py rename to pypaimon/py4j/tests/test_preicates.py index 7ee1a91..5b63759 100644 --- a/paimon_python_java/tests/test_preicates.py +++ b/pypaimon/py4j/tests/test_preicates.py @@ -24,9 +24,9 @@ import pandas as pd import pyarrow as pa -from paimon_python_api import Schema -from paimon_python_java import Catalog -from paimon_python_java.tests import utils +from pypaimon import Schema +from pypaimon.py4j import Catalog +from pypaimon.py4j.tests import utils from setup_utils import java_setuputils diff --git a/paimon_python_java/tests/test_write_and_read.py b/pypaimon/py4j/tests/test_write_and_read.py similarity index 98% rename from paimon_python_java/tests/test_write_and_read.py rename to pypaimon/py4j/tests/test_write_and_read.py index e2c631d..27528d1 100644 --- a/paimon_python_java/tests/test_write_and_read.py +++ b/pypaimon/py4j/tests/test_write_and_read.py @@ -24,11 +24,11 @@ import pyarrow as pa from py4j.protocol import Py4JJavaError -from paimon_python_api import Schema -from paimon_python_java import Catalog -from paimon_python_java.java_gateway import get_gateway -from paimon_python_java.tests import utils -from paimon_python_java.util import java_utils +from pypaimon import Schema +from pypaimon.py4j import Catalog +from pypaimon.py4j.java_gateway import get_gateway +from pypaimon.py4j.tests import utils +from pypaimon.py4j.util import java_utils from setup_utils import java_setuputils diff --git a/paimon_python_java/tests/utils.py b/pypaimon/py4j/tests/utils.py similarity index 97% rename from paimon_python_java/tests/utils.py rename to pypaimon/py4j/tests/utils.py index 350f80e..18d924e 100644 --- a/paimon_python_java/tests/utils.py +++ b/pypaimon/py4j/tests/utils.py @@ -19,7 +19,7 @@ import os import urllib.request -from paimon_python_java.util import constants +from pypaimon.py4j import constants def setup_hadoop_bundle_jar(hadoop_dir): diff --git a/paimon_python_java/util/__init__.py b/pypaimon/py4j/util/__init__.py similarity index 100% rename from paimon_python_java/util/__init__.py rename to pypaimon/py4j/util/__init__.py diff --git a/paimon_python_java/util/constants.py b/pypaimon/py4j/util/constants.py similarity index 100% rename from paimon_python_java/util/constants.py rename to pypaimon/py4j/util/constants.py diff --git a/paimon_python_java/util/exceptions.py b/pypaimon/py4j/util/exceptions.py similarity index 100% rename from paimon_python_java/util/exceptions.py rename to pypaimon/py4j/util/exceptions.py diff --git a/paimon_python_java/util/java_utils.py b/pypaimon/py4j/util/java_utils.py similarity index 97% rename from paimon_python_java/util/java_utils.py rename to pypaimon/py4j/util/java_utils.py index ce0404a..0beb527 100644 --- a/paimon_python_java/util/java_utils.py +++ b/pypaimon/py4j/util/java_utils.py @@ -18,8 +18,8 @@ import pyarrow as pa -from paimon_python_api import Schema -from paimon_python_java.java_gateway import get_gateway +from pypaimon import Schema +from pypaimon.py4j.java_gateway import get_gateway def to_j_catalog_context(catalog_options: dict): diff --git a/setup.py b/setup.py index 999e3c5..628a6b8 100644 --- a/setup.py +++ b/setup.py @@ -50,13 +50,14 @@ def run(self): try: PACKAGES = [ - 'paimon_python_api', - 'paimon_python_java', - 'paimon_python_java.util' + 'pypaimon', + 'pypaimon.api', + 'pypaimon.py4j', + 'pypaimon.py4j.util' ] PACKAGE_DATA = { - 'paimon_python_java': java_setuputils.get_package_data() + 'pypaimon.py4j': java_setuputils.get_package_data() } install_requires = [ @@ -72,7 +73,7 @@ def run(self): [Doc](https://paimon.apache.org/docs/master/program-api/python-api/) for usage.' setup( - name='paimon_python', + name='pypaimon', version=setup_utils.version.__version__, packages=PACKAGES, include_package_data=True, diff --git a/setup_utils/java_setuputils.py b/setup_utils/java_setuputils.py index 49e2ea9..01b02e8 100755 --- a/setup_utils/java_setuputils.py +++ b/setup_utils/java_setuputils.py @@ -22,7 +22,7 @@ from xml.etree import ElementTree -_JAVA_IMPL_MODULE = 'paimon_python_java' +_JAVA_IMPL_MODULE = 'pypaimon/py4j' _JAVA_DEPS = 'java_dependencies' _JAVA_BRIDGE = 'paimon-python-java-bridge' diff --git a/tools/releasing/create_binary_release.sh b/tools/releasing/create_binary_release.sh index f216e9a..8188fb4 100755 --- a/tools/releasing/create_binary_release.sh +++ b/tools/releasing/create_binary_release.sh @@ -60,7 +60,7 @@ source dev/.conda/bin/activate # build dev/build-wheels.sh -WHEEL_FILE_NAME="paimon_python-${RELEASE_VERSION}-py3-none-any.whl" +WHEEL_FILE_NAME="pypaimon-${RELEASE_VERSION}-py3-none-any.whl" cp "dist/${WHEEL_FILE_NAME}" "${RELEASE_DIR}/${WHEEL_FILE_NAME}" cd ${RELEASE_DIR} diff --git a/tools/releasing/create_source_release.sh b/tools/releasing/create_source_release.sh index 4d07db5..7d6a8a9 100755 --- a/tools/releasing/create_source_release.sh +++ b/tools/releasing/create_source_release.sh @@ -66,7 +66,7 @@ echo "Creating source package" git clone ${PROJECT_ROOT} ${CLONE_DIR} cd ${CLONE_DIR} -JAVA_ROOT="paimon_python_java/paimon-python-java-bridge" +JAVA_ROOT="pypaimon/py4j/paimon-python-java-bridge" rsync -a \ --exclude ".DS_Store" --exclude ".asf.yaml" --exclude ".git" \ --exclude ".github" --exclude ".gitignore" --exclude ".idea" \ diff --git a/tox.ini b/tox.ini index da9b377..8ce06f8 100644 --- a/tox.ini +++ b/tox.ini @@ -45,6 +45,6 @@ max-line-length=100 exclude=.tox/*,dev/*,build/*,dist/* [mypy] -files=paimon_python_api/*.py +files=pypaimon/api/*.py ignore_missing_imports = True strict_optional=False