diff --git a/.github/workflows/check-java-bridge-licensing.yml b/.github/workflows/check-java-bridge-licensing.yml
new file mode 100644
index 0000000..153dfeb
--- /dev/null
+++ b/.github/workflows/check-java-bridge-licensing.yml
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Check Java Bridge Licensing
+
+on: [push, pull_request]
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
+ cancel-in-progress: true
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ env:
+ MVN_COMMON_OPTIONS: -U -B --no-transfer-progress
+ MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
+ MVN_VALIDATION_DIR: "/tmp/paimon-validation-deployment"
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Set JDK
+ uses: actions/setup-java@v2
+ with:
+ java-version: 8
+ distribution: 'adopt'
+ - name: Build
+ run: |
+ set -o pipefail
+ cd pypaimon/py4j/paimon-python-java-bridge
+ mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests \
+ -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \
+ | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
+
+ - name: Check licensing
+ run: |
+ cd pypaimon/py4j/paimon-python-java-bridge
+ mvn ${{ env.MVN_COMMON_OPTIONS }} exec:java@check-licensing -N \
+ -Dexec.args="${{ env.MVN_BUILD_OUTPUT_FILE }} $(pwd) ${{ env.MVN_VALIDATION_DIR }}" \
+ -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties
+ env:
+ MAVEN_OPTS: -Xmx4096m
diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml
index e94820b..195783f 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -43,7 +43,14 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
+ - name: Set up hadoop dependency
+ run: |
+ mkdir -p ${{ github.workspace }}/temp
+ curl -L -o ${{ github.workspace }}/temp/bundled-hadoop.jar \
+ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- name: Run lint-python.sh
+ env:
+ _PYPAIMON_HADOOP_CLASSPATH: ${{ github.workspace }}/temp/bundled-hadoop.jar
run: |
chmod +x dev/lint-python.sh
./dev/lint-python.sh
diff --git a/LICENSE b/LICENSE
index 261eeb9..f51b94d 100644
--- a/LICENSE
+++ b/LICENSE
@@ -199,3 +199,23 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
+
+------------------------------------------------------------------------------------
+
+This product bundles various third-party components under other open source licenses.
+This section summarizes those components and their licenses. See licenses/
+for text of these licenses.
+
+Apache Software Foundation License 2.0
+--------------------------------------
+
+paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
+paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils
+paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonEnvUtils
+paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PythonGatewayServer
+
+paimon_python_java/gateway_server.py
+paimon_python_java/java_gateway.py
+paimon_python_java/util/exceptions.py
+
+from http://flink.apache.org/ version 1.20.0
diff --git a/NOTICE b/NOTICE
index b666c9f..81978c0 100644
--- a/NOTICE
+++ b/NOTICE
@@ -2,4 +2,269 @@ Apache Paimon Python
Copyright 2024 The Apache Software Foundation
This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
+The Apache Software Foundation (http://www.apache.org/).
+
+
+This project bundles the following dependencies under the under the MIT license
+- org.slf4j:slf4j-api:1.7.32
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- org.apache.logging.log4j:log4j-1.2-api:2.17.1
+- org.apache.arrow:arrow-vector:14.0.0
+- org.apache.arrow:arrow-format:14.0.0
+- org.apache.arrow:arrow-memory-core:14.0.0
+- org.apache.arrow:arrow-memory-unsafe:14.0.0
+- org.apache.arrow:arrow-c-data:14.0.0
+- com.google.flatbuffers:flatbuffers-java:1.12.0
+
+This project bundles the following dependencies under the 2-Clause BSD License
+- net.sf.py4j:py4j:0.10.9.7
+
+
+Apache Paimon-shade (incubating)
+Copyright 2023-2024 The Apache Software Foundation
+
+Paimon : Bundle
+Copyright 2023-2024 The Apache Software Foundation
+
+paimon-common
+Copyright 2023-2024 The Apache Software Foundation
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+- org.roaringbitmap:RoaringBitmap:1.0.5
+- org.apache.datasketches:datasketches-java:4.2.0
+- org.apache.datasketches:datasketches-memory:2.2.0
+
+This project bundles the following dependencies under the BSD 3-clause license.
+You find them under licenses/LICENSE.antlr-runtime and licenses/LICENSE.janino.
+
+- org.antlr:antlr4-runtime:4.9.3
+- org.codehaus.janino:janino:3.0.11
+- org.codehaus.janino:commons-compiler:3.0.11
+- it.unimi.dsi:fastutil:8.5.12
+- net.openhft:zero-allocation-hashing:0.16
+- com.github.davidmoten:hilbert-curve:0.2.2
+- com.github.davidmoten:guava-mini:0.1.3
+
+datasketches-java
+Copyright 2015-2022 The Apache Software Foundation
+
+Apache DataSketches Memory
+Copyright 2022 - The Apache Software Foundation
+
+Copyright 2015-2018 Yahoo Inc.
+Copyright 2019-2020 Verizon Media
+Copyright 2021 Yahoo Inc.
+
+Prior to moving to ASF, the software for this project was developed at
+Yahoo Inc. (https://developer.yahoo.com).
+
+Paimon : Core
+Copyright 2023-2024 The Apache Software Foundation
+
+Paimon : Code Gen Loader
+Copyright 2023-2024 The Apache Software Foundation
+
+paimon-format
+Copyright 2023-2024 The Apache Software Foundation
+
+This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- org.apache.orc:orc-core:1.9.2
+- org.apache.orc:orc-shims:1.9.2
+- org.apache.hive:hive-storage-api:2.8.1
+- io.airlift:aircompressor:0.27
+- commons-lang:commons-lang:2.6
+- org.apache.commons:commons-lang3:3.12.0
+
+- org.apache.avro:avro:1.11.3
+- com.fasterxml.jackson.core:jackson-core:2.14.2
+- com.fasterxml.jackson.core:jackson-databind:2.14.2
+- com.fasterxml.jackson.core:jackson-annotations:2.14.2
+- org.apache.commons:commons-compress:1.22
+
+- org.apache.parquet:parquet-hadoop:1.13.1
+- org.apache.parquet:parquet-column:1.13.1
+- org.apache.parquet:parquet-common:1.13.1
+- org.apache.parquet:parquet-encoding:1.13.1
+- org.apache.parquet:parquet-format-structures:1.13.1
+- org.apache.parquet:parquet-jackson:1.13.1
+- commons-pool:commons-pool:1.6
+
+This project bundles the following dependencies under the BSD license.
+You find it under licenses/LICENSE.protobuf, licenses/LICENSE.zstd-jni
+and licenses/LICENSE.threeten-extra
+
+- com.google.protobuf:protobuf-java:3.19.6
+- com.github.luben:zstd-jni:1.5.5-11
+- org.threeten:threeten-extra:1.7.1
+
+# Jackson JSON processor
+
+Jackson is a high-performance, Free/Open Source JSON processing library.
+It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+been in development since 2007.
+It is currently developed by a community of developers.
+
+## Licensing
+
+Jackson 2.x core and extension components are licensed under Apache License 2.0
+To find the details that apply to this artifact see the accompanying LICENSE file.
+
+## Credits
+
+A list of contributors may be found from CREDITS(-2.x) file, which is included
+in some artifacts (usually source distributions); but is always available
+from the source code management (SCM) system project uses.
+
+Apache Commons Lang
+Copyright 2001-2011 The Apache Software Foundation
+
+This product includes software developed by
+The Apache Software Foundation (http://www.apache.org/).
+
+ORC Core
+Copyright 2013-2023 The Apache Software Foundation
+
+ORC Shims
+Copyright 2013-2023 The Apache Software Foundation
+
+Apache Commons Lang
+Copyright 2001-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (https://www.apache.org/).
+
+Hive Storage API
+Copyright 2020 The Apache Software Foundation
+
+Apache Avro
+Copyright 2009-2023 The Apache Software Foundation
+
+Apache Commons Compress
+Copyright 2002-2022 The Apache Software Foundation
+
+---
+
+The files in the package org.apache.commons.compress.archivers.sevenz
+were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
+which has been placed in the public domain:
+
+"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
+
+The test file lbzip2_32767.bz2 has been copied from libbzip2's source
+repository:
+
+This program, "bzip2", the associated library "libbzip2", and all
+documentation, are copyright (C) 1996-2019 Julian R Seward. All
+rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+2. The origin of this software must not be misrepresented; you must
+ not claim that you wrote the original software. If you use this
+ software in a product, an acknowledgment in the product
+ documentation would be appreciated but is not required.
+
+3. Altered source versions must be plainly marked as such, and must
+ not be misrepresented as being the original software.
+
+4. The name of the author may not be used to endorse or promote
+ products derived from this software without specific prior written
+ permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
+OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Julian Seward, jseward@acm.org
+
+Apache Commons Pool
+Copyright 2001-2012 The Apache Software Foundation
+
+Paimon : Hive Catalog
+Copyright 2023-2024 The Apache Software Foundation
+
+Paimon : Hive Common
+Copyright 2023-2024 The Apache Software Foundation
+
+paimon-shade-jackson-2
+Copyright 2023-2024 The Apache Software Foundation
+
+This project includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.fasterxml.jackson.core:jackson-annotations:2.14.2
+- com.fasterxml.jackson.core:jackson-core:2.14.2
+- com.fasterxml.jackson.core:jackson-databind:2.14.2
+- com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.14.2
+- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.2
+- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.14.2
+- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2
+- org.yaml:snakeyaml:1.33
+
+Jackson is a high-performance, Free/Open Source JSON processing library.
+It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+been in development since 2007.
+It is currently developed by a community of developers, as well as supported
+commercially by FasterXML.com.
+
+Jackson core and extension components may be licensed under different licenses.
+To find the details that apply to this artifact see the accompanying LICENSE file.
+For more information, including possible other licensing options, contact
+FasterXML.com (http://fasterxml.com).
+
+A list of contributors may be found from CREDITS file, which is included
+in some artifacts (usually source distributions); but is always available
+from the source code management (SCM) system project uses.
+
+paimon-shade-guava-30
+Copyright 2023-2024 The Apache Software Foundation
+
+- com.google.guava:guava:30.1.1-jre
+- com.google.guava:failureaccess:1.0.1
+
+paimon-shade-caffeine-2
+Copyright 2023-2024 The Apache Software Foundation
+
+- com.github.ben-manes.caffeine:caffeine:2.9.3
+
+Paimon : Arrow
+Copyright 2023-2024 The Apache Software Foundation
+
+Apache Log4j 1.x Compatibility API
+Copyright 1999-1969 The Apache Software Foundation
+
+Arrow Vectors
+Copyright 2023 The Apache Software Foundation
+
+Arrow Format
+Copyright 2023 The Apache Software Foundation
+
+Arrow Memory - Core
+Copyright 2023 The Apache Software Foundation
+
+Arrow Memory - Unsafe
+Copyright 2023 The Apache Software Foundation
+
+Arrow Java C Data Interface
+Copyright 2023 The Apache Software Foundation
+
+Apache Flink
+Copyright 2014-2024 The Apache Software Foundation
+
diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt
index 7fd1aeb..4ed964e 100755
--- a/dev/dev-requirements.txt
+++ b/dev/dev-requirements.txt
@@ -26,3 +26,5 @@ numpy>=1.22.4
python-dateutil>=2.8.0,<3
pytz>=2018.3
pytest~=7.0
+duckdb>=0.5.0,<2.0.0
+ray~=2.10.0
diff --git a/pypaimon/__init__.py b/pypaimon/__init__.py
new file mode 100644
index 0000000..0cd0eb5
--- /dev/null
+++ b/pypaimon/__init__.py
@@ -0,0 +1,21 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from .api import Schema
+
+__all__ = ['Schema']
diff --git a/paimon_python_api/__init__.py b/pypaimon/api/__init__.py
similarity index 93%
rename from paimon_python_api/__init__.py
rename to pypaimon/api/__init__.py
index 86090c9..44717bf 100644
--- a/paimon_python_api/__init__.py
+++ b/pypaimon/api/__init__.py
@@ -19,6 +19,7 @@
from .split import Split
from .table_read import TableRead
from .table_scan import TableScan, Plan
+from .predicate import Predicate, PredicateBuilder
from .read_builder import ReadBuilder
from .commit_message import CommitMessage
from .table_commit import BatchTableCommit
@@ -39,5 +40,7 @@
'BatchWriteBuilder',
'Table',
'Schema',
- 'Catalog'
+ 'Catalog',
+ 'Predicate',
+ 'PredicateBuilder'
]
diff --git a/paimon_python_api/catalog.py b/pypaimon/api/catalog.py
similarity index 97%
rename from paimon_python_api/catalog.py
rename to pypaimon/api/catalog.py
index a4a863f..3132159 100644
--- a/paimon_python_api/catalog.py
+++ b/pypaimon/api/catalog.py
@@ -18,7 +18,7 @@
from abc import ABC, abstractmethod
from typing import Optional
-from paimon_python_api import Table, Schema
+from pypaimon.api import Table, Schema
class Catalog(ABC):
diff --git a/paimon_python_api/commit_message.py b/pypaimon/api/commit_message.py
similarity index 100%
rename from paimon_python_api/commit_message.py
rename to pypaimon/api/commit_message.py
diff --git a/pypaimon/api/predicate.py b/pypaimon/api/predicate.py
new file mode 100644
index 0000000..46280d1
--- /dev/null
+++ b/pypaimon/api/predicate.py
@@ -0,0 +1,95 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+from abc import ABC, abstractmethod
+from typing import Any, List
+
+
+class Predicate(ABC):
+ """Predicate which evaluates to a boolean. Now it doesn't have
+ any methods because only paimon_python_java implement it and
+ the Java implementation convert it to Java object."""
+
+
+class PredicateBuilder(ABC):
+ """A utility class to create Predicate object for common filter conditions."""
+
+ @abstractmethod
+ def equal(self, field: str, literal: Any) -> Predicate:
+ """field = literal"""
+
+ @abstractmethod
+ def not_equal(self, field: str, literal: Any) -> Predicate:
+ """field <> literal"""
+
+ @abstractmethod
+ def less_than(self, field: str, literal: Any) -> Predicate:
+ """field < literal"""
+
+ @abstractmethod
+ def less_or_equal(self, field: str, literal: Any) -> Predicate:
+ """field <= literal"""
+
+ @abstractmethod
+ def greater_than(self, field: str, literal: Any) -> Predicate:
+ """field > literal"""
+
+ @abstractmethod
+ def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+ """field >= literal"""
+
+ @abstractmethod
+ def is_null(self, field: str) -> Predicate:
+ """field IS NULL"""
+
+ @abstractmethod
+ def is_not_null(self, field: str) -> Predicate:
+ """field IS NOT NULL"""
+
+ @abstractmethod
+ def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """field.startswith"""
+
+ @abstractmethod
+ def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """field.endswith()"""
+
+ @abstractmethod
+ def contains(self, field: str, pattern_literal: Any) -> Predicate:
+ """literal in field"""
+
+ @abstractmethod
+ def is_in(self, field: str, literals: List[Any]) -> Predicate:
+ """field IN literals"""
+
+ @abstractmethod
+ def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+ """field NOT IN literals"""
+
+ @abstractmethod
+ def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \
+ -> Predicate:
+ """field BETWEEN included_lower_bound AND included_upper_bound"""
+
+ @abstractmethod
+ def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """predicate1 AND predicate2 AND ..."""
+
+ @abstractmethod
+ def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """predicate1 OR predicate2 OR ..."""
diff --git a/paimon_python_api/read_builder.py b/pypaimon/api/read_builder.py
similarity index 75%
rename from paimon_python_api/read_builder.py
rename to pypaimon/api/read_builder.py
index 94ec073..68b7d46 100644
--- a/paimon_python_api/read_builder.py
+++ b/pypaimon/api/read_builder.py
@@ -17,7 +17,7 @@
#################################################################################
from abc import ABC, abstractmethod
-from paimon_python_api import TableRead, TableScan
+from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder
from typing import List
@@ -25,7 +25,14 @@ class ReadBuilder(ABC):
"""An interface for building the TableScan and TableRead."""
@abstractmethod
- def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
+ def with_filter(self, predicate: Predicate):
+ """
+ Push filters, will filter the data as much as possible,
+ but it is not guaranteed that it is a complete filter.
+ """
+
+ @abstractmethod
+ def with_projection(self, projection: List[str]) -> 'ReadBuilder':
"""Push nested projection."""
@abstractmethod
@@ -39,3 +46,7 @@ def new_scan(self) -> TableScan:
@abstractmethod
def new_read(self) -> TableRead:
"""Create a TableRead to read splits."""
+
+ @abstractmethod
+ def new_predicate_builder(self) -> PredicateBuilder:
+ """Create a builder for Predicate."""
diff --git a/paimon_python_api/split.py b/pypaimon/api/split.py
similarity index 100%
rename from paimon_python_api/split.py
rename to pypaimon/api/split.py
diff --git a/paimon_python_api/table.py b/pypaimon/api/table.py
similarity index 96%
rename from paimon_python_api/table.py
rename to pypaimon/api/table.py
index 0170cb1..7eef7b4 100644
--- a/paimon_python_api/table.py
+++ b/pypaimon/api/table.py
@@ -19,7 +19,7 @@
import pyarrow as pa
from abc import ABC, abstractmethod
-from paimon_python_api import ReadBuilder, BatchWriteBuilder
+from pypaimon.api import ReadBuilder, BatchWriteBuilder
from typing import Optional, List
diff --git a/paimon_python_api/table_commit.py b/pypaimon/api/table_commit.py
similarity index 97%
rename from paimon_python_api/table_commit.py
rename to pypaimon/api/table_commit.py
index d76ed6a..eed8a82 100644
--- a/paimon_python_api/table_commit.py
+++ b/pypaimon/api/table_commit.py
@@ -17,7 +17,7 @@
#################################################################################
from abc import ABC, abstractmethod
-from paimon_python_api import CommitMessage
+from pypaimon.api import CommitMessage
from typing import List
diff --git a/paimon_python_api/table_read.py b/pypaimon/api/table_read.py
similarity index 74%
rename from paimon_python_api/table_read.py
rename to pypaimon/api/table_read.py
index 24095b4..9fcb78c 100644
--- a/paimon_python_api/table_read.py
+++ b/pypaimon/api/table_read.py
@@ -18,10 +18,12 @@
import pandas as pd
import pyarrow as pa
+import ray
from abc import ABC, abstractmethod
-from paimon_python_api import Split
-from typing import List
+from duckdb.duckdb import DuckDBPyConnection
+from pypaimon.api import Split
+from typing import List, Optional
class TableRead(ABC):
@@ -38,3 +40,15 @@ def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader:
@abstractmethod
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
"""Read data from splits and converted to pandas.DataFrame format."""
+
+ @abstractmethod
+ def to_duckdb(
+ self,
+ splits: List[Split],
+ table_name: str,
+ connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
+ """Convert splits into an in-memory DuckDB table which can be queried."""
+
+ @abstractmethod
+ def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+ """Convert splits into a Ray dataset format."""
diff --git a/paimon_python_api/table_scan.py b/pypaimon/api/table_scan.py
similarity index 97%
rename from paimon_python_api/table_scan.py
rename to pypaimon/api/table_scan.py
index d998462..8830676 100644
--- a/paimon_python_api/table_scan.py
+++ b/pypaimon/api/table_scan.py
@@ -18,7 +18,7 @@
from abc import ABC, abstractmethod
from typing import List
-from paimon_python_api import Split
+from pypaimon.api import Split
class TableScan(ABC):
diff --git a/paimon_python_api/table_write.py b/pypaimon/api/table_write.py
similarity index 97%
rename from paimon_python_api/table_write.py
rename to pypaimon/api/table_write.py
index d1d39a7..8839620 100644
--- a/paimon_python_api/table_write.py
+++ b/pypaimon/api/table_write.py
@@ -20,7 +20,7 @@
import pyarrow as pa
from abc import ABC, abstractmethod
-from paimon_python_api import CommitMessage
+from pypaimon.api import CommitMessage
from typing import List
diff --git a/paimon_python_api/write_builder.py b/pypaimon/api/write_builder.py
similarity index 96%
rename from paimon_python_api/write_builder.py
rename to pypaimon/api/write_builder.py
index 7835179..ce9cba3 100644
--- a/paimon_python_api/write_builder.py
+++ b/pypaimon/api/write_builder.py
@@ -17,7 +17,7 @@
#################################################################################
from abc import ABC, abstractmethod
-from paimon_python_api import BatchTableCommit, BatchTableWrite
+from pypaimon.api import BatchTableCommit, BatchTableWrite
from typing import Optional
diff --git a/paimon_python_java/__init__.py b/pypaimon/py4j/__init__.py
similarity index 81%
rename from paimon_python_java/__init__.py
rename to pypaimon/py4j/__init__.py
index 6e97d9e..c3b70ff 100644
--- a/paimon_python_java/__init__.py
+++ b/pypaimon/py4j/__init__.py
@@ -17,8 +17,10 @@
################################################################################
from .util import constants
-from .pypaimon import (Catalog, Table, ReadBuilder, TableScan, Plan, Split, TableRead,
- BatchWriteBuilder, BatchTableWrite, CommitMessage, BatchTableCommit)
+from .java_implementation import \
+ (Catalog, Table, ReadBuilder, TableScan, Plan, Split,
+ TableRead, BatchWriteBuilder, BatchTableWrite, CommitMessage,
+ BatchTableCommit, Predicate, PredicateBuilder)
__all__ = [
'constants',
@@ -32,5 +34,7 @@
'BatchWriteBuilder',
'BatchTableWrite',
'CommitMessage',
- 'BatchTableCommit'
+ 'BatchTableCommit',
+ 'Predicate',
+ 'PredicateBuilder'
]
diff --git a/paimon_python_java/gateway_server.py b/pypaimon/py4j/gateway_server.py
similarity index 96%
rename from paimon_python_java/gateway_server.py
rename to pypaimon/py4j/gateway_server.py
index fc05ddc..9a259e0 100644
--- a/paimon_python_java/gateway_server.py
+++ b/pypaimon/py4j/gateway_server.py
@@ -22,7 +22,7 @@
import signal
from subprocess import Popen, PIPE
-from paimon_python_java import constants
+from pypaimon.py4j import constants
def on_windows():
@@ -63,6 +63,7 @@ def launch_gateway_server_process(env):
*main_args
]
+ preexec_fn = None
if not on_windows():
def preexec_func():
# ignore ctrl-c / SIGINT
@@ -73,7 +74,7 @@ def preexec_func():
stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
-_JAVA_IMPL_MODULE = 'paimon_python_java'
+_JAVA_IMPL_MODULE = 'pypaimon.py4j'
_JAVA_DEPS = 'java_dependencies'
_JAVA_BRIDGE = 'paimon-python-java-bridge'
@@ -102,7 +103,7 @@ def _get_hadoop_classpath(env):
return env[constants.PYPAIMON_HADOOP_CLASSPATH]
if 'HADOOP_CLASSPATH' in env:
- return None
+ return env['HADOOP_CLASSPATH']
else:
raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \
and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")
diff --git a/paimon_python_java/java_gateway.py b/pypaimon/py4j/java_gateway.py
similarity index 94%
rename from paimon_python_java/java_gateway.py
rename to pypaimon/py4j/java_gateway.py
index f2b1621..41d7a0d 100644
--- a/paimon_python_java/java_gateway.py
+++ b/pypaimon/py4j/java_gateway.py
@@ -26,9 +26,9 @@
CallbackServerParameters)
from threading import RLock
-from paimon_python_java.gateway_server import launch_gateway_server_process
-from paimon_python_java import constants
-from paimon_python_java.util.exceptions import install_py4j_hooks
+from pypaimon.py4j.gateway_server import launch_gateway_server_process
+from pypaimon.py4j import constants
+from pypaimon.py4j.util.exceptions import install_py4j_hooks
_gateway = None
_lock = RLock()
@@ -109,6 +109,7 @@ def import_paimon_view(gateway):
java_import(gateway.jvm, 'org.apache.paimon.types.*')
java_import(gateway.jvm, 'org.apache.paimon.python.*')
java_import(gateway.jvm, "org.apache.paimon.data.*")
+ java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")
class Watchdog(object):
diff --git a/paimon_python_java/pypaimon.py b/pypaimon/py4j/java_implementation.py
similarity index 56%
rename from paimon_python_java/pypaimon.py
rename to pypaimon/py4j/java_implementation.py
index fcf0695..17c6eda 100644
--- a/paimon_python_java/pypaimon.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -16,14 +16,21 @@
# limitations under the License.
################################################################################
+# pypaimon.api implementation based on Java code & py4j lib
+
+import duckdb
import pandas as pd
import pyarrow as pa
+import ray
-from paimon_python_java.java_gateway import get_gateway
-from paimon_python_java.util import java_utils, constants
-from paimon_python_api import (catalog, table, read_builder, table_scan, split, table_read,
- write_builder, table_write, commit_message, table_commit, Schema)
-from typing import List, Iterator, Optional
+from duckdb.duckdb import DuckDBPyConnection
+from pypaimon.py4j.java_gateway import get_gateway
+from pypaimon.py4j.util import java_utils, constants
+from pypaimon.api import \
+ (catalog, table, read_builder, table_scan, split,
+ table_read, write_builder, table_write, commit_message,
+ table_commit, Schema, predicate)
+from typing import List, Iterator, Optional, Any
class Catalog(catalog.Catalog):
@@ -60,33 +67,36 @@ class Table(table.Table):
def __init__(self, j_table, catalog_options: dict):
self._j_table = j_table
self._catalog_options = catalog_options
- # init arrow schema
- schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_table.rowType())
- schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes))
- self._arrow_schema = schema_reader.schema
- schema_reader.close()
def new_read_builder(self) -> 'ReadBuilder':
j_read_builder = get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
- return ReadBuilder(
- j_read_builder, self._j_table.rowType(), self._catalog_options, self._arrow_schema)
+ return ReadBuilder(j_read_builder, self._j_table.rowType(), self._catalog_options)
def new_batch_write_builder(self) -> 'BatchWriteBuilder':
java_utils.check_batch_write(self._j_table)
j_batch_write_builder = get_gateway().jvm.InvocationUtil.getBatchWriteBuilder(self._j_table)
- return BatchWriteBuilder(j_batch_write_builder, self._j_table.rowType(), self._arrow_schema)
+ return BatchWriteBuilder(j_batch_write_builder)
class ReadBuilder(read_builder.ReadBuilder):
- def __init__(self, j_read_builder, j_row_type, catalog_options: dict, arrow_schema: pa.Schema):
+ def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
self._j_read_builder = j_read_builder
self._j_row_type = j_row_type
self._catalog_options = catalog_options
- self._arrow_schema = arrow_schema
- def with_projection(self, projection: List[List[int]]) -> 'ReadBuilder':
- self._j_read_builder.withProjection(projection)
+ def with_filter(self, predicate: 'Predicate'):
+ self._j_read_builder.withFilter(predicate.to_j_predicate())
+ return self
+
+ def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+ field_names = list(map(lambda field: field.name(), self._j_row_type.getFields()))
+ int_projection = list(map(lambda p: field_names.index(p), projection))
+ gateway = get_gateway()
+ int_projection_arr = gateway.new_array(gateway.jvm.int, len(projection))
+ for i in range(len(projection)):
+ int_projection_arr[i] = int_projection[i]
+ self._j_read_builder.withProjection(int_projection_arr)
return self
def with_limit(self, limit: int) -> 'ReadBuilder':
@@ -98,8 +108,11 @@ def new_scan(self) -> 'TableScan':
return TableScan(j_table_scan)
def new_read(self) -> 'TableRead':
- j_table_read = self._j_read_builder.newRead()
- return TableRead(j_table_read, self._j_row_type, self._catalog_options, self._arrow_schema)
+ j_table_read = self._j_read_builder.newRead().executeFilter()
+ return TableRead(j_table_read, self._j_read_builder.readType(), self._catalog_options)
+
+ def new_predicate_builder(self) -> 'PredicateBuilder':
+ return PredicateBuilder(self._j_row_type)
class TableScan(table_scan.TableScan):
@@ -133,12 +146,12 @@ def to_j_split(self):
class TableRead(table_read.TableRead):
- def __init__(self, j_table_read, j_row_type, catalog_options, arrow_schema):
+ def __init__(self, j_table_read, j_read_type, catalog_options):
self._j_table_read = j_table_read
- self._j_row_type = j_row_type
+ self._j_read_type = j_read_type
self._catalog_options = catalog_options
self._j_bytes_reader = None
- self._arrow_schema = arrow_schema
+ self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
def to_arrow(self, splits):
record_batch_reader = self.to_arrow_batch_reader(splits)
@@ -154,6 +167,18 @@ def to_arrow_batch_reader(self, splits):
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
return self.to_arrow(splits).to_pandas()
+ def to_duckdb(
+ self,
+ splits: List[Split],
+ table_name: str,
+ connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
+ con = connection or duckdb.connect(database=":memory:")
+ con.register(table_name, self.to_arrow(splits))
+ return con
+
+ def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+ return ray.data.from_arrow(self.to_arrow(splits))
+
def _init(self):
if self._j_bytes_reader is None:
# get thread num
@@ -166,7 +191,7 @@ def _init(self):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader(
- self._j_table_read, self._j_row_type, max_workers)
+ self._j_table_read, self._j_read_type, max_workers)
def _batch_generator(self) -> Iterator[pa.RecordBatch]:
while True:
@@ -180,10 +205,8 @@ def _batch_generator(self) -> Iterator[pa.RecordBatch]:
class BatchWriteBuilder(write_builder.BatchWriteBuilder):
- def __init__(self, j_batch_write_builder, j_row_type, arrow_schema: pa.Schema):
+ def __init__(self, j_batch_write_builder):
self._j_batch_write_builder = j_batch_write_builder
- self._j_row_type = j_row_type
- self._arrow_schema = arrow_schema
def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuilder':
if static_partition is None:
@@ -193,7 +216,7 @@ def overwrite(self, static_partition: Optional[dict] = None) -> 'BatchWriteBuild
def new_write(self) -> 'BatchTableWrite':
j_batch_table_write = self._j_batch_write_builder.newWrite()
- return BatchTableWrite(j_batch_table_write, self._j_row_type, self._arrow_schema)
+ return BatchTableWrite(j_batch_table_write, self._j_batch_write_builder.rowType())
def new_commit(self) -> 'BatchTableCommit':
j_batch_table_commit = self._j_batch_write_builder.newCommit()
@@ -202,32 +225,31 @@ def new_commit(self) -> 'BatchTableCommit':
class BatchTableWrite(table_write.BatchTableWrite):
- def __init__(self, j_batch_table_write, j_row_type, arrow_schema: pa.Schema):
+ def __init__(self, j_batch_table_write, j_row_type):
self._j_batch_table_write = j_batch_table_write
self._j_bytes_writer = get_gateway().jvm.InvocationUtil.createBytesWriter(
j_batch_table_write, j_row_type)
- self._arrow_schema = arrow_schema
+ self._arrow_schema = java_utils.to_arrow_schema(j_row_type)
def write_arrow(self, table):
for record_batch in table.to_reader():
- # TODO: can we use a reusable stream?
- stream = pa.BufferOutputStream()
- with pa.RecordBatchStreamWriter(stream, self._arrow_schema) as writer:
- writer.write(record_batch)
- arrow_bytes = stream.getvalue().to_pybytes()
- self._j_bytes_writer.write(arrow_bytes)
+ # TODO: can we use a reusable stream in #_write_arrow_batch ?
+ self._write_arrow_batch(record_batch)
def write_arrow_batch(self, record_batch):
+ self._write_arrow_batch(record_batch)
+
+ def write_pandas(self, dataframe: pd.DataFrame):
+ record_batch = pa.RecordBatch.from_pandas(dataframe, schema=self._arrow_schema)
+ self._write_arrow_batch(record_batch)
+
+ def _write_arrow_batch(self, record_batch):
stream = pa.BufferOutputStream()
- with pa.RecordBatchStreamWriter(stream, self._arrow_schema) as writer:
+ with pa.RecordBatchStreamWriter(stream, record_batch.schema) as writer:
writer.write(record_batch)
arrow_bytes = stream.getvalue().to_pybytes()
self._j_bytes_writer.write(arrow_bytes)
- def write_pandas(self, dataframe: pd.DataFrame):
- record_batch = pa.RecordBatch.from_pandas(dataframe, schema=self._arrow_schema)
- self.write_arrow_batch(record_batch)
-
def prepare_commit(self) -> List['CommitMessage']:
j_commit_messages = self._j_batch_table_write.prepareCommit()
return list(map(lambda cm: CommitMessage(cm), j_commit_messages))
@@ -257,3 +279,92 @@ def commit(self, commit_messages: List[CommitMessage]):
def close(self):
self._j_batch_table_commit.close()
+
+
+class Predicate(predicate.Predicate):
+
+ def __init__(self, j_predicate):
+ self._j_predicate = j_predicate
+
+ def to_j_predicate(self):
+ return self._j_predicate
+
+
+class PredicateBuilder(predicate.PredicateBuilder):
+
+ def __init__(self, j_row_type):
+ self._field_names = j_row_type.getFieldNames()
+ self._j_row_type = j_row_type
+ self._j_predicate_builder = get_gateway().jvm.PredicateBuilder(j_row_type)
+
+ def _build(self, method: str, field: str, literals: Optional[List[Any]] = None):
+ error = ValueError(f'The field {field} is not in field list {self._field_names}.')
+ try:
+ index = self._field_names.index(field)
+ if index == -1:
+ raise error
+ except ValueError:
+ raise error
+
+ if literals is None:
+ literals = []
+
+ j_predicate = get_gateway().jvm.PredicationUtil.build(
+ self._j_row_type,
+ self._j_predicate_builder,
+ method,
+ index,
+ literals
+ )
+ return Predicate(j_predicate)
+
+ def equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('equal', field, [literal])
+
+ def not_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('notEqual', field, [literal])
+
+ def less_than(self, field: str, literal: Any) -> Predicate:
+ return self._build('lessThan', field, [literal])
+
+ def less_or_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('lessOrEqual', field, [literal])
+
+ def greater_than(self, field: str, literal: Any) -> Predicate:
+ return self._build('greaterThan', field, [literal])
+
+ def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+ return self._build('greaterOrEqual', field, [literal])
+
+ def is_null(self, field: str) -> Predicate:
+ return self._build('isNull', field)
+
+ def is_not_null(self, field: str) -> Predicate:
+ return self._build('isNotNull', field)
+
+ def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('startsWith', field, [pattern_literal])
+
+ def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('endsWith', field, [pattern_literal])
+
+ def contains(self, field: str, pattern_literal: Any) -> Predicate:
+ return self._build('contains', field, [pattern_literal])
+
+ def is_in(self, field: str, literals: List[Any]) -> Predicate:
+ return self._build('in', field, literals)
+
+ def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+ return self._build('notIn', field, literals)
+
+ def between(self, field: str, included_lower_bound: Any, included_upper_bound: Any) \
+ -> Predicate:
+ return self._build('between', field, [included_lower_bound, included_upper_bound])
+
+ def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ return Predicate(get_gateway().jvm.PredicationUtil.buildAnd(predicates))
+
+ def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ return Predicate(get_gateway().jvm.PredicationUtil.buildOr(predicates))
diff --git a/paimon_python_java/paimon-python-java-bridge/copyright.txt b/pypaimon/py4j/paimon-python-java-bridge/copyright.txt
similarity index 100%
rename from paimon_python_java/paimon-python-java-bridge/copyright.txt
rename to pypaimon/py4j/paimon-python-java-bridge/copyright.txt
diff --git a/paimon_python_java/paimon-python-java-bridge/pom.xml b/pypaimon/py4j/paimon-python-java-bridge/pom.xml
similarity index 63%
rename from paimon_python_java/paimon-python-java-bridge/pom.xml
rename to pypaimon/py4j/paimon-python-java-bridge/pom.xml
index 52bc061..dd7f7f2 100644
--- a/paimon_python_java/paimon-python-java-bridge/pom.xml
+++ b/pypaimon/py4j/paimon-python-java-bridge/pom.xml
@@ -23,10 +23,11 @@
org.apache.paimon
paimon-python-java-bridge
- 0.1-SNAPSHOT
+ 0.9-SNAPSHOT
Paimon : Python-Java Bridge
jar
+ 2024
0.9.0
@@ -37,6 +38,7 @@
package
14.0.0
1.8
+ 0.9.0
@@ -132,21 +134,21 @@
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.8.0
-
- ${target.java.version}
- ${target.java.version}
-
- false
-
-
- -Xpkginfo:always
- -Xlint:deprecation
-
-
-
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.0
+
+ ${target.java.version}
+ ${target.java.version}
+
+ false
+
+
+ -Xpkginfo:always
+ -Xlint:deprecation
+
+
+
org.apache.maven.plugins
@@ -159,6 +161,9 @@
shade
+ false
+ false
+ true
org.apache.paimon:paimon-bundle
@@ -174,6 +179,18 @@
net.sf.py4j:py4j
+
+
+
+
+
+ Apache Paimon
+ ${project.inceptionYear}
+ UTF-8
+
+
com.fasterxml.jackson
@@ -184,7 +201,54 @@
+
+
+ org.apache.maven.plugins
+ maven-clean-plugin
+
+
+
+ ${project.basedir}
+
+ dependency-reduced-pom.xml
+
+
+
+
+
-
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.1.0
+ false
+
+
+ check-license
+
+ none
+
+ java
+
+
+
+
+ org.apache.paimon.tools.ci.licensecheck.LicenseChecker
+ true
+ false
+
+
+
+ org.apache.paimon
+ paimon-ci-tools
+ ${paimon.ci.tools.version}
+
+
+
+
+
+
diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
similarity index 59%
rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
index 7cf6267..f2ca4e1 100644
--- a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
+++ b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.python;
+import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.reader.ArrowBatchReader;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.sink.TableWrite;
@@ -27,8 +28,11 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.pojo.Field;
import java.io.ByteArrayInputStream;
+import java.util.List;
+import java.util.stream.Collectors;
/** Write Arrow bytes to Paimon. */
public class BytesWriter {
@@ -36,17 +40,30 @@ public class BytesWriter {
private final TableWrite tableWrite;
private final ArrowBatchReader arrowBatchReader;
private final BufferAllocator allocator;
+ private final List arrowFields;
public BytesWriter(TableWrite tableWrite, RowType rowType) {
this.tableWrite = tableWrite;
this.arrowBatchReader = new ArrowBatchReader(rowType);
this.allocator = new RootAllocator();
+ arrowFields =
+ rowType.getFields().stream()
+ .map(f -> ArrowUtils.toArrowField(f.name(), f.type()))
+ .collect(Collectors.toList());
}
public void write(byte[] bytes) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bais, allocator);
VectorSchemaRoot vsr = arrowStreamReader.getVectorSchemaRoot();
+ if (!checkTypesIgnoreNullability(arrowFields, vsr.getSchema().getFields())) {
+ throw new RuntimeException(
+ String.format(
+ "Input schema isn't consistent with table schema.\n"
+ + "\tTable schema is: %s\n"
+ + "\tInput schema is: %s",
+ arrowFields, vsr.getSchema().getFields()));
+ }
while (arrowStreamReader.loadNextBatch()) {
Iterable rows = arrowBatchReader.readBatch(vsr);
@@ -60,4 +77,24 @@ public void write(byte[] bytes) throws Exception {
public void close() {
allocator.close();
}
+
+ private boolean checkTypesIgnoreNullability(
+ List expectedFields, List actualFields) {
+ if (expectedFields.size() != actualFields.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < expectedFields.size(); i++) {
+ Field expectedField = expectedFields.get(i);
+ Field actualField = actualFields.get(i);
+ // ArrowType doesn't have nullability (similar to DataTypeRoot)
+ if (!actualField.getType().equals(expectedField.getType())
+ || !checkTypesIgnoreNullability(
+ expectedField.getChildren(), actualField.getChildren())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
similarity index 100%
rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/FileLock.java
diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
similarity index 100%
rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
similarity index 100%
rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/NetUtils.java
diff --git a/paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java
similarity index 100%
rename from paimon_python_java/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java
rename to pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/ParallelBytesReader.java
diff --git a/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
new file mode 100644
index 0000000..a863dfd
--- /dev/null
+++ b/pypaimon/py4j/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/PredicationUtil.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.python;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** For building Predicate. */
+public class PredicationUtil {
+
+ public static Predicate build(
+ RowType rowType,
+ PredicateBuilder builder,
+ String method,
+ int index,
+ List