팬 아웃, 팬 인
예제 87: 팬 아웃
여러 코루틴이 동시에 채널을 구독할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++)
delay(100L)
}
}
fun CoroutineScope.processNumber(id: Int, channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("${id}가 ${it}을 받았습니다.")
}
}
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat (5) {
processNumber(it, producer)
}
delay(1000L)
producer.cancel()
}
예제 88: 팬 인
팬 인은 반대로 생산자가 많은 것입니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
suspend fun produceNumbers(channel: SendChannel<Int>, from: Int, interval: Long) {
var x = from
while (true) {
channel.send(x)
x += 2
delay(interval)
}
}
fun CoroutineScope.processNumber(channel: ReceiveChannel<Int>) = launch {
channel.consumeEach {
println("${it}을 받았습니다.")
}
}
fun main() = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
produceNumbers(channel, 1, 100L)
}
launch {
produceNumbers(channel, 2, 150L)
}
processNumber(channel)
delay(1000L)
coroutineContext.cancelChildren()
}
coroutineContext
의 자식이 아닌 본인을 취소하면 어떻게 될까요?processNumber
를 suspend 함수의 형태로 변형하면 어떻게 될까요?- 다른 방법으로 취소할 수 있을까요?
예제 89: 공정한 채널
두 개의 코루틴에서 채널을 서로 사용할 때 공정하게 기회를 준다는 것을 알 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
suspend fun someone(channel: Channel<String>, name: String) {
for (comment in channel) {
println("${name}: ${comment}")
channel.send(comment.drop(1) + comment.first())
delay(100L)
}
}
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
someone(channel, "민준")
}
launch {
someone(channel, "서연")
}
channel.send("패스트 캠퍼스")
delay(1000L)
coroutineContext.cancelChildren()
}
예제 90: select
먼저 끝나는 요청을 처리하는 것이 중요할 수 있습니다. 이 경우에 select
를 쓸 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
fun CoroutineScope.sayFast() = produce<String> {
while (true) {
delay(100L)
send("패스트")
}
}
fun CoroutineScope.sayCampus() = produce<String> {
while (true) {
delay(150L)
send("캠퍼스")
}
}
fun main() = runBlocking<Unit> {
val fasts = sayFast()
val campuses = sayCampus()
repeat (5) {
select<Unit> {
fasts.onReceive {
println("fast: $it")
}
campuses.onReceive {
println("campus: $it")
}
}
}
coroutineContext.cancelChildren()
}
채널에 대해 onReceive
를 사용하는 것 이외에도 아래의 상황에서 사용할 수 있습니다.
Job
-onJoin
Deferred
-onAwait
SendChannel
-onSend
ReceiveChannel
-onReceive
,onReceiveCatching
delay
-onTimeout