项目总结:GetX + Kotlin 协程实现跨端音乐播放实时同步 ...

打印 上一主题 下一主题

主题 1579|帖子 1579|积分 4737

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、GetX 状态管理的设计

1. 深入明白 ExoPlayer 与状态封装

ExoPlayer 是 Android 平台上强盛的媒体播放引擎,它具有丰富的状态和事件回调。在使用 GetX 进行状态管理时,必要深入明白 ExoPlayer 的各种状态,如 STATE_IDLE、STATE_BUFFERING、STATE_READY 和 STATE_ENDED 等,以及播放位置、缓冲位置等信息。
  1. import com.google.android.exoplayer2.ExoPlayer
  2. import com.google.android.exoplayer2.Player
  3. import com.google.android.exoplayer2.source.MediaSource
  4. import com.google.android.exoplayer2.source.ProgressiveMediaSource
  5. import com.google.android.exoplayer2.upstream.DefaultDataSourceFactory
  6. import com.google.android.exoplayer2.util.Util
  7. import io.reactivex.rxjava3.subjects.BehaviorSubject
  8. import io.reactivex.rxjava3.subjects.PublishSubject
  9. import kotlinx.coroutines.*
  10. import kotlinx.coroutines.flow.*
  11. import org.koin.core.component.KoinComponent
  12. import org.koin.core.component.inject
  13. import java.util.concurrent.TimeUnit
  14. class MusicPlayerController : KoinComponent {
  15.     private val context: android.content.Context by inject()
  16.     private val player: ExoPlayer = ExoPlayer.Builder(context).build()
  17.     // 封装播放状态
  18.     private val _playStateSubject = BehaviorSubject.createDefault<PlayState>(PlayState.IDLE)
  19.     val playStateFlow: Flow<PlayState> = _playStateSubject.asFlow()
  20.     // 封装播放位置
  21.     private val _positionSubject = BehaviorSubject.createDefault(0L)
  22.     val positionFlow: Flow<Long> = _positionSubject.asFlow()
  23.     // 封装缓冲位置
  24.     private val _bufferedPositionSubject = BehaviorSubject.createDefault(0L)
  25.     val bufferedPositionFlow: Flow<Long> = _bufferedPositionSubject.asFlow()
  26.     init {
  27.         player.addListener(object : Player.EventListener {
  28.             override fun onPlayerStateChanged(playWhenReady: Boolean, playbackState: Int) {
  29.                 val newState = when (playbackState) {
  30.                     Player.STATE_IDLE -> PlayState.IDLE
  31.                     Player.STATE_BUFFERING -> PlayState.BUFFERING
  32.                     Player.STATE_READY -> if (playWhenReady) PlayState.PLAYING else PlayState.PAUSED
  33.                     Player.STATE_ENDED -> PlayState.ENDED
  34.                     else -> PlayState.IDLE
  35.                 }
  36.                 _playStateSubject.onNext(newState)
  37.             }
  38.             override fun onPositionDiscontinuity(reason: Int) {
  39.                 _positionSubject.onNext(player.currentPosition)
  40.             }
  41.             override fun onIsPlayingChanged(isPlaying: Boolean) {
  42.                 if (isPlaying) {
  43.                     startPositionUpdates()
  44.                 } else {
  45.                     stopPositionUpdates()
  46.                 }
  47.             }
  48.             override fun onBufferingChanged(isBuffering: Boolean) {
  49.                 if (isBuffering) {
  50.                     _bufferedPositionSubject.onNext(player.bufferedPosition)
  51.                 }
  52.             }
  53.         })
  54.     }
  55.     private var positionUpdateJob: Job? = null
  56.     private fun startPositionUpdates() {
  57.         positionUpdateJob = GlobalScope.launch {
  58.             while (isActive) {
  59.                 _positionSubject.onNext(player.currentPosition)
  60.                 delay(100) // 每 100ms 更新一次位置
  61.             }
  62.         }
  63.     }
  64.     private fun stopPositionUpdates() {
  65.         positionUpdateJob?.cancel()
  66.     }
  67.     fun play(url: String) {
  68.         val dataSourceFactory = DefaultDataSourceFactory(context, Util.getUserAgent(context, "MusicPlayer"))
  69.         val mediaSource: MediaSource = ProgressiveMediaSource.Factory(dataSourceFactory)
  70.            .createMediaSource(android.net.Uri.parse(url))
  71.         player.setMediaSource(mediaSource)
  72.         player.prepare()
  73.         player.play()
  74.     }
  75.     fun pause() {
  76.         player.pause()
  77.     }
  78.     fun seekTo(position: Long) {
  79.         player.seekTo(position)
  80.     }
  81.     fun release() {
  82.         player.release()
  83.     }
  84. }
  85. enum class PlayState {
  86.     IDLE, BUFFERING, PLAYING, PAUSED, ENDED
  87. }
