Skip to content

Instantly share code, notes, and snippets.

@martin-tarjanyi
Created June 1, 2024 22:01
Show Gist options
  • Select an option

  • Save martin-tarjanyi/1ceabd93914c64ff8869caf9cd8e79b5 to your computer and use it in GitHub Desktop.

Select an option

Save martin-tarjanyi/1ceabd93914c64ff8869caf9cd8e79b5 to your computer and use it in GitHub Desktop.
Non-blocking HTTP calls with Async HTTP 5 and Kotlin Coroutines
package org.example.apache
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse
import org.apache.hc.client5.http.classic.methods.HttpGet
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient
import org.apache.hc.client5.http.impl.async.HttpAsyncClients
import org.apache.hc.client5.http.impl.classic.BasicHttpClientResponseHandler
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient
import org.apache.hc.client5.http.impl.classic.HttpClients
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager
import org.apache.hc.core5.concurrent.FutureCallback
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
suspend fun main(): Unit =
coroutineScope {
val asyncClient =
HttpAsyncClients.custom()
.setConnectionManager(
PoolingAsyncClientConnectionManager().apply {
maxTotal = 100
defaultMaxPerRoute = 100
},
)
.build()
.also { it.start() }
val syncClient =
HttpClients.custom()
.setConnectionManager(
PoolingHttpClientConnectionManager().apply {
maxTotal = 100
defaultMaxPerRoute = 100
},
)
.build()
executeWithSync(syncClient)
executeWithAsync(asyncClient)
try {
measureTime {
(1..8)
.map {
async {
withTimeout(1500.milliseconds) {
// executeWithSync(syncClient)
executeWithAsync(asyncClient)
}
}
}
.awaitAll()
}.also { println("Time: $it") }
} catch (e: Exception) {
e.printStackTrace()
}
delay(1.seconds)
}
private suspend fun executeWithAsync(client: CloseableHttpAsyncClient): SimpleHttpResponse {
val result =
suspendCancellableCoroutine { cont ->
val callback =
object : FutureCallback<SimpleHttpResponse> {
override fun completed(result: SimpleHttpResponse) {
println("responded")
cont.resumeWith(Result.success(result))
}
override fun failed(ex: Exception) {
cont.resumeWith(Result.failure(ex))
}
override fun cancelled() {
println("cancelled")
cont.cancel()
}
}
val future = client.execute(SimpleHttpRequest.create("GET", "http://localhost:8080/log"), callback)
cont.invokeOnCancellation { future.cancel(true) }
}
println(Thread.currentThread().name)
return result
}
private suspend fun executeWithSync(client: CloseableHttpClient): Any =
withContext(Dispatchers.Default) {
val block =
suspend {
client.execute(HttpGet("http://localhost:8080/log"), BasicHttpClientResponseHandler())
}
block()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment