联机游戏机制--续

导入

上一篇博客简单介绍了联机游戏机制,这篇直接找一个简单的单机游戏改成联机,希望对读者理解联机游戏机制有帮助。

油管一搜pygame就找到了这个游戏,链接,一个经典的雷电游戏的简单python实现,一共只有300多行代码。那就这个了。

正文

一个简单的联机游戏我习惯用下面这种简单的框架:

  • 客户端

    1. 建立连接,处理玩家输入

    2. 利用tcp或者可靠udp协议发送输入到服务器端。同时本地利用玩家输入进行本地的预测

    3. 接收服务器的返回数据(一般另外使用后台守护线程专门负责接收服务器数据),根据服务器的权威状态和本地预测状态进行和解

    4. 对其他玩家的角色状态进行插值

    5. 游戏渲染

  • 服务器端

    1. 多线程,线程池或者端口复用应对客户端连接

    2. 根据连接类型判断新连接还是接收客户端数据。新连接则分配玩家id,保存socket描述符,新建玩家,保存到所有玩家字典。如果是客户端数据,则解码数据,传输数据利用json或者序列化,解码对应。保存输入数据到对应玩家结构中。

    3. 如此循环监听,直到满足服务器tick同步条件,则进行同步

    4. 同步首先记录同步时间,之后保存到快照中,便于客户端判断快照时间顺序和用来判断快照是否已确认。然后处理所有玩家字典中的输入,进行权威状态的计算。对于每一个玩家,依次执行输入队列中的所有操作,直到队列为空。如果有多种类型的输入,根据type类型判断分别处理,同时要进行碰撞检测。这样依次处理完所有玩家的输入后,就得到这个服务器同步tick时间间隔内整个游戏状态的推进(权威状态推进),把推进后的权威状态选出客户端需要的加入到服务器快照中(比如tick,玩家位置,子弹位置,碰撞检测结果等),然后广播快照给所有客户端。

    5. 更新最新同步tick时间,再次循环

这种C/S联机架构非常简单,个人觉得很适合联机小游戏的制作,直接根据这个流程修改即可。然后因为这个游戏代码并没有重构,整体结构类似函数式编程,我也只简单修改了下,花了小半天时间,并没有帮他重构代码。

首先就是客户端连接服务器:

def connect_to_server():
        max_retries = 3
        global sock,player_id,connected
        for attempt in range(max_retries):
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(5)
                sock.connect((server_ip, 5555))
                data = recv_until_null(sock)
                if data:
                    init_data = json.loads(data)
                    player_id = init_data['player_id']
                    print(f"Connected as {player_id}")

                    connected = True
                    return True

            except Exception as e:
                print(f"Connection attempt {attempt+1} failed: {e}")
                time.sleep(1)
        return False

为了提高鲁棒性,设置三次重连机会。这里我为了避免不同次数据之间的影响,和接收完整度的考虑,设计了每次循环接收到确认数据结束,并且设置结束符\0。接收函数如下:

def recv_until_null(sock=None, buffer_size=4096):
        data = b''
        while True:
            try:
                chunk = sock.recv(buffer_size)
                if not chunk:
                    return None
                data += chunk
                # 如果接收data中有b'\0'结束符,说明接收完毕
                if b'\0' in data:
                    msg, _ = data.split(b'\0', 1)
                    return msg.decode()
            except socket.timeout:
                return None
            except Exception as e:
                print(f"Recv error: {e}")
                return None

然后服务器需要接受到这个信息,并且分配玩家id,并进行一些初始化:

def accept_new_connection(self):
        conn, addr = self.server.accept()
        # 服务器分配玩家id
        player_id = f"player_{len(self.players)}"

        # 立即发送玩家ID
        init_data = {'player_id': player_id}
        conn.sendall((json.dumps(init_data) + '\0').encode())

        self.inputs.append(conn)

        self.players[conn] = {
            'id': player_id,
            'position': [250, 590],
            'input_queue': deque(maxlen=60), # 保存客户端玩家输入向量的队列
            'last_processed': 0,
            'health': 100,
            'lives': 3,
            'score': 0,
        }
        self.player_sprites[conn] = Player(player_id)
        print(f"New connection from {addr} as {player_id}")

这里用的是上个博客中的例子服务器代码直接修改,偷懒了,此处最好直接使用pygame的精灵类Sprite继承后的Player类,直接把客户端的Player类复制过来即可。既然提到这,就说有一下,对于这个游戏代码而言,他定义了玩家类,岩石类,子弹类,还有特效的爆炸类,buff类,都是继承自精灵类Sprite。对于联机游戏而言,岩石,子弹,buff都是由服务器控制的,客户端只负责渲染的部分,比如爆炸特效类,因此客户端只需要加载图片资源,然后根据服务器端返回的位置数据渲染即可。所以涉及计算的岩石,子弹,buff类,包括玩家类都要移动到服务器端,客户端保留爆炸特效类和一个玩家类的复制即可。