复制代码
在上述代码中,我们创建了一个 MusicPlayerController 类,它继承自 KoinComponent 以便使用依赖注入。通过 BehaviorSubject 封装了播放状态、播放位置和缓冲位置,如允许以方便地将这些状态袒露为 Flow,供 UI 层订阅。在 init 块中,我们为 ExoPlayer 添加了事件监听器,根据不同的状态更新相应的 Subject。同时,为了实时更新播放位置,我们使用协程每隔 100ms 更新一次位置。
2. 跨页面状态共享与 UI 自动更新

GetX 的 Rx 响应式变量使得状态的变革能够自动通知到订阅的 UI 组件。在 Flutter 中,我们可以使用 Obx 或 GetX 组件来监听状态的变革。
  1. import 'package:flutter/material.dart';
  2. import 'package:get/get.dart';
  3. import 'package:your_app/music_player_controller.dart';
  4. class MusicPlayerPage extends StatelessWidget {
  5.   final MusicPlayerController controller = Get.put(MusicPlayerController());
  6.   @override
  7.   Widget build(BuildContext context) {
  8.     return Scaffold(
  9.       appBar: AppBar(
  10.         title: Text('Music Player'),
  11.       ),
  12.       body: Column(
  13.         mainAxisAlignment: MainAxisAlignment.center,
  14.         children: [
  15.           Obx(() => Text('Play State: ${controller.playStateFlow.value}')),
  16.           Obx(() => Text('Position: ${controller.positionFlow.value} ms')),
  17.           Obx(() => Text('Buffered Position: ${controller.bufferedPositionFlow.value} ms')),
  18.           ElevatedButton(
  19.             onPressed: () => controller.play('your_music_url'),
  20.             child: Text('Play'),
  21.           ),
  22.           ElevatedButton(
  23.             onPressed: () => controller.pause(),
  24.             child: Text('Pause'),
  25.           ),
  26.           ElevatedButton(
  27.             onPressed: () => controller.seekTo(5000), // 跳到 5s 位置
  28.             child: Text('Seek to 5s'),
  29.           ),
  30.         ],
  31.       ),
  32.     );
  33.   }
  34. }
复制代码
在这个 Flutter 页面中,我们使用 Get.put 方法将 MusicPlayerController 实例化并注入到 GetX 管理中。通过 Obx 组件监听 playStateFlow、positionFlow 和 bufferedPositionFlow 的变革,当这些状态发生变革时,UI 会自动更新。
3. 处理多端播放进度精准对齐

