diff --git a/.github/workflows/docker-build-push.yml b/.github/workflows/docker-build-push.yml index 49f82a042..f2940012c 100644 --- a/.github/workflows/docker-build-push.yml +++ b/.github/workflows/docker-build-push.yml @@ -67,7 +67,7 @@ jobs: password: ${{ secrets.DOCKERHUB_TOKEN }} - name: 'Build & Push to Docker Hub' - uses: docker/build-push-action@v5 + uses: docker/build-push-action@v6 with: context: docker platforms: linux/amd64,linux/arm64 @@ -83,7 +83,7 @@ jobs: - name: 'Build & Push to Docker Hub (Latest)' if: startsWith(github.ref, 'refs/tags/v') - uses: docker/build-push-action@v5 + uses: docker/build-push-action@v6 with: context: docker platforms: linux/amd64,linux/arm64 diff --git a/.github/workflows/early-access.yml b/.github/workflows/early-access.yml index 63d5d2c4b..f0facd3e6 100644 --- a/.github/workflows/early-access.yml +++ b/.github/workflows/early-access.yml @@ -37,7 +37,7 @@ jobs: ./mvnw -ntp -B --file pom.xml -Pall,dist package - name: 'Upload build artifact' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: artifacts path: | @@ -54,7 +54,7 @@ jobs: clean: true - name: 'Download all artifacts' - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 - name: 'Set up Java' uses: actions/setup-java@v3 @@ -81,11 +81,11 @@ jobs: JRELEASER_GPG_PUBLIC_KEY: ${{ secrets.GPG_PUBLIC_KEY }} JRELEASER_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} JRELEASER_GPG_SECRET_KEY: ${{ secrets.GPG_PRIVATE_KEY }} - run: ./mvnw -ntp -B --file ./pom.xml -Prelease -DartifactsDir=artifacts jreleaser:full-release + run: ./mvnw -ntp -B --file ./connect-file-pulse-plugin/pom.xml -Prelease -DartifactsDir=artifacts jreleaser:full-release - name: 'JReleaser output' if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: jreleaser-logs path: | diff --git a/.github/workflows/github-page.yml b/.github/workflows/github-page.yml index 8586b411e..d1196aee0 100644 --- a/.github/workflows/github-page.yml +++ b/.github/workflows/github-page.yml @@ -28,7 +28,7 @@ jobs: git submodule update --init --recursive - name: Setup Hugo - uses: peaceiris/actions-hugo@v2 + uses: peaceiris/actions-hugo@v3 with: hugo-version: '0.96.0' extended: true @@ -43,7 +43,7 @@ jobs: run: (cd ./docs; hugo --minify -v -s . -d ./public) - name: Deploy - uses: peaceiris/actions-gh-pages@v3 + uses: peaceiris/actions-gh-pages@v4 if: ${{ github.ref == 'refs/heads/master' }} with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/maven-deploy.yml b/.github/workflows/maven-deploy.yml index 80cd9de88..a9a04224e 100644 --- a/.github/workflows/maven-deploy.yml +++ b/.github/workflows/maven-deploy.yml @@ -40,7 +40,7 @@ jobs: - name: 'Set up Maven settings' run: | - echo "falseossrh${{ secrets.OSSRH_USERNAME }}${{ secrets.OSSRH_PASSWORD }}"> ./settings.xml + echo "falsesonatype-central${{ secrets.OSSRH_USERNAME }}${{ secrets.OSSRH_PASSWORD }}"> ./settings.xml - name: 'Deploy Maven Central' run: | diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 927f2193b..013a964ab 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,15 +9,13 @@ name: Releases on: workflow_dispatch: inputs: - newVersion: + version: type: string required: false description: "New version (if null use current version)" - createTag: - type: boolean - required: true - description: "Create a Tag" - default: true + next: + description: "Next version" + required: false generateDoc: type: boolean required: true @@ -33,6 +31,7 @@ jobs: outputs: HEAD: ${{ steps.version.outputs.HEAD }} RELEASE_VERSION: ${{ steps.version.outputs.RELEASE_VERSION }} + NEXT_VERSION: ${{ steps.version.outputs.NEXT_VERSION }} steps: - name: 'Checkout GitHub repository' uses: actions/checkout@v4 @@ -56,78 +55,73 @@ jobs: - name: 'Grant execute permission to MVN Wrapper' run: chmod +x ./mvnw - - name: Update release version - if: "${{ github.event.inputs.newVersion == '' }}" - run: | - echo 'Remove snapshot from maven version' - ./mvnw -q versions:set -DremoveSnapshot -DprocessAllModules -DgenerateBackupPoms=false - - - name: Set specific version to release - if: "${{ github.event.inputs.newVersion != '' }}" - run: | - ./mvnw -q versions:set -DnewVersion=${{ github.event.inputs.newVersion }} - - - name: 'Set env RELEASE_VERSION' - run: | - RELEASE_VERSION=$(./mvnw org.apache.maven.plugins:maven-help-plugin:3.1.0:evaluate -Dexpression=project.version -q -DforceStdout) - echo "RELEASE_VERSION=$RELEASE_VERSION">> $GITHUB_ENV - - - name: 'Update Documentation' - if: "${{ github.event.inputs.generateDoc == 'true' }}" - run: | - DOC_BASEDIR="./docs/content/en/docs" - RELEASE_DOC_VERSION=$(echo ${{ env.RELEASE_VERSION }} | sed 's/\([0-9]\)\s*$/\x/') - RELEASE_DOC_DIR="$DOC_BASEDIR/Archives/v${{ env.RELEASE_VERSION }}" - RELEASE_DOC_LINK=$(echo ${{ env.RELEASE_VERSION }} | sed -r 's/\.+/-/g') - RELEASE_DOC_LINK=${RELEASE_DOC_LINK%??} - DIRS=( - "Developer Guide" - "Examples" - "FAQ" - "Getting started" - "Overview" - "Project Info" - ) - echo "Creating release site documentation: v$RELEASE_DOC_VERSION" - mkdir -p "$RELEASE_DOC_DIR" - for DIR in "${DIRS[@]}"; do - echo "Copying $DIR to $DOC_BASEDIR/$DIR"; - cp -r "$DOC_BASEDIR/$DIR" "$RELEASE_DOC_DIR"; - done - - echo "Creating $RELEASE_DOC_DIR/_index.md" - cat> "$RELEASE_DOC_DIR/_index.md" <> "./docs/config.toml" < "$RELEASE_DOC_DIR/_index.md" <> "./docs/config.toml" <> $GITHUB_OUTPUT echo "RELEASE_VERSION=$RELEASE_VERSION">> $GITHUB_OUTPUT + echo "NEXT_VERSION=$NEXT_VERSION">> $GITHUB_OUTPUT + build-distribution: needs: [ set-release-version ] @@ -153,7 +147,7 @@ jobs: ./mvnw -ntp -B --file pom.xml -Pall,dist package - name: 'Upload build artifact' - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: artifacts path: | @@ -171,7 +165,7 @@ jobs: fetch-depth: 0 - name: 'Download all artifacts' - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 - name: 'Set up Java' uses: actions/setup-java@v3 @@ -197,30 +191,23 @@ jobs: JRELEASER_GITHUB_TOKEN: ${{ secrets.PAT }} JRELEASER_GPG_PUBLIC_KEY: ${{ secrets.GPG_PUBLIC_KEY }} JRELEASER_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} - JRELEASER_GPG_SECRET_KEY: ${{ secrets.GPG_PRIVATE_KEY }} - run: ./mvnw -ntp -B --file ./pom.xml -Prelease -DartifactsDir=artifacts jreleaser:full-release + JRELEASER_GPG_SECRET_KEY: ${{ secrets.GPG_PRIVATE_KEY }} + run: ./mvnw -ntp -B --file ./connect-file-pulse-plugin/pom.xml -Prelease -DartifactsDir=artifacts jreleaser:full-release - name: 'JReleaser output' if: always() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: jreleaser-logs path: | target/jreleaser/trace.log target/jreleaser/output.properties - - name: 'Bump version for next iteration' - if: "${{ github.event.inputs.newVersion == '' }}" - run: | - ./mvnw -q build-helper:parse-version versions:set \ - -DnewVersion=\${parsedVersion.majorVersion}.\${parsedVersion.nextMinorVersion}.0-SNAPSHOT \ - versions:commit - NEXT_VERSION=$(./mvnw org.apache.maven.plugins:maven-help-plugin:3.1.0:evaluate -Dexpression=project.version -q -DforceStdout) - echo "NEXT_VERSION=$NEXT_VERSION">> $GITHUB_ENV - - - name: 'Commit Bump Version' - if: "${{ github.event.inputs.newVersion == '' }}" + - name: 'Commit Next Version' + env: + NEXT_VERSION: ${{ needs.set-release-version.outputs.NEXT_VERSION }} run: | + ./mvnw -ntp -B versions:set versions:commit -DnewVersion=${{ env.NEXT_VERSION }} find . -name 'pom.xml' | xargs git add git commit -m "ci: bump version for next iteration to ${{ env.NEXT_VERSION }} π€" - git push origin HEAD:${{ needs.set-release-version.outputs.HEAD }} + git push origin HEAD:${GITHUB_REF#refs/heads/} diff --git a/.github/workflows/trivy.yml b/.github/workflows/trivy.yml index 9b63fcf5a..4caeb179f 100644 --- a/.github/workflows/trivy.yml +++ b/.github/workflows/trivy.yml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-20.04 steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Run Trivy vulnerability scanner in repo mode uses: aquasecurity/trivy-action@master diff --git a/connect-file-pulse-api/pom.xml b/connect-file-pulse-api/pom.xml index a02584ee4..a2bc2fc6e 100644 --- a/connect-file-pulse-api/pom.xml +++ b/connect-file-pulse-api/pom.xml @@ -22,7 +22,7 @@ limitations under the License. io.streamthoughts kafka-connect-filepulse-reactor - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT kafka-connect-filepulse-api diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java index 828f70029..c8e15cb17 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java @@ -11,6 +11,7 @@ import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset; import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset; import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord; @@ -89,6 +90,18 @@ public RecordsIterable> apply(final RecordsIterable> flushed = node + .flush(newContextFor(FileObjectOffset::empty, fileObjectObject.metadata())); + results.addAll(flushed); + node = node.onSuccess; + } + } + return new RecordsIterable(results); } diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectOffset.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectOffset.java index be5060b89..f9bedff9b 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectOffset.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/source/FileObjectOffset.java @@ -9,7 +9,6 @@ import com.jsoniter.annotation.JsonCreator; import com.jsoniter.annotation.JsonProperty; import java.util.Objects; -import org.apache.kafka.common.utils.SystemTime; /** * An object representing the position of next bytes to read in the input source. @@ -23,7 +22,7 @@ public class FileObjectOffset { private final long timestamp; public static FileObjectOffset empty() { - return new FileObjectOffset(-1, 0, SystemTime.SYSTEM.milliseconds()); + return new FileObjectOffset(-1, 0, System.currentTimeMillis()); } /** diff --git a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java index 5061a3cb5..d05ffb606 100644 --- a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java +++ b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.junit.Test; +import org.junit.jupiter.api.Assertions; public class DefaultRecordFilterPipelineTest { @@ -190,6 +191,101 @@ public void shouldNotFlushBufferedRecordsGivenNoAcceptFilterAndThereIsNoRemainin assertEquals(record2, records.collect().get(0)); } + @Test + public void shouldFlushBufferedRecordsGivenAcceptFilterEmptyRecordsIterableAndNoRemainingRecords() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter() + .setBuffer(bufferedRecords); + + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(Collections.singletonList(filter1)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromFirstFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter() + .setBuffer(bufferedRecords); + TestFilter filter2 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))); + + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromLastFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter(); + TestFilter filter2 = new TestFilter() + .setBuffer(bufferedRecords); + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromAllFiltersGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + final FileRecord record3 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value3"); + final FileRecord record4 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value4"); + final FileRecord record5 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value5"); + + List> allBuffered = List.of(record1, record2, record3, record4, record5); + + List> bufferedRecords1 = List.of(record1); + List> bufferedRecords2 = List.of(record2, record3); + List> bufferedRecords3 = List.of(record4, record5); + TestFilter filter1 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords1); + TestFilter filter2 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords2); + TestFilter filter3 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords3); + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2, filter3)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(allBuffered, filteredRecords); + } + @Test public void shouldReturnRecordUnchangedGivenNoFilter() { diff --git a/connect-file-pulse-dataformat/pom.xml b/connect-file-pulse-dataformat/pom.xml index 621279ec7..270fb8c41 100644 --- a/connect-file-pulse-dataformat/pom.xml +++ b/connect-file-pulse-dataformat/pom.xml @@ -11,7 +11,7 @@ kafka-connect-filepulse-reactor io.streamthoughts - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 diff --git a/connect-file-pulse-expression/pom.xml b/connect-file-pulse-expression/pom.xml index 69141f713..d25daf961 100644 --- a/connect-file-pulse-expression/pom.xml +++ b/connect-file-pulse-expression/pom.xml @@ -11,7 +11,7 @@ kafka-connect-filepulse-reactor io.streamthoughts - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 @@ -25,7 +25,7 @@ UTF-8 ${project.parent.basedir} ${project.parent.basedir}/header - 4.13.1 + 4.13.2 diff --git a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml index 4aa3803c7..f94d24a44 100644 --- a/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-aliyunoss-fs/pom.xml @@ -12,7 +12,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml index 8bed44e5c..f4cbc320c 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/pom.xml @@ -11,7 +11,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 @@ -22,7 +22,7 @@ ${project.parent.basedir}/.. ${project.parent.basedir}/../header - 1.12.555 + 1.12.772 1.1.9 diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3Storage.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3Storage.java index 8ddae43aa..74a70a07c 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3Storage.java +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/AmazonS3Storage.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.HashMap; import java.util.Objects; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -257,8 +258,11 @@ private static FileObjectMeta createFileObjectMeta(final S3BucketKey s3Object, objectMetadata.getUserMetadata().forEach((k, v) -> userDefinedMetadata.put("s3.object.user.metadata." + k, v)); userDefinedMetadata.put("s3.object.summary.bucketName", s3Object.bucketName()); userDefinedMetadata.put("s3.object.summary.key", s3Object.key()); - userDefinedMetadata.put("s3.object.summary.etag", objectMetadata.getETag()); - userDefinedMetadata.put("s3.object.summary.storageClass", objectMetadata.getStorageClass()); + + Optional.ofNullable(objectMetadata.getETag()) + .ifPresent(it -> userDefinedMetadata.put("s3.object.summary.etag", it)); + Optional.ofNullable(objectMetadata.getStorageClass()) + .ifPresent(it -> userDefinedMetadata.put("s3.object.summary.storageClass", it)); final String contentMD5 = objectMetadata.getETag(); diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java index e74ecd00b..ff261d874 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicy.java @@ -6,9 +6,12 @@ */ package io.streamthoughts.kafka.connect.filepulse.fs.clean; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_BUCKET_NAME_CONFIG; +import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH; import static io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG; import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy; @@ -29,7 +32,10 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy { private static final Logger LOG = LoggerFactory.getLogger(AmazonS3MoveCleanupPolicy.class); private AmazonS3Storage storage; - + + private boolean includeSuccessSourcePrefixPath; + private boolean includeFailuresSourcePrefixPath; + private Config config; /** @@ -38,6 +44,8 @@ public class AmazonS3MoveCleanupPolicy implements FileCleanupPolicy { @Override public void configure(final Map configs) { this.config = new Config(configs); + this.includeSuccessSourcePrefixPath = this.config.getBoolean(SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH); + this.includeFailuresSourcePrefixPath = this.config.getBoolean(FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH); } /** @@ -45,7 +53,11 @@ public void configure(final Map configs) { */ @Override public boolean onSuccess(final FileObject source) { - return move(source, SUCCESS_AWS_BUCKET_NAME_CONFIG, SUCCESS_AWS_PREFIX_PATH_CONFIG); + return move( + source, + SUCCESS_AWS_BUCKET_NAME_CONFIG, + SUCCESS_AWS_PREFIX_PATH_CONFIG, + includeSuccessSourcePrefixPath); } /** @@ -53,12 +65,17 @@ public boolean onSuccess(final FileObject source) { */ @Override public boolean onFailure(final FileObject source) { - return move(source, FAILURES_AWS_BUCKET_NAME_CONFIG, FAILURES_AWS_PREFIX_PATH_CONFIG); + return move( + source, + FAILURES_AWS_BUCKET_NAME_CONFIG, + FAILURES_AWS_PREFIX_PATH_CONFIG, + includeFailuresSourcePrefixPath); } private boolean move(final FileObject source, final String destinationS3BucketConfig, - final String destinationS3PrefixConfig) { + final String destinationS3PrefixConfig, + final boolean includeSourcePrefixPath) { checkState(); URI sourceURI = source.metadata().uri(); if (!storage.exists(sourceURI)) { @@ -67,6 +84,11 @@ private boolean move(final FileObject source, } S3BucketKey sourceBucketKey = S3BucketKey.fromURI(sourceURI); + String relativeSourcePrefix = extractPrefix( + sourceBucketKey.key().replaceAll(sourceBucketKey.objectName(), "")); + String newObjectKey = includeSourcePrefixPath ? + relativeSourcePrefix + sourceBucketKey.objectName() : sourceBucketKey.objectName(); + var destS3BucketName = Optional .ofNullable(config.getString(destinationS3BucketConfig)) .orElse(sourceBucketKey.bucketName()); @@ -74,11 +96,23 @@ private boolean move(final FileObject source, var destBucketKey = new S3BucketKey( destS3BucketName, config.getString(destinationS3PrefixConfig), - sourceBucketKey.objectName() + newObjectKey ); return storage.move(sourceURI, destBucketKey.toURI()); } + private String extractPrefix(final String p) { + String excludeSourcePrefixPath = Optional + .ofNullable(config.getString(EXCLUDE_SOURCE_PREFIX_PATH_CONFIG)) + .orElse(""); + String prefix = p.replaceAll(excludeSourcePrefixPath, ""); + prefix = prefix.replaceAll("^/+", ""); + // if there are no subdirectories, return an empty string + if (prefix.length() == 0) { + return ""; + } + return prefix.endsWith("/") ? prefix : prefix + "/"; + } /** * {@inheritDoc} */ @@ -110,6 +144,14 @@ public static class Config extends AbstractConfig { private static final String SUCCESS_AWS_PREFIX_PATH_DOC = "The prefix to be used for defining the key of an S3 object to move into the destination bucket."; + public static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH = + CONFIG_PREFIX + "success.aws.include.source.prefix.path"; + private static final String SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to include the source prefix path in the destination key."; + public static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH = + CONFIG_PREFIX + "failure.aws.include.source.prefix.path"; + private static final String FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to include the source prefix path in the destination key."; public static final String FAILURES_AWS_BUCKET_NAME_CONFIG = CONFIG_PREFIX + "failure.aws.bucket.name"; private static final String FAILURES_AWS_BUCKET_NAME_DOC = @@ -120,6 +162,11 @@ public static class Config extends AbstractConfig { private static final String FAILURES_AWS_PREFIX_PATH_DOC = "The prefix to be used for defining the key of S3 object to move into the destination bucket."; + public static final String EXCLUDE_SOURCE_PREFIX_PATH_CONFIG = + CONFIG_PREFIX + "exclude.source.prefix.path"; + private static final String EXCLUDE_SOURCE_PREFIX_PATH_DOC = + "Indicates whether to exclude the source prefix path from the destination key."; + /** * Creates a new {@link Config} instance. */ @@ -152,6 +199,17 @@ static ConfigDef configDef() { ConfigDef.Width.NONE, SUCCESS_AWS_PREFIX_PATH_CONFIG ) + .define( + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH + ) .define( FAILURES_AWS_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, @@ -173,6 +231,28 @@ static ConfigDef configDef() { groupCounter++, ConfigDef.Width.NONE, FAILURES_AWS_PREFIX_PATH_CONFIG + ) + .define( + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH + ) + .define( + EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + EXCLUDE_SOURCE_PREFIX_PATH_DOC, + CONFIG_GROUP, + groupCounter++, + ConfigDef.Width.NONE, + EXCLUDE_SOURCE_PREFIX_PATH_CONFIG ); } } diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java new file mode 100644 index 000000000..97ef97670 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseBzipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseBzipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "bzip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java new file mode 100644 index 000000000..0de96b685 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseGzipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseGzipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "gzip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java new file mode 100644 index 000000000..c7d5ce314 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/BaseZipAmazonS3Test.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs; + +import static io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3ClientConfig.*; + +import com.amazonaws.services.s3.AmazonS3; +import io.findify.s3mock.S3Mock; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.junit.After; +import org.junit.Before; + +public class BaseZipAmazonS3Test { + + public static final String S3_TEST_BUCKET = "testbucket"; + protected S3Mock s3Mock; + protected AmazonS3 client; + protected String endpointConfiguration; + protected Map unmodifiableCommonsProperties; + + @Before + public void setUp() throws Exception { + final Random generator = new Random(); + final int s3Port = generator.nextInt(10000) + 10000; + s3Mock = new S3Mock.Builder().withPort(s3Port).withInMemoryBackend().build(); + s3Mock.start(); + + endpointConfiguration = "http://localhost:" + s3Port; + unmodifiableCommonsProperties = new HashMap(); + unmodifiableCommonsProperties.put(AWS_ACCESS_KEY_ID_CONFIG, "test_key_id"); + unmodifiableCommonsProperties.put(AWS_SECRET_ACCESS_KEY_CONFIG, "test_secret_key"); + unmodifiableCommonsProperties.put(AWS_S3_BUCKET_NAME_CONFIG, S3_TEST_BUCKET); + unmodifiableCommonsProperties.put(AWS_S3_REGION_CONFIG, "us-west-2"); + unmodifiableCommonsProperties.put("compression.method", "zip"); + unmodifiableCommonsProperties = Collections.unmodifiableMap(unmodifiableCommonsProperties); + + client = AmazonS3ClientUtils.createS3Client( + new AmazonS3ClientConfig(unmodifiableCommonsProperties), + endpointConfiguration + ); + } + + @After + public void tearDown() throws Exception { + client.shutdown(); + s3Mock.shutdown(); + } +} diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java index c768c35a7..462a85f52 100644 --- a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/AmazonS3MoveCleanupPolicyTest.java @@ -22,6 +22,8 @@ public class AmazonS3MoveCleanupPolicyTest extends BaseAmazonS3Test { public static final String S3_TEST_BUCKET = "bucket"; public static final String OBJECT_NAME = "object"; public static final String S3_OBJECT_KEY = "input/" + OBJECT_NAME; + public static final String S3_OBJECT_KEY_WITH_PREFIX = "input/prefix/" + OBJECT_NAME; + public static final String EXCLUDE_SOURCE_PREFIX_PATH = "input"; private AmazonS3Storage storage; @@ -58,7 +60,36 @@ public void should_move_object_on_success() { Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/" + OBJECT_NAME).toURI())); } + @Test + public void should_move_object_on_success_with_prefix() { + // GIVEN + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents"); + var cleaner = new AmazonS3MoveCleanupPolicy(); + cleaner.setStorage(storage); + cleaner.configure(Map.of( + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/", + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/", + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH + )); + + // WHEN + FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX)); + cleaner.onSuccess(new FileObject( + objectMetadata, + FileObjectOffset.empty(), + FileObjectStatus.COMPLETED + ) + ); + + // THEN + Assert.assertFalse(storage.exists(objectMetadata.uri())); + Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/success/prefix/" + OBJECT_NAME).toURI())); + } + @Test public void should_move_object_on_failure() { // GIVEN @@ -85,4 +116,34 @@ public void should_move_object_on_failure() { Assert.assertFalse(storage.exists(objectMetadata.uri())); Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/" + OBJECT_NAME).toURI())); } + + @Test + public void should_move_object_on_failure_with_prefix() { + // GIVEN + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX, "contents"); + + var cleaner = new AmazonS3MoveCleanupPolicy(); + cleaner.setStorage(storage); + cleaner.configure(Map.of( + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_PREFIX_PATH_CONFIG, "/success/", + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_PREFIX_PATH_CONFIG, "/failure/", + AmazonS3MoveCleanupPolicy.Config.SUCCESS_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.FAILURES_AWS_INCLUDE_SOURCE_PREFIX_PATH, true, + AmazonS3MoveCleanupPolicy.Config.EXCLUDE_SOURCE_PREFIX_PATH_CONFIG, EXCLUDE_SOURCE_PREFIX_PATH + )); + + // WHEN + FileObjectMeta objectMetadata = storage.getObjectMetadata(new S3BucketKey(S3_TEST_BUCKET, S3_OBJECT_KEY_WITH_PREFIX)); + cleaner.onFailure(new FileObject( + objectMetadata, + FileObjectOffset.empty(), + FileObjectStatus.COMPLETED + ) + ); + + // THEN + Assert.assertFalse(storage.exists(objectMetadata.uri())); + Assert.assertTrue(storage.exists(new S3BucketKey(S3_TEST_BUCKET, "/failure/prefix/" + OBJECT_NAME).toURI())); + } } \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java new file mode 100644 index 000000000..c190bc6f2 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowBzipFileInputReaderTest.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseBzipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowBzipFileInputReaderTest extends BaseBzipAmazonS3Test{ + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new BZip2CompressorOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8)); + generateLines(writer); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_zip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + writer.close(); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java new file mode 100644 index 000000000..90581610b --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowGzipFileInputReaderTest.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseGzipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.GZIPOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowGzipFileInputReaderTest extends BaseGzipAmazonS3Test { + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(objectFile.toPath().toString())), StandardCharsets.UTF_8)); + generateLines(writer); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_gzip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + writer.close(); + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java new file mode 100644 index 000000000..bb7700116 --- /dev/null +++ b/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3RowZipFileInputReaderTest.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) StreamThoughts + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.kafka.connect.filepulse.fs.reader; + +import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct; +import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage; +import io.streamthoughts.kafka.connect.filepulse.fs.BaseZipAmazonS3Test; +import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey; +import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; +import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; +import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.attribute.FileTime; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AmazonS3RowZipFileInputReaderTest extends BaseZipAmazonS3Test{ + + private static final String LF = "\n"; + + private static final int NLINES = 10; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File objectFile; + + private AmazonS3RowFileInputReader reader; + + @Before + public void setUp() throws Exception { + super.setUp(); + objectFile = testFolder.newFile(); + System.out.println(objectFile.toPath()); + ZipOutputStream zos =new ZipOutputStream(new FileOutputStream(objectFile.toPath().toString())); + ZipEntry entry = new ZipEntry(objectFile.toPath().toFile().getName()); + entry.setCreationTime(FileTime.fromMillis(objectFile.toPath().toFile().lastModified())); + entry.setComment("created by jimbo"); + zos.putNextEntry(entry); + + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(zos, StandardCharsets.UTF_8)); + generateLines(writer); + zos.closeEntry(); + writer.close(); + + reader = new AmazonS3RowFileInputReader(); + reader.setStorage(new AmazonS3Storage(client)); + reader.configure(unmodifiableCommonsProperties); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + reader.close(); + } + + @Test + public void should_read_all_zip_lines() { + client.createBucket(S3_TEST_BUCKET); + client.putObject(S3_TEST_BUCKET, "my-key", objectFile); + + final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder() + .withUri(new S3BucketKey(S3_TEST_BUCKET, "my-key").toURI()) + .build(); + + final FileInputIterator> iterator = reader.newIterator(meta.uri()); + List> results = new ArrayList(); + while (iterator.hasNext()) { + final RecordsIterable> next = iterator.next(); + results.addAll(next.collect()); + } + Assert.assertEquals(10, results.size()); + } + + private void generateLines(final BufferedWriter writer) throws IOException { + + for (int i = 0; i < NLINES; i++) { + String line = "00000000-" + i; + writer.write(line); + if (i + 1 < NLINES) { + writer.write(LF); + } + } + writer.flush(); + + } +} \ No newline at end of file diff --git a/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml index 8b3299a48..67cb92eb5 100644 --- a/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-azure-storage-fs/pom.xml @@ -13,7 +13,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT Kafka Connect Source File Pulse Azure Storage FS @@ -25,7 +25,7 @@ ${project.parent.basedir}/.. ${project.parent.basedir}/../header 12.25.2 - 1.11.4 + 1.12.2 diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml index f45d85f35..44b0e07c9 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml @@ -10,7 +10,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 kafka-connect-filepulse-commons-fs diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/BytesRecordOffset.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/BytesRecordOffset.java index 373e7282d..ac7f219f9 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/BytesRecordOffset.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/BytesRecordOffset.java @@ -9,7 +9,6 @@ import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset; import io.streamthoughts.kafka.connect.filepulse.source.TimestampedRecordOffset; import java.util.Objects; -import org.apache.kafka.common.utils.SystemTime; public class BytesRecordOffset extends TimestampedRecordOffset { @@ -21,7 +20,8 @@ public static BytesRecordOffset empty() { return new BytesRecordOffset( -1, -1, - SystemTime.SYSTEM.milliseconds()); + System.currentTimeMillis() + ); } /** @@ -32,7 +32,7 @@ public static BytesRecordOffset empty() { */ public BytesRecordOffset(long startPosition, long endPosition) { - this(startPosition, endPosition, SystemTime.SYSTEM.milliseconds()); + this(startPosition, endPosition, System.currentTimeMillis()); } /** diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java index 647237b5f..1460b603d 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorConfig.java @@ -21,6 +21,10 @@ public class RowFileInputIteratorConfig extends AbstractConfig { public static final String FILE_ENCODING_DOC = "The text file encoding to use (default = UTF_8)"; public static final String FILE_ENCODING_DEFAULT = StandardCharsets.UTF_8.displayName(); + public static final String COMPRESSION_METHOD_CONFIG = "compression.method"; + public static final String COMPRESSION_METHOD_DOC = "The compression method - gzip zip bzip"; + public static final String COMPRESSION_METHOD_DEFAULT = ""; + public static final String BUFFER_INIT_BYTES_SIZE_CONFIG = "buffer.initial.bytes.size"; public static final String BUFFER_INIT_BYTES_SIZE_DOC = "The initial buffer size used to read input files"; public static final int BUFFER_INIT_BYTES_SIZE_DEFAULT = 4096; @@ -48,6 +52,8 @@ public RowFileInputIteratorConfig(final Map originals) { super(configDef(), originals); } + public String compressionMethod() {return getString(COMPRESSION_METHOD_CONFIG);} + public int bufferInitialBytesSize() { return getInt(BUFFER_INIT_BYTES_SIZE_CONFIG); } @@ -86,6 +92,10 @@ private static ConfigDef configDef() { ConfigDef.Importance.LOW, READER_WAIT_MAX_MS_DOC) .define(READER_FIELD_FOOTER_CONFIG, ConfigDef.Type.INT, READER_FIELD_FOOTER_DEFAULT, - ConfigDef.Importance.HIGH, READER_FIELD_FOOTER_DOC); + ConfigDef.Importance.HIGH, READER_FIELD_FOOTER_DOC) + + .define(COMPRESSION_METHOD_CONFIG, ConfigDef.Type.STRING, COMPRESSION_METHOD_DEFAULT, + ConfigDef.Importance.HIGH, COMPRESSION_METHOD_DOC); + } } diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java index 091f2177c..236f48169 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorFactory.java @@ -15,6 +15,10 @@ import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; import java.net.URI; +import java.util.Objects; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; public class RowFileInputIteratorFactory implements FileInputIteratorFactory { @@ -56,13 +60,46 @@ public FileInputIterator> newIterator(final URI objectUR .withIteratorManager(iteratorManager) .withReaderSupplier(() -> { try { - var br = new NonBlockingBufferReader( - storage.getInputStream(objectURI), - configs.bufferInitialBytesSize(), - configs.charset() - ); - br.disableAutoFlush(); - return br; + if (Objects.equals(configs.compressionMethod(), "gzip")){ + var br = new NonBlockingBufferReader( + new GZIPInputStream(storage.getInputStream(objectURI)), + configs.bufferInitialBytesSize(), + configs.charset() + ); + br.disableAutoFlush(); + return br; + } + else if (Objects.equals(configs.compressionMethod(), "bzip")){ + var br = new NonBlockingBufferReader( + new BZip2CompressorInputStream(storage.getInputStream(objectURI)), + configs.bufferInitialBytesSize(), + configs.charset() + ); + + br.disableAutoFlush(); + return br; + } + else if (Objects.equals(configs.compressionMethod(), "zip")){ + ZipInputStream zis =new ZipInputStream(storage.getInputStream(objectURI)); + zis.getNextEntry(); + var br = new NonBlockingBufferReader( + zis, + configs.bufferInitialBytesSize(), + configs.charset() + ); + br.disableAutoFlush(); + return br; + } + else { + var br = new NonBlockingBufferReader( + storage.getInputStream(objectURI), + configs.bufferInitialBytesSize(), + configs.charset() + ); + + br.disableAutoFlush(); + return br; + } } catch (Exception e) { throw new ReaderException("Failed to get InputStream for object: " + objectMetadata, e); } diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileRecordOffset.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileRecordOffset.java index 6bf5621cf..9e1f78c35 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileRecordOffset.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileRecordOffset.java @@ -8,7 +8,6 @@ import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset; import java.util.Objects; -import org.apache.kafka.common.utils.SystemTime; /** * Represents the position of a record into a text XML file. @@ -24,8 +23,9 @@ public static RowFileRecordOffset empty() { -1, -1, 0, - SystemTime.SYSTEM.milliseconds(), - 0); + System.currentTimeMillis(), + 0 + ); } public static RowFileRecordOffset with(long startPosition, long endPosition) { @@ -33,8 +33,9 @@ public static RowFileRecordOffset with(long startPosition, long endPosition) { startPosition, endPosition, 0, - SystemTime.SYSTEM.milliseconds(), - endPosition - startPosition); + System.currentTimeMillis(), + endPosition - startPosition + ); } /** diff --git a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml index 90b95b808..f2ce213ee 100644 --- a/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-google-cloud-storage-fs/pom.xml @@ -13,7 +13,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT Kafka Connect Source File Pulse Google Cloud Storage FS @@ -63,7 +63,7 @@ com.google.cloud google-cloud-nio - 0.127.6 + 0.127.20 test diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml index e29d144ff..a6db94be1 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-local-fs/pom.xml @@ -11,7 +11,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java index 1dad2547c..e4e611c2f 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java +++ b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListing.java @@ -49,7 +49,6 @@ public LocalFSDirectoryListing() { this(Collections.emptyList()); } - /** * Creates a new {@link LocalFSDirectoryListing} instance. * @@ -86,8 +85,7 @@ private Collection toSourceObjects(final Collection allFil } catch (ConnectFilePulseException e) { LOG.warn( "Failed to read metadata. Object file is ignored: {}", - e.getMessage() - ); + e.getMessage()); return Optional.empty(); } }) @@ -105,22 +103,54 @@ public void setFilter(final FileListFilter filter) { private List listEligibleFiles(final Path input) { final List listingLocalFiles = new LinkedList(); - if (!Files.isReadable(input)) { - LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", input); - return listingLocalFiles; - } - - if (!Files.isDirectory(input)) { - LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", input); + if (!isPathReadable(input)) { return listingLocalFiles; } - if (isHidden(input)) { + if (!isPathDirectory(input) || isHidden(input)) { return listingLocalFiles; } final List decompressedDirs = new LinkedList(); final List directories = new LinkedList(); + processFiles(input, listingLocalFiles, directories, decompressedDirs); + + if (config.isRecursiveScanEnable() && !directories.isEmpty()) { + listingLocalFiles.addAll(scanRecursiveDirectories(directories, decompressedDirs)); + } + return listingLocalFiles; + } + + private boolean isPathReadable(Path path) { + if (!Files.isReadable(path)) { + LOG.warn("Cannot get directory listing for '{}'. Input path is not readable.", path); + return false; + } + return true; + } + + private boolean isPathDirectory(Path path) { + if (!Files.isDirectory(path)) { + LOG.warn("Cannot get directory listing for '{}'. Input path is not a directory.", path); + return false; + } + return true; + } + + private boolean isHidden(final Path input) { + try { + return Files.isHidden(input); + } catch (IOException e) { + LOG.warn( + "Error while checking if input file is hidden '{}': {}", + input, + e.getLocalizedMessage()); + return false; + } + } + + private void processFiles(Path input, List listingLocalFiles, List directories, + List decompressedDirs) { try (DirectoryStream stream = Files.newDirectoryStream(input)) { for (Path path : stream) { if (Files.isDirectory(path)) { @@ -139,8 +169,14 @@ private List listEligibleFiles(final Path input) { final Path decompressed = codec.decompress(file).toPath(); listingLocalFiles.addAll(listEligibleFiles(decompressed)); decompressedDirs.add(decompressed); - } catch (IOException e) { - LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e); + LOG.debug("Compressed file extracted successfully : {}", path); + handleFileDeletion(file, path); + } catch (IOException | SecurityException e) { + if (e instanceof IOException) { + LOG.warn("Error while decompressing input file '{}'. Skip and continue.", path, e); + } else if (e instanceof SecurityException) { + LOG.warn("Error while deleting input file '{}'. Skip and continue.", path, e); + } } } else { // If no codec was found for the input file, @@ -152,37 +188,29 @@ private List listEligibleFiles(final Path input) { } } } catch (IOException e) { - LOG.error( - "Error while getting directory listing for {}: {}", - input, - e.getLocalizedMessage() - ); + LOG.error("Error while getting directory listing for {}: {}", input, e.getLocalizedMessage()); throw new ConnectException(e); } - if (config.isRecursiveScanEnable() && !directories.isEmpty()) { - listingLocalFiles.addAll(directories.stream() - .filter(f -> !decompressedDirs.contains(f)) - .flatMap(f -> listEligibleFiles(f).stream()) - .collect(Collectors.toList()) - ); - } - return listingLocalFiles; } - private boolean isHidden(final Path input) { - try { - return Files.isHidden(input); - } catch (IOException e) { - LOG.warn( - "Error while checking if input file is hidden '{}': {}", - input, - e.getLocalizedMessage() - ); - return false; + private void handleFileDeletion(File file, Path path) { + if (config.isDeleteCompressFileEnable() && file.exists()) { + if (file.delete()) { + LOG.debug("Compressed file deleted successfully : {}", path); + } else { + LOG.warn("Error while deleting input file '{}'. Skip and continue.", path); + } } } + private List scanRecursiveDirectories(List directories, List decompressedDirs) { + return directories.stream() + .filter(f -> !decompressedDirs.contains(f)) + .flatMap(f -> listEligibleFiles(f).stream()) + .collect(Collectors.toList()); + } + /** * {@inheritDoc} */ diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingConfig.java b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingConfig.java index 96e8e508c..8c59e0c88 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingConfig.java +++ b/connect-file-pulse-filesystems/filepulse-local-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingConfig.java @@ -12,30 +12,37 @@ public class LocalFSDirectoryListingConfig extends AbstractConfig { - public static final String FS_LISTING_DIRECTORY_PATH = "fs.listing.directory.path"; public static final String FS_LISTING_DIRECTORY_DOC = "The input directory to scan"; - public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled"; - private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " + - "should be recursively scanned (default true)."; + public static final String FS_RECURSIVE_SCAN_ENABLE_CONFIG = "fs.listing.recursive.enabled"; + private static final String FS_RECURSIVE_SCAN_ENABLE_DOC = "Boolean indicating whether local directory " + + "should be recursively scanned (default true)."; + + public static final String FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG = "fs.delete.compress.files.enabled"; + private static final String FS_DELETE_COMPRESS_FILES_ENABLE_DOC = "Flag indicating whether compressed file " + + "should be deleted after extraction (default false)"; public static ConfigDef getConf() { return new ConfigDef() - .define( - FS_LISTING_DIRECTORY_PATH, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - FS_LISTING_DIRECTORY_DOC - ) - - .define( - FS_RECURSIVE_SCAN_ENABLE_CONFIG, - ConfigDef.Type.BOOLEAN, - true, - ConfigDef.Importance.MEDIUM, - FS_RECURSIVE_SCAN_ENABLE_DOC - ); + .define( + FS_LISTING_DIRECTORY_PATH, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + FS_LISTING_DIRECTORY_DOC) + + .define( + FS_RECURSIVE_SCAN_ENABLE_CONFIG, + ConfigDef.Type.BOOLEAN, + true, + ConfigDef.Importance.MEDIUM, + FS_RECURSIVE_SCAN_ENABLE_DOC) + .define( + FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.MEDIUM, + FS_DELETE_COMPRESS_FILES_ENABLE_DOC); } /** @@ -53,4 +60,8 @@ public boolean isRecursiveScanEnable() { public String listingDirectoryPath() { return this.getString(FS_LISTING_DIRECTORY_PATH); } + + public boolean isDeleteCompressFileEnable() { + return getBoolean(FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG); + } } diff --git a/connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingTest.java b/connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingTest.java index b53ad815a..bec14e7c6 100644 --- a/connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingTest.java +++ b/connect-file-pulse-filesystems/filepulse-local-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/LocalFSDirectoryListingTest.java @@ -27,7 +27,7 @@ public class LocalFSDirectoryListingTest { private static final String DEFAULT_ENTRY_FILE_NAME = "file-entry-0.txt"; - private static final String DEFAULT_ARCHIVE_NAME = "archive"; + private static final String DEFAULT_ARCHIVE_NAME = "archive"; private static final String TEST_SCAN_DIRECTORY = "test-scan"; @Rule @@ -54,19 +54,22 @@ public void shouldExtractXZGipCompressedFilesPathWhileScanningGivenRecursiveScan zos.closeEntry(); } - scanner.configure(new HashMap() {{ - put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false); - put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); - }}); + scanner.configure(new HashMap() { + { + put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false); + put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); + } + }); final Collection scanned = scanner.listObjects(); Assert.assertEquals(1, scanned.size()); - String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME)); + String expected = String.join(File.separator, + Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ENTRY_FILE_NAME)); Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next())); } @Test - public void shouldExtractGzipCompressedFiles() throws IOException { + public void shouldExtractGzipCompressedFilesAndKeepGzipFileAfterExtraction() throws IOException { File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz"); try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) { @@ -74,27 +77,56 @@ public void shouldExtractGzipCompressedFiles() throws IOException { os.write(data, 0, data.length); } - scanner.configure(new HashMap() {{ - put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); - }}); + scanner.configure(new HashMap() { + { + put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); + } + }); final Collection scanned = scanner.listObjects(); Assert.assertEquals(1, scanned.size()); String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(), DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME)); Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next())); + Assert.assertTrue(archiveFile.exists()); } - + + @Test + public void shouldExtractGzipCompressedFilesAndDeleteGzipFileAfterExtraction() throws IOException { + File archiveFile = new File(inputDirectory, DEFAULT_ARCHIVE_NAME + ".gz"); + + try (GZIPOutputStream os = new GZIPOutputStream(new FileOutputStream(archiveFile))) { + byte[] data = "dummy".getBytes(); + os.write(data, 0, data.length); + } + + scanner.configure(new HashMap() { + { + put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); + put(LocalFSDirectoryListingConfig.FS_DELETE_COMPRESS_FILES_ENABLED_CONFIG, true); + } + }); + + final Collection scanned = scanner.listObjects(); + Assert.assertEquals(1, scanned.size()); + String expected = String.join(File.separator, Arrays.asList(inputDirectory.getCanonicalPath(), + DEFAULT_ARCHIVE_NAME, DEFAULT_ARCHIVE_NAME)); + Assert.assertEquals(expected, getCanonicalPath(scanned.iterator().next())); + Assert.assertTrue(!archiveFile.exists()); + } + @Test public void shouldListFilesGivenRecursiveScanEnable() throws IOException { - folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory"); + folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory"); final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt"); final File file2 = folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt"); - scanner.configure(new HashMap(){{ - put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true); - put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); - }}); + scanner.configure(new HashMap() { + { + put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, true); + put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); + } + }); final Collection scanned = scanner .listObjects() @@ -117,14 +149,16 @@ private String getCanonicalPath(final FileObjectMeta s) { @Test public void shouldListFilesGivenRecursiveScanDisable() throws IOException { - folder.newFolder(TEST_SCAN_DIRECTORY , "sub-directory"); + folder.newFolder(TEST_SCAN_DIRECTORY, "sub-directory"); final File file1 = folder.newFile(TEST_SCAN_DIRECTORY + "/test-file1.txt"); folder.newFile(TEST_SCAN_DIRECTORY + "/sub-directory/test-file2.txt"); // will not be scanned - scanner.configure(new HashMap(){{ - put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false); - put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); - }}); + scanner.configure(new HashMap() { + { + put(LocalFSDirectoryListingConfig.FS_RECURSIVE_SCAN_ENABLE_CONFIG, false); + put(LocalFSDirectoryListingConfig.FS_LISTING_DIRECTORY_PATH, inputDirectory.getAbsolutePath()); + } + }); final Collection scanned = scanner .listObjects() diff --git a/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml b/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml index 09c126efe..d2c2f4e80 100644 --- a/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml +++ b/connect-file-pulse-filesystems/filepulse-sftp-fs/pom.xml @@ -11,7 +11,7 @@ io.streamthoughts kafka-connect-filepulse-filesystems - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 @@ -23,8 +23,8 @@ ${project.parent.basedir}/../header 0.1.55 5.5.0 - 3.24.2 - 1.18.30 + 3.26.3 + 1.18.34 diff --git a/connect-file-pulse-filesystems/pom.xml b/connect-file-pulse-filesystems/pom.xml index 0c9141b15..d6491b02f 100644 --- a/connect-file-pulse-filesystems/pom.xml +++ b/connect-file-pulse-filesystems/pom.xml @@ -20,7 +20,7 @@ limitations under the License. kafka-connect-filepulse-reactor io.streamthoughts - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT 4.0.0 diff --git a/connect-file-pulse-filters/pom.xml b/connect-file-pulse-filters/pom.xml index 8d327a1ab..d5bcf2611 100644 --- a/connect-file-pulse-filters/pom.xml +++ b/connect-file-pulse-filters/pom.xml @@ -13,7 +13,7 @@ io.streamthoughts kafka-connect-filepulse-reactor - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT kafka-connect-filepulse-filters @@ -77,7 +77,7 @@ org.json json - 20230618 + 20240303 diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java index 86725b805..963e07ee4 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java @@ -42,6 +42,8 @@ public abstract class AbstractDelimitedRowFilter columnsTypesByIndex = new HashMap(); /** @@ -103,6 +105,11 @@ public RecordsIterable apply(final FilterContext context, if (schema == null || isSchemaDynamic()) { inferSchemaFromRecord(record, columnValues.length); } + + if (schema != null && configs.extractColumnName() != null && shouldInferSchema(record)) { + inferSchemaFromRecord(record, columnValues.length); + } + final TypedStruct struct = buildStructForFields(columnValues); return RecordsIterable.of(struct); } @@ -115,12 +122,23 @@ public boolean isSchemaDynamic() { configs.isAutoGenerateColumnNames(); } + private boolean shouldInferSchema(TypedStruct record) { + if (cachedHeaders == null) { + return false; + } + final String fieldName = configs.extractColumnName(); + String field = record.first(fieldName).getString(); + return cachedHeaders.length() == field.length() && !cachedHeaders.equals(field); + } + private void inferSchemaFromRecord(final TypedStruct record, int numColumns) { schema = Schema.struct(); if (configs.extractColumnName() != null) { final String fieldName = configs.extractColumnName(); String field = record.first(fieldName).getString(); + cachedHeaders = field; + if (field == null) { throw new FilterException( "Cannot find field for name '" + fieldName + "' to determine columns names" diff --git a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java index 36ceb836c..0792709bf 100644 --- a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java +++ b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java @@ -39,6 +39,100 @@ public void setUp() { configs.put(CSVFilter.PARSER_SEPARATOR_CONFIG, ";"); } + @Test + public void should_extract_column_names_from_diff_order_headers() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;3;value2") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("value2", record1.getString("col1")); + Assert.assertEquals("3", record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + + final TypedStruct input2 = TypedStruct.create() + .put("message", "4;false;value3") + .put("headers", Arrays.asList("col2;col3;col1")); + + RecordsIterable output2 = filter.apply(null, input2, false); + Assert.assertNotNull(output2); + Assert.assertEquals(1, output2.size()); + + final TypedStruct record2 = output2.iterator().next(); + Assert.assertEquals("value3", record2.getString("col1")); + Assert.assertEquals("4", record2.getString("col2")); + Assert.assertEquals("false", record2.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_null_value() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;;") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertNull(record1.getString("col1")); + Assert.assertNull(record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_diff_size() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;4;") + .put("headers", Arrays.asList("col3;col2")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("false", record1.getString("col1")); + Assert.assertEquals("4", record1.getString("col2")); + Assert.assertNull(record1.getString("col3")); + } + @Test public void should_auto_generate_schema_given_no_schema_field() { filter.configure(configs, alias -> null); diff --git a/connect-file-pulse-plugin/pom.xml b/connect-file-pulse-plugin/pom.xml index 0ba432eeb..28e5a3a93 100644 --- a/connect-file-pulse-plugin/pom.xml +++ b/connect-file-pulse-plugin/pom.xml @@ -13,7 +13,7 @@ io.streamthoughts kafka-connect-filepulse-reactor - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT kafka-connect-filepulse-plugin diff --git a/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/AbstractKafkaConnectTest.java b/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/AbstractKafkaConnectTest.java index 5bcf74123..0d0ef0475 100644 --- a/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/AbstractKafkaConnectTest.java +++ b/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/AbstractKafkaConnectTest.java @@ -55,7 +55,7 @@ public String getConnectWorker() { } private GenericContainer> createConnectWorkerContainer() { - return new GenericContainer(DockerImageName.parse("confluentinc/cp-kafka-connect-base:6.2.1")) + return new GenericContainer(DockerImageName.parse("confluentinc/cp-kafka-connect-base:7.9.0")) .withLogConsumer(new Slf4jLogConsumer(LOG)) .withNetwork(kafka.getKafkaNetwork()) .withExposedPorts(CONNECT_PORT) diff --git a/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/RedpandaKafkaContainer.java b/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/RedpandaKafkaContainer.java index af231e85f..0c3ba36f6 100644 --- a/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/RedpandaKafkaContainer.java +++ b/connect-file-pulse-plugin/src/integration-test/java/io/streamthoughts/kafka/connect/filepulse/RedpandaKafkaContainer.java @@ -28,7 +28,7 @@ */ public final class RedpandaKafkaContainer extends GenericContainer { - public static final String VECTORIZED_REDPANDA_LATEST = "vectorized/redpanda:latest"; + public static final String VECTORIZED_REDPANDA_LATEST = "redpandadata/redpanda:latest"; private static final Logger LOG = LoggerFactory.getLogger(RedpandaKafkaContainer.class); diff --git a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicy.java b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicy.java index 746f6db3d..779ef8b2c 100644 --- a/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicy.java +++ b/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicy.java @@ -28,13 +28,13 @@ public final class RegexRouterCleanupPolicy implements FileCleanupPolicy { public static final String SUCCESS_ROUTE_TOPIC_REGEX_CONFIG = CONFIG_PREFIX + "success.uri.regex"; private static final String SUCCESS_ROUTE_TOPIC_REGEX_DOC = - "Regular expression to use for matching objects in success."; + "Regular expression to use for matching objects in success."; public static final String SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG = CONFIG_PREFIX + "success.uri.replacement"; private static final String SUCCESS_ROUTE_TOPIC_REPLACEMENT_DOC = "Replacement string."; public static final String FAILURE_ROUTE_TOPIC_REGEX_CONFIG = CONFIG_PREFIX + "failure.uri.regex"; private static final String FAILURE_ROUTE_TOPIC_REGEX_DOC = - "Regular expression to use for matching objects in failure."; + "Regular expression to use for matching objects in failure."; public static final String FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG = CONFIG_PREFIX + "failure.uri.replacement"; private static final String FAILURE_ROUTE_TOPIC_REPLACEMENT_DOC = "Replacement string."; @@ -57,8 +57,8 @@ public void configure(final Map configs) { successReplacement = simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG); successRegex = Pattern.compile(simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REGEX_CONFIG)); - failureReplacement = simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG); - failureRegex = Pattern.compile(simpleConfig.getString(SUCCESS_ROUTE_TOPIC_REGEX_CONFIG)); + failureReplacement = simpleConfig.getString(FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG); + failureRegex = Pattern.compile(simpleConfig.getString(FAILURE_ROUTE_TOPIC_REGEX_CONFIG)); } /** diff --git a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicyTest.java b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicyTest.java index f081636f6..4e93229b0 100644 --- a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicyTest.java +++ b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/clean/RegexRouterCleanupPolicyTest.java @@ -76,8 +76,8 @@ void should_rename_failure_file_given_cleanup_with_custom_config() { try (RegexRouterCleanupPolicy policy = new RegexRouterCleanupPolicy()) { policy.setStorage(new LocalFileStorage()); policy.configure(Map.of( - RegexRouterCleanupPolicy.SUCCESS_ROUTE_TOPIC_REGEX_CONFIG,"(.*).txt", - RegexRouterCleanupPolicy.SUCCESS_ROUTE_TOPIC_REPLACEMENT_CONFIG, "1γγ«-DONE-FAILURE.txt" + RegexRouterCleanupPolicy.FAILURE_ROUTE_TOPIC_REGEX_CONFIG,"(.*).txt", + RegexRouterCleanupPolicy.FAILURE_ROUTE_TOPIC_REPLACEMENT_CONFIG, "1γγ«-DONE-FAILURE.txt" )); URI targetURI = policy.routeOnFailure(source.metadata().uri()); diff --git a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/offset/DefaultOffsetPolicyTest.java b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/offset/DefaultOffsetPolicyTest.java index 9462a3058..929811051 100644 --- a/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/offset/DefaultOffsetPolicyTest.java +++ b/connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/offset/DefaultOffsetPolicyTest.java @@ -13,19 +13,28 @@ import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.SystemUtils; import org.junit.Assert; import org.junit.Test; public class DefaultOffsetPolicyTest { + private String getValidPath() { + String validPath = SystemUtils.OS_NAME; + if (validPath.contains("Windows")) { + return "C:\\tmp\\path"; + } else { + return "/tmp/path"; + } + } + private static final GenericFileObjectMeta metadata = new GenericFileObjectMeta( URI.create("file:///tmp/path/test"), "test", 0L, 123L, new FileObjectMeta.ContentDigest("789", "dummy"), - Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L") - ); + Collections.singletonMap(LocalFileObjectMeta.SYSTEM_FILE_INODE_META_KEY, "456L")); @Test(expected = IllegalArgumentException.class) public void should_throw_illegal_argument_given_empty_strategy() { @@ -46,7 +55,7 @@ public void should_throw_npe_given_unknown_strategy() { public void should_get_offset_based_on_path() { Map result = new DefaultSourceOffsetPolicy("PATH").toPartitionMap(metadata); Assert.assertEquals(1, result.size()); - Assert.assertEquals("/tmp/path", result.get("path")); + Assert.assertEquals(getValidPath(), result.get("path")); } @Test @@ -76,7 +85,7 @@ public void should_get_offset_based_on_name() { public void should_get_composed_offset_based_on_path_and_hash() { Map result = new DefaultSourceOffsetPolicy("PATH+HASH").toPartitionMap(metadata); Assert.assertEquals(2, result.size()); - Assert.assertEquals("/tmp/path", result.get("path")); + Assert.assertEquals(getValidPath(), result.get("path")); Assert.assertEquals("789", result.get("hash")); } diff --git a/docker-compose-debug.yml b/docker-compose-debug.yml index 7b318c876..9f3e6f242 100644 --- a/docker-compose-debug.yml +++ b/docker-compose-debug.yml @@ -1,7 +1,7 @@ version: '3' services: cp-zookeeper: - image: confluentinc/cp-zookeeper:7.5.0 + image: confluentinc/cp-zookeeper:7.9.0 hostname: zookeeper container_name: zookeeper ports: @@ -13,7 +13,7 @@ services: - kafka-connect cp-kafka: - image: confluentinc/cp-kafka:7.5.0 + image: confluentinc/cp-kafka:7.9.0 hostname: kafka container_name: kafka depends_on: @@ -36,7 +36,7 @@ services: - kafka-connect cp-schema-registry: - image: confluentinc/cp-schema-registry:7.5.0 + image: confluentinc/cp-schema-registry:7.9.0 hostname: schema-registry container_name: schema-registry depends_on: diff --git a/docker-compose.yml b/docker-compose.yml index 6a863e26e..ae9823071 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ version: '3' services: cp-zookeeper: - image: confluentinc/cp-zookeeper:7.5.0 + image: confluentinc/cp-zookeeper:7.9.0 hostname: zookeeper container_name: zookeeper ports: - - "2181:2181" + - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 @@ -13,14 +13,14 @@ services: - kafka-connect cp-kafka: - image: confluentinc/cp-kafka:7.5.0 + image: confluentinc/cp-kafka:7.9.0 hostname: kafka container_name: kafka depends_on: - - cp-zookeeper + - cp-zookeeper ports: - - "29092:29092" - - "9092:9092" + - "29092:29092" + - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' @@ -36,13 +36,13 @@ services: - kafka-connect cp-schema-registry: - image: confluentinc/cp-schema-registry:7.5.0 + image: confluentinc/cp-schema-registry:7.9.0 hostname: schema-registry container_name: schema-registry depends_on: - - cp-kafka + - cp-kafka ports: - - "8081:8081" + - "8081:8081" environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092 SCHEMA_REGISTRY_HOST_NAME: schema-registry @@ -50,13 +50,13 @@ services: - kafka-connect connect-file-pulse: - image: streamthoughts/kafka-connect-file-pulse:2.13.0 + image: streamthoughts/kafka-connect-file-pulse:2.14.1 container_name: connect depends_on: - - cp-kafka + - cp-kafka ports: - - "8083:8083" - - "8000:8000" + - "8083:8083" + - "8000:8000" environment: CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect diff --git a/docker/Dockerfile b/docker/Dockerfile index 78eea4b2e..bcdf5e681 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -5,7 +5,7 @@ # Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 # # Kafka Connect File Pulse -FROM confluentinc/cp-kafka-connect-base:7.6.0 +FROM confluentinc/cp-kafka-connect-base:7.9.0 USER root diff --git a/docs/content/en/docs/Developer Guide/file-system-listing/local-filesystem.md b/docs/content/en/docs/Developer Guide/file-system-listing/local-filesystem.md index 09054b0cb..a358a379d 100644 --- a/docs/content/en/docs/Developer Guide/file-system-listing/local-filesystem.md +++ b/docs/content/en/docs/Developer Guide/file-system-listing/local-filesystem.md @@ -19,10 +19,11 @@ Use the following property in your Connector's configuration: The following table describes the properties that can be used to configure the `LocalFSDirectoryListing`: -| Configuration | Description | Type | Default | Importance | -|--------------------------------|-----------------------------------------------------------------------|-----------|---------|------------| -| `fs.listing.directory.path` | The input directory to scan | `string` | - | HIGH | -| `fs.listing.recursive.enabled` | Flag indicating whether local directory should be recursively scanned | `boolean` | `true` | MEDIUM | +| Configuration | Description | Type | Default | Importance | +| ---------------------------------- | -------------------------------------------------------------------------- | --------- | ------- | ---------- | +| `fs.listing.directory.path` | The input directory to scan | `string` | - | HIGH | +| `fs.listing.recursive.enabled` | Flag indicating whether local directory should be recursively scanned | `boolean` | `true` | MEDIUM | +| `fs.delete.compress.files.enabled` | Flag indicating whether compressed file should be deleted after extraction | `boolean` | `false` | MEDIUM | ## Supported File types diff --git a/docs/content/en/docs/Getting started/_index.md b/docs/content/en/docs/Getting started/_index.md index 21373714b..0214965ee 100644 --- a/docs/content/en/docs/Getting started/_index.md +++ b/docs/content/en/docs/Getting started/_index.md @@ -1,5 +1,5 @@ --- -date: 2022εΉ΄06ζ06ζ₯ +date: 2024εΉ΄09ζ09ζ₯ title: "Getting Started" linkTitle: "Getting Started" weight: 10 @@ -69,7 +69,7 @@ curl -X GET http://localhost:8083/connectors/connect-file-pulse-quickstart-log4j $ docker exec -it -e KAFKA_OPTS="" connect kafka-avro-console-consumer \ --topic connect-file-pulse-quickstart-log4j \ --from-beginning \ ---bootstrap-server broker:29092 \ +--bootstrap-server kafka:29092 \ --property schema.registry.url=http://schema-registry:8081 ``` @@ -91,7 +91,7 @@ Connect File Pulse use an internal topic to track the current state of files bei $ docker exec -it -e KAFKA_OPTS="" connect kafka-console-consumer \ --topic connect-file-pulse-status \ --from-beginning \ ---bootstrap-server broker:29092 +--bootstrap-server kafka:29092 ``` (output) @@ -146,7 +146,7 @@ docker logs --tail="all" -f connect | grep "source task is transitioning to IDLE $ docker exec -it connect kafka-avro-console-consumer \ --topic connect-file-pulse-quickstart-csv \ --from-beginning \ ---bootstrap-server broker:29092 \ +--bootstrap-server kafka:29092 \ --property schema.registry.url=http://schema-registry:8081 ``` diff --git a/docs/package-lock.json b/docs/package-lock.json index 0be8d0165..41e65dcd3 100644 --- a/docs/package-lock.json +++ b/docs/package-lock.json @@ -82,12 +82,23 @@ "dev": true }, "braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dev": true, "requires": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" + }, + "dependencies": { + "fill-range": { + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", + "dev": true, + "requires": { + "to-regex-range": "^5.0.1" + } + } } }, "browserslist": { @@ -206,15 +217,6 @@ "reusify": "^1.0.4" } }, - "fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", - "dev": true, - "requires": { - "to-regex-range": "^5.0.1" - } - }, "fraction.js": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/fraction.js/-/fraction.js-4.2.0.tgz", diff --git a/examples/connect-file-pulse-example-override-topic-and-key.json b/examples/connect-file-pulse-example-override-topic-and-key.json index f6cd0eaa5..2be32793e 100644 --- a/examples/connect-file-pulse-example-override-topic-and-key.json +++ b/examples/connect-file-pulse-example-override-topic-and-key.json @@ -19,9 +19,9 @@ "topic": "connect-file-pulse-quickstart-csv", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, "tasks.max": 1 -} \ No newline at end of file +} diff --git a/examples/connect-file-pulse-quickstart-avro.json b/examples/connect-file-pulse-quickstart-avro.json index ce66d7de9..e73e38216 100644 --- a/examples/connect-file-pulse-quickstart-avro.json +++ b/examples/connect-file-pulse-quickstart-avro.json @@ -10,9 +10,9 @@ "topic": "connect-file-pulse-quickstart-avro", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalAvroFileInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, "tasks.max": 1 -} \ No newline at end of file +} diff --git a/examples/connect-file-pulse-quickstart-csv.json b/examples/connect-file-pulse-quickstart-csv.json index 067b0d421..d1c125cac 100644 --- a/examples/connect-file-pulse-quickstart-csv.json +++ b/examples/connect-file-pulse-quickstart-csv.json @@ -21,7 +21,7 @@ "topic": "connect-file-pulse-quickstart-csv", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, diff --git a/examples/connect-file-pulse-quickstart-log4j.json b/examples/connect-file-pulse-quickstart-log4j.json index ebd7c69f3..1e00a034b 100644 --- a/examples/connect-file-pulse-quickstart-log4j.json +++ b/examples/connect-file-pulse-quickstart-log4j.json @@ -22,9 +22,9 @@ "topic": "connect-file-pulse-quickstart-log4j", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, "tasks.max": 1 -} \ No newline at end of file +} diff --git a/examples/connect-file-pulse-quickstart-raw.json b/examples/connect-file-pulse-quickstart-raw.json index 9e849b006..28c73602a 100644 --- a/examples/connect-file-pulse-quickstart-raw.json +++ b/examples/connect-file-pulse-quickstart-raw.json @@ -10,7 +10,7 @@ "topic": "connect-file-pulse-quickstart-csv", "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalBytesArrayInputReader", "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connect-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, diff --git a/examples/connect-file-pulse-quickstart-xml.json b/examples/connect-file-pulse-quickstart-xml.json index 849ffa8fa..c934e016a 100644 --- a/examples/connect-file-pulse-quickstart-xml.json +++ b/examples/connect-file-pulse-quickstart-xml.json @@ -12,9 +12,9 @@ "reader.xml.parser.namespace.aware.enabled": true, "reader.xml.exclude.empty.elements": true, "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore", - "tasks.file.status.storage.bootstrap.servers": "broker:29092", + "tasks.file.status.storage.bootstrap.servers": "kafka:29092", "tasks.file.status.storage.topic": "connct-file-pulse-status", "tasks.file.status.storage.topic.partitions": 10, "tasks.file.status.storage.topic.replication.factor": 1, "tasks.max": 1 -} \ No newline at end of file +} diff --git a/pom.xml b/pom.xml index 4db4cb3a0..632de5380 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ io.streamthoughts kafka-connect-filepulse-reactor pom - 2.15.0-SNAPSHOT + 2.17.0-SNAPSHOT Kafka Connect Source File Pulse Reactor Connect File Pulse is a multipurpose source connector for streaming files from a local filesystem to @@ -58,13 +58,9 @@ https://github.com/streamthoughts/kafka-connect-file-pulse - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ + sonatype-central + https://central.sonatype.com/api/v1/publisher/maven @@ -79,31 +75,31 @@ ${jdk.version} ${jdk.version} - 3.2.5 + 3.5.2 3.0.0-M7 - 3.3.0 + 3.3.1 3.6.0 - 3.1.0 + 3.2.7 3.6.0 - 3.11.0 + 3.13.0 3.3.0 1.8.2 - 1.11.0 + 1.16.0 https://packages.confluent.io/maven/ - 7.6.0 - 3.6.1 - 5.10.2 - 1.19.1 + 7.9.0 + 3.9.0 + 5.11.0 + 1.19.7 0.12.0 ${basedir} 2.17.0 - 2.17.0 - 2.15.2 + 2.17.2 + 2.18.1 2.0.9 4.8.3.1 1.13.1 - 3.3.6 + 3.4.1 0.8.9 streamthoughts @@ -117,11 +113,11 @@ ${project.basedir}/header 2.39.0 3.7.1 - 12.3 + 12.5 0.9.23 - 1.11.3 + 1.12.0 1.26.1 - 2.15.1 + 2.16.1 @@ -243,7 +239,7 @@ com.github.spotbugs spotbugs - 4.8.3 + 4.9.3 @@ -258,7 +254,7 @@ com.h3xstream.findsecbugs findsecbugs-plugin - 1.12.0 + 1.13.0 @@ -412,14 +408,14 @@ - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.13 + org.sonatype.central + central-publishing-maven-plugin + 0.7.0 true - ossrh - https://oss.sonatype.org/ - false + sonatype-central + true + published AltStyle γ«γγ£γ¦ε€ζγγγγγΌγΈ (->γͺγͺγΈγγ«) / γ’γγ¬γΉ: γ’γΌγ: γγγ©γ«γ ι³ε£°γγ©γ¦γΆ γ«γδ»γ ι θ²εθ»’ ζεζ‘ε€§ γ’γγ€γ«
AltStyle γ«γγ£γ¦ε€ζγγγγγΌγΈ (->γͺγͺγΈγγ«) / γ’γγ¬γΉ: γ’γΌγ: γγγ©γ«γ ι³ε£°γγ©γ¦γΆ γ«γδ»γ ι θ²εθ»’ ζεζ‘ε€§ γ’γγ€γ«