/*++ Copyright (c) 2000 Microsoft Corporation Module Name: receive.c Abstract: Receive handler and sends reply packets Author: Ahmed Mohamed (ahmedm) 12, 01, 2000 Revision History: --*/ #include "gs.h" #include "gsp.h" #include #include void GspDumpQueue(gs_group_t *gd) { gs_msg_t *q; int i = 0; for (q = gd->g_recv.r_head; q != NULL; q = q->m_next) { state_log(("Msg %x: nid %d gid %d type %d mseq %d bnum %d flags %x cnt %d\n", q, q->m_hdr.h_sid, q->m_hdr.h_gid, q->m_hdr.h_type, q->m_hdr.h_mseq, q->m_hdr.h_bnum, q->m_hdr.h_flags, q->m_refcnt)); i++; if (i > 100) { err_log(("Infinite loop\n")); halt(1); } } state_log(("Head %x Next %x expecting <%d, %d.\n", gd->g_recv.r_head, gd->g_recv.r_next, gd->g_recv.r_mseq, gd->g_recv.r_bnum)); } void GspRemoveMsg(gs_group_t *gd, gs_msg_t *msg) { gs_msg_t **p; gs_msg_t *q; gs_log(("Remove gid %d seq %d msg %x\n", gd->g_id, msg->m_hdr.h_mseq, msg)); GspDumpQueue(gd); if (msg->m_hdr.h_flags & GS_FLAGS_QUEUED) { while ((q = gd->g_recv.r_head) != msg) { if (q == NULL) { err_log(("Internal error: null head during remove %x\n", msg)); GspDumpQueue(gd); halt(1); break; } q->m_hdr.h_flags &= ~GS_FLAGS_QUEUED; gd->g_recv.r_head = q->m_next; msg_free(q); } // delay the freeing of continued messages to simplify recovery if (!(msg->m_hdr.h_flags & GS_FLAGS_CONTINUED)) { msg->m_refcnt--; msg->m_hdr.h_flags &= ~GS_FLAGS_QUEUED; gd->g_recv.r_head = msg->m_next; if (&msg->m_next == gd->g_recv.r_next) gd->g_recv.r_next = &gd->g_recv.r_head; } } msg_free(msg); GspDumpQueue(gd); } void GspCleanQueue(gs_group_t *gd, gs_sequence_t mseq) { gs_msg_t *q, *msg; gs_log(("Clean gid %d seq %d\n", gd->g_id, mseq)); GspDumpQueue(gd); while ((q = gd->g_recv.r_head) != NULL && q->m_hdr.h_mseq < mseq) { if (&q->m_next == gd->g_recv.r_next) { gd->g_recv.r_next = &gd->g_recv.r_head; } q->m_hdr.h_flags &= ~GS_FLAGS_QUEUED; gd->g_recv.r_head = q->m_next; msg_free(q); } GspDumpQueue(gd); } void GspUOrderInsert(gs_group_t *gd, gs_msg_t *head, gs_msg_t *tail, gs_sequence_t mseq, gs_sequence_t bnum) { gs_msg_t **p; // insert msg into proper order in receive queue // this routine needs to check for duplicates gs_log(("Add ucast gid %d mseq %d,%d head %x tail %x @ next %x\n", gd->g_id, mseq, bnum, head, tail, gd->g_recv.r_next)); p = gd->g_recv.r_next; while (*p) { if ((*p)->m_hdr.h_mseq > mseq) { tail->m_next = *p; *p = head; return; } p = &(*p)->m_next; } // add at tail of history queue tail->m_next = *p; *p = head; GspDumpQueue(gd); } void GspOrderInsert(gs_group_t *gd, gs_msg_t *head, gs_msg_t *tail, gs_sequence_t mseq, gs_sequence_t bnum) { gs_msg_t **p; // check if we have already processed this sequence if (mseq < gd->g_recv.r_mseq || (mseq == gd->g_recv.r_mseq && bnum < gd->g_recv.r_bnum)) { gs_log(("Droping msg %d,%d @ %d,%d\n", mseq, bnum, gd->g_recv.r_mseq, gd->g_recv.r_bnum)); msg_free(head); return; } if (head->m_hdr.h_flags & GS_FLAGS_REPLAY) { p = &gd->g_recv.r_head; while (p != gd->g_recv.r_next && *p != NULL) { if ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum == bnum) { gs_log(("duplicate pending type %d mseq %d bnum %d\n", head->m_hdr.h_type, mseq, bnum)); msg_free(head); return; } } } // insert msg into proper order in receive queue // this routine needs to check for duplicates gs_log(("Add gid %d mseq %d,%d head %x tail %x @ next %x\n", gd->g_id, mseq, bnum, head, tail, gd->g_recv.r_next)); p = gd->g_recv.r_next; while (*p) { if ((*p)->m_hdr.h_mseq > mseq || ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum > bnum)) { tail->m_next = *p; *p = head; return; } else if ((*p)->m_hdr.h_mseq == mseq && (*p)->m_hdr.h_bnum == bnum) { assert(head->m_hdr.h_flags & GS_FLAGS_REPLAY); assert(head == tail); gs_log(("duplicate type %d mseq %d bnum %d\n", head->m_hdr.h_type,mseq, bnum)); msg_free(head); return; } p = &(*p)->m_next; } // add at tail of history queue tail->m_next = *p; *p = head; GspDumpQueue(gd); } void GspReplyMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_group_t *gd; gs_context_t *ctx; hdr = &msg->m_hdr; // find group using group internal identifier gd = GspLookupGroup(hdr->h_gid); GsLockEnter(gd->g_lock); // find context in waiting queue ctx = GspLookupContext(gd, hdr->h_cid); assert(ctx != NULL); if (ctx->ctx_msg == NULL) { err_log(("Internal error gid %d ctx %d mseq %d bnum %d flags %x mask %x\n", ctx->ctx_gid, ctx->ctx_id, ctx->ctx_mseq, ctx->ctx_bnum, ctx->ctx_flags, ctx->ctx_mask)); err_log(("Internal error msg sid %d mid %d gid %d ctx %d mseq %d bnum %d flags %x\n", hdr->h_sid, hdr->h_mid, hdr->h_gid, hdr->h_cid, hdr->h_mseq, hdr->h_bnum, hdr->h_flags)); halt(1); } assert(ctx->ctx_msg != NULL); if (ctx->ctx_msg->m_hdr.h_mseq != hdr->h_mseq) { err_log(("Internal error ctx %d %d reply %d mismatch %d\n", ctx->ctx_id, hdr->h_cid, hdr->h_mseq, ctx->ctx_msg->m_hdr.h_mseq)); halt(1); } GspProcessReply(gd, ctx, msg->m_hdr.h_sid, msg->m_buf, msg->m_hdr.h_len, *((NTSTATUS *)msg->m_hdr.h_tag)); GsLockExit(gd->g_lock); msg_free(msg); } void GspSendAck(gs_group_t *gd, gs_msg_t *msg, NTSTATUS status) { gs_msg_hdr_t *hdr; hdr = &msg->m_hdr; if (hdr->h_cid == (gs_cookie_t) -1) return; gs_log(("Ack nid %d msg %x flags %x\n",hdr->h_sid, msg, msg->m_hdr.h_flags)); if (hdr->h_sid != gd->g_nid) { gs_msg_hdr_t rhdr; memcpy(&rhdr, hdr, sizeof(rhdr)); rhdr.h_sid = (gs_memberid_t) gd->g_nid; rhdr.h_mid = hdr->h_sid; rhdr.h_type = GS_MSG_TYPE_ACK; rhdr.h_len = 0; *((NTSTATUS *)rhdr.h_tag) = status; msg_send(hdr->h_sid, &rhdr, NULL, 0); } else { gs_context_t *ctx; ctx = GspLookupContext(gd, hdr->h_cid); GspProcessReply(gd, ctx, gd->g_nid, NULL, 0, status); } } NTSTATUS WINAPI GsSendReply(HANDLE cookie, PVOID buf, int len, NTSTATUS status) { gs_group_t *gd; gs_msg_t *msg = (gs_msg_t *)cookie; NTSTATUS err = ERROR_SUCCESS; if (msg == NULL || msg->m_hdr.h_rlen < len) return ERROR_INVALID_PARAMETER; // find group gd = GspLookupGroup(msg->m_hdr.h_gid); GsLockEnter(gd->g_lock); if (!(msg->m_hdr.h_flags & GS_FLAGS_REPLY) && msg->m_hdr.h_rlen >= len) { // mark msg state msg->m_hdr.h_flags |= GS_FLAGS_REPLY; gs_log(("Reply msg %x flags %x len %x ubuf %x ulen %x\n",msg, msg->m_hdr.h_flags, msg->m_hdr.h_rlen, buf, len)); // local reply if (msg->m_hdr.h_sid == gd->g_nid) { gs_context_t *ctx; // find context in waiting queue ctx = GspLookupContext(gd, msg->m_hdr.h_cid); assert(ctx != NULL); assert(ctx->ctx_msg->m_hdr.h_mseq == hdr->h_mseq); GspProcessReply(gd, ctx, msg->m_hdr.h_sid, (char *)buf, len, status); } else { gs_msg_hdr_t rhdr; memcpy(&rhdr, &msg->m_hdr, sizeof(rhdr)); rhdr.h_sid = gd->g_nid; rhdr.h_mid = msg->m_hdr.h_sid; rhdr.h_type = GS_MSG_TYPE_REPLY; rhdr.h_len = (UINT16) len; *((NTSTATUS *)rhdr.h_tag) = status; msg_send(rhdr.h_mid, &rhdr, (const char *)buf, len); } // release msg msg_free(msg); } else { gs_log(("Reply failed %x: flags %x len %x ubuf %x ulen %x\n",msg, msg->m_hdr.h_flags, msg->m_hdr.h_rlen, buf, len)); err = ERROR_INVALID_OPERATION; } GsLockExit(gd->g_lock); return err; } static gs_eventid_t GsTypeToEventId[] = { GsEventInvalid, GsEventInvalid, GsEventData, GsEventInvalid, GsEventSingleData, GsEventInvalid, GsEventInvalid, GsEventInvalid, GsEventInvalid, GsEventMemberJoin, GsEventMemberUp, GsEventInvalid, GsEventMemberEvicted, GsEventInvalid, GsEventMemberDown }; #define GsMsgTypeToEventId(x) (x != GS_MSG_TYPE_ABORT ? GsTypeToEventId[x] : GsEventAbort) void GspSyncMember(gs_group_t *gd, gs_memberid_t mid, gs_sequence_t mseq) { gs_msg_t *p; // forward all messages that we have sent with higher sequence number for (p = gd->g_recv.r_head; p != NULL; p = p->m_next) { if (p->m_hdr.h_sid == gd->g_nid && p->m_hdr.h_mseq > mseq && p->m_hdr.h_type != GS_MSG_TYPE_UCAST) { gs_context_t *ctx = &gd->g_send.s_ctxpool[p->m_hdr.h_cid]; assert(ctx->ctx_msg == p); if (!(ctx->ctx_mask & (1 << mid))) { recovery_log(("sync node %d mseq %d\n", mid, p->m_hdr.h_mseq)); ctx->ctx_mask |= (1 << mid); msg_send(mid, &p->m_hdr, p->m_buf, p->m_hdr.h_len); } } } } void GspDeliverMsg(gs_group_t *gd, gs_msg_t *msg) { IO_STATUS_BLOCK ios; NTSTATUS status; gs_memberid_t mid; switch(msg->m_hdr.h_type) { case GS_MSG_TYPE_UP: mid = *((gs_memberid_t *)msg->m_hdr.h_tag); GspAddMember(gd, mid, *(int *)msg->m_buf); GspSyncMember(gd, mid, msg->m_hdr.h_mseq); recovery_log(("New membership gid %d view %d,%d sz %d set %x\n", gd->g_id, gd->g_startview, gd->g_curview, gd->g_sz, gd->g_mset)); break; default: break; } // hold msg msg->m_refcnt++; GsLockExit(gd->g_lock); ios.Status = GsMsgTypeToEventId(msg->m_hdr.h_type); ios.Information = msg->m_hdr.h_len; status = gd->g_callback((HANDLE)msg, msg->m_hdr.h_tag, msg->m_buf, &ios); GsLockEnter(gd->g_lock); if (status == STATUS_PENDING) { gs_log(("Reply msg pending %x\n", msg)); return; } if (!(msg->m_hdr.h_flags & GS_FLAGS_REPLY)) { msg->m_hdr.h_flags |= GS_FLAGS_REPLY; // *((NTSTATUS *)msg->m_hdr.h_tag) = status; // release msg msg->m_refcnt--; GspSendAck(gd, msg, status); } if (msg->m_hdr.h_type == GS_MSG_TYPE_UCAST) { msg->m_refcnt++; msg->m_hdr.h_flags &= ~GS_FLAGS_CONTINUED; GspRemoveMsg(gd, msg); } } void GspDispatch(gs_group_t *gd) { gs_msg_t *msg; assert(gd->g_recv.r_next != NULL); while (gd->g_pending == 0 && (msg = *(gd->g_recv.r_next)) != NULL) { int hit = FALSE; int flags; if (msg->m_hdr.h_type != GS_MSG_TYPE_UCAST) { // compare sequence numbers if (gd->g_recv.r_mseq == msg->m_hdr.h_mseq && gd->g_recv.r_bnum == msg->m_hdr.h_bnum) { // got it hit = TRUE; } } else { // compare sequence numbers if (gd->g_recv.r_mseq >= msg->m_hdr.h_mseq) { // got it hit = TRUE; } } if (hit == FALSE) { break; } gd->g_pending = 1; msg->m_hdr.h_flags &= ~GS_FLAGS_REPLY; flags = msg->m_hdr.h_flags; gs_log(("dispatch seq <%d, %d> flags %x msg %x @ next %x\n", msg->m_hdr.h_mseq, msg->m_hdr.h_bnum, flags, msg, gd->g_recv.r_next)); // advance next msg to deliver gd->g_recv.r_next = &msg->m_next; // don't touch msg beyond this point, it may get freed as part of delivery if (msg->m_hdr.h_type != GS_MSG_TYPE_SKIP) { GspDeliverMsg(gd, msg); } // if a continued msg don't advance mseq/bnum if (!(flags & GS_FLAGS_CONTINUED)) { if (flags & GS_FLAGS_LAST) { gd->g_recv.r_bnum = 0; gd->g_recv.r_mseq++; } else if (!(flags & GS_FLAGS_PTP)) { gd->g_recv.r_bnum += (1 << 16); } } else if (!(flags & GS_FLAGS_PTP)) { gd->g_recv.r_bnum++; } gd->g_pending = 0; } gs_log(("waiting gid %d expect <%d, %d>\n", gd->g_id, gd->g_recv.r_mseq, gd->g_recv.r_bnum)); GspDumpQueue(gd); } #if 0 WINAPI GsReceiveRequest(gd, buf, len, ios) { GsLockEnter(gd->recv_lock); m = gd->recv_last; // advance receive window if (m && m->state == MSG_STATE_DELIVERED) { if (m->flags & GS_FLAGS_DELIVERED) { msg_send_reply(m->srcid, m->mseq, m->cseq..); m->reply = 1; } m->state = MSG_STATE_DONE; // check if this msg can be freed before moving to next one m = m->next; } if (m && m->state == MSG_STATE_READY) { m->state = MSG_STATE_DELIVERED; GsLockExit(gd->recv_lock); memcpy(buf, m->data, m->len); Ios->status = m->srcid; Ios->information = m->len; Return SUCCESS; } // queue request irp->next = gd->recv_pending_queue; gd->recv_pending_queue = irp; GsLockExit(gd->recv_lock); Return PENDING; } #endif void GspMcastMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_group_t *gd; hdr = &msg->m_hdr; gd = GspLookupGroup(hdr->h_gid); // accept messages only if in a valid view if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) { gs_sequence_t lseq = msg->m_hdr.h_lseq; GsLockEnter(gd->g_lock); hdr->h_flags |= GS_FLAGS_QUEUED; // insert msg into dispatch queue at proper order GspOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum); GspDispatch(gd); GspCleanQueue(gd, lseq); GsLockExit(gd->g_lock); } else { msg_free(msg); } } void GspUcastMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_group_t *gd; hdr = &msg->m_hdr; gd = GspLookupGroup(hdr->h_gid); if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) { gs_sequence_t lseq = msg->m_hdr.h_lseq; GsLockEnter(gd->g_lock); hdr->h_flags |= GS_FLAGS_QUEUED; // insert msg into dispatch queue at proper order GspUOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum); GspDispatch(gd); GspCleanQueue(gd, lseq); GsLockExit(gd->g_lock); } else { gs_log(("Dropping ucast: gid %d nid %d mseq %d view %d\n", hdr->h_gid, hdr->h_mid, hdr->h_mseq, hdr->h_viewnum)); msg_free(msg); } } void GspSeqAllocMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_seq_info_t info; gs_group_t *gd; hdr = &msg->m_hdr; gd = GspLookupGroup(hdr->h_gid); if (gd) { GsLockEnter(gd->g_lock); info.mseq = gd->g_global_seq++; info.viewnum = gd->g_curview; GsLockExit(gd->g_lock); hdr->h_mid = hdr->h_sid; hdr->h_sid = gd->g_nid; hdr->h_type = GS_MSG_TYPE_SEQREPLY; hdr->h_len = sizeof(info); gs_log(("SeqAlloc: nid %d mseq %d view %d\n", hdr->h_mid, info.mseq, info.viewnum)); msg_send(hdr->h_mid, hdr, (char *) &info, sizeof(info)); } msg_free(msg); } void GspSeqReplyMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_group_t *gd; gs_context_t *ctx; hdr = &msg->m_hdr; assert(hdr->h_len == sizeof(gs_seq_info_t)); // find group using group internal identifier gd = GspLookupGroup(hdr->h_gid); if (gd != NULL && GspValidateView(gd, hdr->h_viewnum)) { gs_seq_info_t *info = (gs_seq_info_t *)msg->m_buf; GsLockEnter(gd->g_lock); if (GspValidateView(gd, info->viewnum) && hdr->h_sid == gd->g_mid) { GspProcessWaitQueue(gd, info); } GsLockExit(gd->g_lock); } msg_free(msg); } void GspJoinRequestMsgHandler(gs_msg_t *msg) { gs_msg_hdr_t *hdr; gs_join_info_t info; gs_group_t *gd; hdr = &msg->m_hdr; gd = GspLookupGroup(hdr->h_gid); if (gd) { GsLockEnter(gd->g_lock); info.mseq = gd->g_global_seq++; info.viewnum = gd->g_curview; info.mset = gd->g_mset; info.sz = gd->g_sz; GsLockExit(gd->g_lock); hdr->h_mid = hdr->h_sid; hdr->h_sid = gd->g_nid; hdr->h_type = GS_MSG_TYPE_REPLY; hdr->h_len = sizeof(info); msg_send(hdr->h_mid, hdr, (char *) &info, sizeof(info)); } msg_free(msg); } void GspJoinUpMsgHandler(gs_msg_t *msg) { gs_group_t *gd; gs_msg_hdr_t *hdr; hdr = &msg->m_hdr; gd = GspLookupGroup(hdr->h_gid); // accept messages only if in a valid view if (gd && GspValidateView(gd, msg->m_hdr.h_viewnum)) { GsLockEnter(gd->g_lock); hdr->h_flags |= GS_FLAGS_QUEUED; // insert msg into dispatch queue at proper order GspOrderInsert(gd, msg, msg, hdr->h_mseq, hdr->h_bnum); GspDispatch(gd); GsLockExit(gd->g_lock); } else { msg_free(msg); } } void GspInfoMsgHandler(gs_msg_t *); void GspMmMsgHandler(gs_msg_t *); void GspRecoveryMsgHandler(gs_msg_t *); void GspSyncMsgHandler(gs_msg_t *); gs_msg_handler_t gs_msg_handler[] = { GspSeqAllocMsgHandler, GspSeqReplyMsgHandler, GspMcastMsgHandler, GspReplyMsgHandler, GspUcastMsgHandler, GspReplyMsgHandler, GspInfoMsgHandler, GspMmMsgHandler, GspJoinRequestMsgHandler, GspJoinUpMsgHandler, // join GspJoinUpMsgHandler, // up NULL, // evict request NULL, // evict GspRecoveryMsgHandler, GspSyncMsgHandler };