多端播放进度精准对齐是一个复杂的题目,主要难点在于不同装备的时间戳可能不一致,网络延迟也会影响同步的准确性。我们可以通过以下步骤来解决:


  • 使用 WebSocket 实时推送播放事件:当用户在某一端进行播放操纵(如播放、暂停、Seek 等)时,该端将操纵事件和对应的时间戳通过 WebSocket 发送到服务器。
  • 服务器广播事件:服务器接收到事件后,将其广播给全部毗连的客户端。
  • 客户端接收事件并调解进度:客户端接收到事件后,根据事件中的时间戳和本地时间戳盘算时间差,然后调解本地播放进度。
  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.flow.*
  3. import okhttp3.*
  4. import java.io.IOException
  5. import java.util.concurrent.TimeUnit
  6. class WebSocketManager {
  7.     private val client = OkHttpClient.Builder()
  8.        .pingInterval(10, TimeUnit.SECONDS)
  9.        .build()
  10.     private var webSocket: WebSocket? = null
  11.     private val eventFlow = MutableSharedFlow<PlayEvent>()
  12.     fun connect(url: String) {
  13.         val request = Request.Builder()
  14.            .url(url)
  15.            .build()
  16.         webSocket = client.newWebSocket(request, object : WebSocketListener() {
  17.             override fun onOpen(webSocket: WebSocket, response: Response) {
  18.                 println("WebSocket connected")
  19.             }
  20.             override fun onMessage(webSocket: WebSocket, text: String) {
  21.                 val event = PlayEvent.fromJson(text)
  22.                 GlobalScope.launch {
  23.                     eventFlow.emit(event)
  24.                 }
  25.             }
  26.             override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  27.                 println("WebSocket failure: ${t.message}")
  28.                 reconnect(url)
  29.             }
  30.             override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  31.                 println("WebSocket closed: $code - $reason")
  32.                 reconnect(url)
  33.             }
  34.         })
  35.     }
  36.     private fun reconnect(url: String) {
  37.         GlobalScope.launch {
  38.             delay(5000) // 5s 后重试
  39.             connect(url)
  40.         }
  41.     }
  42.     fun sendEvent(event: PlayEvent) {
  43.         val json = event.toJson()
  44.         webSocket?.send(json)
  45.     }
  46.     fun observeEvents(): Flow<PlayEvent> = eventFlow
  47. }
  48. data class PlayEvent(
  49.     val eventType: EventType,
  50.     val position: Long,
  51.     val timestamp: Long
  52. ) {
  53.     enum class EventType {
  54.         PLAY, PAUSE, SEEK
  55.     }
  56.     fun toJson(): String {
  57.         // 实现 JSON 序列化
  58.         return ""
  59.     }
  60.     companion object {
  61.         fun fromJson(json: String): PlayEvent {
  62.             // 实现 JSON 反序列化
  63.             return PlayEvent(EventType.PLAY, 0, 0)
  64.         }
  65.     }
  66. }
复制代码
在上述代码中,我们创建了一个 WebSocketManager 类,用于管理 WebSocket 毗连。通过 connect 方法毗连到服务器,当接收到消息时,将其剖析为 PlayEvent 并通过 eventFlow 发射出去。如果毗连失败或关闭,会在 5s 后重试毗连。同时,提供了 sendEvent 方法用于发送播放事件。
二、网络与 WebSocket 的结合

1. 架构设计

HTTP 哀求用于初始化数据,如歌曲列表、用户信息等。这些数据通常是静态的或不常常变革的,使用 HTTP 哀求可以利用其成熟的缓存机制和错误处理机制。WebSocket 则负责实时同步,如其他装备切换歌曲、进度更新等。这种架构设计可以充实发挥两种协议的优势,提高体系的性能和实时性。
  1. import okhttp3.*
  2. import java.io.IOException
  3. class HttpManager {
  4.     private val client = OkHttpClient()
  5.     fun getSongList(url: String, callback: Callback) {
  6.         val request = Request.Builder()
  7.            .url(url)
  8.            .build()
  9.         client.newCall(request).enqueue(callback)
  10.     }
  11.     fun getUserInfo(url: String, callback: Callback) {
  12.         val request = Request.Builder()
  13.            .url(url)
  14.            .build()
  15.         client.newCall(request).enqueue(callback)
  16.     }
  17. }
