import { ArrowAliasV2, DataSet, DataSetView } from '../../interfaces/arrow-alias.interface';
import { GeneralHelpers } from '../../helpers/general.helper'
import { tableFromIPC, tableToIPC } from 'apache-arrow';
import { PutStreamMachine } from './state-machine/put-stream';
import { signal } from '@angular/core'

const DEFAULT_BATCH_SIZE = 9000;
const MAX_BATCH_SIZE = 250000;

export type ResponseHeader = {
  message: string,
  trans_id: number,
}

export type CreateTableParams = {
  name: string,
  bucket_name: string,
  bucket_path: string,
  src_bucket: string,
  src_path: string,
  config_id: string|undefined,
}

enum ArrowClientState {
  DISCONNECTED,
  AWAIT_CONNECT,
  AWAIT_DS_LIST,
  OPEN,
}

type ReadInfo = {
  buffer: Buffer,
  promise: Promise<Uint8Array>,
}

type ReadCallbacks = {
  queryFailed: (reason: any) => void,
  querySuccess: (data: Uint8Array) => void,
  asTable: boolean,
}

export class ArrowClient {
  #ws!: WebSocket;
  #state = ArrowClientState.DISCONNECTED;
  #transId = 1;
  #disconnecting = false;
  #currentPut: PutStreamMachine | undefined = undefined;
  #resolveConnected = () => {};
  #pending = new Map<number, (message: any) => void>()
  #dataSets = signal(new Map<string, any>());
  /** signal with the datasets for this connection keyed by name */
  dataSets = this.#dataSets.asReadonly();
  #views = new Map<string, any>();
  #reads = new Map<number, ReadInfo>();
  #completeReads = new Map<number, ReadCallbacks>();
  #readySignal = signal<boolean>(false);
  /** signal that the client is ready to use */
  ready = this.#readySignal.asReadonly();

  // callback to be called when the websocket is closed
  wsClosedCallback = () => {};

  constructor(public alias: ArrowAliasV2, public readonly url: string) {
    try {
      this.#ws = new WebSocket(url);
    } catch (e) {
      console.error('ArrowClient: Failed to create WebSocket', e);
    }
  }

  /**
   * Returns true if the client is connected to the server
   *
   * @readonly
   */
  get connected() {
    return this.#state === ArrowClientState.OPEN && this.#ws.readyState === WebSocket.OPEN;
  }

