(资料图片仅供参考)
public void registerStartUpInfo(final boolean enabled) { //开始所有的监听器 listenerManager.startAllListeners(); //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器 leaderService.electLeader(); //{namespace}/{ipservers} 设置enable处理 serverService.persistOnline(enabled); //临时节点 /{namespave}/instances 放置运行服务实例信息 instanceService.persistOnline(); //开启一个异步服务 if (!reconcileService.isRunning()) { reconcileService.startAsync(); }}listenerManager.startAllListeners();会开启一个选举相关的listenerManager ElectionListenerManager.classleaderService.electLeader();执行选举功能 第一步:执行选举功能
public void electLeader() { log.debug("Elect a new leader now."); this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback()); log.debug("Leader election completed.");}
public void executeInLeader(String key, LeaderExecutionCallback callback) { try { LeaderLatch latch = new LeaderLatch(this.client, key); try { latch.start(); latch.await(); callback.execute(); } catch (Throwable var7) { try { latch.close(); } catch (Throwable var6) { var7.addSuppressed(var6); } throw var7; } latch.close(); } catch (Exception var8) { this.handleException(var8); }}{job name}/leader/election/latch节点加zk锁,在抢到锁之后,调用callback对象中的execute方法
class LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { //{jobname}/leader/election/instance 不存在 if (!hasLeader()) { //创建临时节点 {jobname}/leader/election/instance 值为 当前运行的实例值 例如:10.100.16.75@-@134642 前面是ip地址,后面是产生的随机数 //当应用实例与zk断开重新连接时,该节点信息会清除 jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } }}第二步:ElectionListenerManager.class开启监听
@Overridepublic void start() { addDataListener(new LeaderElectionJobListener()); addDataListener(new LeaderAbdicationJobListener());}执行start方法有两个监听LeaderElectionJobListener:用于leader宕机之后重新选举监听LeaderAbdicationJobListener :用于监听leader宕机数据处理 LeaderElectionJobListener.java
@Overridepublic void onChange(final DataChangedEvent event) { //1.schedulerMap 和 jobInstanceMap 没有job信息 //2.{jobname}/service/{ip} 节点数据为DISABLE 或者 ({jobname}/leader/election/instance 节点的类型为删除且{jobname}/servers 节点的值是ENABLED 且 {jobname}/instances 节点下有其他的在线实例) //当前运行的job实例宕机,并且有其他运行的实例 if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) { //重新选举 leaderService.electLeader(); }}
LeaderAbdicationJobListener.java
@Overridepublic void onChange(final DataChangedEvent event) { //{jobname}/leader/election/instance节点的实例id和JobRegistry对象中的实例id相等 //{jobname}/service/{ip}/ 是DISABLED //就是实例下线 if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) { //删除{jobname}/leader/election/instance 节点 leaderService.removeLeader(); }}
下一篇:最后一页
X 关闭
X 关闭
中新网上海3月30日电 (记者 陈静)上海正面临常态化防控以来疫情形势最严峻复杂的挑战,单日新增阳性感染者数量不断刷新纪录。记者30
中新网3月30日电 据国家地震台网官方微博消息,中国地震台网正式测定:3月30日18时14分在新疆和田地区皮山县(北纬36 01度,东经77 89
上海市委常委会今天上午(3月30日)举行会议,听取当前疫情应急处置和核酸筛查相关工作汇报,研究部署下一步疫情防控重点工作。市委书记
(抗击新冠肺炎)江苏无锡一男子隐匿行程轨迹被警方立案侦查 中新网无锡3月30日电 (记者 孙权)3月30日,无锡市在“应检尽检”人员核
(抗击新冠肺炎)官方称吉林市疫情扩散势头得到遏制 中新网吉林3月30日电 (记者 石洪宇)记者30日从吉林市政府新闻办召开的疫情防控
中新网唐山3月30日电 (白云水 孟潮)3月30日,河北省唐山市召开新冠肺炎疫情防控工作新闻发布会通报称,3月29日0时至24时,唐山市新增
浙江省嘉兴市秀洲区新型冠状病毒感染肺炎疫情防控指挥部办公室发布通告: 3月30日上午,秀洲区发现1例新冠肺炎阳性感染者,该感染者
今天(3月30日)下午,新疆乌鲁木齐市人民政府新闻办公室召开疫情防控新闻发布会,通报乌鲁木齐市新冠肺炎疫情和疫情防控最新情况。会上
中新网天津3月30日电 (记者 王君妍)记者30日从天津市水务局获悉,为充分发挥河湖长制优势,近日,天津市将南水北调中线天津干线(天津
(抗击新冠肺炎)河北廊坊累计治愈出院673例 5县区恢复域内交通 中新网廊坊3月30日电 (宋敏涛 郭京泉)30日,河北省廊坊市召