复制代码
在这个 HttpManager 类中,我们使用 OkHttp 库来处理 HTTP 哀求。通过 getSongList 和 getUserInfo 方法分别获取歌曲列表和用户信息,使用 enqueue 方法进行异步哀求。
2. Kotlin 协程优化

Kotlin 协程可以方便地处理异步操纵,制止阻塞主线程。在处理 WebSocket 心跳、数据剖析等耗时操纵时,我们可以使用 withContext(Dispatchers.IO) 来切换到 IO 线程。
  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.flow.*
  3. import okhttp3.*
  4. import java.io.IOException
  5. import java.util.concurrent.TimeUnit
  6. class WebSocketManager {
  7.     private val client = OkHttpClient.Builder()
  8.        .pingInterval(10, TimeUnit.SECONDS)
  9.        .build()
  10.     private var webSocket: WebSocket? = null
  11.     private val eventFlow = MutableSharedFlow<PlayEvent>()
  12.     suspend fun connect(url: String) = withContext(Dispatchers.IO) {
  13.         val request = Request.Builder()
  14.            .url(url)
  15.            .build()
  16.         webSocket = client.newWebSocket(request, object : WebSocketListener() {
  17.             override fun onOpen(webSocket: WebSocket, response: Response) {
  18.                 println("WebSocket connected")
  19.             }
  20.             override fun onMessage(webSocket: WebSocket, text: String) {
  21.                 val event = PlayEvent.fromJson(text)
  22.                 GlobalScope.launch {
  23.                     eventFlow.emit(event)
  24.                 }
  25.             }
  26.             override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  27.                 println("WebSocket failure: ${t.message}")
  28.                 GlobalScope.launch {
  29.                     reconnect(url)
  30.                 }
  31.             }
  32.             override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  33.                 println("WebSocket closed: $code - $reason")
  34.                 GlobalScope.launch {
  35.                     reconnect(url)
  36.                 }
  37.             }
  38.         })
  39.     }
  40.     private suspend fun reconnect(url: String) = withContext(Dispatchers.IO) {
  41.         delay(5000) // 5s 后重试
  42.         connect(url)
  43.     }
  44.     fun sendEvent(event: PlayEvent) {
  45.         val json = event.toJson()
  46.         webSocket?.send(json)
  47.     }
  48.     fun observeEvents(): Flow<PlayEvent> = eventFlow
  49. }
复制代码
在这个改进后的 WebSocketManager 类中,connect 和 reconnect 方法都使用了 withContext(Dispatchers.IO) 来切换到 IO 线程,制止阻塞主线程。同时,使用 MutableSharedFlow 来处理事件的发射和订阅。
3. Flow 处理数据流与 UI 映射

利用 Flow 可以方便地处理数据流,通过 collectAsState() 将实时数据映射到 UI 层。
  1. import 'package:flutter/material.dart';
  2. import 'package:get/get.dart';
  3. import 'package:your_app/music_player_controller.dart';
  4. import 'package:your_app/websocket_manager.dart';
  5. class MusicPlayerPage extends StatelessWidget {
  6.   final MusicPlayerController controller = Get.put(MusicPlayerController());
  7.   final WebSocketManager webSocketManager = Get.put(WebSocketManager());
  8.   @override
  9.   Widget build(BuildContext context) {
  10.     return Scaffold(
  11.       appBar: AppBar(
  12.         title: Text('Music Player'),
  13.       ),
  14.       body: Column(
  15.         mainAxisAlignment: MainAxisAlignment.center,
  16.         children: [
  17.           Obx(() => Text('Play State: ${controller.playStateFlow.value}')),
  18.           Obx(() => Text('Position: ${controller.positionFlow.value} ms')),
  19.           Obx(() => Text('Buffered Position: ${controller.bufferedPositionFlow.value} ms')),
  20.           ElevatedButton(
  21.             onPressed: () => controller.play('your_music_url'),
  22.             child: Text('Play'),
  23.           ),
  24.           ElevatedButton(
  25.             onPressed: () => controller.pause(),
  26.             child: Text('Pause'),
  27.           ),
  28.           ElevatedButton(
  29.             onPressed: () => controller.seekTo(5000), // 跳到 5s 位置
  30.             child: Text('Seek to 5s'),
  31.           ),
  32.           StreamBuilder<PlayEvent>(
  33.             stream: webSocketManager.observeEvents().asStream(),
  34.             builder: (context, snapshot) {
  35.               if (snapshot.hasData) {
  36.                 return Text('Received event: ${snapshot.data?.eventType} at ${snapshot.data?.position} ms');
  37.               } else {
  38.                 return Text('No event received');
  39.               }
  40.             },
  41.           ),
  42.         ],
  43.       ),
  44.     );
  45.   }
  46. }
