diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml
index 5e850d7975..3e905f1c92 100644
--- a/.github/workflows/gradle-wrapper-validation.yml
+++ b/.github/workflows/gradle-wrapper-validation.yml
@@ -9,5 +9,5 @@ jobs:
name: "Validation"
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0
diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml
index e34ef65e00..6038849ccc 100644
--- a/.github/workflows/gradle_branch.yml
+++ b/.github/workflows/gradle_branch.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- - name: Set up JDK 8
- uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml
index 9549026590..cb9a43e76e 100644
--- a/.github/workflows/gradle_jdk11.yml
+++ b/.github/workflows/gradle_jdk11.yml
@@ -12,19 +12,22 @@ on:
permissions:
contents: read
+env:
+ BUILD_WITH_11: true
+
jobs:
build:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Set up JDK 11
- uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -35,5 +38,5 @@ jobs:
run: ./gradlew -PjavaCompatibility=9 jar
- name: Build RxJava
run: ./gradlew build --stacktrace
- - name: Generate Javadoc
- run: ./gradlew javadoc --stacktrace
+# - name: Generate Javadoc
+# run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml
index d1c17aa766..fbc4f47a2b 100644
--- a/.github/workflows/gradle_pr.yml
+++ b/.github/workflows/gradle_pr.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- - name: Set up JDK 8
- uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml
index 62a36633e1..e89b7add66 100644
--- a/.github/workflows/gradle_release.yml
+++ b/.github/workflows/gradle_release.yml
@@ -22,14 +22,14 @@ jobs:
env:
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- - name: Set up JDK 8
- uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -43,9 +43,18 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload to Codecov
- uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0
- - name: Upload release
- run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
+# - name: Upload release
+# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+# env:
+# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
+# # ------------------------------------------------------------------------------
+# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
+# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
+# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
+# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
+ - name: Publish release
+ run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
@@ -53,13 +62,6 @@ jobs:
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
- - name: Publish release
- run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace
- env:
- # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
- # ------------------------------------------------------------------------------
- ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
- ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Push Javadoc
run: ./push_javadoc.sh
env:
diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml
index 57a7bea53e..06320aa11c 100644
--- a/.github/workflows/gradle_snapshot.yml
+++ b/.github/workflows/gradle_snapshot.yml
@@ -21,14 +21,14 @@ jobs:
# ------------------------------------------------------------------------------
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- - name: Set up JDK 8
- uses: actions/setup-java@6a0805fcefea3d4657a47ac4c165951e33482018 # v4.2.2
+ - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
+ - name: Set up JDK 11
+ uses: actions/setup-java@f2beeb24e141e01a676f977032f5a29d81c9e27e # v5.1.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@0c45773b623bea8c8e75f6c82b208c3cf94ea4f9 # v4.0.2
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -40,14 +40,14 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload Snapshot
- run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace
+ run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Upload to Codecov
- uses: codecov/codecov-action@e28ff129e5465c2c0dcc6f003fc735cb6ae0c673 # v4.5.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Push Javadoc
run: ./push_javadoc.sh
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml
index aff218a9c3..88f38756ac 100644
--- a/.github/workflows/scorecard.yml
+++ b/.github/workflows/scorecard.yml
@@ -24,12 +24,12 @@ jobs:
steps:
- name: "Checkout code"
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
+ uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: "Run analysis"
- uses: ossf/scorecard-action@62b2cac7ed8198b15735ed49ab1e5cf35480ba46 # v2.4.0
+ uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3
with:
results_file: results.sarif
results_format: sarif
@@ -46,7 +46,7 @@ jobs:
# Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
# format to the repository Actions tab.
- name: "Upload artifact"
- uses: actions/upload-artifact@50769540e7f4bd5e21e526ee35c689e35e0d6874 # v4.4.0
+ uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: SARIF file
path: results.sarif
@@ -54,6 +54,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
- uses: github/codeql-action/upload-sarif@4dd16135b69a43b6c8efb853346f8437d92d3c93 # v3.26.6
+ uses: github/codeql-action/upload-sarif@b20883b0cd1f46c72ae0ba6d1090936928f9fa30 # v3.29.5
with:
sarif_file: results.sarif
diff --git a/README.md b/README.md
index 1456d2b68c..5276c0bd37 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x)
-[](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+[](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
[](https://gitpod.io/#https://github.com/ReactiveX/RxJava)
[](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava)
@@ -48,7 +48,7 @@ The first step is to include RxJava 3 into your project, for example, as a Gradl
implementation "io.reactivex.rxjava3:rxjava:3.x.y"
```
-(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
)
### Hello World
@@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki
- Google Group: [RxJava](http://groups.google.com/d/forum/rxjava)
- Twitter: [@RxJava](http://twitter.com/RxJava)
- [GitHub Issues](https://github.com/ReactiveX/RxJava/issues)
-- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2)
+- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3)
- [Gitter.im](https://gitter.im/ReactiveX/RxJava)
## Versioning
@@ -571,11 +571,11 @@ and for Ivy:
### Snapshots
-Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
+Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/
```groovy
repositories {
- maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
+ maven { url 'https://central.sonatype.com/repository/maven-snapshots' }
}
dependencies {
@@ -583,7 +583,7 @@ dependencies {
}
```
-JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot
+JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot
## Build
diff --git a/build.gradle b/build.gradle
index cbad8caaa4..8bcfddb717 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,12 +4,13 @@ plugins {
id("eclipse")
id("jacoco")
id("maven-publish")
- id("ru.vyarus.animalsniffer") version "1.7.1"
- id("me.champeau.gradle.jmh") version "0.5.3"
+ id("ru.vyarus.animalsniffer") version "2.0.1"
+ id("me.champeau.jmh") version "0.7.3"
id("com.github.hierynomus.license") version "0.16.1"
id("biz.aQute.bnd.builder") version "6.4.0"
- id("com.vanniktech.maven.publish") version "0.19.0"
- id("org.beryx.jar") version "1.2.0"
+ id("com.vanniktech.maven.publish") version "0.33.0"
+ id("org.beryx.jar") version "2.0.0"
+ id("signing")
}
ext {
@@ -18,7 +19,7 @@ ext {
testNgVersion = "7.5"
mockitoVersion = "4.11.0"
jmhLibVersion = "1.21"
- guavaVersion = "33.3.0-jre"
+ guavaVersion = "33.5.0-jre"
}
def releaseTag = System.getenv("BUILD_TAG")
@@ -49,7 +50,16 @@ dependencies {
testImplementation "com.google.guava:guava:$guavaVersion"
}
+def buildWith11 = System.getenv("BUILD_WITH_11")
java {
+ toolchain {
+ vendor = JvmVendorSpec.ADOPTIUM
+ if ("true".equals(buildWith11)) {
+ languageVersion = JavaLanguageVersion.of(11)
+ } else {
+ languageVersion = JavaLanguageVersion.of(8)
+ }
+ }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
@@ -86,12 +96,19 @@ animalsniffer {
annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer"
}
+moduleConfig {
+ moduleInfoPath = 'src/main/module/module-info.java'
+ multiReleaseVersion = 9
+ version = project.version
+}
+
jar {
from('.') {
include 'LICENSE'
include 'COPYRIGHT'
into('META-INF/')
}
+ exclude("module-info.class")
// Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227
bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$')
@@ -106,8 +123,6 @@ jar {
"Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava",
"Multi-Release": "true"
)
-
- moduleInfoPath = 'src/main/module/module-info.java'
}
license {
@@ -126,8 +141,8 @@ jmh {
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]
if (project.hasProperty("jmh")) {
- include = [".*" + project.jmh + ".*"]
- logger.info("JMH: {}", include)
+ includes = [".*" + project.jmh + ".*"]
+ logger.info("JMH: {}", includes)
}
}
@@ -166,8 +181,9 @@ jacocoTestReport {
dependsOn testNG
reports {
- xml.enabled = true
- html.enabled = true
+ xml.required.set(true)
+ csv.required.set(false)
+ html.required.set(true)
}
}
@@ -179,44 +195,25 @@ checkstyle {
"checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"),
"checkstyle.header.file" : project.file("config/license/HEADER_JAVA")
]
+ checkstyleMain.exclude '**/module-info.java'
}
if (project.hasProperty("releaseMode")) {
logger.lifecycle("ReleaseMode: {}", project.releaseMode)
- /*
- if ("branch" == project.releaseMode) {
-
- if (version.endsWith("-SNAPSHOT")) {
- publishing {
- repositories {
- maven {
- url = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
- }
- }
- }
-
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- }
- }
- */
if ("full" == project.releaseMode) {
signing {
if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) {
useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD"))
+ sign(publishing.publications)
}
}
- /*
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- */
+ }
+ mavenPublishing {
+ // or when publishing to https://central.sonatype.com/
+ publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL)
+
+ // signAllPublications()
}
}
diff --git a/docs/Additional-Reading.md b/docs/Additional-Reading.md
index 85e7d47077..4badd81308 100644
--- a/docs/Additional-Reading.md
+++ b/docs/Additional-Reading.md
@@ -3,7 +3,7 @@ A more complete and up-to-date list of resources can be found at the [reactivex.
# Introducing Reactive Programming
* [Introduction to Rx](http://www.introtorx.com/): a free, on-line book by Lee Campbell **(1.x)**
* [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) by Andre Staltz
-* [Mastering Observables](http://docs.couchbase.com/developer/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
+* [Mastering Observables](https://docs.huihoo.com/couchbase/developer-guide/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
* [Reactive Programming in Java 8 With RxJava](http://pluralsight.com/training/Courses/TableOfContents/reactive-programming-java-8-rxjava), a course designed by Russell Elledge **(1.x)**
* [33rd Degree Reactive Java](http://www.slideshare.net/tkowalcz/33rd-degree-reactive-java) by Tomasz Kowalczewski **(1.x)**
* [What Every Hipster Should Know About Functional Reactive Programming](http://www.infoq.com/presentations/game-functional-reactive-programming) - Bodil Stokke demos the creation of interactive game mechanics in RxJS
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index c7d437bbb4..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index c4a0a385a5..6e6ae7e3c6 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -139,9 +139,9 @@ public Observable extends Integer> apply(Integer v) {
}
});
- singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
+ singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
@@ -153,16 +153,16 @@ public Iterable extends Integer> apply(Integer v) {
}
});
- maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservable;
}
});
- maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
index 87ee5a07e4..4310ea2e95 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.concatMapMaybe(new Function>() {
+ flowableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
index 8ab19a00c6..699f76c074 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
index d0f3730b42..f81ed10ec3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
index 4f50938647..5a92bf20ff 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapSingle(new Function>() {
+ flowableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
index 83ad00e0f9..46ce694f6d 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
index e36b49c4d3..e96bbc3919 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
index 0da6941895..ef06ebfa66 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapSingle(new Function>() {
+ flowableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
index 48b20dc005..2229eed77a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
index 4528c90b50..cfde5183e5 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- concatMapToObservableEmpty = source.concatMap(new Function>() {
+ concatMapToObservableEmpty = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
index 204020abfe..75e7506724 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
index e2e34b24f5..4227791222 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.concatMapSingle(new Function>() {
+ observableDedicated = source.concatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
index b6daa57eb6..6a916a68f1 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
index 5d0327fa46..377a8bba93 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
index e2a7c43bea..248ca98112 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
index add0cd310c..880da95f5a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.flatMapSingle(new Function>() {
+ observableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
index 69b8e71f18..41964c3dbd 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
index 3930534eb8..6a4ea5c73b 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
index 30158d012d..f0c3285890 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
index 75aeb504f9..087f32c8e3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.switchMapSingle(new Function>() {
+ observableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
index 36436d1370..11239d04a7 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
@@ -138,9 +138,12 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (!queue.offer(t)) {
+ // Error must be set first before calling cancel to avoid race
+ // with hasNext(), which checks for cancel first before checking
+ // for error.
+ error = new QueueOverflowException();
SubscriptionHelper.cancel(this);
-
- onError(new QueueOverflowException());
+ onComplete();
} else {
signalConsumer();
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
index 469d0dd48b..a7de73213a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
index daa9edd533..99e11259a6 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
@@ -24,23 +24,7 @@
*
* @param the source element type
*/
-public final class ObservableCache extends AbstractObservableWithUpstream
-implements Observer {
-
- /**
- * The subscription to the source should happen at most once.
- */
- final AtomicBoolean once;
-
- /**
- * The number of items per cached nodes.
- */
- final int capacityHint;
-
- /**
- * The current known array of observer state to notify.
- */
- final AtomicReference[]> observers;
+public final class ObservableCache extends AbstractObservableWithUpstream {
/**
* A shared instance of an empty array of observers to avoid creating
@@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head;
-
- /**
- * The current tail of the linked structure holding the items.
- */
- Node tail;
-
- /**
- * How many items have been put into the tail node so far.
+ * The subscription to the source should happen at most once.
*/
- int tailOffset;
+ final AtomicBoolean once;
/**
- * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null.
+ * Responsible caching events from the source and multicasting them to each downstream.
*/
- Throwable error;
+ final Multicaster multicaster;
/**
- * True if the source has terminated.
+ * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each
+ * points exclusively to the next node (if present). When a new downstream arrives, the subscription is
+ * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As
+ * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each
+ * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out
+ * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible
+ * for collection.
*/
- volatile boolean done;
+ final Node head;
/**
* Constructs an empty, non-connected cache.
* @param source the source to subscribe to for the first incoming observer
* @param capacityHint the number of items expected (reduce allocation frequency)
*/
- @SuppressWarnings("unchecked")
public ObservableCache(Observable source, int capacityHint) {
super(source);
- this.capacityHint = capacityHint;
this.once = new AtomicBoolean();
Node n = new Node(capacityHint);
this.head = n;
- this.tail = n;
- this.observers = new AtomicReference(EMPTY);
+ this.multicaster = new Multicaster(capacityHint, n);
}
@Override
protected void subscribeActual(Observer super T> t) {
- CacheDisposable consumer = new CacheDisposable(t, this);
+ CacheDisposable consumer = new CacheDisposable(t, multicaster, head);
t.onSubscribe(consumer);
- add(consumer);
+ multicaster.add(consumer);
if (!once.get() && once.compareAndSet(false, true)) {
- source.subscribe(this);
+ source.subscribe(multicaster);
} else {
- replay(consumer);
+ multicaster.replay(consumer);
}
}
@@ -127,7 +99,7 @@ protected void subscribeActual(Observer super T> t) {
* @return true if the cache has observers
*/
/* public */ boolean hasObservers() {
- return observers.get().length != 0;
+ return multicaster.get().length != 0;
}
/**
@@ -135,194 +107,241 @@ protected void subscribeActual(Observer super T> t) {
* @return the number of currently cached event count
*/
/* public */ long cachedEventCount() {
- return size;
+ return multicaster.size;
}
- /**
- * Atomically adds the consumer to the {@link #observers} copy-on-write array
- * if the source has not yet terminated.
- * @param consumer the consumer to add
- */
- void add(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- if (current == TERMINATED) {
- return;
- }
- int n = current.length;
+ static final class Multicaster extends AtomicReference[]> implements Observer {
- @SuppressWarnings("unchecked")
- CacheDisposable[] next = new CacheDisposable[n + 1];
- System.arraycopy(current, 0, next, 0, n);
- next[n] = consumer;
+ /** */
+ private static final long serialVersionUID = 8514643269016498691L;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * The number of items per cached nodes.
+ */
+ final int capacityHint;
- /**
- * Atomically removes the consumer from the {@link #observers} copy-on-write array.
- * @param consumer the consumer to remove
- */
- @SuppressWarnings("unchecked")
- void remove(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- int n = current.length;
- if (n == 0) {
- return;
- }
+ /**
+ * The total number of elements in the list available for reads.
+ */
+ volatile long size;
- int j = -1;
- for (int i = 0; i < n; i++) { - if (current[i] == consumer) { - j = i; - break; - } - } + /** + * The current tail of the linked structure holding the items. + */ + Node tail;
- if (j < 0) { - return; - } - CacheDisposable[] next;
+ /**
+ * How many items have been put into the tail node so far.
+ */
+ int tailOffset;
- if (n == 1) {
- next = EMPTY;
- } else {
- next = new CacheDisposable[n - 1];
- System.arraycopy(current, 0, next, 0, j);
- System.arraycopy(current, j + 1, next, j, n - j - 1);
- }
+ /**
+ * If the observers are {@link #TERMINATED}, this holds the terminal error if not null.
+ */
+ Throwable error;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * True if the source has terminated.
+ */
+ volatile boolean done;
- /**
- * Replays the contents of this cache to the given consumer based on its
- * current state and number of items requested by it.
- * @param consumer the consumer to continue replaying items to
- */
- void replay(CacheDisposable consumer) {
- // make sure there is only one replay going on at a time
- if (consumer.getAndIncrement() != 0) {
- return;
+ @SuppressWarnings("unchecked")
+ Multicaster(int capacityHint, final Node head) {
+ super(EMPTY);
+ this.tail = head;
+ this.capacityHint = capacityHint;
}
- // see if there were more replay request in the meantime
- int missed = 1;
- // read out state into locals upfront to avoid being re-read due to volatile reads
- long index = consumer.index;
- int offset = consumer.offset;
- Node node = consumer.node;
- Observer super T> downstream = consumer.downstream;
- int capacity = capacityHint;
-
- for (;;) {
- // if the consumer got disposed, clear the node and quit
- if (consumer.disposed) {
- consumer.node = null;
- return;
+ /**
+ * Atomically adds the consumer to the observers copy-on-write array
+ * if the source has not yet terminated.
+ * @param consumer the consumer to add
+ */
+ void add(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ if (current == TERMINATED) {
+ return;
+ }
+ int n = current.length;
+
+ @SuppressWarnings("unchecked")
+ CacheDisposable[] next = new CacheDisposable[n + 1];
+ System.arraycopy(current, 0, next, 0, n);
+ next[n] = consumer;
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
}
+ }
- // first see if the source has terminated, read order matters!
- boolean sourceDone = done;
- // and if the number of items is the same as this consumer has received
- boolean empty = size == index;
-
- // if the source is done and we have all items so far, terminate the consumer
- if (sourceDone && empty) {
- // release the node object to avoid leaks through retained consumers
- consumer.node = null;
- // if error is not null then the source failed
- Throwable ex = error;
- if (ex != null) {
- downstream.onError(ex);
+ /**
+ * Atomically removes the consumer from the observers copy-on-write array.
+ * @param consumer the consumer to remove
+ */
+ @SuppressWarnings("unchecked")
+ void remove(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ int n = current.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+ for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + CacheDisposable[] next;
+
+ if (n == 1) {
+ next = EMPTY;
} else {
- downstream.onComplete();
+ next = new CacheDisposable[n - 1];
+ System.arraycopy(current, 0, next, 0, j);
+ System.arraycopy(current, j + 1, next, j, n - j - 1);
}
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Replays the contents of this cache to the given consumer based on its
+ * current state and number of items requested by it.
+ * @param consumer the consumer to continue replaying items to
+ */
+ void replay(CacheDisposable consumer) {
+ // make sure there is only one replay going on at a time
+ if (consumer.getAndIncrement() != 0) {
return;
}
- // there are still items not sent to the consumer
- if (!empty) {
- // if the offset in the current node has reached the node capacity
- if (offset == capacity) {
- // switch to the subsequent node
- node = node.next;
- // reset the in-node offset
- offset = 0;
+ // see if there were more replay request in the meantime
+ int missed = 1;
+ // read out state into locals upfront to avoid being re-read due to volatile reads
+ long index = consumer.index;
+ int offset = consumer.offset;
+ Node node = consumer.node;
+ Observer super T> downstream = consumer.downstream;
+ int capacity = capacityHint;
+
+ for (;;) {
+ // if the consumer got disposed, clear the node and quit
+ if (consumer.disposed) {
+ consumer.node = null;
+ return;
}
- // emit the cached item
- downstream.onNext(node.values[offset]);
-
- // move the node offset forward
- offset++;
- // move the total consumed item count forward
- index++;
+ // first see if the source has terminated, read order matters!
+ boolean sourceDone = done;
+ // and if the number of items is the same as this consumer has received
+ boolean empty = size == index;
+
+ // if the source is done and we have all items so far, terminate the consumer
+ if (sourceDone && empty) {
+ // release the node object to avoid leaks through retained consumers
+ consumer.node = null;
+ // if error is not null then the source failed
+ Throwable ex = error;
+ if (ex != null) {
+ downstream.onError(ex);
+ } else {
+ downstream.onComplete();
+ }
+ return;
+ }
- // retry for the next item/terminal event if any
- continue;
- }
+ // there are still items not sent to the consumer
+ if (!empty) {
+ // if the offset in the current node has reached the node capacity
+ if (offset == capacity) {
+ // switch to the subsequent node
+ node = node.next;
+ // reset the in-node offset
+ offset = 0;
+ }
+
+ // emit the cached item
+ downstream.onNext(node.values[offset]);
+
+ // move the node offset forward
+ offset++;
+ // move the total consumed item count forward
+ index++;
+
+ // retry for the next item/terminal event if any
+ continue;
+ }
- // commit the changed references back
- consumer.index = index;
- consumer.offset = offset;
- consumer.node = node;
- // release the changes and see if there were more replay request in the meantime
- missed = consumer.addAndGet(-missed);
- if (missed == 0) {
- break;
+ // commit the changed references back
+ consumer.index = index;
+ consumer.offset = offset;
+ consumer.node = node;
+ // release the changes and see if there were more replay request in the meantime
+ missed = consumer.addAndGet(-missed);
+ if (missed == 0) {
+ break;
+ }
}
}
- }
- @Override
- public void onSubscribe(Disposable d) {
- // we can't do much with the upstream disposable
- }
-
- @Override
- public void onNext(T t) {
- int tailOffset = this.tailOffset;
- // if the current tail node is full, create a fresh node
- if (tailOffset == capacityHint) {
- Node n = new Node(tailOffset);
- n.values[0] = t;
- this.tailOffset = 1;
- tail.next = n;
- tail = n;
- } else {
- tail.values[tailOffset] = t;
- this.tailOffset = tailOffset + 1;
+ @Override
+ public void onSubscribe(Disposable d) {
+ // we can't do much with the upstream disposable
}
- size++;
- for (CacheDisposable consumer : observers.get()) {
- replay(consumer);
+
+ @Override
+ public void onNext(T t) {
+ int tailOffset = this.tailOffset;
+ // if the current tail node is full, create a fresh node
+ if (tailOffset == capacityHint) {
+ Node n = new Node(tailOffset);
+ n.values[0] = t;
+ this.tailOffset = 1;
+ tail.next = n;
+ tail = n;
+ } else {
+ tail.values[tailOffset] = t;
+ this.tailOffset = tailOffset + 1;
+ }
+ size++;
+ for (CacheDisposable consumer : get()) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onError(Throwable t) {
- error = t;
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onError(Throwable t) {
+ error = t;
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onComplete() {
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete() {
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
}
@@ -338,7 +357,7 @@ static final class CacheDisposable extends AtomicInteger
final Observer super T> downstream;
- final ObservableCache parent;
+ final Multicaster parent;
Node node;
@@ -353,11 +372,12 @@ static final class CacheDisposable extends AtomicInteger
* the parent cache object.
* @param downstream the actual consumer
* @param parent the parent that holds onto the cached items
+ * @param head the first node in the linked list
*/
- CacheDisposable(Observer super T> downstream, ObservableCache parent) {
+ CacheDisposable(Observer super T> downstream, Multicaster parent, Node head) {
this.downstream = downstream;
this.parent = parent;
- this.node = parent.head;
+ this.node = head;
}
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
index 1801cce1f2..7c01c23f90 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
index d735ff43c0..e8d19c633e 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
@@ -91,6 +91,8 @@ public T poll() {
// we have to null out the value because we are going to hang on to the node
final T nextValue = nextNode.getAndNullValue();
spConsumerNode(nextNode);
+ // unlink previous consumer to help gc
+ currConsumerNode.soNext(null);
return nextValue;
}
else if (currConsumerNode != lvProducerNode()) {
@@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) {
// we have to null out the value because we are going to hang on to the node
final T nextValue = nextNode.getAndNullValue();
spConsumerNode(nextNode);
+ // unlink previous consumer to help gc
+ currConsumerNode.soNext(null);
return nextValue;
}
return null;
diff --git a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
index 4d5eb3335f..e103594ba5 100644
--- a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
+++ b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
@@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer
final List buffer;
- volatile boolean done;
-
volatile int size;
UnboundedReplayBuffer(int capacityHint) {
@@ -671,7 +669,6 @@ public void addFinal(Object notificationLite) {
buffer.add(notificationLite);
trimHead();
size++;
- done = true;
}
@Override
@@ -772,20 +769,17 @@ public void replay(ReplayDisposable rs) {
Object o = b.get(index);
- if (done) {
- if (index + 1 == s) {
- s = size;
- if (index + 1 == s) {
- if (NotificationLite.isComplete(o)) {
- a.onComplete();
- } else {
- a.onError(NotificationLite.getError(o));
- }
- rs.index = null;
- rs.cancelled = true;
- return;
- }
- }
+ if (NotificationLite.isComplete(o)) {
+ a.onComplete();
+ rs.index = null;
+ rs.cancelled = true;
+ return;
+ } else
+ if (NotificationLite.isError(o)) {
+ a.onError(NotificationLite.getError(o));
+ rs.index = null;
+ rs.cancelled = true;
+ return;
}
a.onNext((T)o);
@@ -856,8 +850,6 @@ static final class SizeBoundReplayBuffer
Node tail;
- volatile boolean done;
-
SizeBoundReplayBuffer(int maxSize) {
this.maxSize = maxSize;
Node h = new Node(null);
@@ -895,7 +887,6 @@ public void addFinal(Object notificationLite) {
t.lazySet(n); // releases both the tail and size
trimHead();
- done = true;
}
/**
@@ -1000,18 +991,17 @@ public void replay(ReplayDisposable rs) {
Object o = n.value;
- if (done) {
- if (n.get() == null) {
-
- if (NotificationLite.isComplete(o)) {
- a.onComplete();
- } else {
- a.onError(NotificationLite.getError(o));
- }
- rs.index = null;
- rs.cancelled = true;
- return;
- }
+ if (NotificationLite.isComplete(o)) {
+ a.onComplete();
+ rs.index = null;
+ rs.cancelled = true;
+ return;
+ } else
+ if (NotificationLite.isError(o)) {
+ a.onError(NotificationLite.getError(o));
+ rs.index = null;
+ rs.cancelled = true;
+ return;
}
a.onNext((T)o);
@@ -1069,8 +1059,6 @@ static final class SizeAndTimeBoundReplayBuffer
TimedNode tail;
- volatile boolean done;
-
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
this.maxSize = maxSize;
this.maxAge = maxAge;
@@ -1163,8 +1151,6 @@ public void addFinal(Object notificationLite) {
size++;
t.lazySet(n); // releases both the tail and size
trimFinal();
-
- done = true;
}
/**
@@ -1290,18 +1276,17 @@ public void replay(ReplayDisposable rs) {
Object o = n.value;
- if (done) {
- if (n.get() == null) {
-
- if (NotificationLite.isComplete(o)) {
- a.onComplete();
- } else {
- a.onError(NotificationLite.getError(o));
- }
- rs.index = null;
- rs.cancelled = true;
- return;
- }
+ if (NotificationLite.isComplete(o)) {
+ a.onComplete();
+ rs.index = null;
+ rs.cancelled = true;
+ return;
+ } else
+ if (NotificationLite.isError(o)) {
+ a.onError(NotificationLite.getError(o));
+ rs.index = null;
+ rs.cancelled = true;
+ return;
}
a.onNext((T)o);
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java
index ba86c69719..1a1ceca926 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java
@@ -402,7 +402,8 @@ public void disposed() {
@Test
public void manySources() {
- Flowable>[] a = new Flowable[32];
+ @SuppressWarnings("unchecked")
+ Flowable[] a = new Flowable[32];
Arrays.fill(a, Flowable.never());
a[31] = Flowable.just(1);
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java
index 3160f61173..4d67594a17 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java
@@ -20,6 +20,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.LockSupport;
import org.junit.*;
import org.mockito.InOrder;
@@ -28,6 +29,7 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.*;
@@ -1030,4 +1032,38 @@ public Publisher apply(Integer t) throws Exception {
.to(TestHelper.testConsumer())
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher");
}
+
+ @Test
+ public void cancelShouldPreventRandomSubsequentEmissions() {
+ for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue();
+
+ disposable.replace(
+ Flowable.range(1, 10)
+ .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
+ .doOnNext(v -> {
+ if (v == 1) {
+ Schedulers.computation().scheduleDirect(disposable::dispose);
+ }
+ sink.offer(v);
+ })
+ .subscribe());
+
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+
+ Integer last = null;
+
+ while (!sink.isEmpty()) {
+ Integer current = sink.poll();
+
+ if (last != null && last + 1 != current) {
+ fail("Emission hole: " + last + " -> " + current);
+ }
+
+ last = current;
+ }
+ }
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java
index 47bbd491bd..8d3e10dbd2 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java
@@ -1870,7 +1870,7 @@ public Integer apply(Integer a, Integer b) throws Exception {
public void firstErrorPreventsSecondSubscription() {
final AtomicInteger counter = new AtomicInteger();
- List> flowableList = new ArrayList();
+ List flowableList = new ArrayList();
flowableList.add(Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e)
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java
index 0a349cd417..5ec9129d9f 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java
@@ -235,7 +235,8 @@ public void ambArraySingleElement() {
@Test
public void manySources() {
- Observable>[] a = new Observable[32];
+ @SuppressWarnings("unchecked")
+ Observable[] a = new Observable[32];
Arrays.fill(a, Observable.never());
a[31] = Observable.just(1);
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java
index 8264a66494..7085720f01 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java
@@ -1278,9 +1278,9 @@ public Integer apply(Integer integer, Long aLong) {
}
})
.buffer(Observable.interval(0, 200, TimeUnit.MILLISECONDS),
- new Function>() {
+ new Function>() {
@Override
- public Observable> apply(Long a) {
+ public Observable apply(Long a) {
return Observable.just(a).delay(100, TimeUnit.MILLISECONDS);
}
})
@@ -1301,9 +1301,9 @@ public Integer apply(Integer integer, Long aLong) {
}
})
.buffer(Observable.interval(0, 100, TimeUnit.MILLISECONDS),
- new Function>() {
+ new Function>() {
@Override
- public Observable> apply(Long a) {
+ public Observable apply(Long a) {
return Observable.just(a).delay(200, TimeUnit.MILLISECONDS);
}
})
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java
index 7f47ec95d8..74d17c062b 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java
@@ -16,10 +16,15 @@
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import io.reactivex.rxjava3.observables.ConnectableObservable;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
@@ -355,4 +360,52 @@ public void addRemoveRace() {
);
}
}
+
+ @Test
+ public void valuesAreReclaimable() throws Exception {
+ ConnectableObservable source =
+ Observable.range(0, 200)
+ .map($ -> new byte[1024 * 1024])
+ .publish();
+
+ System.out.println("Bounded Replay Leak check: Wait before GC");
+ Thread.sleep(1000);
+
+ System.out.println("Bounded Replay Leak check: GC");
+ System.gc();
+
+ Thread.sleep(500);
+
+ final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
+ long initial = memHeap.getUsed();
+
+ System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
+
+ final AtomicLong after = new AtomicLong();
+
+ source.cache().lastElement().subscribe(new Consumer() {
+ @Override
+ public void accept(byte[] v) throws Exception {
+ System.out.println("Bounded Replay Leak check: Wait before GC 2");
+ Thread.sleep(1000);
+
+ System.out.println("Bounded Replay Leak check: GC 2");
+ System.gc();
+
+ Thread.sleep(500);
+
+ after.set(memoryMXBean.getHeapMemoryUsage().getUsed());
+ }
+ });
+
+ source.connect();
+
+ System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0);
+
+ if (initial + 100 * 1024 * 1024 < after.get()) { + fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0);
+ }
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java
index ea8d51d996..778d2d4e64 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java
@@ -20,6 +20,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
import org.junit.*;
import org.mockito.InOrder;
@@ -29,6 +30,7 @@
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.observers.*;
import io.reactivex.rxjava3.schedulers.*;
@@ -978,4 +980,37 @@ public Observable apply(Integer t) throws Exception {
.to(TestHelper.testConsumer())
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource");
}
-}
+
+ @Test
+ public void cancelShouldPreventRandomSubsequentEmissions() {
+ for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue();
+
+ disposable.replace(
+ Observable.range(1, 10)
+ .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true)
+ .doOnNext(v -> {
+ if (v == 1) {
+ Schedulers.computation().scheduleDirect(disposable::dispose);
+ }
+ sink.offer(v);
+ })
+ .subscribe());
+
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
+
+ Integer last = null;
+
+ while (!sink.isEmpty()) {
+ Integer current = sink.poll();
+
+ if (last != null && last + 1 != current) {
+ fail("Emission hole: " + last + " -> " + current);
+ }
+
+ last = current;
+ }
+ }
+ }}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java
index ae7de66d6f..faebee3fa5 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java
@@ -1403,7 +1403,7 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
public void firstErrorPreventsSecondSubscription() {
final AtomicInteger counter = new AtomicInteger();
- List> observableList = new ArrayList();
+ List observableList = new ArrayList();
observableList.add(Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e)
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java
index 083ee46238..8086aa7c1c 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java
@@ -249,7 +249,8 @@ public void run() {
@Test
public void manySources() {
- Single>[] sources = new Single[32];
+ @SuppressWarnings("unchecked")
+ Single[] sources = new Single[32];
Arrays.fill(sources, Single.never());
sources[31] = Single.just(31);
diff --git a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java
index aa7026a9ff..35a877911b 100644
--- a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java
+++ b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java
@@ -1832,4 +1832,69 @@ public void timeAndSizeRemoveCorrectNumberOfOld() {
rp.test().assertValuesOnly(4, 5);
}
-}
+
+ @Test
+ public void terminationSubscriptionRaceUnbounded() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.create();
+ PublishProcessor sink = PublishProcessor.create();
+ TestSubscriber subscriber = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(new BooleanSubscription());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ subscriber.await().assertValues("hello", "world").assertComplete();
+ }
+ }
+
+ @Test
+ public void terminationSubscriptionRaceSizeBound() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithSize(20);
+ PublishProcessor sink = PublishProcessor.create();
+ TestSubscriber subscriber = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(new BooleanSubscription());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ subscriber.await().assertValues("hello", "world").assertComplete();
+ }
+ }
+
+ @Test
+ public void terminationSubscriptionRaceTimeBound() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation());
+ PublishProcessor sink = PublishProcessor.create();
+ TestSubscriber subscriber = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(new BooleanSubscription());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ subscriber.await().assertValues("hello", "world").assertComplete();
+ }
+ }}
diff --git a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java
index e752278b2f..8417b53081 100644
--- a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java
+++ b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java
@@ -1378,4 +1378,70 @@ public void timeAndSizeRemoveCorrectNumberOfOld() {
rs.test().assertValuesOnly(4, 5);
}
+
+ @Test
+ public void terminationSubscriptionRaceUnbounded() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.create();
+ Subject sink = PublishSubject.create();
+ TestObserver observer = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(Disposable.empty());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ observer.await().assertValues("hello", "world").assertComplete();
+ }
+ }
+
+ @Test
+ public void terminationSubscriptionRaceSizeBound() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithSize(20);
+ Subject sink = PublishSubject.create();
+ TestObserver observer = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(Disposable.empty());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ observer.await().assertValues("hello", "world").assertComplete();
+ }
+ }
+
+ @Test
+ public void terminationSubscriptionRaceTimeBound() throws Throwable {
+ for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation());
+ Subject sink = PublishSubject.create();
+ TestObserver observer = sink.test();
+ Schedulers.computation().scheduleDirect(() -> {
+ // issue signals to the source in adherence to the reactive streams specification
+ source.onSubscribe(Disposable.empty());
+ source.onNext("hello");
+ source.onNext("world");
+ source.onComplete();
+ });
+ Schedulers.computation().scheduleDirect(() -> {
+ // connect the source to the sink in parallel with the signals issued to the source
+ // note the cast() operator, which is here to detect non-String escapees
+ source.cast(String.class).subscribe(sink);
+ });
+ observer.await().assertValues("hello", "world").assertComplete();
+ }
+ }
}