  async connect() : Promise<void> {
    return new Promise<void>((resolve, reject) => {
      this.#resolveConnected = resolve;
      this.#ws.binaryType = 'arraybuffer';
      this.#ws.onopen = () => {
        this.#ws.send(JSON.stringify({ message: 'Connect', url: this.alias.url, trans_id: this.#transId++ }))
      };
      this.#ws.onmessage = (event) => {
        this.#handleMessage(event);
      };
      this.#ws.onclose = () => {
        this.wsClosedCallback();
        this.#state = ArrowClientState.DISCONNECTED;
        this.#readySignal.set(false);
        if (!this.#disconnecting) {
          reject('ArrowClient: Connection closed unexpectedly');
        }
      };
    });
  }

  async disconnect() : Promise<void> {
    const msg = { message: 'Disconnect', trans_id: this.#transId++ };
    this.#disconnecting = true;
    this.#ws.send(JSON.stringify(msg));
  }

  listDataSets(): DataSet[] {
    return Array.from(this.#dataSets().values());
  }

  refreshDataSetList():  Promise<DataSet[]> {
    return new Promise<DataSet[]>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error refreshing DataSets: ${message.description}`));
          return;
        }
        this.#handleDataSetList(message);
        resolve(this.listDataSets());
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSList', argument: '', trans_id: tid }));
    });
  }

  createDataSet(dataSetName: string, base_path: string, partition_cols: any[], batchSize: number) : Promise<void> {
    const bucketName = this.#getBucketName();
    if (!bucketName) {
      return Promise.reject('ArrowClient: Cloud configuration not set');
    }
    batchSize = this.#massageBatchSize(batchSize);
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error creating DataSet: ${message.description}`));
          return;
        }
        const ds = JSON.parse(message.results);
        ds.partitionCols = JSON.parse(ds.partition_cols);
        this.#dataSets.update(map => {
          map.set(dataSetName, ds);
          return map;
        });
        resolve();
      });
      const argStr = JSON.stringify({ name: dataSetName, base_path, partition_cols,
        bucket_name: bucketName, batch_size: batchSize });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSCreate', argument: argStr,
        trans_id: tid }));
    });
  }

  deleteDataSet(dataSetName: string) {
    return new Promise<void>((resolve, reject) => {
      const dataSet = this.#dataSets().get(dataSetName);
      if (!dataSet) {
        reject(new Error(`ArrowClient: DataSet ${dataSetName} not found`));
        return;
      }
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error deleting DataSet: ${message.description}`));
          return;
        }
        this.#dataSets.update(map => {
          map.delete(dataSetName);
          return map;
        });
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSDelete', argument: `${dataSetName}`, trans_id: tid }));
    });
  }

  createView(dataSetName: string, viewName: string, expiresAfter: number,
    partitionRegex: string, query: string) {
    return new Promise<string>((resolve, reject) => {
      let dataSet = this.#dataSets().get(dataSetName);
      if (!dataSet) {
        reject(new Error(`ArrowClient: DataSet ${dataSetName} not found`));
        return;
      }
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error creating View '${viewName}': ${message.description}`));
          return;
        }
        const view = JSON.parse(message.results);
        let views = this.#views.get(dataSetName);
        if (!views) {
          views = [view];
          this.#views.set(dataSetName, views);
        }
        views.push(view);
        resolve(view);
      });
      const argStr = JSON.stringify({ dataset_id: dataSet.dataset_id, name: viewName,
        expires_after: expiresAfter, partition_regex: partitionRegex, query });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSView', argument: argStr,
        trans_id: tid }));
    });
  }

  deleteView(dataSetName: string, viewName: string) {
    return new Promise<void>((resolve, reject) => {
      let dataSet = this.#dataSets().get(dataSetName);
      if (!dataSet) {
        reject(new Error(`ArrowClient: DataSet ${dataSetName} not found`));
        return;
      }
      let views = this.#views.get(dataSetName);
      if (!views) {
        reject(new Error(`ArrowClient: Views for DataSet ${dataSetName} not found. Did you forget to list them?`));
        return;
      }
      let view = views.find((v: any) => v.name === viewName);
      if (!view) {
        reject(new Error(`ArrowClient: View ${viewName} not found in DataSet ${dataSetName}`));
        return;
      }
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error deleting View: ${message.description}`));
          return;
        }
        views.splice(views.indexOf(view), 1);
        this.#views.set(dataSetName, views);
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSViewDelete',
        argument: view.view_id.toString(), trans_id: tid }));
    });
  }

  /// send a user-specified action to the server and return the results
  doAction(action: string, argument: string): Promise<string> {
    return new Promise<string>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error doing action: ${message.description}`));
          return;
        }
        resolve(message.results);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action, argument, trans_id: tid }));
    });
  }

  initCache(dataSetName: string) : Promise<void> {
    return new Promise<void>((resolve, reject) => {
      if (!dataSetName) {
        reject(new Error('ArrowClient: No dataSetName provided'));
        return;
      }
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSCache', argument: dataSetName, trans_id: tid }));
    });
  }

  /** Returns undefined if listViews() was not previously called. */
  getCachedViews(dataSetName: string): DataSetView[] {
    return this.#views.get(dataSetName) || [];
  }

  listViews(dataSetName: string) {
    return new Promise<DataSetView[]>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error listing views: ${message.description}`));
          return;
        }
        const views = JSON.parse(message.results);
        this.#views.set(dataSetName, views);
        resolve(views);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSViewList', argument: dataSetName, trans_id: tid }));
    });
  }

  getSchema(dataSetName: string) {
    return new Promise<Object>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        const data = JSON.parse(message.results);
        resolve(data);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSGetSchema', argument: dataSetName, trans_id: tid }));
    });
  }

  listCloud(prefix?: string): Promise<string[]> {
    return new Promise<string[]>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        const data = JSON.parse(message.results);
        resolve(data);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'ListCloud', argument: prefix ||
        '', trans_id: tid }));
    });
  }

  cloudBasePathExists(basePath: string): Promise<boolean> {
    return new Promise<boolean>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        const data = JSON.parse(message.results);
        const idx = data.indexOf(basePath + '/');
        resolve(idx !== -1);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'ListCloud', argument: basePath, trans_id: tid }));
    });
  }

  /**
   *
   * @param dataSetName The name of the dataset to query
   * @param query The query to run against the view and or any static tables
   * @param asTable If true, the actual arrow table will be returned. otherwise, a binary representation of the table will be returned
   * @param params Additional parameters to pass to the query to replace placeholders
   * @returns an Arrow table in table format or IPC format
   */
    performQuery(dataSetName: string|undefined, query: string, asTable: boolean = false, params?: Object) : Promise<Uint8Array> {
      if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
        return Promise.reject(new Error('ArrowClient: WebSocket not connected'));
      }
      let flight_id = 0;
      if (dataSetName) {
        let dataSet = this.#dataSets().get(dataSetName);
        if (dataSet) {
          flight_id = dataSet.dataset_id;
        }
      }

      const tid = this.#transId++;
      const info = { buffer: Buffer.alloc(0), promise: new Promise<Uint8Array>((resolve, reject) => {
        this.#completeReads.set(tid, {querySuccess: resolve, queryFailed: reject, asTable });
      }) };
      this.#reads.set(tid, info);
      this.#pending.set(tid, (data) => {
        if (data.message === 'Error') {
          // there was an error reading the table
          const comp = this.#completeReads.get(tid);
          if (comp) {
            comp.queryFailed(new Error(`Error performing query: ${data.description}`));
            this.#completeReads.delete(tid);
          }
          this.#reads.delete(tid);
        } else if (data.message === 'Success' && this.#completeReads.get(tid)) {
          // success w/o data means the resulting table is empty
          this.#completeReads.get(tid)?.queryFailed(new Error('No data found'));
          this.#completeReads.delete(tid);
        }
      });
      // create the ticket
      const ticket = JSON.stringify({
        flight_id,
        query,
        path: dataSetName,
        first_batch_only: false,
        query_params: params,
      });
      this.#ws.send(JSON.stringify({ message: 'Read', trans_id: tid, ticket }));
      return info.promise;
    }

  /**
   *
   * @param connectParams The connection parameters to the Postgres database
   * @param query The query to run against the view and or any static tables
   * @param asTable If true, the actual arrow table will be returned. otherwise, a binary representation of the table will be returned
   * @param params Additional parameters to pass to the query to replace placeholders
   * @returns an Arrow table in table format or IPC format
   */
  performSqlQuery(connectParams: string, query: string, asTable: boolean = false) : Promise<Uint8Array> {
    if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
      return Promise.reject(new Error('ArrowClient: WebSocket not connected'));
    }
    const tid = this.#transId++;
    const info = { buffer: Buffer.alloc(0), promise: new Promise<Uint8Array>((resolve, reject) => {
      this.#completeReads.set(tid, {querySuccess: resolve, queryFailed: reject, asTable });
    }) };
    this.#reads.set(tid, info);
    this.#pending.set(tid, (data) => {
      if (data.message === 'Error') {
        // there was an error reading the table
        const comp = this.#completeReads.get(tid);
        if (comp) {
          comp.queryFailed(new Error(`Error performing sql query: ${data.description}`));
          this.#completeReads.delete(tid);
        }
        this.#reads.delete(tid);
      } else if (data.message === 'Success' && this.#completeReads.get(tid)) {
        // success w/o data means the resulting table is empty
        this.#completeReads.get(tid)?.queryFailed(new Error('No sql data returned'));
        this.#completeReads.delete(tid);
      }
    });
    // create the ticket
    const msg = JSON.stringify({
      message: 'ReadSQL',
      connect_params: connectParams,
      sql: query,
      trans_id: tid,
    });
    this.#ws.send(msg);
    return info.promise;
  }

  /**
   *
   * @param connectParams The connection parameters to the Postgres database
   * @param query The query to run against the view and or any static tables
   * @param asTable If true, the actual arrow table will be returned. otherwise, a binary representation of the table will be returned
   * @param params Additional parameters to pass to the query to replace placeholders
   * @returns an Arrow table in table format or IPC format
   */
  saveSqlQueryAsTable(connectParams: string, query: string, tableName: string) : Promise<void> {
    if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
      return Promise.reject(new Error('ArrowClient: WebSocket not connected'));
    }
    const tid = this.#transId++;
    return new Promise<void>((resolve, reject) => {
      this.#pending.set(tid, (data) => {
        if (data.message === 'Error') {
          reject(new Error(`Error performing sql query: ${data.description}`));
        } else {
          resolve();
        }
      });
      // create the ticket
      const msg = JSON.stringify({
        message: 'ReadSQL',
        connect_params: connectParams,
        sql: query,
        table_name: tableName,
        trans_id: tid,
      });
      this.#ws.send(msg);
    });
  }

  /**
   *
   * @param query The query to run against views and or static tables
   * @param tableName The name of the table to save the results as. If the table already exists, an error will be returned
   * @returns The table definition
   */
  saveQueryAsTable(query: string, tableName: string) : Promise<void> {
    if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
      return Promise.reject(new Error('ArrowClient: WebSocket not connected'));
    }
    const tid = this.#transId++;
    return new Promise<void>((resolve, reject) => {
      this.#pending.set(tid, (data) => {
        if (data.message === 'Error') {
          reject(new Error(`Error performing sql query: ${data.description}`));
        } else {
          resolve(data.results);
        }
      });
      const args = { query, table_name: tableName };
      // create the ticket
      const msg = JSON.stringify({
        message: 'DoAction',
        action: 'SaveQuery',
        argument: JSON.stringify(args),
        trans_id: tid,
      });
      this.#ws.send(msg);
    });
  }

  /**
   *
   * @param query The query to run against views and or static tables
   * @param tableName The name of the table to save the results as. If the table already exists, an error will be returned
   * @returns The table definition
   */
  saveQueryToCloud(query: string, bucketName: string, bucketPath: string) : Promise<void> {
    if (!this.#ws || this.#ws.readyState !== WebSocket.OPEN) {
      return Promise.reject(new Error('ArrowClient: WebSocket not connected'));
    }
    const tid = this.#transId++;
    return new Promise<void>((resolve, reject) => {
      this.#pending.set(tid, (data) => {
        if (data.message === 'Error') {
          reject(new Error(`Error performing sql query: ${data.description}`));
        } else {
          resolve();
        }
      });
      const args = {
        query, bucket_name: bucketName, bucket_path: bucketPath,
      };
      const msg = JSON.stringify({
        message: 'DoAction',
        action: 'SaveQuery',
        argument: JSON.stringify(args),
        trans_id: tid,
      });
      this.#ws.send(msg);
    });
  }

  createPut(dataSetName: string, base_path: string, partition_cols: any[], batchSize: number, table: Buffer) {
    return new Promise<number>((resolve, reject) => {
      const bucketName = this.#getBucketName();
      if (!bucketName) {
        reject(new Error('ArrowClient: Cloud configuration not set'));
        return;
      }
      batchSize = this.#massageBatchSize(batchSize);
      const tid = this.#transId++;
      const mdStr = JSON.stringify({ name: dataSetName, base_path, partition_cols,
        bucket_name: bucketName, batch_size: batchSize, });
      this.#currentPut = new PutStreamMachine(tid, undefined, mdStr, table, this.#ws);
      this.#currentPut.errorHandler = (reason: any) => { reject(new Error(reason)); this.#currentPut = undefined; };
      this.#currentPut.resolveHandler = (putId: number) => {
        const list_tid = this.#transId++;
        this.#pending.set(list_tid, (message) => {
          this.#handleDataSetList(message);
        });
        this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSList', argument: '', trans_id: list_tid }));
        resolve(putId);
        this.#currentPut = undefined;
      };
      this.#currentPut.start();
    });
  }

  initPut(dataSetName: string) {
    let dataSet = this.#dataSets().get(dataSetName);
    if (!dataSet) {
      return Promise.reject(new Error(`ArrowClient: DataSet ${dataSetName} not found`));
    }
    return new Promise<number>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        const data = JSON.parse(message.results);
        console.log(`got put_id = ${data.put_command.put_id}`);
        resolve(data.put_command.put_id);
      });
      const argStr = JSON.stringify({ dataset_id: dataSet.dataset_id, update_views: true });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSInitPut', argument: argStr, trans_id: tid }));
    });
  }

  continuePut(put_id: number, table: Buffer) {
    if (this.#currentPut) {
      return Promise.reject(new Error('ArrowClient: Put already in progress'));
    }
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#currentPut = new PutStreamMachine(tid, undefined, undefined, table, this.#ws);
      this.#currentPut.errorHandler = (reason: any) => { reject(new Error(reason)); this.#currentPut = undefined; };
      this.#currentPut.resolveHandler = (putId: number) => {
        resolve();
        this.#currentPut = undefined;
      };
      this.#currentPut.start(put_id);
    });
  }

  endPut(put_id: number) {
    const argStr = JSON.stringify({"command":"DSEndPut", put_id});
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error ending Put: ${message.description}`));
          return;
        }
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSEndPut', argument: argStr, trans_id: tid }));
    });
  }

  listTables() {
    return new Promise<Object[]>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        const data = JSON.parse(message.results);
        resolve(data);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'STList', argument: '', trans_id: tid }));
    })
  }

  createTable(args: CreateTableParams) {
    const bname = this.#getBucketName();
    if (!args.bucket_name && bname) {
      args.bucket_name = bname;
    }
    if (args.src_path && !args.src_bucket && bname) {
      args.src_bucket = bname;
    }
    args.config_id = this.alias.aliasId.toString()
    return new Promise<Object>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error creating table: ${message.description}`));
          return;
        }
        const table = JSON.parse(message.results);
        resolve(table);
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'STCreate', argument: JSON.stringify(args), trans_id: tid }));
    });
  }

  uploadTable(tableName: string, table: Buffer) {
    if (this.#currentPut) {
      return Promise.reject(new Error('ArrowClient: Put already in progress'));
    }
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#currentPut = new PutStreamMachine(tid, tableName, undefined, table, this.#ws);
      this.#currentPut.errorHandler = (reason: any) => { reject(new Error(reason)); this.#currentPut = undefined; };
      this.#currentPut.resolveHandler = (putId: number) => {
        resolve();
        this.#currentPut = undefined;
        console.log('table uploaded');
      };
      this.#currentPut.start();
    });
  }

  updateTables() {
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error updating tables: ${message.description}`));
          return;
        }
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'STUpdate', argument: '', trans_id: tid }));
    })
  }

  deleteTable(name: string) {
    return new Promise<void>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        if (message.message === 'Error') {
          reject(new Error(`ArrowClient: Error deleting table: ${message.description}`));
          return;
        }
        resolve();
      });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'STDelete', argument: name, trans_id: tid }));
    })
  }

  presignUrl(bucket_name: string, path: string, duration: number|undefined) {
    return new Promise<string>((resolve, reject) => {
      const tid = this.#transId++;
      this.#pending.set(tid, (message) => {
        resolve(message.results);
      });
      const argStr = JSON.stringify({ bucket_name, path, duration });
      this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'SignResource', argument: argStr, trans_id: tid }));
    });
  }

  // private methods

  #getBucketName() {
    return this.alias.cloudConfig?.bucket;
  }

  #massageBatchSize(batchSize: number) {
    if (batchSize < 1) {
      batchSize = DEFAULT_BATCH_SIZE;
    } else if (batchSize > MAX_BATCH_SIZE) {
      batchSize = MAX_BATCH_SIZE;
    }
    return batchSize;
  }

  #handleMessage(event: MessageEvent) {
    if (typeof event.data === "string") {
      this.#handleJsonMessage(event.data);
    } else if (event.data instanceof Blob) {
      GeneralHelpers.blobToBuffer(event.data).then((buffer) => {
        this.#handleBinaryMessage(buffer);
      });
    } else if (event.data instanceof ArrayBuffer) {
      this.#handleBinaryMessage(event.data);
    }
  }

  #handleBinaryMessage(data: ArrayBuffer) {
    const transIdBuffer = new Uint8Array(data.slice(0, 20));
    const transIdStr = Array.from(transIdBuffer).map((i) => String.fromCharCode(i)).join('');
    let trans_id = parseInt(transIdStr, 10);
    const batchStr = Array.from(new Uint8Array(data.slice(20, 30))).map((i) => String.fromCharCode(i)).join('');
    const batch_id = parseInt(batchStr, 10);
    let readInfo = this.#reads.get(trans_id);
    if (!readInfo) {
      console.log(`ArrowClient: Received binary data for an unknown transaction tid=${trans_id} batch=${batch_id}`);
      return;
    }
    const udata = new Uint8Array(data);
    if (udata.length === 30) {
      const comp = this.#completeReads.get(trans_id);
      if (comp) {
        const table = tableFromIPC(readInfo.buffer);
        if (comp.asTable) {
          comp.querySuccess(table as any);
        } else {
          const arrayBuffer: Uint8Array = tableToIPC(table, 'file');
          comp.querySuccess(arrayBuffer);
        }
        this.#completeReads.delete(trans_id);
      }
      // this is the a flag that all the data has been read
      this.#reads.delete(trans_id);
      return;
    }
    let buf_data = udata.subarray(30);
    readInfo.buffer = Buffer.concat([readInfo.buffer, Buffer.from(buf_data)]);
    console.log(`ArrowClient: Received ${buf_data.length} bytes of binary data tid=${trans_id}`);

  }

  #handleJsonMessage(str: string) {
    const data = JSON.parse(str);
    if (!data.message || !data.trans_id) {
      console.log(`ArrowClient: Invalid message received: ${str}`);
      return;
    }
    const header = data as ResponseHeader;
    if (this.#currentPut && this.#currentPut.transId === header.trans_id) {
      this.#currentPut.handleResponse(data);
    } else if (this.#pending.has(header.trans_id)) {
      this.#pending.get(header.trans_id)?.call(this, data);
      this.#pending.delete(header.trans_id);
    } else {
      // manully handle the message
      switch (header.message) {
        case 'Connected': {
          this.#state = ArrowClientState.AWAIT_DS_LIST;
          const list_tid = this.#transId++;
          this.#pending.set(list_tid, (message) => {
            this.#handleDataSetList(message);
            this.#state = ArrowClientState.OPEN;
            this.#readySignal.set(true);
            this.#resolveConnected();
            this.#resolveConnected = () => {};
          });
          this.#ws.send(JSON.stringify({ message: 'DoAction', action: 'DSList', argument: '', trans_id: list_tid }));
        }
          break;
        case 'Error':
          console.error(`ArrowClient: Error message: ${data.description}`);
          break;
        case 'StartBinary':
        case 'Continue':
        case 'PutComplete':
          if (this.#currentPut) {
            this.#currentPut.handleResponse(data);
          } else {
            console.error('ArrowClient: Put response received but no currentPut');
            this.#currentPut = undefined;
          }
          break;
        case 'Success':
        case 'Disconnected':
          break;
        default:
          console.log(`ArrowClient: Unhandled message: ${header.message}`);
          break;
      }
    }
  }

  #handleDataSetList(data: any) {
    let map = new Map<string, DataSet>();
    const dsets = JSON.parse(data.results);
    for (const ds of dsets) {
      ds.partitionCols = JSON.parse(ds.partition_cols);
      map.set(ds.name, ds);
      if (ds.views) {
        this.#views.set(ds.name, ds.views);
      }
    }
    this.#dataSets.set(map);
  }
}