下面再次修改客户端,客户端需要接收这个玩家id,然后开启一个后台线程,持续接收服务器消息。

def start_network_thread():
    threading.Thread(target=recv_server_data, daemon=True).start()

def recv_server_data():
    buffer = b''
    global connected, server_states
    while running and connected:
        try:
            data = sock.recv(4096)
            if not data:
                print("Server closed connection")
                connected = False
                break

            buffer += data
            while b'\0' in buffer:
                msg, buffer = buffer.split(b'\0', 1)
                if msg:
                    try:
                        snapshot = json.loads(msg.decode())
                        # 保存快照
                        server_states.append(snapshot)
                        # 和解
                        reconcile_states()
                    except json.JSONDecodeError:
                        print("Invalid JSON data from server")
        except ConnectionResetError:
            print("Connection reset by server")
            connected = False
            break
        except Exception as e:
            print(f"Network error: {e}")
            connected = False
            break

然后客户端处理玩家输入,并且进行本地预测:

def send_player_input(input_vec):
        global connected, player_id
        if not connected:
            return

        input_cmd = {
            'type':'move',
            'tick': int(time.time() * 1000), # 输入操作的时间戳(秒)
            'input': input_vec,  # 输入按键
            'player_id': player_id  # 玩家id
        }

        try:
            # 末尾+ 结束符'\0' 发送到服务器
            sock.sendall((json.dumps(input_cmd) + '\0').encode())
            # 并且本地进行预测
            predict_movement(input_vec)
        except ConnectionResetError:
            print("Connection lost during send")
            connected = False
        except Exception as e:
            print(f"Send error: {e}")
            connected = False

def predict_movement(input_vec):
    global render_x
    new_x = render_x + input_vec[0] * 5
    render_x = new_x

本地预测的结果会和后台接收的服务器权威状态进行和解:

def reconcile_states():
    global last_confirmed_tick, render_x, render_y, entities
    if not server_states:
        return

    latest = server_states[-1]
    if latest['tick'] > last_confirmed_tick:

        server_entity = latest['state'].get(player_id)

        if server_entity:
            render_x = server_entity[0]
            render_y = server_entity[1]

        last_confirmed_tick = latest['tick']
        entities = latest['state']

这是具体的和解代码,和上篇博客中使用的差不多。

然后我们再回到服务器端,服务器对输入进行处理:

def handle_client_data(self, sock):
        try:
            data = sock.recv(2048)
            if data:
                messages = data.split(b'\0')
                for msg in messages:
                    if msg:
                        try:
                            cmd = json.loads(msg.decode())
                            self.players[sock]['input_queue'].append(cmd)
                        except json.JSONDecodeError:
                            print("Invalid JSON data received")
            else:
                self.cleanup_connection(sock)
        except ConnectionResetError:
            self.cleanup_connection(sock)
        except Exception as e:
            print(f"Error handling client data: {e}")
            self.cleanup_connection(sock)

然后服务器进行同步,生成快照,并且广播:

def game_tick(self):

        current_tick = int(time.time() * 1000)
        active_collisions = {}
        for conn, player in self.players.items():                         
            while player['input_queue']:
                input_cmd = player['input_queue'].popleft()

                if input_cmd['type'] == 'move':
                    if input_cmd['tick'] > player['last_processed']:
                        dx = input_cmd['input'][0]
                        player['position'][0] += dx * 5
                        # 更新时间戳
                        player['last_processed'] = input_cmd['tick']
                elif input_cmd['type'] == 'shoot':
                    self.handle_shoot(player, conn)

        for conn, player_sprite in self.player_sprites.items():
            # 更新玩家精灵,用于碰撞检测
            player_sprite.update(self.players[conn]['position'][0], self.players[conn]['position'][1])      
        # 碰撞检测与处理
        hits = pygame.sprite.groupcollide(self.bullets_group, self.rocks_group , True, True)
        for hit in hits:
            self.players[hit.sock_id]['score'] += 1
            if random.random() > 0.5:
                power = Power(hit.rect.centerx,hit.rect.bottom)
                self.powers_group.add(power)
            r = Rock()
            self.rocks_group.add(r)
            active_collisions['lg'].append(hit.rect.center)

        for conn, player_sprite in self.player_sprites.items():
            hits = pygame.sprite.spritecollide(player_sprite, self.rocks_group, True, pygame.sprite.collide_circle)
            for hit in hits:
                self.players[conn]['health'] -= 50
                r = Rock()
                self.rocks_group.add(r)
                if self.players[conn]['health'] <= 0:
                    global DEAD,TRUE_DEAD
                    DEAD[self.players[conn]['id']] = True               
                    self.players[conn]['lives'] -= 1
                    self.players[conn]['health'] = 100
                    if self.players[conn]['lives'] <= 0:
                        TRUE_DEAD[self.players[conn]['id']] = True
                    player_sprite.hide()
                active_collisions['sm'].append(player_sprite.rect.center)

        for conn, player_sprite in self.player_sprites.items():
            hits = pygame.sprite.spritecollide(player_sprite, self.powers_group, True)
            for hit in hits:
                if hit.type == 'shield':
                    self.players[conn]['health'] += 20
                    if self.players[conn]['health'] > 100:
                        self.players[conn]['health'] = 100
                elif hit.type == 'gun':
                    player_sprite.gun_mode += 1

        snapshot = {
            'tick': current_tick,
            'state': {s.id: [s.rect.centerx, s.rect.bottom] for s in self.player_sprites.values()},
            'bullets':[[b.rect.left,b.rect.top] for b in self.bullets_group],
            'rocks':[[r.rect.left,r.rect.top] for r in self.rocks_group],
            'score':{p['id']: p['score'] for p in self.players.values()},
            'health':{p['id']: p['health'] for p in self.players.values()},
            'lives':{p['id']: p['lives'] for p in self.players.values()},
            'dead': DEAD,
            'true_dead': TRUE_DEAD,
            'collisions': active_collisions,
            'powers': {s.type: [s.rect.left,s.rect.top] for s in self.powers_group},
        }

        # 广播快照
        self.broadcast_snapshot(snapshot)

    def broadcast_snapshot(self, snapshot):
        data = (json.dumps(snapshot) + '\0').encode()
        for conn in list(self.players.keys()):
            try:
                conn.sendall(data)
            except (ConnectionResetError, BrokenPipeError):
                self.cleanup_connection(conn)
            except Exception as e:
                print(f"Broadcast error: {e}")
                self.cleanup_connection(conn)

关于碰撞检测,直接使用pygame的内置函数即可。至于snapshot就是服务器上主要传输的数据,也就是快照数据,可以看到包含所有玩家和子弹,岩石,buff的位置,还有其他服务器计算的重要属性,包含了客户端渲染所需要的一切信息。

接着客户端对其他玩家的位置进行插值:

def interpolate_entities():
    global entities
    # 插值要求服务器状态快照队列中至少有2个快照才能插值
    if len(server_states) < 2:
        return

    t1 = server_states[-2]['tick']
    t2 = server_states[-1]['tick']
    if t1 == t2:
        return
    # 生成0~1之间的插值比例,根据时间戳计算插值比例
    alpha = min(1.0, max(0.0, (time.time()*1000 - t1) / (t2 - t1)))

    for entity_id, entity in server_states[-1]['state'].items():
        if entity_id != player_id:
            prev_entity = server_states[-2]['state'].get(entity_id, entity)
            interp_x = prev_entity[0] + (entity[0] - prev_entity[0]) * alpha

            entities[entity_id] = [interp_x]

插值后,我们获取了其他玩家的位置,又利用快照数据知道了自己玩家的新的位置,知道了岩石,子弹,buff的位置,知道了玩家血量,生命等信息,客户端直接加载图片资源,然后利用这些快照中的位置进行渲染即可。

def render_bullet():
    if not server_states:
        return
    bullets = server_states[-1]['bullets']

    if not bullets:
        return
    # 绘制子弹
    for bullet in bullets:
        screen.blit(bullet_img, (bullet[0], bullet[1]))

渲染函数大致如此,其他也是一样。

可以看看2人联机的效果,还是不错的。

总结

对于简单的联机游戏,使用我提出的简单框架即可。而对于socket连接而言,简单的几人联机小游戏可以通过多线程或者线程池来处理,但是对于更大型的多人游戏,多线程的开销太大,还是要使用端口复用或者更强大服务器集群来处理,多服务器集群还涉及到不同服务器处理不同游戏地图区域的技术等等,总之联机技术还是很复杂的,但是掌握博客提到的这些基本的联机游戏机制,对一些中小型游戏来说已经足够了。

至于连接方式选择TCP还是UDP,现代游戏一般都是改进UDP协议,而不使用TCP,因为TCP更复杂,校验更多,延迟也就更高,而改进后的UDP即可靠又可以抛去TCP中冗余的环节,因此更受欢迎。