Android 集成websocket小结

tech2023-11-05  105

1、GitHub地址

 https://github.com/TooTallNate/Java-WebSocket 

implementation "org.java-websocket:Java-WebSocket:1.5.1"

2、项目新建module->im_websocket

module->build.gradle

apply plugin: 'com.android.library' android { compileSdkVersion rootProject.ext.android["compileSdkVersion"] defaultConfig { minSdkVersion rootProject.ext.android["minSdkVersion"] targetSdkVersion rootProject.ext.android["targetSdkVersion"] versionCode 1 versionName "1.0" } } dependencies { api fileTree(include: ['*.jar'], dir: 'libs') implementation "org.java-websocket:Java-WebSocket:1.5.1" }

其中配置文件来自统一配置文件config.gradle

ext { android = [ compileSdkVersion: 28, buildToolsVersion: "28.0.3", minSdkVersion : 21, targetSdkVersion : 28, versionCode : VERSION_CODE as int, versionName : APP_VERSION ] version = [ androidSupportSdkVersion: "28.0.0", retrofitSdkVersion : "2.2.0", butterknifeSdkVersion : "8.8.1", constraintLayout : "1.1.2", espressoSdkVersion : "2.2.2", canarySdkVersion : "1.5.1", dagger2SdkVersion : "2.10", rxlifecycleSdkVersion : "1.0", rxlifecycle2SdkVersion : "2.0.1" ] dependencies = [ //support "appcompat-v7": "com.android.support:appcompat-v7:${version["androidSupportSdkVersion"]}", ... ] ... }

3、封装

JWebSocketClient.java文件: package com.example.im_websocket; import android.util.Log; import org.java_websocket.client.WebSocketClient; import org.java_websocket.drafts.Draft_6455; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; public class JWebSocketClient extends WebSocketClient { public JWebSocketClient(URI serverUri) { super(serverUri, new Draft_6455()); } @Override public void onOpen(ServerHandshake handshakedata) { Log.e("JWebSocketClient", "onOpen()"); } @Override public void onMessage(String message) { Log.e("JWebSocketClient", "onMessage()"); } @Override public void onClose(int code, String reason, boolean remote) { Log.e("JWebSocketClient", "onClose()"); } @Override public void onError(Exception ex) { Log.e("JWebSocketClient", "onError()"); } }

ImClient:

package com.example.im_websocket; import android.content.Context; import android.text.TextUtils; import android.util.Log; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; public class ImClient { public JWebSocketClient client; private static String TAG = "ImClient"; private URI allUri = null; private static ImClient.OnReceiveMessageListener onReceiveMessageListener; public static void init(Context context, String appKey) { init(context, appKey, true); } private static void init(Context context, String appKey, boolean isPush) { if (TextUtils.isEmpty(appKey)) { Log.e(TAG, "appkey is null"); return; } SingletonHolder.sInstance.initClientSet(context, appKey); } //连接服务器 public static ImClient connect(final ConnectCallback callback) { SingletonHolder.sInstance.connectServer(new ConnectCallback() { @Override public void onSuccess(String var1) { if (callback != null) callback.onSuccess(var1); } @Override public void onError(String var1) { if (callback != null) callback.onError(var1); } @Override public void onMessage(String message) { if (callback != null) callback.onMessage(message); } @Override public void onTokenIncorrect() { if (callback != null) callback.onTokenIncorrect(); } }); return SingletonHolder.sInstance; } public static void disConnectionSocket() { SingletonHolder.sInstance.closeConnect(); } public static void reConnectionWebsoket() throws Exception{ SingletonHolder.sInstance.reConnectionSelfWebsoket(); } public static boolean isNullReconnectionWebsocket() { return SingletonHolder.sInstance.isNullReconnectionSelfWebsocket(); } public static boolean isCloseConnectionWebsocket() { return SingletonHolder.sInstance.isCloseReconnectionSelfWebsocket(); } public static void setOnReceiveMessageListener(OnReceiveMessageListener listener) { onReceiveMessageListener = listener; } private void initClientSet(Context context, String appKey) { allUri = URI.create(appKey); } private void connectServer(final ConnectCallback connectCallback) { try { if (client == null) { client = new JWebSocketClient(allUri) { @Override public void onMessage(String message) { Log.e(TAG, "onMessage message=" + message); if (connectCallback != null) connectCallback.onMessage(message); if (onReceiveMessageListener != null) { onReceiveMessageListener.onReceived(message, 1); } } @Override public void onOpen(ServerHandshake handshakedata) { super.onOpen(handshakedata); Log.e(TAG, "onOpen =" + handshakedata.getHttpStatusMessage()); if (connectCallback != null) connectCallback.onSuccess(handshakedata.getHttpStatusMessage()); } @Override public void onClose(int code, String reason, boolean remote) { super.onClose(code, reason, remote); Log.e(TAG, "onClose code=" + code + ",reason=" + reason + ",remote" + remote); if (connectCallback != null) connectCallback.onError("onClose code=" + code + ",reason=" + reason + ",remote=" + remote); } @Override public void onError(Exception ex) { super.onError(ex); Log.e(TAG, "onError ex=" + ex.getMessage()); if (connectCallback != null) connectCallback.onError("onError ex=" + ex.getMessage()); } }; client.connectBlocking(); } } catch (InterruptedException e) { e.printStackTrace(); Log.e(TAG, "connect onError ex=" + e.getMessage()); } } //是否关闭 private boolean isCloseReconnectionSelfWebsocket() { if (client == null) return true; return client.isClosed(); } //是否client为空 private boolean isNullReconnectionSelfWebsocket() { return client == null; } //重连websoket private void reConnectionSelfWebsoket() throws Exception { if (client != null) { try { client.reconnectBlocking(); } catch (InterruptedException e) { e.printStackTrace(); throw e; } } } public static void sendMessage(String message) { if (TextUtils.isEmpty(message)) { Log.e(TAG, "Message sending cannot null "); return; } SingletonHolder.sInstance.send(message); } private void send(String message) { Log.e(TAG, "send() message=" + message); if (client != null && client.isOpen()) { client.send(message); } else { Log.e(TAG, "send() client error null"); } } /** * 断开连接 */ private void closeConnect() { Log.e(TAG, "closeConnect() message="); try { if (null != client) { client.close(); } } catch (Exception e) { e.printStackTrace(); } finally { client = null; } } public interface OnReceiveMessageListener { boolean onReceived(String var1, int var2); } public abstract static class ConnectCallback extends ImClient.ResultCallback<String> { public ConnectCallback() { } public abstract void onTokenIncorrect(); } public abstract static class ResultCallback<T> { public ResultCallback() { } public abstract void onMessage(String message); public abstract void onSuccess(T var1); public abstract void onError(String var1); public static class Result<T> { public T t; public Result() { } } } private static class SingletonHolder { static ImClient sInstance = new ImClient(); private SingletonHolder() { } } }

4、app调用

websocket的功能实现类:WebSocketIm import android.content.Context; import android.text.TextUtils; import com.example.im_websocket.ImClient; /** * websocket的功能实现类 */ public class WebSocketIm implements ImOperation<String> { private final static String TAG = "WebSocketIm"; /** * @param context 传入Application类的Context。 * @param appKey 注册应用的连接地址。 */ @Override public void init(Context context, String appKey) { LogUtils.e("lxd", appKey); ImClient.init(context, appKey); } /** * @param callback 连接回调。 * @return ImClient 客户端核心类的实例。 */ @Override public void connect(String token, final ConnectCallback callback) { //IM ImClient.connect(new ImClient.ConnectCallback() { @Override public void onTokenIncorrect() { if (callback != null) callback.onError(ErrorMessage.valueOf(ErrorMessage.TOKEN_ERROR)); } @Override public void onMessage(String message) { if (TextUtils.isEmpty(message)) { LogUtils.e(TAG,"IM接收消息为空"); } } @Override public void onSuccess(String var1) { LogUtils.e(TAG, "onMessage message=" + var1); if (callback != null) callback.onSuccess(SessionProvider.Companion.get().getUserId()); //链接成功开始发送链接消息 开始发送webSocket连接信息 ImClient.sendMessage(SendChatMessageBean.Companion.getConnectionMessage()); } @Override public void onError(String var1) { if (callback != null) callback.onError(ErrorMessage.valueOf(-101, var1)); } }); } @Override public void logout(boolean recPushMsg) { //IM ImClient.disConnectionSocket(); } /** * 这需要回调回去 * * @param listener */ @Override public void onReceiveMessage(final OnParserBeforeReceiveMessageListener<String> listener) { if (listener == null) return; ImClient.setOnReceiveMessageListener((message, i) -> { listener.onReceived(message, i); return false; }); } /** * 重新连接websoket */ @Override public void reConnectionWebsoket() throws Exception { ImClient.reConnectionWebsoket(); } /** * client 是否为空 */ @Override public boolean isNullReconnectionWebsocket() { return ImClient.isNullReconnectionWebsocket(); } /** * 判断client是否为关闭状态 */ @Override public boolean isCloseConnectionWebsocket() { return ImClient.isCloseConnectionWebsocket(); } /** * 发送心跳包 */ @Override public void sendHeartPackage() { //开始发送webSocket心跳包 ImClient.sendMessage(SendChatMessageBean.Companion.getPingMessage()); } }

 

以下代码可封装的单独的module供整个项目使用---------------------------------------------

Im操作接口 ImOperation

import android.content.Context; /** * Im操作类接口 M具体解析的类型 */ public interface ImOperation<M> { /** * 初始化 * * @param context 上下文 * @param appKey appkey */ void init(Context context, String appKey); /** * 链接服务器 * * @param token 链接标识 * @param callback 回调 */ void connect(String token, final ConnectCallback callback); /** * 退出 * * @param b */ void logout(boolean b); /** * 设置解析之前的消息接收监听 这个需要具体的第三方库回调回来 * * @param receive 消息回调 */ void onReceiveMessage(OnParserBeforeReceiveMessageListener<M> receive); /** * 重新连接websoket */ void reConnectionWebsoket() throws Exception; /** * client 是否为空 */ boolean isNullReconnectionWebsocket(); /** * 判断client是否为关闭状态 */ boolean isCloseConnectionWebsocket(); /** * 发送心跳包 */ void sendHeartPackage(); } ConnectCallback 连接状态接口 public interface ConnectCallback { void onSuccess(String userId); void onError(ErrorMessage errorMessage); } ErrorMessage public class ErrorMessage { private int mCode; private String mMsg; public final static int UNKNOWN_CODE = -1; public final static int TOKEN_ERROR = 1; public ErrorMessage() { } public ErrorMessage(int code, String msg) { this.mCode = code; this.mMsg = msg; } public int getCode() { return mCode; } public void setCode(int code) { this.mCode = code; } public String getMsg() { return mMsg; } public void setMsg(String msg) { this.mMsg = msg; } public int getValue() { return this.mCode; } public String getMessage() { return this.mMsg; } public static ErrorMessage valueOf(int code) { ErrorMessage errorMessage = new ErrorMessage(); errorMessage.setCode(code); switch (code) { case UNKNOWN_CODE: errorMessage.setMsg("未知错误"); break; case TOKEN_ERROR: errorMessage.setMsg("token错误"); break; default: errorMessage.setMsg("未知错误"); break; } return errorMessage; } public static ErrorMessage valueOf(int code, String msg) { return new ErrorMessage(code, msg); } } IMManager管理类 import android.content.Context; /** * IM功能管理类 */ public class IMManager { private volatile static IMManager mInstance; private ImOperation mImOperation; private ImParser mImParser; private OnReceiveImMessageListener mOnReceiveImMessageListener; private IMManager() { } public static IMManager getInstance() { if (mInstance == null) { synchronized (IMManager.class) { if (mInstance == null) { mInstance = new IMManager(); } } } return mInstance; } /** * 设置操作引擎 * * @param imOperation */ public IMManager imOperation(ImOperation imOperation) { mImOperation = imOperation; return mInstance; } /** * 设置解析引擎 * * @param imParser */ public IMManager imParser(ImParser imParser) { mImParser = imParser; return mInstance; } /** * 初始化 * * @param context * @param appKey */ public void init(Context context, String appKey) { if (mImOperation == null) return; mImOperation.init(context.getApplicationContext(), appKey); } /** * 链接服务器 * * @param token * @param callback */ public void connect(String token, ConnectCallback callback) { if (mImOperation == null) return; mImOperation.connect(token, callback); } /** * 设置接受消息的回掉 * * @param listener li */ public void setOnReceiveMessageListener(final OnReceiveImMessageListener listener) { mOnReceiveImMessageListener = listener; onReceived(); } /** * 设置消息解析之后的回调 */ private void onReceived() { if (mImOperation == null) return; if (mImParser == null) return; mImOperation.onReceiveMessage(new OnParserBeforeReceiveMessageListener() { @Override public void onReceived(Object message, int i) { mOnReceiveImMessageListener.onReceived(mImParser.parser(message), i); } }); } /** * 退出 */ public void logout() { if (mImOperation == null) return; mImOperation.logout(false); } /** * 重连websocket */ public void reConnectionWebsocket() throws Exception{ if (mImOperation == null) return; mImOperation.reConnectionWebsoket(); } /** * client 是否为空 */ public boolean isNullReconnectionWebsocket() { if (mImOperation == null) return true; return mImOperation.isNullReconnectionWebsocket(); } /** * 判断client是否为关闭状态 */ public boolean isCloseConnectionWebsocket() { if (mImOperation == null) return true; return mImOperation.isCloseConnectionWebsocket(); } /** * 发送心跳包 */ public void sendHeartPackage() { if (mImOperation == null) return; mImOperation.sendHeartPackage(); } } ImParser public interface ImParser<T> { ImMessage parser(T message); } OnReceiveImMessageListener public interface OnReceiveImMessageListener { void onReceived(ImMessage message,int i ); }

 

---------------------------------------------------回到主项目-----------------------------------------------------------

IMHelper(加入和退出房间可忽略)

import android.app.ActivityManager; import android.content.Context; import android.text.TextUtils; import org.jetbrains.annotations.Nullable; import java.util.List; import java.util.Objects; import io.reactivex.subscribers.DisposableSubscriber; /** * im业务逻辑处理类 * webSocket */ public class IMHelper { private static IMHelper mInstance; private String TAG = "IMHelper"; private IMHelper() { } public static IMHelper getInstance() { if (mInstance == null) { synchronized (IMHelper.class) { if (mInstance == null) { mInstance = new IMHelper(); } } } return mInstance; } public void initIm(Context context) { initIm(context, false); } public void initIm(Context context, boolean forcedInit) { initIm(context, forcedInit, null); } //是否初始化成功 private boolean initImOk; /** * 初始化 * * @param context * @param forcedInit 是否必须重新初始化 如果为false就只初始化一次 * @param connectCallback */ public void initIm(final Context context, final boolean forcedInit, final ConnectCallback connectCallback) { if (initImOk && !forcedInit) { return; } ActivityManager manager = (ActivityManager) context.getSystemService(Context.ACTIVITY_SERVICE); List<ActivityManager.RunningAppProcessInfo> processes = manager .getRunningAppProcesses(); for (ActivityManager.RunningAppProcessInfo process : processes) { if (TextUtils.equals(context.getPackageName(), process.processName)) { //websocket取当前地址 String appkey = BuildConfig.IM_SERVER_URL; IMManager.getInstance() .imOperation(new WebSocketIm()) .imParser(new WebSocketImParser()) .init(context, appkey); IMManager.getInstance().connect(null, new ConnectCallback() { @Override public void onSuccess(String userId) { initImOk = true; LogUtils.e(TAG, "webSocket连接成功 userId=" + userId); IMManager.getInstance().setOnReceiveMessageListener(new IMReceive(context)); if (connectCallback != null) { connectCallback.onSuccess(userId); } } @Override public void onError(ErrorMessage errorMessage) { initImOk = false; LogUtils.e(TAG, "webSocket连接失败 errorMessage=" + errorMessage.getMessage()); if (connectCallback != null) { connectCallback.onError(errorMessage); } } }); } } } //重连websocket public void reConnectionWebsocket() throws Exception { IMManager.getInstance().reConnectionWebsocket(); } //判断client是否为空 public boolean isNullConnectionWebsocket() { return IMManager.getInstance().isNullReconnectionWebsocket(); } //判断client是否为关闭 public boolean isCloseConnectionWebsocket() { return IMManager.getInstance().isCloseConnectionWebsocket(); } public void addRoom(Context context, String roomId, OperationCallback callback) { addRoom(context, roomId, true, callback); } public void addRoom(Context context, String roomId) { addRoom(context, roomId, null); } int connCount; /** * @param roomId * @param first true表示第一次加入 false表示重连 */ private synchronized void addRoom(final Context context, final String roomId, boolean first, final OperationCallback callback) { if (first) { connCount = 0; } connCount++; //加入房间 ChatMessageContract chatMessageContract = new JoinOrOutChatRoom(); chatMessageContract.joinChatRoom(roomId, new DisposableSubscriber<JoinChatRoomBean>() { @Override public void onNext(JoinChatRoomBean joinChatRoomBean) { try { if (joinChatRoomBean != null && !TextUtils.isEmpty(joinChatRoomBean.getStatus()) && Objects.requireNonNull(joinChatRoomBean.getStatus()).contains("success")) { LogUtils.e(TAG, "加入房间成功---->" + roomId); //注意 这个是在主线程调用 因为融云自己写了线程调度 if (callback != null) { callback.onSuccess(); } //重连线程 一个小时重连一次 StageThreadManager.getInstance().start(() -> { //注意 这个是在子线程调用 addRoom(context, roomId, true, callback); }, 3600); } else { joinError(Objects.requireNonNull(joinChatRoomBean).getMsg()); } } catch (Exception e) { e.printStackTrace(); } } private void joinError(String msg) { try { LogUtils.e(TAG, "加入房间失败---->" + roomId + "--->" + msg); if (connCount < 6) { LogUtils.e(TAG, "重新连接im-->" + connCount); initIm(context, true, new ConnectCallback() { @Override public void onSuccess(String userId) { addRoom(context, roomId, false, callback); } @Override public void onError(ErrorMessage errorMessage) { addRoom(context, roomId, false, callback); } }); } else { LogUtils.e(TAG, "重连超过最大次数!"); //注意 这个是在主线程调用 因为融云自己写了线程调度 if (callback != null) { callback.onError(ErrorMessage.valueOf(10, "重连超过最大次数")); } } } catch (Exception e) { e.printStackTrace(); } } @Override public void onError(Throwable t) { joinError(t.getMessage()); } @Override public void onComplete() { } }); } public void exitIm() { IMManager.getInstance().logout(); } public void exitRoom(final String roomId) { //退出房间 try { ChatMessageContract messageContract = new JoinOrOutChatRoom(); messageContract.exitChatRoom(roomId, new DisposableSubscriber<JoinChatRoomBean>() { @Override public void onNext(JoinChatRoomBean joinChatRoomBean) { if (joinChatRoomBean != null && !TextUtils.isEmpty(joinChatRoomBean.getStatus()) && Objects.requireNonNull(joinChatRoomBean.getStatus()).contains("success")) { LogUtils.e(TAG, "退出房间成功!" + roomId); } else { LogUtils.e(TAG, "退出房间失败!" + roomId + " " + Objects.requireNonNull(joinChatRoomBean).getMsg()); } } @Override public void onError(Throwable t) { LogUtils.e(TAG, "退出房间失败!" + roomId + " " + t.getMessage()); } @Override public void onComplete() { } }); StageThreadManager.getInstance().stop(); } catch (Exception e) { e.printStackTrace(); } } //发送心跳包 public void sendHeartPackage() { IMManager.getInstance().sendHeartPackage(); } }

5、app

JWebSocketClientService.java import android.app.Service; import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.os.Binder; import android.os.Handler; import android.os.IBinder; import android.support.v4.content.LocalBroadcastManager; /** * Im服务 */ public class JWebSocketClientService extends Service { private String TAG = "JWebSocketClientService"; private JWebSocketClientBinder mBinder = new JWebSocketClientBinder(); public class JWebSocketClientBinder extends Binder { public JWebSocketClientService getService() { return JWebSocketClientService.this; } } @Override public IBinder onBind(Intent intent) { return mBinder; } @Override public void onCreate() { super.onCreate(); IMInit(); heartBeatRunnable.run(); } @Override public void onDestroy() { if (mHandler != null && heartBeatRunnable != null) { mHandler.removeCallbacks(heartBeatRunnable); } mHandler = null; heartBeatRunnable = null; super.onDestroy(); } private void IMInit() { IMHelper.getInstance().initIm(this, true, new ConnectCallback() { @Override public void onSuccess(String userId) { LogUtils.e(TAG + " onSuccess", "userId=" + userId); } @Override public void onError(ErrorMessage errorMessage) { LogUtils.e(TAG + " onError", errorMessage.getMessage()); } }); } private static final long HEART_BEAT_RATE = 15 * 1000; private Handler mHandler = new Handler(); private Runnable heartBeatRunnable = new Runnable() { @Override public void run() { if (!IMHelper.getInstance().isNullConnectionWebsocket()) { if (IMHelper.getInstance().isCloseConnectionWebsocket()) { LogUtils.e(TAG, "IM重连"); reconnectWs(); } else { //发送心跳包 LogUtils.e(TAG, "IM开始发送心跳包"); IMHelper.getInstance().sendHeartPackage(); } } else { //如果client已为空,重新初始化websocket LogUtils.e(TAG, "重新初始化WebSocket"); IMInit(); } //定时对长连接进行心跳检测 mHandler.postDelayed(this, HEART_BEAT_RATE); } }; /** * 开启重连 */ private void reconnectWs() { mHandler.removeCallbacks(heartBeatRunnable); new Thread() { @Override public void run() { try { //尝试重连 IMHelper.getInstance().reConnectionWebsocket(); } catch (Exception e) { e.printStackTrace(); LogUtils.e("==", "IM重连失败" + e.getMessage()); } } }.start(); } }

MainActivity调用;

class MainActivity : AppCompatActivity{ override fun onCreate(savedInstanceState: Bundle?) { initView() } fun initView() { //im service bindIMService() } override fun onDestroy() { unbindIMService() super.onDestroy() } //IM聊天服务 private lateinit var binder: JWebSocketClientService.JWebSocketClientBinder private lateinit var jWebSClientService: JWebSocketClientService private val serviceConnection: ServiceConnection = object : ServiceConnection { override fun onServiceDisconnected(name: ComponentName?) { LogUtils.e("==","IM服务断开连接") } override fun onServiceConnected(name: ComponentName?, service: IBinder?) { LogUtils.e("==","IM服务连接") binder = service as JWebSocketClientService.JWebSocketClientBinder jWebSClientService = binder.service } } /** * 绑定IM服务 */ private fun bindIMService() { bindService(Intent(this@MainActivity, JWebSocketClientService::class.java), serviceConnection, Context.BIND_AUTO_CREATE) } /** * 取消绑定IM服务 */ private fun unbindIMService() { unbindService(serviceConnection) } }

PS: java-websocket 1.1.5某些机型的兼容性问题解决办法(

No virtual method setEndpointIdentificationAlgorithm(Ljava/lang/String;)V in class Ljavax/net/ssl/SSLParameters; or its super classes (declaration of 'javax.net.ssl.SSLParameters' appears in /system/framework/core-libart.jar)

org.java_websocket.client.WebSocketClient.onSetSSLParameters(WebSocketClient.java:528)

解决办法:

If you want to target Android API lower 24, you should do the following:

Override protected void onSetSSLParameters(SSLParameters sslParameters) to not call setEndpointIdentificationAlgorithm()

https://github.com/TooTallNate/Java-WebSocket/wiki/No-such-method-error-setEndpointIdentificationAlgorithm

JWebSocketClient.java:

public class JWebSocketClient extends WebSocketClient { ... @Override protected void onSetSSLParameters(SSLParameters sslParameters) { try { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) super.onSetSSLParameters(sslParameters); } catch (Exception e) { e.printStackTrace(); } } ... }

 

 

 

 

 

 

 

 

 

 

 

 

最新回复(0)