复制代码
在这个 Flutter 页面中,我们使用 StreamBuilder 来监听 WebSocketManager 的 eventFlow,当接收到新的事件时,更新 UI 显示事件信息。
4. 网络波动处理

当检测到网络波动时,协程自动重试 WebSocket 毗连,并缓存未同步的播放事件,网络恢复后批量补发。
 
  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.flow.*
  3. import okhttp3.*
  4. import java.io.IOException
  5. import java.util.concurrent.TimeUnit
  6. class WebSocketManager {
  7.     private val client = OkHttpClient.Builder()
  8.        .pingInterval(10, TimeUnit.SECONDS)
  9.        .build()
  10.     private var webSocket: WebSocket? = null
  11.     private val eventFlow = MutableSharedFlow<PlayEvent>()
  12.     private val pendingEvents = mutableListOf<PlayEvent>()
  13.     suspend fun connect(url: String) = withContext(Dispatchers.IO) {
  14.         val request = Request.Builder()
  15.            .url(url)
  16.            .build()
  17.         webSocket = client.newWebSocket(request, object : WebSocketListener() {
  18.             override fun onOpen(webSocket: WebSocket, response: Response) {
  19.                 println("WebSocket connected")
  20.                 sendPendingEvents()
  21.             }
  22.             override fun onMessage(webSocket: WebSocket, text: String) {
  23.                 val event = PlayEvent.fromJson(text)
  24.                 GlobalScope.launch {
  25.                     eventFlow.emit(event)
  26.                 }
  27.             }
  28.             override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  29.                 println("WebSocket failure: ${t.message}")
  30.                 GlobalScope.launch {
  31.                     reconnect(url)
  32.                 }
  33.             }
  34.             override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  35.                 println("WebSocket closed: $code - $reason")
  36.                 GlobalScope.launch {
  37.                     reconnect(url)
  38.                 }
  39.             }
  40.         })
  41.     }
  42.     private suspend fun reconnect(url: String) = withContext(Dispatchers.IO) {
  43.         delay(5000) // 5s 后重试
  44.         connect(url)
  45.     }
  46.     fun sendEvent(event: PlayEvent) {
  47.         if (webSocket?.connectionState() == WebSocket.State.OPEN) {
  48.             val json = event.toJson()
  49.             webSocket?.send(json)
  50.         } else {
  51.             pendingEvents.add(event)
  52.         }
  53.     }
  54.     private fun sendPendingEvents() {
  55.         pendingEvents.forEach { event ->
  56.             val json = event.toJson()
  57.             webSocket?.send(json)
  58.         }
  59.         pendingEvents.clear()
  60.     }
  61.     fun observeEvents(): Flow<PlayEvent> = eventFlow
  62. }
复制代码
 在这个改进后的 WebSocketManager 类中,我们添加了一个 pendingEvents 列表来缓存未同步的播放事件。当毗连乐成时,调用 sendPendingEvents 方法批量发送这些事件。在 sendEvent 方法中,如果毗连未打开,则将事件添加到 pendingEvents 列表中。
感谢观看!!!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表