/*++ Copyright (c) 2001 Microsoft Corporation Module Name: crs.c Abstract: Implements Consistency Replica Set Algorithm Author: Ahmed Mohamed (ahmedm) 1-Jan-2001 Revision History: --*/ #include #include #include #include #include #include #include #include "crs.h" #define xmalloc(size) VirtualAlloc(NULL, size,MEM_RESERVE|MEM_COMMIT,PAGE_READWRITE) #define xfree(buffer) VirtualFree(buffer, 0, MEM_RELEASE|MEM_DECOMMIT) #define CrspEqual(r1,r2) ((r1)->hdr.seq == (r2)->hdr.seq && \ (r1)->hdr.epoch == (r2)->hdr.epoch && \ (r1)->hdr.state == (r2)->hdr.state) DWORD CrsForcedQuorumSize = 0xffff; void WINAPI CrsSetForcedQuorumSize(DWORD size) { CrsForcedQuorumSize = size; } DWORD CrspFindLast(CrsInfo_t *p, DWORD logsz) { CrsRecord_t *rec, *last_rec; BOOL err; DWORD n, i; n = SetFilePointer(p->fh, 0, NULL, FILE_BEGIN); if (n == INVALID_SET_FILE_POINTER) { return GetLastError(); } err = ReadFile(p->fh, p->buf, logsz, &n, NULL); if (!err) return GetLastError(); if (n != logsz) { CrsLog(("Crs%d: failed to load complete file, read %d expected %d\n", p->lid, n, logsz)); return ERROR_BAD_LENGTH; } ASSERT(p->max_records * CRS_RECORD_SZ == (int)n); if(p->max_records * CRS_RECORD_SZ != (int)n) { CrsLog(("Crs%d: unable to load log file %d bytes, got %d bytes\n", p->lid, n, logsz)); return ERROR_BAD_LENGTH; } CrsLog(("Crs%d: loaded %d bytes, %d records\n", p->lid, n, p->max_records)); last_rec = NULL; rec = p->buf; for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) { if (rec->hdr.tag != CRS_TAG) { CrsLog(("crs%d: Bad record %d, got %x expected %x\n", p->lid, i/CRS_RECORD_SZ, rec->hdr.tag, CRS_TAG)); return ERROR_BAD_FORMAT; } if (!last_rec || rec->hdr.epoch > last_rec->hdr.epoch || (rec->hdr.epoch == last_rec->hdr.epoch && (rec->hdr.seq > last_rec->hdr.seq))) { last_rec = rec; } } ASSERT(last_rec); // make sure only the last record is not committed or aborted rec = p->buf; for (i = 0; i < logsz; i += CRS_RECORD_SZ, rec++) { if (!(rec->hdr.state & (CRS_COMMIT | CRS_ABORT))) { if (rec != last_rec) { CrsLog(("crs:%d Bad record %d state %x expected commit|abort\n", p->lid, i/CRS_RECORD_SZ, rec->hdr.state)); return ERROR_INTERNAL_ERROR; } } } p->last_record = (int) (last_rec - p->buf); p->seq = last_rec->hdr.seq; p->epoch = last_rec->hdr.epoch; return ERROR_SUCCESS; } #define CrspFlush(p,offset) CrspWrite(p,offset, CRS_SECTOR_SZ) static DWORD CrspWrite(CrsInfo_t *p, int offset, DWORD length) { DWORD n; p->pending = FALSE; n = (DWORD) offset; // write out last sector, assumes lock is held ASSERT(offset < p->max_records); offset = offset / CRS_RECORDS_PER_SECTOR; CrsLog(("Crs%d: flush %d bytes record %d -> %d,%d\n", p->lid, length, n, offset, offset*CRS_SECTOR_SZ)); n = SetFilePointer(p->fh, offset * CRS_SECTOR_SZ, NULL, FILE_BEGIN); if (n == INVALID_SET_FILE_POINTER) { return GetLastError(); } n = 0; if (WriteFile(p->fh, (PVOID) &p->buf[offset*CRS_RECORDS_PER_SECTOR], length, &n, NULL)) { if (n != length) { CrsLog(("Write count mismatch, wrote %d, expected %d\n", n, length)); return ERROR_BAD_LENGTH; } return ERROR_SUCCESS; } n = GetLastError(); CrsLog(("Crs%d: flush record %d failed err %d\n", p->lid, offset, n)); if (n == ERROR_UNEXP_NET_ERR) { // repeat the write one more time p->pending = TRUE; } return n; } static DWORD CrspAppendRecord(CrsInfo_t *p, CrsRecord_t *rr, CrsRecord_t **rec) { CrsRecord_t *q; DWORD err; // tag record rr->hdr.tag = CRS_TAG; // assumes lock is held if ((p->last_record & CRS_SECTOR_MASK) == CRS_SECTOR_MASK) { // flush current sector err = CrspFlush(p, p->last_record); if (err != ERROR_SUCCESS) return err; } // advance last record p->last_record++; if (p->last_record == p->max_records) p->last_record = 0; CrsLog(("Crs%d: append record %d epoch %I64d seq %I64d state %x\n", p->lid, p->last_record, rr->hdr.epoch, rr->hdr.seq, rr->hdr.state)); // copy record q = &p->buf[p->last_record]; memcpy((PVOID)q, (PVOID) rr, CRS_RECORD_SZ); // flush it out now err = CrspFlush(p, p->last_record); if (err == ERROR_SUCCESS) { if (rec) *rec = q; } else { if (p->last_record == 0) p->last_record = p->max_records; p->last_record--; } return err; } // NextRecord: // if seq is null, fill in last record and return SUCCESS // if seq is not found, return NOT_FOUND // if seq is last record, return EOF // otherwise return next record after seq in lrec and SUCCESS DWORD CrspNextLogRecord(CrsInfo_t *info, CrsRecord_t *seq, CrsRecord_t *lrec, BOOLEAN this_flag) { CrsRecord_t *last, *p; DWORD err = ERROR_SUCCESS; if (lrec == NULL || info == NULL) { return ERROR_INVALID_PARAMETER; } // read record EnterCriticalSection(&info->lock); last = &info->buf[info->last_record]; if (seq == NULL) { CrsLog(("Crs%d: last record %d %I64d %I64d\n", info->lid, info->last_record, last->hdr.epoch, last->hdr.seq)); // read last record memcpy(lrec, last, CRS_RECORD_SZ); } else if (seq->hdr.epoch != last->hdr.epoch || seq->hdr.seq != last->hdr.seq) { int i; CrsLog(("Crs%d: last record %d %I64d %I64d search %I64d %I64d\n", info->lid, info->last_record, last->hdr.epoch, last->hdr.seq, seq->hdr.epoch, seq->hdr.seq)); // assume we don't have it p = seq; seq = NULL; // do a search instead of index, so that // seq can be reset as epoch increments for (i = 0; i < info->max_records; i++) { last = &info->buf[i]; if (p->hdr.epoch == last->hdr.epoch && p->hdr.seq == last->hdr.seq) { seq = last; break; } } if (seq != NULL) { if (this_flag == FALSE) { // return record after this one i++; if (i >= info->max_records) i = 0; seq = &info->buf[i]; } CrsLog(("Crs%d: search found %d %I64d, %I64d\n", info->lid, seq - info->buf, seq->hdr.epoch, seq->hdr.seq)); memcpy(lrec, seq, CRS_RECORD_SZ); } else { err = ERROR_NOT_FOUND; } } else { CrsLog(("Crs%d: reached last record %d %I64d %I64d, %I64d %I64d\n", info->lid, info->last_record, last->hdr.epoch, last->hdr.seq, seq->hdr.epoch, seq->hdr.seq)); if (this_flag == TRUE) { // we are trying to read the last record memcpy(lrec, last, CRS_RECORD_SZ); err = ERROR_SUCCESS; } else { err = ERROR_HANDLE_EOF; } } LeaveCriticalSection(&info->lock); if (err == ERROR_SUCCESS && lrec->hdr.epoch == 0) { // invalid rec, log is empty err = ERROR_HANDLE_EOF; } return err; } // Call into fs with // undo: pass replica in recovery due to a conflict // replay: replica is missing change, if replay fails with abort, we // do a full copy; otherwise we issue a skip record // query: ask replica if record was completed or not // done: signal end of recovery and pass in new wset, rset // we silently handle // abort: add a skip record // epoch records: just log it as is DWORD CrspReplay(LPVOID rec) { CrsRecoveryBlk_t *rr; CrsInfo_t *info, *minfo; CrsRecord_t *p, *q; CrsRecord_t lrec, mlrec; DWORD err; rr = (CrsRecoveryBlk_t *) rec; info = rr->info; minfo = rr->minfo; CrsLog(("CrsReplay%d mid %d, lid %d leader_id %d\n", rr->nid, rr->mid, info->lid, info->leader_id)); do { p = NULL; // read last record err = CrspNextLogRecord(info, NULL, &lrec, FALSE); if (err != ERROR_SUCCESS) { CrsLog(("CrsReplay%d: unable to read last record %d\n", info->lid, err)); break; } // find our last record in master replica q = &lrec; p = &mlrec; err = CrspNextLogRecord(minfo, q, p, TRUE); // if found and consistent with master, no undo if (err == ERROR_SUCCESS && p->hdr.state == q->hdr.state) { CrsLog(("CrsReplay%d: last record %I64d, %I64d consistent %x %x\n", info->lid, q->hdr.epoch, q->hdr.seq, p->hdr.state, q->hdr.state)); break; } if (err != ERROR_SUCCESS) { CrsLog(("CrsReplay%d: missing lrec %I64d, %I64d in disk %d, err %d\n", info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid, err)); } else { CrsLog(("CrsReplay%d: undo last record %I64d, %I64d %x needs %x\n", info->lid, q->hdr.epoch, q->hdr.seq, q->hdr.state, p->hdr.state)); ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT)); } // last record is in conflict, we must undo it first if (!(q->hdr.state & CRS_EPOCH)) { // if we found this record in master and a conflict is detected, // we undo it. Otherwise, we need to do a full copy if (err == ERROR_SUCCESS) { ASSERT(p->hdr.state & (CRS_COMMIT|CRS_ABORT)); ASSERT(q->hdr.state & CRS_PREPARE); err = info->callback(info->callback_arg, rr->nid, q, CRS_ACTION_UNDO, rr->mid); } } else { // A missing epoch record doesn't mean we are old. A regroup // could have happened but no new data records got added. We // undo it, and continue; err = STATUS_SUCCESS; } if (err == STATUS_SUCCESS) { // update current record, sequence, epoch info->buf[info->last_record].hdr.state = 0; info->buf[info->last_record].hdr.epoch = 0; info->buf[info->last_record].hdr.seq = 0; if (info->last_record == 0) { info->last_record = info->max_records; } info->last_record--; info->seq = info->buf[info->last_record].hdr.seq; info->epoch = info->buf[info->last_record].hdr.epoch; CrsLog(("CrsReplay%d: new last record %d %I64d, %I64d\n", info->lid, info->last_record, info->epoch, info->seq)); } else { // can't undo it, do full copy and readjust our log CrsLog(("CrsReplay%d: Unable to undo record %I64d, %I64d\n", info->lid, q->hdr.epoch, q->hdr.seq)); p = NULL; } } while (err == STATUS_SUCCESS && info->state == CRS_STATE_RECOVERY); while (p != NULL && info->state == CRS_STATE_RECOVERY) { // read master copy err = CrspNextLogRecord(minfo, p, &mlrec, FALSE); if (err != ERROR_SUCCESS) { if (err == ERROR_HANDLE_EOF) { CrsLog(("CrsReplay%d: last record %I64d, %I64d in disk %d\n", info->lid, q->hdr.epoch, q->hdr.seq, minfo->lid)); // the last record is where we are at info->seq = info->buf[info->last_record].hdr.seq; info->epoch = info->buf[info->last_record].hdr.epoch; // we reached the end, signal end of recovery err = info->callback(info->callback_arg, rr->nid, p, CRS_ACTION_DONE, rr->mid); goto exit; } break; } p = &mlrec; if ((p->hdr.state & CRS_EPOCH) || (p->hdr.state & CRS_ABORT)) { CrsLog(("CrsReplay%d: skip record %I64d, %I64d %x\n", info->lid, p->hdr.epoch, p->hdr.seq, p->hdr.state)); err = !STATUS_SUCCESS; } else if (p->hdr.state & CRS_COMMIT) { err = info->callback(info->callback_arg, rr->nid, p, CRS_ACTION_REPLAY, rr->mid); if (err == STATUS_TRANSACTION_ABORTED) { CrsLog(("CrsReplay: failed nid %d seq %I64d err %d\n", rr->nid, p->hdr.seq, err)); break; } } else { ASSERT(p->hdr.state & CRS_PREPARE); // what if the record is prepared but not yet committed or // aborted; in transit record. // stop now CrsLog(("CrsReplay%d: bad record seq %I64d state %x\n", rr->nid, p->hdr.seq, p->hdr.state)); break; } if (err != STATUS_SUCCESS) { // add record err = CrspAppendRecord(info, p, NULL); if (err != ERROR_SUCCESS) { CrsLog(("CrsReplay%d: failed append seq %I64d err %d\n", rr->nid, p->hdr.seq, err)); break; } if (p->hdr.state & CRS_EPOCH) { ; //ASSERT(info->epoch+1 == p->hdr.epoch); } else { ASSERT(info->epoch == p->hdr.epoch); ASSERT(info->seq+1 == p->hdr.seq); } info->seq = p->hdr.seq; info->epoch = p->hdr.epoch; } else { // make sure we have added it ASSERT(info->seq == p->hdr.seq); ASSERT(info->epoch == p->hdr.epoch); ASSERT(info->buf[info->last_record].hdr.seq == p->hdr.seq); ASSERT(info->buf[info->last_record].hdr.epoch == p->hdr.epoch); // Propagate dubious bit if (p->hdr.state & CRS_DUBIOUS) { info->buf[info->last_record].hdr.state |= CRS_DUBIOUS; } ASSERT(info->buf[info->last_record].hdr.state == p->hdr.state); } } if (p == NULL || err != STATUS_SUCCESS) { CrsLog(("CrsReplay%d: Full copy from disk %d\n", info->lid, minfo->lid)); // we are out of date or need full recovery, do a full copy err = info->callback(info->callback_arg, rr->nid, NULL, CRS_ACTION_COPY, rr->mid); if (err == STATUS_SUCCESS) { DWORD len; // we now copy our master log and flush it ASSERT(minfo->max_records == info->max_records); len = info->max_records * CRS_RECORD_SZ; memcpy(info->buf, minfo->buf, len); err = CrspWrite(info, 0, len); if (err == ERROR_SUCCESS) { // adjust our state info->last_record = minfo->last_record; info->seq = info->buf[info->last_record].hdr.seq; info->epoch = info->buf[info->last_record].hdr.epoch; // we reached the end, signal end of recovery err = info->callback(info->callback_arg, rr->nid, p, CRS_ACTION_DONE, rr->mid); } } } exit: CrsLog(("CrsReplay%d mid %d status 0x%x\n", rr->nid, rr->mid, err)); return err; } /////////////////////// Public Functions ////////////////////// DWORD WINAPI CrsOpen(crs_callback_t callback, PVOID callback_arg, USHORT lid, WCHAR *log_name, int max_logsectors, HANDLE *outhdl) { // Open the log file // If the file in newly create, set the proper size // If the file size is not the same size, we need to either // expand or truncate the file. (truncate needs copy) // Scan file to locate last sector and record // If last record hasn't been commited, issue a query. // If query succeeded then, mark it as committed. // Set epoch,seq DWORD status; HANDLE maph; CrsInfo_t *p; int logsz; if (outhdl == NULL) { return ERROR_INVALID_PARAMETER; } *outhdl = NULL; p = (CrsInfo_t *) malloc(sizeof(*p)); if (p == NULL) { return ERROR_NOT_ENOUGH_MEMORY; } memset((PVOID) p, 0, sizeof(*p)); CrsLog(("Crs%d file '%S'\n", lid, log_name)); p->lid = lid; p->callback = callback; p->callback_arg = callback_arg; p->pending = FALSE; // Create log file, and set size of newly created p->fh = CreateFileW(log_name, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_ALWAYS, FILE_FLAG_WRITE_THROUGH, NULL); status = GetLastError(); if(p->fh == INVALID_HANDLE_VALUE){ free((char *) p); return status; } // acquire an exclusive lock on the whole file if (!LockFile(p->fh, 0, 0, (DWORD)-1, (DWORD)-1)) { FILE_FULL_EA_INFORMATION ea[2] = {0}; IO_STATUS_BLOCK ios; NTSTATUS err; // get status status = GetLastError(); // change the ea to cause a notification to happen ea[0].NextEntryOffset = 0; ea[0].Flags = 0; ea[0].EaNameLength = 1; ea[0].EaValueLength = 1; ea[0].EaName[0] = 'X'; err = NtSetEaFile(p->fh, &ios, (PVOID) ea, sizeof(ea)); CrsLog(("Crs%d Setting EA %x\n", lid, err)); goto error; } if (status == ERROR_ALREADY_EXISTS) { // todo: compare current file size to new size and adjust file // size accordingly. For now, just use old size logsz = GetFileSize(p->fh, NULL); CrsLog(("Crs%d: Filesz %d max_sec %d\n", lid, logsz, max_logsectors)); ASSERT(logsz == max_logsectors * CRS_SECTOR_SZ); } else { //extend the file pointer to max size logsz = max_logsectors * CRS_SECTOR_SZ; SetFilePointer(p->fh, logsz, NULL, FILE_BEGIN); SetEndOfFile(p->fh); CrsLog(("Crs%d: Set Filesz %d max_sec %d\n", lid, logsz, max_logsectors)); } // allocate file copy in memory p->buf = xmalloc(logsz); if (p->buf == NULL) { status = ERROR_NOT_ENOUGH_MEMORY; goto error; } // set max record p->max_records = logsz / CRS_RECORD_SZ; if (status == ERROR_ALREADY_EXISTS) { // load file and compute last epoch/seq status = CrspFindLast(p, logsz); } else { status = !ERROR_SUCCESS; } // init the file, when we detect a read failure or first time if (status != ERROR_SUCCESS) { CrsRecord_t *r; int i; // initialize file p->seq = 0; p->epoch = 0; p->last_record = 0; r = p->buf; for (i = 0; i < logsz; i+= CRS_RECORD_SZ, r++) { r->hdr.epoch = p->epoch; r->hdr.seq = p->seq; r->hdr.tag = CRS_TAG; r->hdr.state = CRS_COMMIT | CRS_PREPARE | CRS_EPOCH; } status = CrspWrite(p, 0, logsz); } if (status != ERROR_SUCCESS) { goto error; } CrsLog(("Crs%d: %x Last record %d max %d epoch %I64d seq %I64d\n", p->lid, p->fh, p->last_record, p->max_records, p->epoch, p->seq)); // initialize rest of state p->state = CRS_STATE_INIT; p->refcnt = 1; p->leader_id = 0; InitializeCriticalSection(&p->lock); *outhdl = p; return ERROR_SUCCESS; error: CloseHandle(p->fh); if (p->buf) { xfree(p->buf); } free((PVOID) p); return status; } // DWORD WINAPI CrsStart(PVOID *hdls, ULONG alive_set, int cluster_sz, ULONG *write_set, ULONG *read_set, ULONG *evict_set) { DWORD status; CrsInfo_t **info = (CrsInfo_t **) hdls; int i, active_sz, mid; ULONG mask, active_set, fail_set; CrsInfo_t *p; CrsRecord_t *q, *mlrec; if (write_set) *write_set = 0; if (read_set) *read_set = 0; if (evict_set) *evict_set = 0; // no alive node if (cluster_sz == 0 || alive_set == 0) { // nothing to do return ERROR_WRITE_PROTECT; } // scan each hdl and make sure it is initialized and lock all hdls mask = alive_set; for (i = 0; mask != 0; i++, mask = mask >> 1) { if (!(mask & 0x1)) { continue; } p = info[i]; if (p == NULL) { continue; } EnterCriticalSection(&p->lock); // check the state of the last record p = info[i]; q = &p->buf[p->last_record]; CrsLog(("Crs%d last record %d epoch %I64d seq %I64d state %x\n", p->lid, p->last_record, q->hdr.epoch, q->hdr.seq, q->hdr.state)); } mid = 0; mlrec = NULL; // select master replica for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) { if (!(mask & 0x1)) { continue; } p = info[i]; if (p == NULL) continue; q = &p->buf[p->last_record]; if (!mlrec || mlrec->hdr.epoch < q->hdr.epoch || (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq < q->hdr.seq) || (mlrec->hdr.epoch == q->hdr.epoch && mlrec->hdr.seq == q->hdr.seq && mlrec->hdr.state != q->hdr.state && (q->hdr.state & CRS_COMMIT))) { mid = i; mlrec = q; } } ASSERT(mid != 0); // if master last record is in doubt, query filesystem. If the filesystem // is certain that the operation has occured, it returns STATUS_SUCCESS for // COMMIT, STATUS_CANCELLED for ABORT, and STATUS_NOT_FOUND for can't tell. // All undetermined IO must be undone and redone in all non-master replicas // to ensure all replicas reach consistency. This statement is true even // for replicas that are currently absent from our set. We tag such records // we both COMMIT and ABORT, so that the replay thread issues replay for // new records and undo,replay for last records p = info[mid]; p->leader_id = (USHORT) mid; ASSERT(mlrec != NULL); if (!(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT))) { ASSERT(mlrec->hdr.state & CRS_PREPARE); status = p->callback(p->callback_arg, p->lid, mlrec, CRS_ACTION_QUERY, p->lid); if (status == STATUS_SUCCESS) { mlrec->hdr.state |= CRS_COMMIT; } else if (status == STATUS_CANCELLED) { mlrec->hdr.state |= CRS_ABORT; } else if (status == STATUS_NOT_FOUND) { // assume it is committed, but mark it for undo during recovery mlrec->hdr.state |= (CRS_COMMIT | CRS_DUBIOUS); } // todo: if status == TRANSACTION_ABORTED, we need to bail out since // must master is dead // no need to flush, I think! // CrspFlush(p, p->last_record); // todo: what if the flush fails here, I am assuming that // an append will equally fail. } ASSERT(mlrec->hdr.state & (CRS_COMMIT | CRS_ABORT)); // compute sync and recovery masks fail_set = 0; active_set = 0; active_sz = 0; for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) { if (!(mask & 0x1)) { continue; } p = info[i]; if (p == NULL) { continue; } // set leader id p->leader_id = (USHORT) mid; q = &p->buf[p->last_record]; if (CrspEqual(mlrec, q)) { ASSERT(q->hdr.state & (CRS_COMMIT | CRS_ABORT)); p->state = CRS_STATE_READ; active_set |= (1 << i); active_sz++; } else if (p->state != CRS_STATE_RECOVERY) { CrsRecoveryBlk_t rrbuf; CrsRecoveryBlk_t *rr = &rrbuf; // recover replica rr->nid = i; rr->mid = mid; rr->info = p; rr->minfo = info[mid]; // set recovery state p->state = CRS_STATE_RECOVERY; status = CrspReplay((LPVOID) rr); // if we fail, evict this replica if (status != ERROR_SUCCESS) { fail_set |= (1 << i); } else { // repeat this replica again i--; mask = mask << 1; } } } // assume success status = ERROR_SUCCESS; // set read sets if (read_set) *read_set = active_set; if (!CRS_QUORUM(active_sz, cluster_sz)) { CrsLog(("No quorum active %d cluster %d\n", active_sz, cluster_sz)); mid = 0; status = ERROR_WRITE_PROTECT; } else { int pass_cnt = 0; ULONG pass_set = 0; // Enable writes on all active replicas for (i = 0, mask = active_set; mask != 0; i++, mask = mask >> 1) { CrsRecord_t rec; if (!(mask & 0x1)) { continue; } p = info[i]; if (p == NULL) continue; p->state = CRS_STATE_WRITE; // we now generate a new epoch and flush it to the disk p->epoch++; if (p->epoch == 0) p->epoch = 1; // reset seq to zero p->seq = 0; // write new epoch now, if not a majority replicas succeeded in writing // the new we fail rec.hdr.epoch = p->epoch; rec.hdr.seq = p->seq; rec.hdr.state = CRS_PREPARE | CRS_COMMIT | CRS_EPOCH; memset(rec.data, 0, sizeof(rec.data)); if (CrspAppendRecord(p, &rec, NULL) == ERROR_SUCCESS) { pass_cnt++; pass_set |= (1 << i); } else { fail_set |= (1 << i); } } // Recheck to make sure all replicas have advanced epoch if (!CRS_QUORUM(pass_cnt, cluster_sz)) { CrsLog(("No quorum due to error pass %d cluster %d\n", pass_cnt, cluster_sz)); mid = 0; pass_set = 0; pass_cnt = 0; status = ERROR_WRITE_PROTECT; } if (pass_cnt != active_sz) { // some replicas have died for (i = 0, mask = pass_set; mask != 0; i++, mask = mask >> 1) { if ((alive_set & (1 << i)) && (!mask & (1 << i))) { p = info[i]; ASSERT(p != NULL); p->state = CRS_STATE_READ; } } } // set write set if (write_set) *write_set = pass_set; } if (evict_set) *evict_set = fail_set; // unlock all hdls and set new master if any for (i = 0, mask = alive_set; mask != 0; i++, mask = mask >> 1) { if (!(mask & 0x1)) { continue; } p = info[i]; if (p == NULL) continue; p->leader_id = (USHORT) mid; LeaveCriticalSection(&p->lock); } return status; } void WINAPI CrsClose(PVOID hd) { DWORD err; CrsInfo_t *info = (CrsInfo_t *) hd; // If we any recovery threads running, make sure we terminate them first // before close and free all of this stuff if (info == NULL) { CrsLog(("CrsClose: try to close a null handle!\n")); return; } // Flush everything out and close the file EnterCriticalSection(&info->lock); // flush CrspFlush(info, info->last_record); LeaveCriticalSection(&info->lock); DeleteCriticalSection(&info->lock); err = CloseHandle(info->fh); CrsLog(("Crs%d: %x Closed %d\n", info->fh, info->lid, err)); xfree(info->buf); free((char *) info); } void WINAPI CrsFlush(PVOID hd) { CrsInfo_t *info = (CrsInfo_t *) hd; // if we have a commit or abort that isn't flushed yet, flush it now EnterCriticalSection(&info->lock); if (info->pending == TRUE) { CrspFlush(info, info->last_record); } LeaveCriticalSection(&info->lock); } PVOID WINAPI CrsPrepareRecord(PVOID hd, PVOID lrec, crs_id_t id) { CrsRecord_t *p = (CrsRecord_t *)lrec; CrsInfo_t *info = (CrsInfo_t *) hd; DWORD err; // move to correct slot in this sector. If we need a new sector, // read it from the file. Make sure we flush any pending commits on // current sector before we over write our in memory sector buffer. // prepare record, if seq none 0 then we are skipping the next sequence EnterCriticalSection(&info->lock); if (info->state == CRS_STATE_WRITE || (info->state == CRS_STATE_RECOVERY && id != NULL && id[0] != 0)) { if (id != NULL && id[0] != 0) { CrsHdr_t *tmp = (CrsHdr_t *) id; assert(id[0] == info->seq+1); p->hdr.seq = tmp->seq; p->hdr.epoch = tmp->epoch; } else { p->hdr.seq = info->seq+1; p->hdr.epoch = info->epoch; } p->hdr.state = CRS_PREPARE; err = CrspAppendRecord(info, p, &p); if (err == ERROR_SUCCESS) { // we return with the lock held, gets release on commitorabort CrsLog(("Crs%d prepare %x seq %I64d\n",info->lid, p, p->hdr.seq)); return p; } CrsLog(("Crs%d: Append failed seq %I64%d\n", info->lid, p->hdr.seq)); } else { CrsLog(("Crs%d: Prepare bad state %d id %x\n", info->lid, info->state, id)); } LeaveCriticalSection(&info->lock); return NULL; } int WINAPI CrsCommitOrAbort(PVOID hd, PVOID lrec, int commit) { CrsRecord_t *p = (CrsRecord_t *)lrec; CrsInfo_t *info = (CrsInfo_t *) hd; if (p == NULL || info == NULL) { return ERROR_INVALID_PARAMETER; } // update state of record if (p->hdr.seq != info->seq+1) { CrsLog(("Crs: sequence mis-match on commit|abort %I64d %I64d\n", p->hdr.seq, info->seq)); assert(0); return ERROR_INVALID_PARAMETER; } assert(!(p->hdr.state & (CRS_COMMIT | CRS_ABORT))); // todo: this is wrong, what if one replica succeeds // and others abort. Now, the others will reuse the // same seq for a different update and when the // succeeded replica rejoins it can't tell that the // sequence got reused. if (commit == TRUE) { p->hdr.state |= CRS_COMMIT; // advance the sequence info->seq++; CrsLog(("Crs%d: commit last %d leader %d seq %I64d\n", info->lid, info->last_record, info->leader_id, p->hdr.seq)); } else { p->hdr.state |= CRS_ABORT; // we need to re-adjust our last record if (info->last_record == 0) { info->last_record = info->max_records; } info->last_record--; CrsLog(("Crs%d: abort last %d leader %d seq %I64d\n", info->lid, info->last_record, info->leader_id, p->hdr.seq)); } info->pending = TRUE; LeaveCriticalSection(&info->lock); return ERROR_SUCCESS; } int WINAPI CrsCanWrite(PVOID hd) { CrsInfo_t *info = (CrsInfo_t *) hd; int err; // do we have a quorm or not EnterCriticalSection(&info->lock); err = (info->state == CRS_STATE_WRITE); LeaveCriticalSection(&info->lock); return err; }